Skip to main content

hydro_lang/live_collections/
optional.rs

1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41    pub(crate) location: Loc,
42    pub(crate) ir_node: RefCell<HydroNode>,
43    pub(crate) flow_state: FlowState,
44
45    _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49    fn drop(&mut self) {
50        let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51        if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52            self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53                input: Box::new(ir_node),
54                op_metadata: HydroIrOpMetadata::new(),
55            });
56        }
57    }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62    T: Clone,
63    L: Location<'a> + NoTick,
64{
65    fn from(value: Optional<T, L, Bounded>) -> Self {
66        let tick = value.location().tick();
67        value.clone_into_tick(&tick).latest()
68    }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73    L: Location<'a>,
74{
75    fn defer_tick(self) -> Self {
76        Optional::defer_tick(self)
77    }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82    L: Location<'a>,
83{
84    type Location = Tick<L>;
85
86    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87        Optional::new(
88            location.clone(),
89            HydroNode::CycleSource {
90                cycle_id,
91                metadata: location.new_node_metadata(Self::collection_kind()),
92            },
93        )
94    }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99    L: Location<'a>,
100{
101    type Location = Tick<L>;
102
103    fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
104        let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
105            location.clone(),
106            HydroNode::DeferTick {
107                input: Box::new(HydroNode::CycleSource {
108                    cycle_id,
109                    metadata: location.new_node_metadata(Self::collection_kind()),
110                }),
111                metadata: location
112                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
113            },
114        );
115
116        from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
117    }
118}
119
120impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
121where
122    L: Location<'a>,
123{
124    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
125        assert_eq!(
126            Location::id(&self.location),
127            expected_location,
128            "locations do not match"
129        );
130        self.location
131            .flow_state()
132            .borrow_mut()
133            .push_root(HydroRoot::CycleSink {
134                cycle_id,
135                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
136                op_metadata: HydroIrOpMetadata::new(),
137            });
138    }
139}
140
141impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
142where
143    L: Location<'a>,
144{
145    type Location = Tick<L>;
146
147    fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
148        Optional::new(
149            location.clone(),
150            HydroNode::CycleSource {
151                cycle_id,
152                metadata: location.new_node_metadata(Self::collection_kind()),
153            },
154        )
155    }
156}
157
158impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
159where
160    L: Location<'a>,
161{
162    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
163        assert_eq!(
164            Location::id(&self.location),
165            expected_location,
166            "locations do not match"
167        );
168        self.location
169            .flow_state()
170            .borrow_mut()
171            .push_root(HydroRoot::CycleSink {
172                cycle_id,
173                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
174                op_metadata: HydroIrOpMetadata::new(),
175            });
176    }
177}
178
179impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
180where
181    L: Location<'a> + NoTick,
182{
183    type Location = L;
184
185    fn create_source(cycle_id: CycleId, location: L) -> Self {
186        Optional::new(
187            location.clone(),
188            HydroNode::CycleSource {
189                cycle_id,
190                metadata: location.new_node_metadata(Self::collection_kind()),
191            },
192        )
193    }
194}
195
196impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
197where
198    L: Location<'a> + NoTick,
199{
200    fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201        assert_eq!(
202            Location::id(&self.location),
203            expected_location,
204            "locations do not match"
205        );
206        self.location
207            .flow_state()
208            .borrow_mut()
209            .push_root(HydroRoot::CycleSink {
210                cycle_id,
211                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212                op_metadata: HydroIrOpMetadata::new(),
213            });
214    }
215}
216
217impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
218where
219    L: Location<'a>,
220{
221    fn from(singleton: Singleton<T, L, B>) -> Self {
222        Optional::new(
223            singleton.location.clone(),
224            HydroNode::Cast {
225                inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
226                metadata: singleton
227                    .location
228                    .new_node_metadata(Self::collection_kind()),
229            },
230        )
231    }
232}
233
234#[cfg(stageleft_runtime)]
235pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
236    me: Optional<T, L, B>,
237    other: Optional<O, L, B>,
238) -> Optional<(T, O), L, B> {
239    check_matching_location(&me.location, &other.location);
240
241    Optional::new(
242        me.location.clone(),
243        HydroNode::CrossSingleton {
244            left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
245            right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
246            metadata: me
247                .location
248                .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
249        },
250    )
251}
252
253#[cfg(stageleft_runtime)]
254fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
255    me: Optional<T, L, B>,
256    other: Optional<T, L, B>,
257) -> Optional<T, L, B> {
258    check_matching_location(&me.location, &other.location);
259
260    Optional::new(
261        me.location.clone(),
262        HydroNode::ChainFirst {
263            first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
264            second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
265            metadata: me
266                .location
267                .new_node_metadata(Optional::<T, L, B>::collection_kind()),
268        },
269    )
270}
271
272impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
273where
274    T: Clone,
275    L: Location<'a>,
276{
277    fn clone(&self) -> Self {
278        if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
279            let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
280            *self.ir_node.borrow_mut() = HydroNode::Tee {
281                inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
282                metadata: self.location.new_node_metadata(Self::collection_kind()),
283            };
284        }
285
286        if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
287            Optional {
288                location: self.location.clone(),
289                flow_state: self.flow_state.clone(),
290                ir_node: HydroNode::Tee {
291                    inner: SharedNode(inner.0.clone()),
292                    metadata: metadata.clone(),
293                }
294                .into(),
295                _phantom: PhantomData,
296            }
297        } else {
298            unreachable!()
299        }
300    }
301}
302
303impl<'a, T, L, B: Boundedness> Optional<T, L, B>
304where
305    L: Location<'a>,
306{
307    pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
308        debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
309        debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
310        let flow_state = location.flow_state().clone();
311        Optional {
312            location,
313            flow_state,
314            ir_node: RefCell::new(ir_node),
315            _phantom: PhantomData,
316        }
317    }
318
319    pub(crate) fn collection_kind() -> CollectionKind {
320        CollectionKind::Optional {
321            bound: B::BOUND_KIND,
322            element_type: stageleft::quote_type::<T>().into(),
323        }
324    }
325
326    /// Returns the [`Location`] where this optional is being materialized.
327    pub fn location(&self) -> &L {
328        &self.location
329    }
330
331    /// Transforms the optional value by applying a function `f` to it,
332    /// continuously as the input is updated.
333    ///
334    /// Whenever the optional is empty, the output optional is also empty.
335    ///
336    /// # Example
337    /// ```rust
338    /// # #[cfg(feature = "deploy")] {
339    /// # use hydro_lang::prelude::*;
340    /// # use futures::StreamExt;
341    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
342    /// let tick = process.tick();
343    /// let optional = tick.optional_first_tick(q!(1));
344    /// optional.map(q!(|v| v + 1)).all_ticks()
345    /// # }, |mut stream| async move {
346    /// // 2
347    /// # assert_eq!(stream.next().await.unwrap(), 2);
348    /// # }));
349    /// # }
350    /// ```
351    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
352    where
353        F: Fn(T) -> U + 'a,
354    {
355        let f = f.splice_fn1_ctx(&self.location).into();
356        Optional::new(
357            self.location.clone(),
358            HydroNode::Map {
359                f,
360                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
361                metadata: self
362                    .location
363                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
364            },
365        )
366    }
367
368    /// Transforms the optional value by applying a function `f` to it and then flattening
369    /// the result into a stream, preserving the order of elements.
370    ///
371    /// If the optional is empty, the output stream is also empty. If the optional contains
372    /// a value, `f` is applied to produce an iterator, and all items from that iterator
373    /// are emitted in the output stream in deterministic order.
374    ///
375    /// The implementation of [`Iterator`] for the output type `I` must produce items in a
376    /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
377    /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
378    ///
379    /// # Example
380    /// ```rust
381    /// # #[cfg(feature = "deploy")] {
382    /// # use hydro_lang::prelude::*;
383    /// # use futures::StreamExt;
384    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
385    /// let tick = process.tick();
386    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
387    /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
388    /// # }, |mut stream| async move {
389    /// // 1, 2, 3
390    /// # for w in vec![1, 2, 3] {
391    /// #     assert_eq!(stream.next().await.unwrap(), w);
392    /// # }
393    /// # }));
394    /// # }
395    /// ```
396    pub fn flat_map_ordered<U, I, F>(
397        self,
398        f: impl IntoQuotedMut<'a, F, L>,
399    ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
400    where
401        B: IsBounded,
402        I: IntoIterator<Item = U>,
403        F: Fn(T) -> I + 'a,
404    {
405        self.into_stream().flat_map_ordered(f)
406    }
407
408    /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
409    /// for the output type `I` to produce items in any order.
410    ///
411    /// If the optional is empty, the output stream is also empty. If the optional contains
412    /// a value, `f` is applied to produce an iterator, and all items from that iterator
413    /// are emitted in the output stream in non-deterministic order.
414    ///
415    /// # Example
416    /// ```rust
417    /// # #[cfg(feature = "deploy")] {
418    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
419    /// # use futures::StreamExt;
420    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
421    /// let tick = process.tick();
422    /// let optional = tick.optional_first_tick(q!(
423    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
424    /// ));
425    /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
426    /// # }, |mut stream| async move {
427    /// // 1, 2, 3, but in no particular order
428    /// # let mut results = Vec::new();
429    /// # for _ in 0..3 {
430    /// #     results.push(stream.next().await.unwrap());
431    /// # }
432    /// # results.sort();
433    /// # assert_eq!(results, vec![1, 2, 3]);
434    /// # }));
435    /// # }
436    /// ```
437    pub fn flat_map_unordered<U, I, F>(
438        self,
439        f: impl IntoQuotedMut<'a, F, L>,
440    ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
441    where
442        B: IsBounded,
443        I: IntoIterator<Item = U>,
444        F: Fn(T) -> I + 'a,
445    {
446        self.into_stream().flat_map_unordered(f)
447    }
448
449    /// Flattens the optional value into a stream, preserving the order of elements.
450    ///
451    /// If the optional is empty, the output stream is also empty. If the optional contains
452    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
453    /// in the output stream in deterministic order.
454    ///
455    /// The implementation of [`Iterator`] for the element type `T` must produce items in a
456    /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
457    /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
458    ///
459    /// # Example
460    /// ```rust
461    /// # #[cfg(feature = "deploy")] {
462    /// # use hydro_lang::prelude::*;
463    /// # use futures::StreamExt;
464    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465    /// let tick = process.tick();
466    /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
467    /// optional.flatten_ordered().all_ticks()
468    /// # }, |mut stream| async move {
469    /// // 1, 2, 3
470    /// # for w in vec![1, 2, 3] {
471    /// #     assert_eq!(stream.next().await.unwrap(), w);
472    /// # }
473    /// # }));
474    /// # }
475    /// ```
476    pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
477    where
478        B: IsBounded,
479        T: IntoIterator<Item = U>,
480    {
481        self.flat_map_ordered(q!(|v| v))
482    }
483
484    /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
485    /// for the element type `T` to produce items in any order.
486    ///
487    /// If the optional is empty, the output stream is also empty. If the optional contains
488    /// a value that implements [`IntoIterator`], all items from that iterator are emitted
489    /// in the output stream in non-deterministic order.
490    ///
491    /// # Example
492    /// ```rust
493    /// # #[cfg(feature = "deploy")] {
494    /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
495    /// # use futures::StreamExt;
496    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
497    /// let tick = process.tick();
498    /// let optional = tick.optional_first_tick(q!(
499    ///     std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
500    /// ));
501    /// optional.flatten_unordered().all_ticks()
502    /// # }, |mut stream| async move {
503    /// // 1, 2, 3, but in no particular order
504    /// # let mut results = Vec::new();
505    /// # for _ in 0..3 {
506    /// #     results.push(stream.next().await.unwrap());
507    /// # }
508    /// # results.sort();
509    /// # assert_eq!(results, vec![1, 2, 3]);
510    /// # }));
511    /// # }
512    /// ```
513    pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
514    where
515        B: IsBounded,
516        T: IntoIterator<Item = U>,
517    {
518        self.flat_map_unordered(q!(|v| v))
519    }
520
521    /// Creates an optional containing only the value if it satisfies a predicate `f`.
522    ///
523    /// If the optional is empty, the output optional is also empty. If the optional contains
524    /// a value and the predicate returns `true`, the output optional contains the same value.
525    /// If the predicate returns `false`, the output optional is empty.
526    ///
527    /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
528    /// not modify or take ownership of the value. If you need to modify the value while filtering
529    /// use [`Optional::filter_map`] instead.
530    ///
531    /// # Example
532    /// ```rust
533    /// # #[cfg(feature = "deploy")] {
534    /// # use hydro_lang::prelude::*;
535    /// # use futures::StreamExt;
536    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
537    /// let tick = process.tick();
538    /// let optional = tick.optional_first_tick(q!(5));
539    /// optional.filter(q!(|&x| x > 3)).all_ticks()
540    /// # }, |mut stream| async move {
541    /// // 5
542    /// # assert_eq!(stream.next().await.unwrap(), 5);
543    /// # }));
544    /// # }
545    /// ```
546    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
547    where
548        F: Fn(&T) -> bool + 'a,
549    {
550        let f = f.splice_fn1_borrow_ctx(&self.location).into();
551        Optional::new(
552            self.location.clone(),
553            HydroNode::Filter {
554                f,
555                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
556                metadata: self.location.new_node_metadata(Self::collection_kind()),
557            },
558        )
559    }
560
561    /// An operator that both filters and maps. It yields only the value if the supplied
562    /// closure `f` returns `Some(value)`.
563    ///
564    /// If the optional is empty, the output optional is also empty. If the optional contains
565    /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
566    /// If the closure returns `None`, the output optional is empty.
567    ///
568    /// # Example
569    /// ```rust
570    /// # #[cfg(feature = "deploy")] {
571    /// # use hydro_lang::prelude::*;
572    /// # use futures::StreamExt;
573    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
574    /// let tick = process.tick();
575    /// let optional = tick.optional_first_tick(q!("42"));
576    /// optional
577    ///     .filter_map(q!(|s| s.parse::<i32>().ok()))
578    ///     .all_ticks()
579    /// # }, |mut stream| async move {
580    /// // 42
581    /// # assert_eq!(stream.next().await.unwrap(), 42);
582    /// # }));
583    /// # }
584    /// ```
585    pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
586    where
587        F: Fn(T) -> Option<U> + 'a,
588    {
589        let f = f.splice_fn1_ctx(&self.location).into();
590        Optional::new(
591            self.location.clone(),
592            HydroNode::FilterMap {
593                f,
594                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
595                metadata: self
596                    .location
597                    .new_node_metadata(Optional::<U, L, B>::collection_kind()),
598            },
599        )
600    }
601
602    /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
603    ///
604    /// If the other value is a [`Optional`], the output will be non-null only if the argument is
605    /// non-null. This is useful for combining several pieces of state together.
606    ///
607    /// # Example
608    /// ```rust
609    /// # #[cfg(feature = "deploy")] {
610    /// # use hydro_lang::prelude::*;
611    /// # use futures::StreamExt;
612    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613    /// let tick = process.tick();
614    /// let numbers = process
615    ///   .source_iter(q!(vec![123, 456, 789]))
616    ///   .batch(&tick, nondet!(/** test */));
617    /// let min = numbers.clone().min(); // Optional
618    /// let max = numbers.max(); // Optional
619    /// min.zip(max).all_ticks()
620    /// # }, |mut stream| async move {
621    /// // [(123, 789)]
622    /// # for w in vec![(123, 789)] {
623    /// #     assert_eq!(stream.next().await.unwrap(), w);
624    /// # }
625    /// # }));
626    /// # }
627    /// ```
628    pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
629    where
630        B: IsBounded,
631    {
632        let other: Optional<O, L, B> = other.into();
633        check_matching_location(&self.location, &other.location);
634
635        if L::is_top_level()
636            && let Some(tick) = self.location.try_tick()
637        {
638            let out = zip_inside_tick(
639                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
641            )
642            .latest();
643
644            Optional::new(
645                out.location.clone(),
646                out.ir_node.replace(HydroNode::Placeholder),
647            )
648        } else {
649            zip_inside_tick(self, other)
650        }
651    }
652
653    /// Passes through `self` when it has a value, otherwise passes through `other`.
654    ///
655    /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
656    /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
657    /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
658    ///
659    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
660    /// of the inputs change (including to/from null states).
661    ///
662    /// # Example
663    /// ```rust
664    /// # #[cfg(feature = "deploy")] {
665    /// # use hydro_lang::prelude::*;
666    /// # use futures::StreamExt;
667    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668    /// let tick = process.tick();
669    /// // ticks are lazy by default, forces the second tick to run
670    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
671    ///
672    /// let some_first_tick = tick.optional_first_tick(q!(123));
673    /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
674    /// some_first_tick.or(some_second_tick).all_ticks()
675    /// # }, |mut stream| async move {
676    /// // [123 /* first tick */, 456 /* second tick */]
677    /// # for w in vec![123, 456] {
678    /// #     assert_eq!(stream.next().await.unwrap(), w);
679    /// # }
680    /// # }));
681    /// # }
682    /// ```
683    pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
684        check_matching_location(&self.location, &other.location);
685
686        if L::is_top_level()
687            && !B::BOUNDED // only if unbounded we need to use a tick
688            && let Some(tick) = self.location.try_tick()
689        {
690            let out = or_inside_tick(
691                self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
692                other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693            )
694            .latest();
695
696            Optional::new(
697                out.location.clone(),
698                out.ir_node.replace(HydroNode::Placeholder),
699            )
700        } else {
701            Optional::new(
702                self.location.clone(),
703                HydroNode::ChainFirst {
704                    first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
705                    second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
706                    metadata: self.location.new_node_metadata(Self::collection_kind()),
707                },
708            )
709        }
710    }
711
712    /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
713    ///
714    /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
715    /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
716    ///
717    /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
718    /// of the inputs change (including to/from null states).
719    ///
720    /// # Example
721    /// ```rust
722    /// # #[cfg(feature = "deploy")] {
723    /// # use hydro_lang::prelude::*;
724    /// # use futures::StreamExt;
725    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
726    /// let tick = process.tick();
727    /// // ticks are lazy by default, forces the later ticks to run
728    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
729    ///
730    /// let some_first_tick = tick.optional_first_tick(q!(123));
731    /// some_first_tick
732    ///     .unwrap_or(tick.singleton(q!(456)))
733    ///     .all_ticks()
734    /// # }, |mut stream| async move {
735    /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
736    /// # for w in vec![123, 456, 456, 456] {
737    /// #     assert_eq!(stream.next().await.unwrap(), w);
738    /// # }
739    /// # }));
740    /// # }
741    /// ```
742    pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
743        let res_option = self.or(other.into());
744        Singleton::new(
745            res_option.location.clone(),
746            HydroNode::Cast {
747                inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
748                metadata: res_option
749                    .location
750                    .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
751            },
752        )
753    }
754
755    /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
756    ///
757    /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
758    /// [`Optional`] when the default value of the type is a suitable fallback.
759    ///
760    /// # Example
761    /// ```rust
762    /// # #[cfg(feature = "deploy")] {
763    /// # use hydro_lang::prelude::*;
764    /// # use futures::StreamExt;
765    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
766    /// let tick = process.tick();
767    /// // ticks are lazy by default, forces the later ticks to run
768    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
769    ///
770    /// let some_first_tick = tick.optional_first_tick(q!(123i32));
771    /// some_first_tick.unwrap_or_default().all_ticks()
772    /// # }, |mut stream| async move {
773    /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
774    /// # for w in vec![123, 0, 0, 0] {
775    /// #     assert_eq!(stream.next().await.unwrap(), w);
776    /// # }
777    /// # }));
778    /// # }
779    /// ```
780    pub fn unwrap_or_default(self) -> Singleton<T, L, B>
781    where
782        T: Default + Clone,
783    {
784        self.into_singleton().map(q!(|v| v.unwrap_or_default()))
785    }
786
787    /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
788    ///
789    /// Useful for writing custom Rust code that needs to interact with both the null and non-null
790    /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
791    /// so that Hydro can skip any computation on null values.
792    ///
793    /// # Example
794    /// ```rust
795    /// # #[cfg(feature = "deploy")] {
796    /// # use hydro_lang::prelude::*;
797    /// # use futures::StreamExt;
798    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
799    /// let tick = process.tick();
800    /// // ticks are lazy by default, forces the later ticks to run
801    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
802    ///
803    /// let some_first_tick = tick.optional_first_tick(q!(123));
804    /// some_first_tick.into_singleton().all_ticks()
805    /// # }, |mut stream| async move {
806    /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
807    /// # for w in vec![Some(123), None, None, None] {
808    /// #     assert_eq!(stream.next().await.unwrap(), w);
809    /// # }
810    /// # }));
811    /// # }
812    /// ```
813    pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
814    where
815        T: Clone,
816    {
817        let none: syn::Expr = parse_quote!(::std::option::Option::None);
818
819        let none_singleton = Singleton::new(
820            self.location.clone(),
821            HydroNode::SingletonSource {
822                value: none.into(),
823                first_tick_only: false,
824                metadata: self
825                    .location
826                    .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
827            },
828        );
829
830        self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
831    }
832
833    /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
834    ///
835    /// # Example
836    /// ```rust
837    /// # #[cfg(feature = "deploy")] {
838    /// # use hydro_lang::prelude::*;
839    /// # use futures::StreamExt;
840    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
841    /// let tick = process.tick();
842    /// // ticks are lazy by default, forces the second tick to run
843    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
844    ///
845    /// let some_first_tick = tick.optional_first_tick(q!(42));
846    /// some_first_tick.is_some().all_ticks()
847    /// # }, |mut stream| async move {
848    /// // [true /* first tick */, false /* second tick */, ...]
849    /// # for w in vec![true, false] {
850    /// #     assert_eq!(stream.next().await.unwrap(), w);
851    /// # }
852    /// # }));
853    /// # }
854    /// ```
855    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
856    pub fn is_some(self) -> Singleton<bool, L, B> {
857        self.map(q!(|_| ()))
858            .into_singleton()
859            .map(q!(|o| o.is_some()))
860    }
861
862    /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
863    ///
864    /// # Example
865    /// ```rust
866    /// # #[cfg(feature = "deploy")] {
867    /// # use hydro_lang::prelude::*;
868    /// # use futures::StreamExt;
869    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
870    /// let tick = process.tick();
871    /// // ticks are lazy by default, forces the second tick to run
872    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
873    ///
874    /// let some_first_tick = tick.optional_first_tick(q!(42));
875    /// some_first_tick.is_none().all_ticks()
876    /// # }, |mut stream| async move {
877    /// // [false /* first tick */, true /* second tick */, ...]
878    /// # for w in vec![false, true] {
879    /// #     assert_eq!(stream.next().await.unwrap(), w);
880    /// # }
881    /// # }));
882    /// # }
883    /// ```
884    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
885    pub fn is_none(self) -> Singleton<bool, L, B> {
886        self.map(q!(|_| ()))
887            .into_singleton()
888            .map(q!(|o| o.is_none()))
889    }
890
891    /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
892    /// values are equal, `false` otherwise (including when either is null).
893    ///
894    /// # Example
895    /// ```rust
896    /// # #[cfg(feature = "deploy")] {
897    /// # use hydro_lang::prelude::*;
898    /// # use futures::StreamExt;
899    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
900    /// let tick = process.tick();
901    /// // ticks are lazy by default, forces the second tick to run
902    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
903    ///
904    /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
905    /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
906    /// a.is_some_and_equals(b).all_ticks()
907    /// # }, |mut stream| async move {
908    /// // [true, false]
909    /// # for w in vec![true, false] {
910    /// #     assert_eq!(stream.next().await.unwrap(), w);
911    /// # }
912    /// # }));
913    /// # }
914    /// ```
915    #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
916    pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
917    where
918        T: PartialEq + Clone,
919        B: IsBounded,
920    {
921        self.into_singleton()
922            .zip(other.into_singleton())
923            .map(q!(|(a, b)| a.is_some() && a == b))
924    }
925
926    /// An operator which allows you to "name" a `HydroNode`.
927    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
928    pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
929        {
930            let mut node = self.ir_node.borrow_mut();
931            let metadata = node.metadata_mut();
932            metadata.tag = Some(name.to_owned());
933        }
934        self
935    }
936
937    /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
938    /// implies that `B == Bounded`.
939    pub fn make_bounded(self) -> Optional<T, L, Bounded>
940    where
941        B: IsBounded,
942    {
943        Optional::new(
944            self.location.clone(),
945            self.ir_node.replace(HydroNode::Placeholder),
946        )
947    }
948
949    /// Clones this bounded optional into a tick, returning a optional that has the
950    /// same value as the outer optional. Because the outer optional is bounded, this
951    /// is deterministic because there is only a single immutable version.
952    pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
953    where
954        B: IsBounded,
955        T: Clone,
956    {
957        // TODO(shadaj): avoid printing simulator logs for this snapshot
958        self.snapshot(
959            tick,
960            nondet!(/** bounded top-level optional so deterministic */),
961        )
962    }
963
964    /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
965    /// non-null. Otherwise, the stream is empty.
966    ///
967    /// # Example
968    /// ```rust
969    /// # #[cfg(feature = "deploy")] {
970    /// # use hydro_lang::prelude::*;
971    /// # use futures::StreamExt;
972    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
973    /// # let tick = process.tick();
974    /// # // ticks are lazy by default, forces the second tick to run
975    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
976    /// # let batch_first_tick = process
977    /// #   .source_iter(q!(vec![]))
978    /// #   .batch(&tick, nondet!(/** test */));
979    /// # let batch_second_tick = process
980    /// #   .source_iter(q!(vec![123, 456]))
981    /// #   .batch(&tick, nondet!(/** test */))
982    /// #   .defer_tick(); // appears on the second tick
983    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
984    /// input_batch // first tick: [], second tick: [123, 456]
985    ///     .clone()
986    ///     .max()
987    ///     .into_stream()
988    ///     .chain(input_batch)
989    ///     .all_ticks()
990    /// # }, |mut stream| async move {
991    /// // [456, 123, 456]
992    /// # for w in vec![456, 123, 456] {
993    /// #     assert_eq!(stream.next().await.unwrap(), w);
994    /// # }
995    /// # }));
996    /// # }
997    /// ```
998    pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
999    where
1000        B: IsBounded,
1001    {
1002        Stream::new(
1003            self.location.clone(),
1004            HydroNode::Cast {
1005                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1006                metadata: self.location.new_node_metadata(Stream::<
1007                    T,
1008                    Tick<L>,
1009                    Bounded,
1010                    TotalOrder,
1011                    ExactlyOnce,
1012                >::collection_kind()),
1013            },
1014        )
1015    }
1016
1017    /// Filters this optional, passing through the value if the boolean signal is `true`,
1018    /// otherwise the output is null.
1019    ///
1020    /// # Example
1021    /// ```rust
1022    /// # #[cfg(feature = "deploy")] {
1023    /// # use hydro_lang::prelude::*;
1024    /// # use futures::StreamExt;
1025    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1026    /// let tick = process.tick();
1027    /// // ticks are lazy by default, forces the second tick to run
1028    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1029    ///
1030    /// let some_first_tick = tick.optional_first_tick(q!(()));
1031    /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1032    /// let batch_first_tick = process
1033    ///   .source_iter(q!(vec![456]))
1034    ///   .batch(&tick, nondet!(/** test */));
1035    /// let batch_second_tick = process
1036    ///   .source_iter(q!(vec![789]))
1037    ///   .batch(&tick, nondet!(/** test */))
1038    ///   .defer_tick();
1039    /// batch_first_tick.chain(batch_second_tick).first()
1040    ///   .filter_if(signal)
1041    ///   .unwrap_or(tick.singleton(q!(0)))
1042    ///   .all_ticks()
1043    /// # }, |mut stream| async move {
1044    /// // [456, 0]
1045    /// # for w in vec![456, 0] {
1046    /// #     assert_eq!(stream.next().await.unwrap(), w);
1047    /// # }
1048    /// # }));
1049    /// # }
1050    /// ```
1051    pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1052    where
1053        B: IsBounded,
1054    {
1055        self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1056    }
1057
1058    /// Filters this optional, passing through the optional value if it is non-null **and** the
1059    /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1060    ///
1061    /// Useful for conditionally processing, such as only emitting an optional's value outside
1062    /// a tick if some other condition is satisfied.
1063    ///
1064    /// # Example
1065    /// ```rust
1066    /// # #[cfg(feature = "deploy")] {
1067    /// # use hydro_lang::prelude::*;
1068    /// # use futures::StreamExt;
1069    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1070    /// let tick = process.tick();
1071    /// // ticks are lazy by default, forces the second tick to run
1072    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1073    ///
1074    /// let batch_first_tick = process
1075    ///   .source_iter(q!(vec![]))
1076    ///   .batch(&tick, nondet!(/** test */));
1077    /// let batch_second_tick = process
1078    ///   .source_iter(q!(vec![456]))
1079    ///   .batch(&tick, nondet!(/** test */))
1080    ///   .defer_tick(); // appears on the second tick
1081    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1082    /// batch_first_tick.chain(batch_second_tick).first()
1083    ///   .filter_if_some(some_on_first_tick)
1084    ///   .unwrap_or(tick.singleton(q!(789)))
1085    ///   .all_ticks()
1086    /// # }, |mut stream| async move {
1087    /// // [789, 789]
1088    /// # for w in vec![789, 789] {
1089    /// #     assert_eq!(stream.next().await.unwrap(), w);
1090    /// # }
1091    /// # }));
1092    /// # }
1093    /// ```
1094    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1095    pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1096    where
1097        B: IsBounded,
1098    {
1099        self.filter_if(signal.is_some())
1100    }
1101
1102    /// Filters this optional, passing through the optional value if it is non-null **and** the
1103    /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1104    ///
1105    /// Useful for conditionally processing, such as only emitting an optional's value outside
1106    /// a tick if some other condition is satisfied.
1107    ///
1108    /// # Example
1109    /// ```rust
1110    /// # #[cfg(feature = "deploy")] {
1111    /// # use hydro_lang::prelude::*;
1112    /// # use futures::StreamExt;
1113    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1114    /// let tick = process.tick();
1115    /// // ticks are lazy by default, forces the second tick to run
1116    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1117    ///
1118    /// let batch_first_tick = process
1119    ///   .source_iter(q!(vec![]))
1120    ///   .batch(&tick, nondet!(/** test */));
1121    /// let batch_second_tick = process
1122    ///   .source_iter(q!(vec![456]))
1123    ///   .batch(&tick, nondet!(/** test */))
1124    ///   .defer_tick(); // appears on the second tick
1125    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1126    /// batch_first_tick.chain(batch_second_tick).first()
1127    ///   .filter_if_none(some_on_first_tick)
1128    ///   .unwrap_or(tick.singleton(q!(789)))
1129    ///   .all_ticks()
1130    /// # }, |mut stream| async move {
1131    /// // [789, 789]
1132    /// # for w in vec![789, 456] {
1133    /// #     assert_eq!(stream.next().await.unwrap(), w);
1134    /// # }
1135    /// # }));
1136    /// # }
1137    /// ```
1138    #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1139    pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1140    where
1141        B: IsBounded,
1142    {
1143        self.filter_if(other.is_none())
1144    }
1145
1146    /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1147    ///
1148    /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1149    /// having a value, such as only releasing a piece of state if the node is the leader.
1150    ///
1151    /// # Example
1152    /// ```rust
1153    /// # #[cfg(feature = "deploy")] {
1154    /// # use hydro_lang::prelude::*;
1155    /// # use futures::StreamExt;
1156    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1157    /// let tick = process.tick();
1158    /// // ticks are lazy by default, forces the second tick to run
1159    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1160    ///
1161    /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1162    /// some_on_first_tick
1163    ///     .if_some_then(tick.singleton(q!(456)))
1164    ///     .unwrap_or(tick.singleton(q!(123)))
1165    /// # .all_ticks()
1166    /// # }, |mut stream| async move {
1167    /// // 456 (first tick) ~> 123 (second tick onwards)
1168    /// # for w in vec![456, 123, 123] {
1169    /// #     assert_eq!(stream.next().await.unwrap(), w);
1170    /// # }
1171    /// # }));
1172    /// # }
1173    /// ```
1174    #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1175    pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1176    where
1177        B: IsBounded,
1178    {
1179        value.filter_if(self.is_some())
1180    }
1181}
1182
1183impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1184where
1185    L: Location<'a>,
1186{
1187    /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1188    /// key-value pair of this [`Optional`].
1189    ///
1190    /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1191    /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1192    /// the entry will be updated and appear / disappear according to the state of the
1193    /// [`Optional`].
1194    pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1195        KeyedSingleton::new(
1196            self.location.clone(),
1197            HydroNode::Cast {
1198                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199                metadata: self
1200                    .location
1201                    .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1202            },
1203        )
1204    }
1205}
1206
1207impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1208where
1209    L: Location<'a> + NoTick,
1210{
1211    /// Returns an optional value corresponding to the latest snapshot of the optional
1212    /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1213    /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1214    /// all snapshots of this optional into the atomic-associated tick will observe the
1215    /// same value each tick.
1216    ///
1217    /// # Non-Determinism
1218    /// Because this picks a snapshot of a optional whose value is continuously changing,
1219    /// the output optional has a non-deterministic value since the snapshot can be at an
1220    /// arbitrary point in time.
1221    pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1222        Optional::new(
1223            tick.clone(),
1224            HydroNode::Batch {
1225                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1226                metadata: tick
1227                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1228            },
1229        )
1230    }
1231
1232    /// Returns this optional back into a top-level, asynchronous execution context where updates
1233    /// to the value will be asynchronously propagated.
1234    pub fn end_atomic(self) -> Optional<T, L, B> {
1235        Optional::new(
1236            self.location.tick.l.clone(),
1237            HydroNode::EndAtomic {
1238                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1239                metadata: self
1240                    .location
1241                    .tick
1242                    .l
1243                    .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1244            },
1245        )
1246    }
1247}
1248
1249impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1250where
1251    L: Location<'a>,
1252{
1253    /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1254    /// will observe the same version of the value and will be executed synchronously before any
1255    /// outputs are yielded (in [`Optional::end_atomic`]).
1256    ///
1257    /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1258    /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1259    /// a different version).
1260    pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1261        let id = self.location.flow_state().borrow_mut().next_clock_id();
1262        let out_location = Atomic {
1263            tick: Tick {
1264                id,
1265                l: self.location.clone(),
1266            },
1267        };
1268        Optional::new(
1269            out_location.clone(),
1270            HydroNode::BeginAtomic {
1271                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1272                metadata: out_location
1273                    .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1274            },
1275        )
1276    }
1277
1278    /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1279    /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1280    /// relevant data that contributed to the snapshot at tick `t`.
1281    ///
1282    /// # Non-Determinism
1283    /// Because this picks a snapshot of a optional whose value is continuously changing,
1284    /// the output optional has a non-deterministic value since the snapshot can be at an
1285    /// arbitrary point in time.
1286    pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1287        assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1288        Optional::new(
1289            tick.clone(),
1290            HydroNode::Batch {
1291                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1292                metadata: tick
1293                    .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1294            },
1295        )
1296    }
1297
1298    /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1299    /// with order corresponding to increasing prefixes of data contributing to the optional.
1300    ///
1301    /// # Non-Determinism
1302    /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1303    /// to non-deterministic batching and arrival of inputs, the output stream is
1304    /// non-deterministic.
1305    pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1306    where
1307        L: NoTick,
1308    {
1309        let tick = self.location.tick();
1310        self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1311    }
1312
1313    /// Given a time interval, returns a stream corresponding to snapshots of the optional
1314    /// value taken at various points in time. Because the input optional may be
1315    /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1316    /// represent the value of the optional given some prefix of the streams leading up to
1317    /// it.
1318    ///
1319    /// # Non-Determinism
1320    /// The output stream is non-deterministic in which elements are sampled, since this
1321    /// is controlled by a clock.
1322    pub fn sample_every(
1323        self,
1324        interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1325        nondet: NonDet,
1326    ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1327    where
1328        L: NoTick + NoAtomic,
1329    {
1330        let samples = self.location.source_interval(interval, nondet);
1331        let tick = self.location.tick();
1332
1333        self.snapshot(&tick, nondet)
1334            .filter_if(samples.batch(&tick, nondet).first().is_some())
1335            .all_ticks()
1336            .weaken_retries()
1337    }
1338}
1339
1340impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1341where
1342    L: Location<'a>,
1343{
1344    /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1345    /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1346    /// null values).
1347    ///
1348    /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1349    /// producing one element in the output for each (non-null) tick. This is useful for batched
1350    /// computations, where the results from each tick must be combined together.
1351    ///
1352    /// # Example
1353    /// ```rust
1354    /// # #[cfg(feature = "deploy")] {
1355    /// # use hydro_lang::prelude::*;
1356    /// # use futures::StreamExt;
1357    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1358    /// # let tick = process.tick();
1359    /// # // ticks are lazy by default, forces the second tick to run
1360    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1361    /// # let batch_first_tick = process
1362    /// #   .source_iter(q!(vec![]))
1363    /// #   .batch(&tick, nondet!(/** test */));
1364    /// # let batch_second_tick = process
1365    /// #   .source_iter(q!(vec![1, 2, 3]))
1366    /// #   .batch(&tick, nondet!(/** test */))
1367    /// #   .defer_tick(); // appears on the second tick
1368    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1369    /// input_batch // first tick: [], second tick: [1, 2, 3]
1370    ///     .max()
1371    ///     .all_ticks()
1372    /// # }, |mut stream| async move {
1373    /// // [3]
1374    /// # for w in vec![3] {
1375    /// #     assert_eq!(stream.next().await.unwrap(), w);
1376    /// # }
1377    /// # }));
1378    /// # }
1379    /// ```
1380    pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1381        self.into_stream().all_ticks()
1382    }
1383
1384    /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1385    /// which will stream the value computed in _each_ tick as a separate stream element.
1386    ///
1387    /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1388    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1389    /// optional's [`Tick`] context.
1390    pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1391        self.into_stream().all_ticks_atomic()
1392    }
1393
1394    /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1395    /// be asynchronously updated with the latest value of the optional inside the tick, including
1396    /// whether the optional is null or not.
1397    ///
1398    /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1399    /// tick that tracks the inner value. This is useful for getting the value as of the
1400    /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1401    ///
1402    /// # Example
1403    /// ```rust
1404    /// # #[cfg(feature = "deploy")] {
1405    /// # use hydro_lang::prelude::*;
1406    /// # use futures::StreamExt;
1407    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1408    /// # let tick = process.tick();
1409    /// # // ticks are lazy by default, forces the second tick to run
1410    /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1411    /// # let batch_first_tick = process
1412    /// #   .source_iter(q!(vec![]))
1413    /// #   .batch(&tick, nondet!(/** test */));
1414    /// # let batch_second_tick = process
1415    /// #   .source_iter(q!(vec![1, 2, 3]))
1416    /// #   .batch(&tick, nondet!(/** test */))
1417    /// #   .defer_tick(); // appears on the second tick
1418    /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1419    /// input_batch // first tick: [], second tick: [1, 2, 3]
1420    ///     .max()
1421    ///     .latest()
1422    /// # .into_singleton()
1423    /// # .sample_eager(nondet!(/** test */))
1424    /// # }, |mut stream| async move {
1425    /// // asynchronously changes from None ~> 3
1426    /// # for w in vec![None, Some(3)] {
1427    /// #     assert_eq!(stream.next().await.unwrap(), w);
1428    /// # }
1429    /// # }));
1430    /// # }
1431    /// ```
1432    pub fn latest(self) -> Optional<T, L, Unbounded> {
1433        Optional::new(
1434            self.location.outer().clone(),
1435            HydroNode::YieldConcat {
1436                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1437                metadata: self
1438                    .location
1439                    .outer()
1440                    .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1441            },
1442        )
1443    }
1444
1445    /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1446    /// be updated with the latest value of the optional inside the tick.
1447    ///
1448    /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1449    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1450    /// optional's [`Tick`] context.
1451    pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1452        let out_location = Atomic {
1453            tick: self.location.clone(),
1454        };
1455
1456        Optional::new(
1457            out_location.clone(),
1458            HydroNode::YieldConcat {
1459                inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1460                metadata: out_location
1461                    .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1462            },
1463        )
1464    }
1465
1466    /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1467    /// always has the state of `self` at tick `T - 1`.
1468    ///
1469    /// At tick `0`, the output optional is null, since there is no previous tick.
1470    ///
1471    /// This operator enables stateful iterative processing with ticks, by sending data from one
1472    /// tick to the next. For example, you can use it to compare state across consecutive batches.
1473    ///
1474    /// # Example
1475    /// ```rust
1476    /// # #[cfg(feature = "deploy")] {
1477    /// # use hydro_lang::prelude::*;
1478    /// # use futures::StreamExt;
1479    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1480    /// let tick = process.tick();
1481    /// // ticks are lazy by default, forces the second tick to run
1482    /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1483    ///
1484    /// let batch_first_tick = process
1485    ///   .source_iter(q!(vec![1, 2]))
1486    ///   .batch(&tick, nondet!(/** test */));
1487    /// let batch_second_tick = process
1488    ///   .source_iter(q!(vec![3, 4]))
1489    ///   .batch(&tick, nondet!(/** test */))
1490    ///   .defer_tick(); // appears on the second tick
1491    /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1492    ///   .reduce(q!(|state, v| *state += v));
1493    ///
1494    /// current_tick_sum.clone().into_singleton().zip(
1495    ///   current_tick_sum.defer_tick().into_singleton() // state from previous tick
1496    /// ).all_ticks()
1497    /// # }, |mut stream| async move {
1498    /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1499    /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1500    /// #     assert_eq!(stream.next().await.unwrap(), w);
1501    /// # }
1502    /// # }));
1503    /// # }
1504    /// ```
1505    pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1506        Optional::new(
1507            self.location.clone(),
1508            HydroNode::DeferTick {
1509                input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1510                metadata: self.location.new_node_metadata(Self::collection_kind()),
1511            },
1512        )
1513    }
1514}
1515
1516#[cfg(test)]
1517mod tests {
1518    #[cfg(feature = "deploy")]
1519    use futures::StreamExt;
1520    #[cfg(feature = "deploy")]
1521    use hydro_deploy::Deployment;
1522    #[cfg(any(feature = "deploy", feature = "sim"))]
1523    use stageleft::q;
1524
1525    #[cfg(feature = "deploy")]
1526    use super::Optional;
1527    #[cfg(any(feature = "deploy", feature = "sim"))]
1528    use crate::compile::builder::FlowBuilder;
1529    #[cfg(any(feature = "deploy", feature = "sim"))]
1530    use crate::location::Location;
1531    #[cfg(feature = "deploy")]
1532    use crate::nondet::nondet;
1533
1534    #[cfg(feature = "deploy")]
1535    #[tokio::test]
1536    async fn optional_or_cardinality() {
1537        let mut deployment = Deployment::new();
1538
1539        let mut flow = FlowBuilder::new();
1540        let node = flow.process::<()>();
1541        let external = flow.external::<()>();
1542
1543        let node_tick = node.tick();
1544        let tick_singleton = node_tick.singleton(q!(123));
1545        let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1546        let counts = tick_optional_inhabited
1547            .clone()
1548            .or(tick_optional_inhabited)
1549            .into_stream()
1550            .count()
1551            .all_ticks()
1552            .send_bincode_external(&external);
1553
1554        let nodes = flow
1555            .with_process(&node, deployment.Localhost())
1556            .with_external(&external, deployment.Localhost())
1557            .deploy(&mut deployment);
1558
1559        deployment.deploy().await.unwrap();
1560
1561        let mut external_out = nodes.connect(counts).await;
1562
1563        deployment.start().await.unwrap();
1564
1565        assert_eq!(external_out.next().await.unwrap(), 1);
1566    }
1567
1568    #[cfg(feature = "deploy")]
1569    #[tokio::test]
1570    async fn into_singleton_top_level_none_cardinality() {
1571        let mut deployment = Deployment::new();
1572
1573        let mut flow = FlowBuilder::new();
1574        let node = flow.process::<()>();
1575        let external = flow.external::<()>();
1576
1577        let node_tick = node.tick();
1578        let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1579        let into_singleton = top_level_none.into_singleton();
1580
1581        let tick_driver = node.spin();
1582
1583        let counts = into_singleton
1584            .snapshot(&node_tick, nondet!(/** test */))
1585            .into_stream()
1586            .count()
1587            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1588            .map(q!(|(c, _)| c))
1589            .all_ticks()
1590            .send_bincode_external(&external);
1591
1592        let nodes = flow
1593            .with_process(&node, deployment.Localhost())
1594            .with_external(&external, deployment.Localhost())
1595            .deploy(&mut deployment);
1596
1597        deployment.deploy().await.unwrap();
1598
1599        let mut external_out = nodes.connect(counts).await;
1600
1601        deployment.start().await.unwrap();
1602
1603        assert_eq!(external_out.next().await.unwrap(), 1);
1604        assert_eq!(external_out.next().await.unwrap(), 1);
1605        assert_eq!(external_out.next().await.unwrap(), 1);
1606    }
1607
1608    #[cfg(feature = "deploy")]
1609    #[tokio::test]
1610    async fn into_singleton_unbounded_top_level_none_cardinality() {
1611        let mut deployment = Deployment::new();
1612
1613        let mut flow = FlowBuilder::new();
1614        let node = flow.process::<()>();
1615        let external = flow.external::<()>();
1616
1617        let node_tick = node.tick();
1618        let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1619        let into_singleton = top_level_none.into_singleton();
1620
1621        let tick_driver = node.spin();
1622
1623        let counts = into_singleton
1624            .snapshot(&node_tick, nondet!(/** test */))
1625            .into_stream()
1626            .count()
1627            .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1628            .map(q!(|(c, _)| c))
1629            .all_ticks()
1630            .send_bincode_external(&external);
1631
1632        let nodes = flow
1633            .with_process(&node, deployment.Localhost())
1634            .with_external(&external, deployment.Localhost())
1635            .deploy(&mut deployment);
1636
1637        deployment.deploy().await.unwrap();
1638
1639        let mut external_out = nodes.connect(counts).await;
1640
1641        deployment.start().await.unwrap();
1642
1643        assert_eq!(external_out.next().await.unwrap(), 1);
1644        assert_eq!(external_out.next().await.unwrap(), 1);
1645        assert_eq!(external_out.next().await.unwrap(), 1);
1646    }
1647
1648    #[cfg(feature = "sim")]
1649    #[test]
1650    fn top_level_optional_some_into_stream_no_replay() {
1651        let mut flow = FlowBuilder::new();
1652        let node = flow.process::<()>();
1653
1654        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1655        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1656        let filtered_some = folded.filter(q!(|_| true));
1657
1658        let out_recv = filtered_some.into_stream().sim_output();
1659
1660        flow.sim().exhaustive(async || {
1661            out_recv.assert_yields_only([10]).await;
1662        });
1663    }
1664
1665    #[cfg(feature = "sim")]
1666    #[test]
1667    fn top_level_optional_none_into_stream_no_replay() {
1668        let mut flow = FlowBuilder::new();
1669        let node = flow.process::<()>();
1670
1671        let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1672        let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1673        let filtered_none = folded.filter(q!(|_| false));
1674
1675        let out_recv = filtered_none.into_stream().sim_output();
1676
1677        flow.sim().exhaustive(async || {
1678            out_recv.assert_yields_only([] as [i32; 0]).await;
1679        });
1680    }
1681}