Skip to main content

hydro_lang/live_collections/
singleton.rs

1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17    CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28    ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36    /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37    type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39    /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40    type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42    /// Returns the [`SingletonBoundKind`] corresponding to this type.
43    fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47    type UnderlyingBound = Unbounded;
48
49    type StreamToMonotone = Monotonic;
50
51    fn bound_kind() -> SingletonBoundKind {
52        SingletonBoundKind::Unbounded
53    }
54}
55
56impl SingletonBound for Bounded {
57    type UnderlyingBound = Bounded;
58
59    type StreamToMonotone = Bounded;
60
61    fn bound_kind() -> SingletonBoundKind {
62        SingletonBoundKind::Bounded
63    }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70    type UnderlyingBound = Unbounded;
71
72    type StreamToMonotone = Monotonic;
73
74    fn bound_kind() -> SingletonBoundKind {
75        SingletonBoundKind::Monotonic
76    }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81    message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82    label = "required here",
83    note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111    pub(crate) location: Loc,
112    pub(crate) ir_node: RefCell<HydroNode>,
113    pub(crate) flow_state: FlowState,
114
115    _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119    fn drop(&mut self) {
120        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123                input: Box::new(ir_node),
124                op_metadata: HydroIrOpMetadata::new(),
125            });
126        }
127    }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132    T: Clone,
133    L: Location<'a> + NoTick,
134{
135    fn from(value: Singleton<T, L, Bounded>) -> Self {
136        let tick = value.location().tick();
137        value.clone_into_tick(&tick).latest()
138    }
139}
140
141impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
142where
143    L: Location<'a>,
144{
145    type Location = Tick<L>;
146
147    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
148        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
149            location.clone(),
150            HydroNode::DeferTick {
151                input: Box::new(HydroNode::CycleSource {
152                    cycle_id,
153                    metadata: location.new_node_metadata(Self::collection_kind()),
154                }),
155                metadata: location
156                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
157            },
158        );
159
160        from_previous_tick.unwrap_or(initial)
161    }
162}
163
164impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
165where
166    L: Location<'a>,
167{
168    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
169        assert_eq!(
170            Location::id(&self.location),
171            expected_location,
172            "locations do not match"
173        );
174        self.location
175            .flow_state()
176            .borrow_mut()
177            .push_root(HydroRoot::CycleSink {
178                cycle_id,
179                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
180                op_metadata: HydroIrOpMetadata::new(),
181            });
182    }
183}
184
185impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
186where
187    L: Location<'a>,
188{
189    type Location = Tick<L>;
190
191    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
192        Singleton::new(
193            location.clone(),
194            HydroNode::CycleSource {
195                cycle_id,
196                metadata: location.new_node_metadata(Self::collection_kind()),
197            },
198        )
199    }
200}
201
202impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
203where
204    L: Location<'a>,
205{
206    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
207        assert_eq!(
208            Location::id(&self.location),
209            expected_location,
210            "locations do not match"
211        );
212        self.location
213            .flow_state()
214            .borrow_mut()
215            .push_root(HydroRoot::CycleSink {
216                cycle_id,
217                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
218                op_metadata: HydroIrOpMetadata::new(),
219            });
220    }
221}
222
223impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
224where
225    L: Location<'a> + NoTick,
226{
227    type Location = L;
228
229    fn create_source(cycle_id: CycleId, location: L) -> Self {
230        Singleton::new(
231            location.clone(),
232            HydroNode::CycleSource {
233                cycle_id,
234                metadata: location.new_node_metadata(Self::collection_kind()),
235            },
236        )
237    }
238}
239
240impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
241where
242    L: Location<'a> + NoTick,
243{
244    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
245        assert_eq!(
246            Location::id(&self.location),
247            expected_location,
248            "locations do not match"
249        );
250        self.location
251            .flow_state()
252            .borrow_mut()
253            .push_root(HydroRoot::CycleSink {
254                cycle_id,
255                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
256                op_metadata: HydroIrOpMetadata::new(),
257            });
258    }
259}
260
261impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
262where
263    T: Clone,
264    L: Location<'a>,
265{
266    fn clone(&self) -> Self {
267        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
268            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
269            *self.ir_node.borrow_mut() = HydroNode::Tee {
270                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
271                metadata: self.location.new_node_metadata(Self::collection_kind()),
272            };
273        }
274
275        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
276            Singleton {
277                location: self.location.clone(),
278                flow_state: self.flow_state.clone(),
279                ir_node: HydroNode::Tee {
280                    inner: SharedNode(inner.0.clone()),
281                    metadata: metadata.clone(),
282                }
283                .into(),
284                _phantom: PhantomData,
285            }
286        } else {
287            unreachable!()
288        }
289    }
290}
291
292#[cfg(stageleft_runtime)]
293fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
294    me: Singleton<T, Tick<L>, B>,
295    other: Optional<O, Tick<L>, B::UnderlyingBound>,
296) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
297    let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
298    super::optional::zip_inside_tick(me_as_optional, other)
299}
300
301impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
302where
303    L: Location<'a>,
304{
305    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308        let flow_state = location.flow_state().clone();
309        Singleton {
310            location,
311            flow_state,
312            ir_node: RefCell::new(ir_node),
313            _phantom: PhantomData,
314        }
315    }
316
317    pub(crate) fn collection_kind() -> CollectionKind {
318        CollectionKind::Singleton {
319            bound: B::bound_kind(),
320            element_type: stageleft::quote_type::<T>().into(),
321        }
322    }
323
324    /// Returns the [`Location`] where this singleton is being materialized.
325    pub fn location(&self) -> &L {
326        &self.location
327    }
328
329    /// Drops the monotonicity property of the [`Singleton`].
330    pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
331        if B::bound_kind() == B::UnderlyingBound::bound_kind() {
332            Singleton::new(
333                self.location.clone(),
334                self.ir_node.replace(HydroNode::Placeholder),
335            )
336        } else {
337            Singleton::new(
338                self.location.clone(),
339                HydroNode::Cast {
340                    inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341                    metadata:
342                        self.location.new_node_metadata(
343                            Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
344                        ),
345                },
346            )
347        }
348    }
349
350    /// Transforms the singleton value by applying a function `f` to it,
351    /// continuously as the input is updated.
352    ///
353    /// # Example
354    /// ```rust
355    /// # #[cfg(feature = "deploy")] {
356    /// # use hydro_lang::prelude::*;
357    /// # use futures::StreamExt;
358    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
359    /// let tick = process.tick();
360    /// let singleton = tick.singleton(q!(5));
361    /// singleton.map(q!(|v| v * 2)).all_ticks()
362    /// # }, |mut stream| async move {
363    /// // 10
364    /// # assert_eq!(stream.next().await.unwrap(), 10);
365    /// # }));
366    /// # }
367    /// ```
368    pub fn map<U, F, OP, B2: SingletonBound>(
369        self,
370        f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
371    ) -> Singleton<U, L, B2>
372    where
373        F: Fn(T) -> U + 'a,
374        B: ApplyOrderPreservingSingleton<OP, B2>,
375    {
376        let (f, proof) = f.splice_fn1_ctx_props(&self.location);
377        proof.register_proof(&f);
378        let f = f.into();
379        Singleton::new(
380            self.location.clone(),
381            HydroNode::Map {
382                f,
383                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
384                metadata: self
385                    .location
386                    .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
387            },
388        )
389    }
390
391    /// Transforms the singleton value by applying a function `f` to it and then flattening
392    /// the result into a stream, preserving the order of elements.
393    ///
394    /// The function `f` is applied to the singleton value to produce an iterator, and all items
395    /// from that iterator are emitted in the output stream in deterministic order.
396    ///
397    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
398    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
399    /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
400    ///
401    /// # Example
402    /// ```rust
403    /// # #[cfg(feature = "deploy")] {
404    /// # use hydro_lang::prelude::*;
405    /// # use futures::StreamExt;
406    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
407    /// let tick = process.tick();
408    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
409    /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
410    /// # }, |mut stream| async move {
411    /// // 1, 2, 3
412    /// # for w in vec![1, 2, 3] {
413    /// #     assert_eq!(stream.next().await.unwrap(), w);
414    /// # }
415    /// # }));
416    /// # }
417    /// ```
418    pub fn flat_map_ordered<U, I, F>(
419        self,
420        f: impl IntoQuotedMut<'a, F, L>,
421    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
422    where
423        B: IsBounded,
424        I: IntoIterator<Item = U>,
425        F: Fn(T) -> I + 'a,
426    {
427        self.into_stream().flat_map_ordered(f)
428    }
429
430    /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
431    /// for the output type `I` to produce items in any order.
432    ///
433    /// The function `f` is applied to the singleton value to produce an iterator, and all items
434    /// from that iterator are emitted in the output stream in non-deterministic order.
435    ///
436    /// # Example
437    /// ```rust
438    /// # #[cfg(feature = "deploy")] {
439    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
440    /// # use futures::StreamExt;
441    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
442    /// let tick = process.tick();
443    /// let singleton = tick.singleton(q!(
444    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
445    /// ));
446    /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
447    /// # }, |mut stream| async move {
448    /// // 1, 2, 3, but in no particular order
449    /// # let mut results = Vec::new();
450    /// # for _ in 0..3 {
451    /// #     results.push(stream.next().await.unwrap());
452    /// # }
453    /// # results.sort();
454    /// # assert_eq!(results, vec![1, 2, 3]);
455    /// # }));
456    /// # }
457    /// ```
458    pub fn flat_map_unordered<U, I, F>(
459        self,
460        f: impl IntoQuotedMut<'a, F, L>,
461    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
462    where
463        B: IsBounded,
464        I: IntoIterator<Item = U>,
465        F: Fn(T) -> I + 'a,
466    {
467        self.into_stream().flat_map_unordered(f)
468    }
469
470    /// Flattens the singleton value into a stream, preserving the order of elements.
471    ///
472    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
473    /// are emitted in the output stream in deterministic order.
474    ///
475    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
476    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
477    /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
478    ///
479    /// # Example
480    /// ```rust
481    /// # #[cfg(feature = "deploy")] {
482    /// # use hydro_lang::prelude::*;
483    /// # use futures::StreamExt;
484    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
485    /// let tick = process.tick();
486    /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
487    /// singleton.flatten_ordered().all_ticks()
488    /// # }, |mut stream| async move {
489    /// // 1, 2, 3
490    /// # for w in vec![1, 2, 3] {
491    /// #     assert_eq!(stream.next().await.unwrap(), w);
492    /// # }
493    /// # }));
494    /// # }
495    /// ```
496    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
497    where
498        B: IsBounded,
499        T: IntoIterator<Item = U>,
500    {
501        self.flat_map_ordered(q!(|x| x))
502    }
503
504    /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
505    /// for the element type `T` to produce items in any order.
506    ///
507    /// The singleton value must implement [`IntoIterator`], and all items from that iterator
508    /// are emitted in the output stream in non-deterministic order.
509    ///
510    /// # Example
511    /// ```rust
512    /// # #[cfg(feature = "deploy")] {
513    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
514    /// # use futures::StreamExt;
515    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
516    /// let tick = process.tick();
517    /// let singleton = tick.singleton(q!(
518    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
519    /// ));
520    /// singleton.flatten_unordered().all_ticks()
521    /// # }, |mut stream| async move {
522    /// // 1, 2, 3, but in no particular order
523    /// # let mut results = Vec::new();
524    /// # for _ in 0..3 {
525    /// #     results.push(stream.next().await.unwrap());
526    /// # }
527    /// # results.sort();
528    /// # assert_eq!(results, vec![1, 2, 3]);
529    /// # }));
530    /// # }
531    /// ```
532    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
533    where
534        B: IsBounded,
535        T: IntoIterator<Item = U>,
536    {
537        self.flat_map_unordered(q!(|x| x))
538    }
539
540    /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
541    ///
542    /// If the predicate returns `true`, the output optional contains the same value.
543    /// If the predicate returns `false`, the output optional is empty.
544    ///
545    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
546    /// not modify or take ownership of the value. If you need to modify the value while filtering
547    /// use [`Singleton::filter_map`] instead.
548    ///
549    /// # Example
550    /// ```rust
551    /// # #[cfg(feature = "deploy")] {
552    /// # use hydro_lang::prelude::*;
553    /// # use futures::StreamExt;
554    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
555    /// let tick = process.tick();
556    /// let singleton = tick.singleton(q!(5));
557    /// singleton.filter(q!(|&x| x > 3)).all_ticks()
558    /// # }, |mut stream| async move {
559    /// // 5
560    /// # assert_eq!(stream.next().await.unwrap(), 5);
561    /// # }));
562    /// # }
563    /// ```
564    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
565    where
566        F: Fn(&T) -> bool + 'a,
567    {
568        let f = f.splice_fn1_borrow_ctx(&self.location).into();
569        Optional::new(
570            self.location.clone(),
571            HydroNode::Filter {
572                f,
573                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
574                metadata: self
575                    .location
576                    .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
577            },
578        )
579    }
580
581    /// An operator that both filters and maps. It yields the value only if the supplied
582    /// closure `f` returns `Some(value)`.
583    ///
584    /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
585    /// If the closure returns `None`, the output optional is empty.
586    ///
587    /// # Example
588    /// ```rust
589    /// # #[cfg(feature = "deploy")] {
590    /// # use hydro_lang::prelude::*;
591    /// # use futures::StreamExt;
592    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
593    /// let tick = process.tick();
594    /// let singleton = tick.singleton(q!("42"));
595    /// singleton
596    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
597    ///     .all_ticks()
598    /// # }, |mut stream| async move {
599    /// // 42
600    /// # assert_eq!(stream.next().await.unwrap(), 42);
601    /// # }));
602    /// # }
603    /// ```
604    pub fn filter_map<U, F>(
605        self,
606        f: impl IntoQuotedMut<'a, F, L>,
607    ) -> Optional<U, L, B::UnderlyingBound>
608    where
609        F: Fn(T) -> Option<U> + 'a,
610    {
611        let f = f.splice_fn1_ctx(&self.location).into();
612        Optional::new(
613            self.location.clone(),
614            HydroNode::FilterMap {
615                f,
616                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
617                metadata: self
618                    .location
619                    .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
620            },
621        )
622    }
623
624    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
625    ///
626    /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
627    /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
628    /// non-null. This is useful for combining several pieces of state together.
629    ///
630    /// # Example
631    /// ```rust
632    /// # #[cfg(feature = "deploy")] {
633    /// # use hydro_lang::prelude::*;
634    /// # use futures::StreamExt;
635    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
636    /// let tick = process.tick();
637    /// let numbers = process
638    ///   .source_iter(q!(vec![123, 456]))
639    ///   .batch(&tick, nondet!(/** test */));
640    /// let count = numbers.clone().count(); // Singleton
641    /// let max = numbers.max(); // Optional
642    /// count.zip(max).all_ticks()
643    /// # }, |mut stream| async move {
644    /// // [(2, 456)]
645    /// # for w in vec![(2, 456)] {
646    /// #     assert_eq!(stream.next().await.unwrap(), w);
647    /// # }
648    /// # }));
649    /// # }
650    /// ```
651    pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
652    where
653        Self: ZipResult<'a, O, Location = L>,
654        B: IsBounded,
655    {
656        check_matching_location(&self.location, &Self::other_location(&other));
657
658        if L::is_top_level()
659            && let Some(tick) = self.location.try_tick()
660        {
661            let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
662            let out = zip_inside_tick(
663                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
664                Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
665                    other_location.clone(),
666                    HydroNode::Cast {
667                        inner: Box::new(Self::other_ir_node(other)),
668                        metadata: other_location.new_node_metadata(Optional::<
669                            <Self as ZipResult<'a, O>>::OtherType,
670                            Tick<L>,
671                            Bounded,
672                        >::collection_kind(
673                        )),
674                    },
675                )
676                .snapshot(&tick, nondet!(/** eventually stabilizes */)),
677            )
678            .latest();
679
680            Self::make(
681                out.location.clone(),
682                out.ir_node.replace(HydroNode::Placeholder),
683            )
684        } else {
685            Self::make(
686                self.location.clone(),
687                HydroNode::CrossSingleton {
688                    left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
689                    right: Box::new(Self::other_ir_node(other)),
690                    metadata: self.location.new_node_metadata(CollectionKind::Optional {
691                        bound: B::BOUND_KIND,
692                        element_type: stageleft::quote_type::<
693                            <Self as ZipResult<'a, O>>::ElementType,
694                        >()
695                        .into(),
696                    }),
697                },
698            )
699        }
700    }
701
702    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
703    /// boolean signal is `true`, otherwise the output is null.
704    ///
705    /// # Example
706    /// ```rust
707    /// # #[cfg(feature = "deploy")] {
708    /// # use hydro_lang::prelude::*;
709    /// # use futures::StreamExt;
710    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
711    /// let tick = process.tick();
712    /// // ticks are lazy by default, forces the second tick to run
713    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
714    ///
715    /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
716    /// let batch_first_tick = process
717    ///   .source_iter(q!(vec![1]))
718    ///   .batch(&tick, nondet!(/** test */));
719    /// let batch_second_tick = process
720    ///   .source_iter(q!(vec![1, 2, 3]))
721    ///   .batch(&tick, nondet!(/** test */))
722    ///   .defer_tick();
723    /// batch_first_tick.chain(batch_second_tick).count()
724    ///   .filter_if(signal)
725    ///   .all_ticks()
726    /// # }, |mut stream| async move {
727    /// // [1]
728    /// # for w in vec![1] {
729    /// #     assert_eq!(stream.next().await.unwrap(), w);
730    /// # }
731    /// # }));
732    /// # }
733    /// ```
734    pub fn filter_if(
735        self,
736        signal: Singleton<bool, L, B>,
737    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
738    where
739        B: IsBounded,
740    {
741        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
742    }
743
744    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
745    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
746    ///
747    /// Useful for conditionally processing, such as only emitting a singleton's value outside
748    /// a tick if some other condition is satisfied.
749    ///
750    /// # Example
751    /// ```rust
752    /// # #[cfg(feature = "deploy")] {
753    /// # use hydro_lang::prelude::*;
754    /// # use futures::StreamExt;
755    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
756    /// let tick = process.tick();
757    /// // ticks are lazy by default, forces the second tick to run
758    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
759    ///
760    /// let batch_first_tick = process
761    ///   .source_iter(q!(vec![1]))
762    ///   .batch(&tick, nondet!(/** test */));
763    /// let batch_second_tick = process
764    ///   .source_iter(q!(vec![1, 2, 3]))
765    ///   .batch(&tick, nondet!(/** test */))
766    ///   .defer_tick(); // appears on the second tick
767    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
768    /// batch_first_tick.chain(batch_second_tick).count()
769    ///   .filter_if_some(some_on_first_tick)
770    ///   .all_ticks()
771    /// # }, |mut stream| async move {
772    /// // [1]
773    /// # for w in vec![1] {
774    /// #     assert_eq!(stream.next().await.unwrap(), w);
775    /// # }
776    /// # }));
777    /// # }
778    /// ```
779    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
780    pub fn filter_if_some<U>(
781        self,
782        signal: Optional<U, L, B>,
783    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
784    where
785        B: IsBounded,
786    {
787        self.filter_if(signal.is_some())
788    }
789
790    /// Filters this singleton into an [`Optional`], passing through the singleton value if the
791    /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
792    ///
793    /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
794    /// the condition.
795    ///
796    /// # Example
797    /// ```rust
798    /// # #[cfg(feature = "deploy")] {
799    /// # use hydro_lang::prelude::*;
800    /// # use futures::StreamExt;
801    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
802    /// let tick = process.tick();
803    /// // ticks are lazy by default, forces the second tick to run
804    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
805    ///
806    /// let batch_first_tick = process
807    ///   .source_iter(q!(vec![1]))
808    ///   .batch(&tick, nondet!(/** test */));
809    /// let batch_second_tick = process
810    ///   .source_iter(q!(vec![1, 2, 3]))
811    ///   .batch(&tick, nondet!(/** test */))
812    ///   .defer_tick(); // appears on the second tick
813    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
814    /// batch_first_tick.chain(batch_second_tick).count()
815    ///   .filter_if_none(some_on_first_tick)
816    ///   .all_ticks()
817    /// # }, |mut stream| async move {
818    /// // [3]
819    /// # for w in vec![3] {
820    /// #     assert_eq!(stream.next().await.unwrap(), w);
821    /// # }
822    /// # }));
823    /// # }
824    /// ```
825    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
826    pub fn filter_if_none<U>(
827        self,
828        other: Optional<U, L, B>,
829    ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
830    where
831        B: IsBounded,
832    {
833        self.filter_if(other.is_none())
834    }
835
836    /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
837    ///
838    /// # Example
839    /// ```rust
840    /// # #[cfg(feature = "deploy")] {
841    /// # use hydro_lang::prelude::*;
842    /// # use futures::StreamExt;
843    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844    /// let tick = process.tick();
845    /// let a = tick.singleton(q!(5));
846    /// let b = tick.singleton(q!(5));
847    /// a.equals(b).all_ticks()
848    /// # }, |mut stream| async move {
849    /// // [true]
850    /// # assert_eq!(stream.next().await.unwrap(), true);
851    /// # }));
852    /// # }
853    /// ```
854    pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
855    where
856        T: PartialEq,
857        B: IsBounded,
858    {
859        self.zip(other).map(q!(|(a, b)| a == b))
860    }
861
862    /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
863    /// greater than or equal to the provided threshold. The event will have the value of the
864    /// given threshold.
865    ///
866    /// This requires the incoming singleton to be monotonic, because otherwise the detection of
867    /// the threshold would be non-deterministic.
868    ///
869    /// # Example
870    /// ```rust
871    /// # #[cfg(feature = "deploy")] {
872    /// # use hydro_lang::prelude::*;
873    /// # use futures::StreamExt;
874    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
875    /// let a = // singleton 1 ~> 5 ~> 10
876    /// # process.singleton(q!(5));
877    /// let b = process.singleton(q!(4));
878    /// a.threshold_greater_or_equal(b)
879    /// # }, |mut stream| async move {
880    /// // [4]
881    /// # assert_eq!(stream.next().await.unwrap(), 4);
882    /// # }));
883    /// # }
884    /// ```
885    pub fn threshold_greater_or_equal<B2: IsBounded>(
886        self,
887        threshold: Singleton<T, L, B2>,
888    ) -> Stream<T, L, B::UnderlyingBound>
889    where
890        T: Clone + PartialOrd,
891        B: IsMonotonic,
892    {
893        let threshold = threshold.make_bounded();
894        match self.try_make_bounded() {
895            Ok(bounded) => {
896                let uncasted = threshold
897                    .zip(bounded)
898                    .into_stream()
899                    .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
900
901                Stream::new(
902                    uncasted.location.clone(),
903                    uncasted.ir_node.replace(HydroNode::Placeholder),
904                )
905            }
906            Err(me) => {
907                let uncasted = sliced! {
908                    let me = use(me, nondet!(/** thresholds are deterministic */));
909                    let mut remaining_threshold = use::state(|l| {
910                        let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
911                        as_option
912                    });
913
914                    let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
915                    remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
916                    passed.map(q!(|(t, _)| t))
917                };
918
919                Stream::new(
920                    uncasted.location.clone(),
921                    uncasted.ir_node.replace(HydroNode::Placeholder),
922                )
923            }
924        }
925    }
926
927    /// An operator which allows you to "name" a `HydroNode`.
928    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
929    pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
930        {
931            let mut node = self.ir_node.borrow_mut();
932            let metadata = node.metadata_mut();
933            metadata.tag = Some(name.to_owned());
934        }
935        self
936    }
937}
938
939impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
940    type Output = Singleton<bool, L, B::UnderlyingBound>;
941
942    fn not(self) -> Self::Output {
943        self.map(q!(|b| !b))
944    }
945}
946
947impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
948where
949    L: Location<'a>,
950{
951    /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
952    /// the inner `Option`.
953    ///
954    /// This is implemented as an identity [`Singleton::filter_map`], passing through the
955    /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
956    /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
957    ///
958    /// # Example
959    /// ```rust
960    /// # #[cfg(feature = "deploy")] {
961    /// # use hydro_lang::prelude::*;
962    /// # use futures::StreamExt;
963    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
964    /// let tick = process.tick();
965    /// let singleton = tick.singleton(q!(Some(42)));
966    /// singleton.into_optional().all_ticks()
967    /// # }, |mut stream| async move {
968    /// // 42
969    /// # assert_eq!(stream.next().await.unwrap(), 42);
970    /// # }));
971    /// # }
972    /// ```
973    pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
974        self.filter_map(q!(|v| v))
975    }
976}
977
978impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
979where
980    L: Location<'a>,
981{
982    /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
983    ///
984    /// # Example
985    /// ```rust
986    /// # #[cfg(feature = "deploy")] {
987    /// # use hydro_lang::prelude::*;
988    /// # use futures::StreamExt;
989    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
990    /// let tick = process.tick();
991    /// // ticks are lazy by default, forces the second tick to run
992    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
993    ///
994    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
995    /// let b = tick.singleton(q!(true)); // true, true
996    /// a.and(b).all_ticks()
997    /// # }, |mut stream| async move {
998    /// // [true, false]
999    /// # for w in vec![true, false] {
1000    /// #     assert_eq!(stream.next().await.unwrap(), w);
1001    /// # }
1002    /// # }));
1003    /// # }
1004    /// ```
1005    pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1006    where
1007        B: IsBounded,
1008    {
1009        self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1010    }
1011
1012    /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1013    ///
1014    /// # Example
1015    /// ```rust
1016    /// # #[cfg(feature = "deploy")] {
1017    /// # use hydro_lang::prelude::*;
1018    /// # use futures::StreamExt;
1019    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1020    /// let tick = process.tick();
1021    /// // ticks are lazy by default, forces the second tick to run
1022    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1023    ///
1024    /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1025    /// let b = tick.singleton(q!(false)); // false, false
1026    /// a.or(b).all_ticks()
1027    /// # }, |mut stream| async move {
1028    /// // [true, false]
1029    /// # for w in vec![true, false] {
1030    /// #     assert_eq!(stream.next().await.unwrap(), w);
1031    /// # }
1032    /// # }));
1033    /// # }
1034    /// ```
1035    pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1036    where
1037        B: IsBounded,
1038    {
1039        self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1040    }
1041}
1042
1043impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1044where
1045    L: Location<'a> + NoTick,
1046{
1047    /// Returns a singleton value corresponding to the latest snapshot of the singleton
1048    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1049    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1050    /// all snapshots of this singleton into the atomic-associated tick will observe the
1051    /// same value each tick.
1052    ///
1053    /// # Non-Determinism
1054    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1055    /// the output singleton has a non-deterministic value since the snapshot can be at an
1056    /// arbitrary point in time.
1057    pub fn snapshot_atomic(
1058        self,
1059        tick: &Tick<L>,
1060        _nondet: NonDet,
1061    ) -> Singleton<T, Tick<L>, Bounded> {
1062        Singleton::new(
1063            tick.clone(),
1064            HydroNode::Batch {
1065                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1066                metadata: tick
1067                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1068            },
1069        )
1070    }
1071
1072    /// Returns this singleton back into a top-level, asynchronous execution context where updates
1073    /// to the value will be asynchronously propagated.
1074    pub fn end_atomic(self) -> Singleton<T, L, B> {
1075        Singleton::new(
1076            self.location.tick.l.clone(),
1077            HydroNode::EndAtomic {
1078                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079                metadata: self
1080                    .location
1081                    .tick
1082                    .l
1083                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1084            },
1085        )
1086    }
1087}
1088
1089impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1090where
1091    L: Location<'a>,
1092{
1093    /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1094    /// will observe the same version of the value and will be executed synchronously before any
1095    /// outputs are yielded (in [`Optional::end_atomic`]).
1096    ///
1097    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1098    /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1099    /// a different version).
1100    pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1101        let id = self.location.flow_state().borrow_mut().next_clock_id();
1102        let out_location = Atomic {
1103            tick: Tick {
1104                id,
1105                l: self.location.clone(),
1106            },
1107        };
1108        Singleton::new(
1109            out_location.clone(),
1110            HydroNode::BeginAtomic {
1111                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1112                metadata: out_location
1113                    .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1114            },
1115        )
1116    }
1117
1118    /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1119    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1120    /// relevant data that contributed to the snapshot at tick `t`.
1121    ///
1122    /// # Non-Determinism
1123    /// Because this picks a snapshot of a singleton whose value is continuously changing,
1124    /// the output singleton has a non-deterministic value since the snapshot can be at an
1125    /// arbitrary point in time.
1126    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
1127        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1128        Singleton::new(
1129            tick.clone(),
1130            HydroNode::Batch {
1131                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1132                metadata: tick
1133                    .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1134            },
1135        )
1136    }
1137
1138    /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1139    /// with order corresponding to increasing prefixes of data contributing to the singleton.
1140    ///
1141    /// # Non-Determinism
1142    /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1143    /// to non-deterministic batching and arrival of inputs, the output stream is
1144    /// non-deterministic.
1145    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1146    where
1147        L: NoTick,
1148    {
1149        sliced! {
1150            let snapshot = use(self, nondet);
1151            snapshot.into_stream()
1152        }
1153        .weaken_retries()
1154    }
1155
1156    /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1157    /// value taken at various points in time. Because the input singleton may be
1158    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1159    /// represent the value of the singleton given some prefix of the streams leading up to
1160    /// it.
1161    ///
1162    /// # Non-Determinism
1163    /// The output stream is non-deterministic in which elements are sampled, since this
1164    /// is controlled by a clock.
1165    pub fn sample_every(
1166        self,
1167        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1168        nondet: NonDet,
1169    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1170    where
1171        L: NoTick + NoAtomic,
1172    {
1173        let samples = self.location.source_interval(interval, nondet);
1174        sliced! {
1175            let snapshot = use(self, nondet);
1176            let sample_batch = use(samples, nondet);
1177
1178            snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1179        }
1180        .weaken_retries()
1181    }
1182
1183    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1184    /// implies that `B == Bounded`.
1185    pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1186    where
1187        B: IsBounded,
1188    {
1189        Singleton::new(
1190            self.location.clone(),
1191            self.ir_node.replace(HydroNode::Placeholder),
1192        )
1193    }
1194
1195    #[expect(clippy::result_large_err, reason = "internal use only")]
1196    fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1197        if B::UnderlyingBound::BOUNDED {
1198            Ok(Singleton::new(
1199                self.location.clone(),
1200                self.ir_node.replace(HydroNode::Placeholder),
1201            ))
1202        } else {
1203            Err(self)
1204        }
1205    }
1206
1207    /// Clones this bounded singleton into a tick, returning a singleton that has the
1208    /// same value as the outer singleton. Because the outer singleton is bounded, this
1209    /// is deterministic because there is only a single immutable version.
1210    pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1211    where
1212        B: IsBounded,
1213        T: Clone,
1214    {
1215        // TODO(shadaj): avoid printing simulator logs for this snapshot
1216        self.snapshot(
1217            tick,
1218            nondet!(/** bounded top-level singleton so deterministic */),
1219        )
1220    }
1221
1222    /// Converts this singleton into a [`Stream`] containing a single element, the value.
1223    ///
1224    /// # Example
1225    /// ```rust
1226    /// # #[cfg(feature = "deploy")] {
1227    /// # use hydro_lang::prelude::*;
1228    /// # use futures::StreamExt;
1229    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1230    /// let tick = process.tick();
1231    /// let batch_input = process
1232    ///   .source_iter(q!(vec![123, 456]))
1233    ///   .batch(&tick, nondet!(/** test */));
1234    /// batch_input.clone().chain(
1235    ///   batch_input.count().into_stream()
1236    /// ).all_ticks()
1237    /// # }, |mut stream| async move {
1238    /// // [123, 456, 2]
1239    /// # for w in vec![123, 456, 2] {
1240    /// #     assert_eq!(stream.next().await.unwrap(), w);
1241    /// # }
1242    /// # }));
1243    /// # }
1244    /// ```
1245    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1246    where
1247        B: IsBounded,
1248    {
1249        Stream::new(
1250            self.location.clone(),
1251            HydroNode::Cast {
1252                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1253                metadata: self.location.new_node_metadata(Stream::<
1254                    T,
1255                    Tick<L>,
1256                    Bounded,
1257                    TotalOrder,
1258                    ExactlyOnce,
1259                >::collection_kind()),
1260            },
1261        )
1262    }
1263
1264    /// Resolves the singleton's [`Future`] value by blocking until it completes,
1265    /// producing a singleton of the resolved output.
1266    ///
1267    /// This is useful when the singleton contains an async computation that must
1268    /// be awaited before further processing. The future is polled to completion
1269    /// before the output value is emitted.
1270    ///
1271    /// # Example
1272    /// ```rust
1273    /// # #[cfg(feature = "deploy")] {
1274    /// # use hydro_lang::prelude::*;
1275    /// # use futures::StreamExt;
1276    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1277    /// let tick = process.tick();
1278    /// let singleton = tick.singleton(q!(5));
1279    /// singleton
1280    ///     .map(q!(|v| async move { v * 2 }))
1281    ///     .resolve_future_blocking()
1282    ///     .all_ticks()
1283    /// # }, |mut stream| async move {
1284    /// // 10
1285    /// # assert_eq!(stream.next().await.unwrap(), 10);
1286    /// # }));
1287    /// # }
1288    /// ```
1289    pub fn resolve_future_blocking(
1290        self,
1291    ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1292    where
1293        T: Future,
1294        B: IsBounded,
1295    {
1296        Singleton::new(
1297            self.location.clone(),
1298            HydroNode::ResolveFuturesBlocking {
1299                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1300                metadata: self
1301                    .location
1302                    .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1303            },
1304        )
1305    }
1306}
1307
1308impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1309where
1310    L: Location<'a>,
1311{
1312    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1313    /// which will stream the value computed in _each_ tick as a separate stream element.
1314    ///
1315    /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1316    /// producing one element in the output for each tick. This is useful for batched computations,
1317    /// where the results from each tick must be combined together.
1318    ///
1319    /// # Example
1320    /// ```rust
1321    /// # #[cfg(feature = "deploy")] {
1322    /// # use hydro_lang::prelude::*;
1323    /// # use futures::StreamExt;
1324    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325    /// let tick = process.tick();
1326    /// # // ticks are lazy by default, forces the second tick to run
1327    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1328    /// # let batch_first_tick = process
1329    /// #   .source_iter(q!(vec![1]))
1330    /// #   .batch(&tick, nondet!(/** test */));
1331    /// # let batch_second_tick = process
1332    /// #   .source_iter(q!(vec![1, 2, 3]))
1333    /// #   .batch(&tick, nondet!(/** test */))
1334    /// #   .defer_tick(); // appears on the second tick
1335    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1336    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1337    ///     .count()
1338    ///     .all_ticks()
1339    /// # }, |mut stream| async move {
1340    /// // [1, 3]
1341    /// # for w in vec![1, 3] {
1342    /// #     assert_eq!(stream.next().await.unwrap(), w);
1343    /// # }
1344    /// # }));
1345    /// # }
1346    /// ```
1347    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1348        self.into_stream().all_ticks()
1349    }
1350
1351    /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1352    /// which will stream the value computed in _each_ tick as a separate stream element.
1353    ///
1354    /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1355    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1356    /// singleton's [`Tick`] context.
1357    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1358        self.into_stream().all_ticks_atomic()
1359    }
1360
1361    /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1362    /// be asynchronously updated with the latest value of the singleton inside the tick.
1363    ///
1364    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1365    /// tick that tracks the inner value. This is useful for getting the value as of the
1366    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1367    ///
1368    /// # Example
1369    /// ```rust
1370    /// # #[cfg(feature = "deploy")] {
1371    /// # use hydro_lang::prelude::*;
1372    /// # use futures::StreamExt;
1373    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1374    /// let tick = process.tick();
1375    /// # // ticks are lazy by default, forces the second tick to run
1376    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1377    /// # let batch_first_tick = process
1378    /// #   .source_iter(q!(vec![1]))
1379    /// #   .batch(&tick, nondet!(/** test */));
1380    /// # let batch_second_tick = process
1381    /// #   .source_iter(q!(vec![1, 2, 3]))
1382    /// #   .batch(&tick, nondet!(/** test */))
1383    /// #   .defer_tick(); // appears on the second tick
1384    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1385    /// input_batch // first tick: [1], second tick: [1, 2, 3]
1386    ///     .count()
1387    ///     .latest()
1388    /// # .sample_eager(nondet!(/** test */))
1389    /// # }, |mut stream| async move {
1390    /// // asynchronously changes from 1 ~> 3
1391    /// # for w in vec![1, 3] {
1392    /// #     assert_eq!(stream.next().await.unwrap(), w);
1393    /// # }
1394    /// # }));
1395    /// # }
1396    /// ```
1397    pub fn latest(self) -> Singleton<T, L, Unbounded> {
1398        Singleton::new(
1399            self.location.outer().clone(),
1400            HydroNode::YieldConcat {
1401                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1402                metadata: self
1403                    .location
1404                    .outer()
1405                    .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1406            },
1407        )
1408    }
1409
1410    /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1411    /// be updated with the latest value of the singleton inside the tick.
1412    ///
1413    /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1414    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1415    /// singleton's [`Tick`] context.
1416    pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1417        let out_location = Atomic {
1418            tick: self.location.clone(),
1419        };
1420        Singleton::new(
1421            out_location.clone(),
1422            HydroNode::YieldConcat {
1423                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1424                metadata: out_location
1425                    .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1426            },
1427        )
1428    }
1429}
1430
1431#[doc(hidden)]
1432/// Helper trait that determines the output collection type for [`Singleton::zip`].
1433///
1434/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1435/// [`Singleton`].
1436#[sealed::sealed]
1437pub trait ZipResult<'a, Other> {
1438    /// The output collection type.
1439    type Out;
1440    /// The type of the tupled output value.
1441    type ElementType;
1442    /// The type of the other collection's value.
1443    type OtherType;
1444    /// The location where the tupled result will be materialized.
1445    type Location: Location<'a>;
1446
1447    /// The location of the second input to the `zip`.
1448    fn other_location(other: &Other) -> Self::Location;
1449    /// The IR node of the second input to the `zip`.
1450    fn other_ir_node(other: Other) -> HydroNode;
1451
1452    /// Constructs the output live collection given an IR node containing the zip result.
1453    fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1454}
1455
1456#[sealed::sealed]
1457impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1458where
1459    L: Location<'a>,
1460{
1461    type Out = Singleton<(T, U), L, B>;
1462    type ElementType = (T, U);
1463    type OtherType = U;
1464    type Location = L;
1465
1466    fn other_location(other: &Singleton<U, L, B>) -> L {
1467        other.location.clone()
1468    }
1469
1470    fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1471        other.ir_node.replace(HydroNode::Placeholder)
1472    }
1473
1474    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1475        Singleton::new(
1476            location.clone(),
1477            HydroNode::Cast {
1478                inner: Box::new(ir_node),
1479                metadata: location.new_node_metadata(Self::Out::collection_kind()),
1480            },
1481        )
1482    }
1483}
1484
1485#[sealed::sealed]
1486impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1487    for Singleton<T, L, B>
1488where
1489    L: Location<'a>,
1490{
1491    type Out = Optional<(T, U), L, B::UnderlyingBound>;
1492    type ElementType = (T, U);
1493    type OtherType = U;
1494    type Location = L;
1495
1496    fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1497        other.location.clone()
1498    }
1499
1500    fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1501        other.ir_node.replace(HydroNode::Placeholder)
1502    }
1503
1504    fn make(location: L, ir_node: HydroNode) -> Self::Out {
1505        Optional::new(location, ir_node)
1506    }
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511    #[cfg(feature = "deploy")]
1512    use futures::{SinkExt, StreamExt};
1513    #[cfg(feature = "deploy")]
1514    use hydro_deploy::Deployment;
1515    #[cfg(any(feature = "deploy", feature = "sim"))]
1516    use stageleft::q;
1517
1518    #[cfg(any(feature = "deploy", feature = "sim"))]
1519    use crate::compile::builder::FlowBuilder;
1520    #[cfg(feature = "deploy")]
1521    use crate::live_collections::stream::ExactlyOnce;
1522    #[cfg(any(feature = "deploy", feature = "sim"))]
1523    use crate::location::Location;
1524    #[cfg(any(feature = "deploy", feature = "sim"))]
1525    use crate::nondet::nondet;
1526
1527    #[cfg(feature = "deploy")]
1528    #[tokio::test]
1529    async fn tick_cycle_cardinality() {
1530        let mut deployment = Deployment::new();
1531
1532        let mut flow = FlowBuilder::new();
1533        let node = flow.process::<()>();
1534        let external = flow.external::<()>();
1535
1536        let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1537
1538        let node_tick = node.tick();
1539        let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1540        let counts = singleton
1541            .clone()
1542            .into_stream()
1543            .count()
1544            .filter_if(
1545                input
1546                    .batch(&node_tick, nondet!(/** testing */))
1547                    .first()
1548                    .is_some(),
1549            )
1550            .all_ticks()
1551            .send_bincode_external(&external);
1552        complete_cycle.complete_next_tick(singleton);
1553
1554        let nodes = flow
1555            .with_process(&node, deployment.Localhost())
1556            .with_external(&external, deployment.Localhost())
1557            .deploy(&mut deployment);
1558
1559        deployment.deploy().await.unwrap();
1560
1561        let mut tick_trigger = nodes.connect(input_send).await;
1562        let mut external_out = nodes.connect(counts).await;
1563
1564        deployment.start().await.unwrap();
1565
1566        tick_trigger.send(()).await.unwrap();
1567
1568        assert_eq!(external_out.next().await.unwrap(), 1);
1569
1570        tick_trigger.send(()).await.unwrap();
1571
1572        assert_eq!(external_out.next().await.unwrap(), 1);
1573    }
1574
1575    #[cfg(feature = "sim")]
1576    #[test]
1577    #[should_panic]
1578    fn sim_fold_intermediate_states() {
1579        let mut flow = FlowBuilder::new();
1580        let node = flow.process::<()>();
1581
1582        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1583        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1584
1585        let tick = node.tick();
1586        let batch = folded.snapshot(&tick, nondet!(/** test */));
1587        let out_recv = batch.all_ticks().sim_output();
1588
1589        flow.sim().exhaustive(async || {
1590            assert_eq!(out_recv.next().await.unwrap(), 10);
1591        });
1592    }
1593
1594    #[cfg(feature = "sim")]
1595    #[test]
1596    fn sim_fold_intermediate_state_count() {
1597        let mut flow = FlowBuilder::new();
1598        let node = flow.process::<()>();
1599
1600        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1601        let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1602
1603        let tick = node.tick();
1604        let batch = folded.snapshot(&tick, nondet!(/** test */));
1605        let out_recv = batch.all_ticks().sim_output();
1606
1607        let instance_count = flow.sim().exhaustive(async || {
1608            let out = out_recv.collect::<Vec<_>>().await;
1609            assert_eq!(out.last(), Some(&10));
1610        });
1611
1612        assert_eq!(
1613            instance_count,
1614            16 // 2^4 possible subsets of intermediates (including initial state)
1615        )
1616    }
1617
1618    #[cfg(feature = "sim")]
1619    #[test]
1620    fn sim_fold_no_repeat_initial() {
1621        // check that we don't repeat the initial state of the fold in autonomous decisions
1622
1623        let mut flow = FlowBuilder::new();
1624        let node = flow.process::<()>();
1625
1626        let (in_port, input) = node.sim_input();
1627        let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1628
1629        let tick = node.tick();
1630        let batch = folded.snapshot(&tick, nondet!(/** test */));
1631        let out_recv = batch.all_ticks().sim_output();
1632
1633        flow.sim().exhaustive(async || {
1634            assert_eq!(out_recv.next().await.unwrap(), 0);
1635
1636            in_port.send(123);
1637
1638            assert_eq!(out_recv.next().await.unwrap(), 123);
1639        });
1640    }
1641
1642    #[cfg(feature = "sim")]
1643    #[test]
1644    #[should_panic]
1645    fn sim_fold_repeats_snapshots() {
1646        // when the tick is driven by a snapshot AND something else, the snapshot can
1647        // "stutter" and repeat the same state multiple times
1648
1649        let mut flow = FlowBuilder::new();
1650        let node = flow.process::<()>();
1651
1652        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1653        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1654
1655        let tick = node.tick();
1656        let batch = source
1657            .batch(&tick, nondet!(/** test */))
1658            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1659        let out_recv = batch.all_ticks().sim_output();
1660
1661        flow.sim().exhaustive(async || {
1662            if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1663            {
1664                panic!("repeated snapshot");
1665            }
1666        });
1667    }
1668
1669    #[cfg(feature = "sim")]
1670    #[test]
1671    fn sim_fold_repeats_snapshots_count() {
1672        // check the number of instances
1673        let mut flow = FlowBuilder::new();
1674        let node = flow.process::<()>();
1675
1676        let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1677        let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1678
1679        let tick = node.tick();
1680        let batch = source
1681            .batch(&tick, nondet!(/** test */))
1682            .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1683        let out_recv = batch.all_ticks().sim_output();
1684
1685        let count = flow.sim().exhaustive(async || {
1686            let _ = out_recv.collect::<Vec<_>>().await;
1687        });
1688
1689        assert_eq!(count, 52);
1690        // don't have a combinatorial explanation for this number yet, but checked via logs
1691    }
1692
1693    #[cfg(feature = "sim")]
1694    #[test]
1695    fn sim_top_level_singleton_exhaustive() {
1696        // ensures that top-level singletons have only one snapshot
1697        let mut flow = FlowBuilder::new();
1698        let node = flow.process::<()>();
1699
1700        let singleton = node.singleton(q!(1));
1701        let tick = node.tick();
1702        let batch = singleton.snapshot(&tick, nondet!(/** test */));
1703        let out_recv = batch.all_ticks().sim_output();
1704
1705        let count = flow.sim().exhaustive(async || {
1706            let _ = out_recv.collect::<Vec<_>>().await;
1707        });
1708
1709        assert_eq!(count, 1);
1710    }
1711
1712    #[cfg(feature = "sim")]
1713    #[test]
1714    fn sim_top_level_singleton_join_count() {
1715        // if a tick consumes a static snapshot and a stream batch, only the batch require space
1716        // exploration
1717
1718        let mut flow = FlowBuilder::new();
1719        let node = flow.process::<()>();
1720
1721        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1722        let tick = node.tick();
1723        let batch = source_iter
1724            .batch(&tick, nondet!(/** test */))
1725            .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1726        let out_recv = batch.all_ticks().sim_output();
1727
1728        let instance_count = flow.sim().exhaustive(async || {
1729            let _ = out_recv.collect::<Vec<_>>().await;
1730        });
1731
1732        assert_eq!(
1733            instance_count,
1734            16 // 2^4 ways to split up (including a possibly empty first batch)
1735        )
1736    }
1737
1738    #[cfg(feature = "sim")]
1739    #[test]
1740    fn top_level_singleton_into_stream_no_replay() {
1741        let mut flow = FlowBuilder::new();
1742        let node = flow.process::<()>();
1743
1744        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1745        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1746
1747        let out_recv = folded.into_stream().sim_output();
1748
1749        flow.sim().exhaustive(async || {
1750            out_recv.assert_yields_only([10]).await;
1751        });
1752    }
1753
1754    #[cfg(feature = "sim")]
1755    #[test]
1756    fn inside_tick_singleton_zip() {
1757        use crate::live_collections::Stream;
1758        use crate::live_collections::sliced::sliced;
1759
1760        let mut flow = FlowBuilder::new();
1761        let node = flow.process::<()>();
1762
1763        let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1764        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1765
1766        let out_recv = sliced! {
1767            let v = use(folded, nondet!(/** test */));
1768            v.clone().zip(v).into_stream()
1769        }
1770        .sim_output();
1771
1772        let count = flow.sim().exhaustive(async || {
1773            let out = out_recv.collect::<Vec<_>>().await;
1774            assert_eq!(out.last(), Some(&(3, 3)));
1775        });
1776
1777        assert_eq!(count, 4);
1778    }
1779}