hydro_lang/live_collections/optional.rs
1//! Definitions for the [`Optional`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::Deref;
6use std::rc::Rc;
7
8use stageleft::{IntoQuotedMut, QuotedWithContext, q};
9use syn::parse_quote;
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::singleton::Singleton;
13use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
14use crate::compile::builder::{CycleId, FlowState};
15use crate::compile::ir::{CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode};
16#[cfg(stageleft_runtime)]
17use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
18use crate::forward_handle::{ForwardRef, TickCycle};
19use crate::live_collections::singleton::SingletonBound;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::{DynLocation, LocationId};
22use crate::location::tick::{Atomic, DeferTick, NoAtomic};
23use crate::location::{Location, NoTick, Tick, check_matching_location};
24use crate::nondet::{NonDet, nondet};
25use crate::prelude::KeyedSingleton;
26
27/// A *nullable* Rust value that can asynchronously change over time.
28///
29/// Optionals are the live collection equivalent of [`Option`]. If the optional is [`Bounded`],
30/// the value is frozen and will not change. But if it is [`Unbounded`], the value will
31/// asynchronously change over time, including becoming present of uninhabited.
32///
33/// Optionals are used in many of the same places as [`Singleton`], but when the value may be
34/// nullable. For example, the first element of a [`Stream`] is exposed as an [`Optional`].
35///
36/// Type Parameters:
37/// - `Type`: the type of the value in this optional (when it is not null)
38/// - `Loc`: the [`Location`] where the optional is materialized
39/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
40pub struct Optional<Type, Loc, Bound: Boundedness> {
41 pub(crate) location: Loc,
42 pub(crate) ir_node: RefCell<HydroNode>,
43 pub(crate) flow_state: FlowState,
44
45 _phantom: PhantomData<(Type, Loc, Bound)>,
46}
47
48impl<T, L, B: Boundedness> Drop for Optional<T, L, B> {
49 fn drop(&mut self) {
50 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
51 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
52 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
53 input: Box::new(ir_node),
54 op_metadata: HydroIrOpMetadata::new(),
55 });
56 }
57 }
58}
59
60impl<'a, T, L> From<Optional<T, L, Bounded>> for Optional<T, L, Unbounded>
61where
62 T: Clone,
63 L: Location<'a> + NoTick,
64{
65 fn from(value: Optional<T, L, Bounded>) -> Self {
66 let tick = value.location().tick();
67 value.clone_into_tick(&tick).latest()
68 }
69}
70
71impl<'a, T, L> DeferTick for Optional<T, Tick<L>, Bounded>
72where
73 L: Location<'a>,
74{
75 fn defer_tick(self) -> Self {
76 Optional::defer_tick(self)
77 }
78}
79
80impl<'a, T, L> CycleCollection<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
81where
82 L: Location<'a>,
83{
84 type Location = Tick<L>;
85
86 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
87 Optional::new(
88 location.clone(),
89 HydroNode::CycleSource {
90 cycle_id,
91 metadata: location.new_node_metadata(Self::collection_kind()),
92 },
93 )
94 }
95}
96
97impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
98where
99 L: Location<'a>,
100{
101 type Location = Tick<L>;
102
103 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
104 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
105 location.clone(),
106 HydroNode::DeferTick {
107 input: Box::new(HydroNode::CycleSource {
108 cycle_id,
109 metadata: location.new_node_metadata(Self::collection_kind()),
110 }),
111 metadata: location
112 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
113 },
114 );
115
116 from_previous_tick.or(initial.filter_if(location.optional_first_tick(q!(())).is_some()))
117 }
118}
119
120impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Optional<T, Tick<L>, Bounded>
121where
122 L: Location<'a>,
123{
124 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
125 assert_eq!(
126 Location::id(&self.location),
127 expected_location,
128 "locations do not match"
129 );
130 self.location
131 .flow_state()
132 .borrow_mut()
133 .push_root(HydroRoot::CycleSink {
134 cycle_id,
135 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
136 op_metadata: HydroIrOpMetadata::new(),
137 });
138 }
139}
140
141impl<'a, T, L> CycleCollection<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
142where
143 L: Location<'a>,
144{
145 type Location = Tick<L>;
146
147 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
148 Optional::new(
149 location.clone(),
150 HydroNode::CycleSource {
151 cycle_id,
152 metadata: location.new_node_metadata(Self::collection_kind()),
153 },
154 )
155 }
156}
157
158impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Optional<T, Tick<L>, Bounded>
159where
160 L: Location<'a>,
161{
162 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
163 assert_eq!(
164 Location::id(&self.location),
165 expected_location,
166 "locations do not match"
167 );
168 self.location
169 .flow_state()
170 .borrow_mut()
171 .push_root(HydroRoot::CycleSink {
172 cycle_id,
173 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
174 op_metadata: HydroIrOpMetadata::new(),
175 });
176 }
177}
178
179impl<'a, T, L, B: Boundedness> CycleCollection<'a, ForwardRef> for Optional<T, L, B>
180where
181 L: Location<'a> + NoTick,
182{
183 type Location = L;
184
185 fn create_source(cycle_id: CycleId, location: L) -> Self {
186 Optional::new(
187 location.clone(),
188 HydroNode::CycleSource {
189 cycle_id,
190 metadata: location.new_node_metadata(Self::collection_kind()),
191 },
192 )
193 }
194}
195
196impl<'a, T, L, B: Boundedness> ReceiverComplete<'a, ForwardRef> for Optional<T, L, B>
197where
198 L: Location<'a> + NoTick,
199{
200 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
201 assert_eq!(
202 Location::id(&self.location),
203 expected_location,
204 "locations do not match"
205 );
206 self.location
207 .flow_state()
208 .borrow_mut()
209 .push_root(HydroRoot::CycleSink {
210 cycle_id,
211 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
212 op_metadata: HydroIrOpMetadata::new(),
213 });
214 }
215}
216
217impl<'a, T, L, B: SingletonBound> From<Singleton<T, L, B>> for Optional<T, L, B::UnderlyingBound>
218where
219 L: Location<'a>,
220{
221 fn from(singleton: Singleton<T, L, B>) -> Self {
222 Optional::new(
223 singleton.location.clone(),
224 HydroNode::Cast {
225 inner: Box::new(singleton.ir_node.replace(HydroNode::Placeholder)),
226 metadata: singleton
227 .location
228 .new_node_metadata(Self::collection_kind()),
229 },
230 )
231 }
232}
233
234#[cfg(stageleft_runtime)]
235pub(super) fn zip_inside_tick<'a, T, O, L: Location<'a>, B: Boundedness>(
236 me: Optional<T, L, B>,
237 other: Optional<O, L, B>,
238) -> Optional<(T, O), L, B> {
239 check_matching_location(&me.location, &other.location);
240
241 Optional::new(
242 me.location.clone(),
243 HydroNode::CrossSingleton {
244 left: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
245 right: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
246 metadata: me
247 .location
248 .new_node_metadata(Optional::<(T, O), L, B>::collection_kind()),
249 },
250 )
251}
252
253#[cfg(stageleft_runtime)]
254fn or_inside_tick<'a, T, L: Location<'a>, B: Boundedness>(
255 me: Optional<T, L, B>,
256 other: Optional<T, L, B>,
257) -> Optional<T, L, B> {
258 check_matching_location(&me.location, &other.location);
259
260 Optional::new(
261 me.location.clone(),
262 HydroNode::ChainFirst {
263 first: Box::new(me.ir_node.replace(HydroNode::Placeholder)),
264 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
265 metadata: me
266 .location
267 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
268 },
269 )
270}
271
272impl<'a, T, L, B: Boundedness> Clone for Optional<T, L, B>
273where
274 T: Clone,
275 L: Location<'a>,
276{
277 fn clone(&self) -> Self {
278 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
279 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
280 *self.ir_node.borrow_mut() = HydroNode::Tee {
281 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
282 metadata: self.location.new_node_metadata(Self::collection_kind()),
283 };
284 }
285
286 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
287 Optional {
288 location: self.location.clone(),
289 flow_state: self.flow_state.clone(),
290 ir_node: HydroNode::Tee {
291 inner: SharedNode(inner.0.clone()),
292 metadata: metadata.clone(),
293 }
294 .into(),
295 _phantom: PhantomData,
296 }
297 } else {
298 unreachable!()
299 }
300 }
301}
302
303impl<'a, T, L, B: Boundedness> Optional<T, L, B>
304where
305 L: Location<'a>,
306{
307 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
308 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
309 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
310 let flow_state = location.flow_state().clone();
311 Optional {
312 location,
313 flow_state,
314 ir_node: RefCell::new(ir_node),
315 _phantom: PhantomData,
316 }
317 }
318
319 pub(crate) fn collection_kind() -> CollectionKind {
320 CollectionKind::Optional {
321 bound: B::BOUND_KIND,
322 element_type: stageleft::quote_type::<T>().into(),
323 }
324 }
325
326 /// Returns the [`Location`] where this optional is being materialized.
327 pub fn location(&self) -> &L {
328 &self.location
329 }
330
331 /// Transforms the optional value by applying a function `f` to it,
332 /// continuously as the input is updated.
333 ///
334 /// Whenever the optional is empty, the output optional is also empty.
335 ///
336 /// # Example
337 /// ```rust
338 /// # #[cfg(feature = "deploy")] {
339 /// # use hydro_lang::prelude::*;
340 /// # use futures::StreamExt;
341 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
342 /// let tick = process.tick();
343 /// let optional = tick.optional_first_tick(q!(1));
344 /// optional.map(q!(|v| v + 1)).all_ticks()
345 /// # }, |mut stream| async move {
346 /// // 2
347 /// # assert_eq!(stream.next().await.unwrap(), 2);
348 /// # }));
349 /// # }
350 /// ```
351 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
352 where
353 F: Fn(T) -> U + 'a,
354 {
355 let f = f.splice_fn1_ctx(&self.location).into();
356 Optional::new(
357 self.location.clone(),
358 HydroNode::Map {
359 f,
360 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
361 metadata: self
362 .location
363 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
364 },
365 )
366 }
367
368 /// Transforms the optional value by applying a function `f` to it and then flattening
369 /// the result into a stream, preserving the order of elements.
370 ///
371 /// If the optional is empty, the output stream is also empty. If the optional contains
372 /// a value, `f` is applied to produce an iterator, and all items from that iterator
373 /// are emitted in the output stream in deterministic order.
374 ///
375 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
376 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
377 /// If the order is not deterministic, use [`Optional::flat_map_unordered`] instead.
378 ///
379 /// # Example
380 /// ```rust
381 /// # #[cfg(feature = "deploy")] {
382 /// # use hydro_lang::prelude::*;
383 /// # use futures::StreamExt;
384 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
385 /// let tick = process.tick();
386 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
387 /// optional.flat_map_ordered(q!(|v| v)).all_ticks()
388 /// # }, |mut stream| async move {
389 /// // 1, 2, 3
390 /// # for w in vec![1, 2, 3] {
391 /// # assert_eq!(stream.next().await.unwrap(), w);
392 /// # }
393 /// # }));
394 /// # }
395 /// ```
396 pub fn flat_map_ordered<U, I, F>(
397 self,
398 f: impl IntoQuotedMut<'a, F, L>,
399 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
400 where
401 B: IsBounded,
402 I: IntoIterator<Item = U>,
403 F: Fn(T) -> I + 'a,
404 {
405 self.into_stream().flat_map_ordered(f)
406 }
407
408 /// Like [`Optional::flat_map_ordered`], but allows the implementation of [`Iterator`]
409 /// for the output type `I` to produce items in any order.
410 ///
411 /// If the optional is empty, the output stream is also empty. If the optional contains
412 /// a value, `f` is applied to produce an iterator, and all items from that iterator
413 /// are emitted in the output stream in non-deterministic order.
414 ///
415 /// # Example
416 /// ```rust
417 /// # #[cfg(feature = "deploy")] {
418 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
419 /// # use futures::StreamExt;
420 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
421 /// let tick = process.tick();
422 /// let optional = tick.optional_first_tick(q!(
423 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
424 /// ));
425 /// optional.flat_map_unordered(q!(|v| v)).all_ticks()
426 /// # }, |mut stream| async move {
427 /// // 1, 2, 3, but in no particular order
428 /// # let mut results = Vec::new();
429 /// # for _ in 0..3 {
430 /// # results.push(stream.next().await.unwrap());
431 /// # }
432 /// # results.sort();
433 /// # assert_eq!(results, vec![1, 2, 3]);
434 /// # }));
435 /// # }
436 /// ```
437 pub fn flat_map_unordered<U, I, F>(
438 self,
439 f: impl IntoQuotedMut<'a, F, L>,
440 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
441 where
442 B: IsBounded,
443 I: IntoIterator<Item = U>,
444 F: Fn(T) -> I + 'a,
445 {
446 self.into_stream().flat_map_unordered(f)
447 }
448
449 /// Flattens the optional value into a stream, preserving the order of elements.
450 ///
451 /// If the optional is empty, the output stream is also empty. If the optional contains
452 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
453 /// in the output stream in deterministic order.
454 ///
455 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
456 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
457 /// If the order is not deterministic, use [`Optional::flatten_unordered`] instead.
458 ///
459 /// # Example
460 /// ```rust
461 /// # #[cfg(feature = "deploy")] {
462 /// # use hydro_lang::prelude::*;
463 /// # use futures::StreamExt;
464 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
465 /// let tick = process.tick();
466 /// let optional = tick.optional_first_tick(q!(vec![1, 2, 3]));
467 /// optional.flatten_ordered().all_ticks()
468 /// # }, |mut stream| async move {
469 /// // 1, 2, 3
470 /// # for w in vec![1, 2, 3] {
471 /// # assert_eq!(stream.next().await.unwrap(), w);
472 /// # }
473 /// # }));
474 /// # }
475 /// ```
476 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
477 where
478 B: IsBounded,
479 T: IntoIterator<Item = U>,
480 {
481 self.flat_map_ordered(q!(|v| v))
482 }
483
484 /// Like [`Optional::flatten_ordered`], but allows the implementation of [`Iterator`]
485 /// for the element type `T` to produce items in any order.
486 ///
487 /// If the optional is empty, the output stream is also empty. If the optional contains
488 /// a value that implements [`IntoIterator`], all items from that iterator are emitted
489 /// in the output stream in non-deterministic order.
490 ///
491 /// # Example
492 /// ```rust
493 /// # #[cfg(feature = "deploy")] {
494 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
495 /// # use futures::StreamExt;
496 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
497 /// let tick = process.tick();
498 /// let optional = tick.optional_first_tick(q!(
499 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
500 /// ));
501 /// optional.flatten_unordered().all_ticks()
502 /// # }, |mut stream| async move {
503 /// // 1, 2, 3, but in no particular order
504 /// # let mut results = Vec::new();
505 /// # for _ in 0..3 {
506 /// # results.push(stream.next().await.unwrap());
507 /// # }
508 /// # results.sort();
509 /// # assert_eq!(results, vec![1, 2, 3]);
510 /// # }));
511 /// # }
512 /// ```
513 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
514 where
515 B: IsBounded,
516 T: IntoIterator<Item = U>,
517 {
518 self.flat_map_unordered(q!(|v| v))
519 }
520
521 /// Creates an optional containing only the value if it satisfies a predicate `f`.
522 ///
523 /// If the optional is empty, the output optional is also empty. If the optional contains
524 /// a value and the predicate returns `true`, the output optional contains the same value.
525 /// If the predicate returns `false`, the output optional is empty.
526 ///
527 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
528 /// not modify or take ownership of the value. If you need to modify the value while filtering
529 /// use [`Optional::filter_map`] instead.
530 ///
531 /// # Example
532 /// ```rust
533 /// # #[cfg(feature = "deploy")] {
534 /// # use hydro_lang::prelude::*;
535 /// # use futures::StreamExt;
536 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
537 /// let tick = process.tick();
538 /// let optional = tick.optional_first_tick(q!(5));
539 /// optional.filter(q!(|&x| x > 3)).all_ticks()
540 /// # }, |mut stream| async move {
541 /// // 5
542 /// # assert_eq!(stream.next().await.unwrap(), 5);
543 /// # }));
544 /// # }
545 /// ```
546 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B>
547 where
548 F: Fn(&T) -> bool + 'a,
549 {
550 let f = f.splice_fn1_borrow_ctx(&self.location).into();
551 Optional::new(
552 self.location.clone(),
553 HydroNode::Filter {
554 f,
555 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
556 metadata: self.location.new_node_metadata(Self::collection_kind()),
557 },
558 )
559 }
560
561 /// An operator that both filters and maps. It yields only the value if the supplied
562 /// closure `f` returns `Some(value)`.
563 ///
564 /// If the optional is empty, the output optional is also empty. If the optional contains
565 /// a value and the closure returns `Some(new_value)`, the output optional contains `new_value`.
566 /// If the closure returns `None`, the output optional is empty.
567 ///
568 /// # Example
569 /// ```rust
570 /// # #[cfg(feature = "deploy")] {
571 /// # use hydro_lang::prelude::*;
572 /// # use futures::StreamExt;
573 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
574 /// let tick = process.tick();
575 /// let optional = tick.optional_first_tick(q!("42"));
576 /// optional
577 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
578 /// .all_ticks()
579 /// # }, |mut stream| async move {
580 /// // 42
581 /// # assert_eq!(stream.next().await.unwrap(), 42);
582 /// # }));
583 /// # }
584 /// ```
585 pub fn filter_map<U, F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<U, L, B>
586 where
587 F: Fn(T) -> Option<U> + 'a,
588 {
589 let f = f.splice_fn1_ctx(&self.location).into();
590 Optional::new(
591 self.location.clone(),
592 HydroNode::FilterMap {
593 f,
594 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
595 metadata: self
596 .location
597 .new_node_metadata(Optional::<U, L, B>::collection_kind()),
598 },
599 )
600 }
601
602 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
603 ///
604 /// If the other value is a [`Optional`], the output will be non-null only if the argument is
605 /// non-null. This is useful for combining several pieces of state together.
606 ///
607 /// # Example
608 /// ```rust
609 /// # #[cfg(feature = "deploy")] {
610 /// # use hydro_lang::prelude::*;
611 /// # use futures::StreamExt;
612 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
613 /// let tick = process.tick();
614 /// let numbers = process
615 /// .source_iter(q!(vec![123, 456, 789]))
616 /// .batch(&tick, nondet!(/** test */));
617 /// let min = numbers.clone().min(); // Optional
618 /// let max = numbers.max(); // Optional
619 /// min.zip(max).all_ticks()
620 /// # }, |mut stream| async move {
621 /// // [(123, 789)]
622 /// # for w in vec![(123, 789)] {
623 /// # assert_eq!(stream.next().await.unwrap(), w);
624 /// # }
625 /// # }));
626 /// # }
627 /// ```
628 pub fn zip<O>(self, other: impl Into<Optional<O, L, B>>) -> Optional<(T, O), L, B>
629 where
630 B: IsBounded,
631 {
632 let other: Optional<O, L, B> = other.into();
633 check_matching_location(&self.location, &other.location);
634
635 if L::is_top_level()
636 && let Some(tick) = self.location.try_tick()
637 {
638 let out = zip_inside_tick(
639 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
640 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
641 )
642 .latest();
643
644 Optional::new(
645 out.location.clone(),
646 out.ir_node.replace(HydroNode::Placeholder),
647 )
648 } else {
649 zip_inside_tick(self, other)
650 }
651 }
652
653 /// Passes through `self` when it has a value, otherwise passes through `other`.
654 ///
655 /// Like [`Option::or`], this is helpful for defining a fallback for an [`Optional`], when the
656 /// fallback itself is an [`Optional`]. If the fallback is a [`Singleton`], you can use
657 /// [`Optional::unwrap_or`] to ensure that the output is always non-null.
658 ///
659 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
660 /// of the inputs change (including to/from null states).
661 ///
662 /// # Example
663 /// ```rust
664 /// # #[cfg(feature = "deploy")] {
665 /// # use hydro_lang::prelude::*;
666 /// # use futures::StreamExt;
667 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
668 /// let tick = process.tick();
669 /// // ticks are lazy by default, forces the second tick to run
670 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
671 ///
672 /// let some_first_tick = tick.optional_first_tick(q!(123));
673 /// let some_second_tick = tick.optional_first_tick(q!(456)).defer_tick();
674 /// some_first_tick.or(some_second_tick).all_ticks()
675 /// # }, |mut stream| async move {
676 /// // [123 /* first tick */, 456 /* second tick */]
677 /// # for w in vec![123, 456] {
678 /// # assert_eq!(stream.next().await.unwrap(), w);
679 /// # }
680 /// # }));
681 /// # }
682 /// ```
683 pub fn or(self, other: Optional<T, L, B>) -> Optional<T, L, B> {
684 check_matching_location(&self.location, &other.location);
685
686 if L::is_top_level()
687 && !B::BOUNDED // only if unbounded we need to use a tick
688 && let Some(tick) = self.location.try_tick()
689 {
690 let out = or_inside_tick(
691 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
692 other.snapshot(&tick, nondet!(/** eventually stabilizes */)),
693 )
694 .latest();
695
696 Optional::new(
697 out.location.clone(),
698 out.ir_node.replace(HydroNode::Placeholder),
699 )
700 } else {
701 Optional::new(
702 self.location.clone(),
703 HydroNode::ChainFirst {
704 first: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
705 second: Box::new(other.ir_node.replace(HydroNode::Placeholder)),
706 metadata: self.location.new_node_metadata(Self::collection_kind()),
707 },
708 )
709 }
710 }
711
712 /// Gets the contents of `self` when it has a value, otherwise passes through `other`.
713 ///
714 /// Like [`Option::unwrap_or`], this is helpful for defining a fallback for an [`Optional`].
715 /// If the fallback is not always defined (an [`Optional`]), you can use [`Optional::or`].
716 ///
717 /// If the inputs are [`Unbounded`], the output will be asynchronously updated as the contents
718 /// of the inputs change (including to/from null states).
719 ///
720 /// # Example
721 /// ```rust
722 /// # #[cfg(feature = "deploy")] {
723 /// # use hydro_lang::prelude::*;
724 /// # use futures::StreamExt;
725 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
726 /// let tick = process.tick();
727 /// // ticks are lazy by default, forces the later ticks to run
728 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
729 ///
730 /// let some_first_tick = tick.optional_first_tick(q!(123));
731 /// some_first_tick
732 /// .unwrap_or(tick.singleton(q!(456)))
733 /// .all_ticks()
734 /// # }, |mut stream| async move {
735 /// // [123 /* first tick */, 456 /* second tick */, 456 /* third tick */, 456, ...]
736 /// # for w in vec![123, 456, 456, 456] {
737 /// # assert_eq!(stream.next().await.unwrap(), w);
738 /// # }
739 /// # }));
740 /// # }
741 /// ```
742 pub fn unwrap_or(self, other: Singleton<T, L, B>) -> Singleton<T, L, B> {
743 let res_option = self.or(other.into());
744 Singleton::new(
745 res_option.location.clone(),
746 HydroNode::Cast {
747 inner: Box::new(res_option.ir_node.replace(HydroNode::Placeholder)),
748 metadata: res_option
749 .location
750 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
751 },
752 )
753 }
754
755 /// Gets the contents of `self` when it has a value, otherwise returns the default value of `T`.
756 ///
757 /// Like [`Option::unwrap_or_default`], this is helpful for defining a fallback for an
758 /// [`Optional`] when the default value of the type is a suitable fallback.
759 ///
760 /// # Example
761 /// ```rust
762 /// # #[cfg(feature = "deploy")] {
763 /// # use hydro_lang::prelude::*;
764 /// # use futures::StreamExt;
765 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
766 /// let tick = process.tick();
767 /// // ticks are lazy by default, forces the later ticks to run
768 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
769 ///
770 /// let some_first_tick = tick.optional_first_tick(q!(123i32));
771 /// some_first_tick.unwrap_or_default().all_ticks()
772 /// # }, |mut stream| async move {
773 /// // [123 /* first tick */, 0 /* second tick */, 0 /* third tick */, 0, ...]
774 /// # for w in vec![123, 0, 0, 0] {
775 /// # assert_eq!(stream.next().await.unwrap(), w);
776 /// # }
777 /// # }));
778 /// # }
779 /// ```
780 pub fn unwrap_or_default(self) -> Singleton<T, L, B>
781 where
782 T: Default + Clone,
783 {
784 self.into_singleton().map(q!(|v| v.unwrap_or_default()))
785 }
786
787 /// Converts this optional into a [`Singleton`] with a Rust [`Option`] as its contents.
788 ///
789 /// Useful for writing custom Rust code that needs to interact with both the null and non-null
790 /// states of the [`Optional`]. When possible, you should use the native APIs on [`Optional`]
791 /// so that Hydro can skip any computation on null values.
792 ///
793 /// # Example
794 /// ```rust
795 /// # #[cfg(feature = "deploy")] {
796 /// # use hydro_lang::prelude::*;
797 /// # use futures::StreamExt;
798 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
799 /// let tick = process.tick();
800 /// // ticks are lazy by default, forces the later ticks to run
801 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
802 ///
803 /// let some_first_tick = tick.optional_first_tick(q!(123));
804 /// some_first_tick.into_singleton().all_ticks()
805 /// # }, |mut stream| async move {
806 /// // [Some(123) /* first tick */, None /* second tick */, None /* third tick */, None, ...]
807 /// # for w in vec![Some(123), None, None, None] {
808 /// # assert_eq!(stream.next().await.unwrap(), w);
809 /// # }
810 /// # }));
811 /// # }
812 /// ```
813 pub fn into_singleton(self) -> Singleton<Option<T>, L, B>
814 where
815 T: Clone,
816 {
817 let none: syn::Expr = parse_quote!(::std::option::Option::None);
818
819 let none_singleton = Singleton::new(
820 self.location.clone(),
821 HydroNode::SingletonSource {
822 value: none.into(),
823 first_tick_only: false,
824 metadata: self
825 .location
826 .new_node_metadata(Singleton::<Option<T>, L, B>::collection_kind()),
827 },
828 );
829
830 self.map(q!(|v| Some(v))).unwrap_or(none_singleton)
831 }
832
833 /// Returns a [`Singleton`] containing `true` if this optional has a value, `false` otherwise.
834 ///
835 /// # Example
836 /// ```rust
837 /// # #[cfg(feature = "deploy")] {
838 /// # use hydro_lang::prelude::*;
839 /// # use futures::StreamExt;
840 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
841 /// let tick = process.tick();
842 /// // ticks are lazy by default, forces the second tick to run
843 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
844 ///
845 /// let some_first_tick = tick.optional_first_tick(q!(42));
846 /// some_first_tick.is_some().all_ticks()
847 /// # }, |mut stream| async move {
848 /// // [true /* first tick */, false /* second tick */, ...]
849 /// # for w in vec![true, false] {
850 /// # assert_eq!(stream.next().await.unwrap(), w);
851 /// # }
852 /// # }));
853 /// # }
854 /// ```
855 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
856 pub fn is_some(self) -> Singleton<bool, L, B> {
857 self.map(q!(|_| ()))
858 .into_singleton()
859 .map(q!(|o| o.is_some()))
860 }
861
862 /// Returns a [`Singleton`] containing `true` if this optional is null, `false` otherwise.
863 ///
864 /// # Example
865 /// ```rust
866 /// # #[cfg(feature = "deploy")] {
867 /// # use hydro_lang::prelude::*;
868 /// # use futures::StreamExt;
869 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
870 /// let tick = process.tick();
871 /// // ticks are lazy by default, forces the second tick to run
872 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
873 ///
874 /// let some_first_tick = tick.optional_first_tick(q!(42));
875 /// some_first_tick.is_none().all_ticks()
876 /// # }, |mut stream| async move {
877 /// // [false /* first tick */, true /* second tick */, ...]
878 /// # for w in vec![false, true] {
879 /// # assert_eq!(stream.next().await.unwrap(), w);
880 /// # }
881 /// # }));
882 /// # }
883 /// ```
884 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
885 pub fn is_none(self) -> Singleton<bool, L, B> {
886 self.map(q!(|_| ()))
887 .into_singleton()
888 .map(q!(|o| o.is_none()))
889 }
890
891 /// Returns a [`Singleton`] containing `true` if both optionals are non-null and their
892 /// values are equal, `false` otherwise (including when either is null).
893 ///
894 /// # Example
895 /// ```rust
896 /// # #[cfg(feature = "deploy")] {
897 /// # use hydro_lang::prelude::*;
898 /// # use futures::StreamExt;
899 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
900 /// let tick = process.tick();
901 /// // ticks are lazy by default, forces the second tick to run
902 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
903 ///
904 /// let a = tick.optional_first_tick(q!(5)); // Some(5), None
905 /// let b = tick.optional_first_tick(q!(5)); // Some(5), None
906 /// a.is_some_and_equals(b).all_ticks()
907 /// # }, |mut stream| async move {
908 /// // [true, false]
909 /// # for w in vec![true, false] {
910 /// # assert_eq!(stream.next().await.unwrap(), w);
911 /// # }
912 /// # }));
913 /// # }
914 /// ```
915 #[expect(clippy::wrong_self_convention, reason = "Stream naming")]
916 pub fn is_some_and_equals(self, other: Optional<T, L, B>) -> Singleton<bool, L, B>
917 where
918 T: PartialEq + Clone,
919 B: IsBounded,
920 {
921 self.into_singleton()
922 .zip(other.into_singleton())
923 .map(q!(|(a, b)| a.is_some() && a == b))
924 }
925
926 /// An operator which allows you to "name" a `HydroNode`.
927 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
928 pub fn ir_node_named(self, name: &str) -> Optional<T, L, B> {
929 {
930 let mut node = self.ir_node.borrow_mut();
931 let metadata = node.metadata_mut();
932 metadata.tag = Some(name.to_owned());
933 }
934 self
935 }
936
937 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
938 /// implies that `B == Bounded`.
939 pub fn make_bounded(self) -> Optional<T, L, Bounded>
940 where
941 B: IsBounded,
942 {
943 Optional::new(
944 self.location.clone(),
945 self.ir_node.replace(HydroNode::Placeholder),
946 )
947 }
948
949 /// Clones this bounded optional into a tick, returning a optional that has the
950 /// same value as the outer optional. Because the outer optional is bounded, this
951 /// is deterministic because there is only a single immutable version.
952 pub fn clone_into_tick(self, tick: &Tick<L>) -> Optional<T, Tick<L>, Bounded>
953 where
954 B: IsBounded,
955 T: Clone,
956 {
957 // TODO(shadaj): avoid printing simulator logs for this snapshot
958 self.snapshot(
959 tick,
960 nondet!(/** bounded top-level optional so deterministic */),
961 )
962 }
963
964 /// Converts this optional into a [`Stream`] containing a single element, the value, if it is
965 /// non-null. Otherwise, the stream is empty.
966 ///
967 /// # Example
968 /// ```rust
969 /// # #[cfg(feature = "deploy")] {
970 /// # use hydro_lang::prelude::*;
971 /// # use futures::StreamExt;
972 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
973 /// # let tick = process.tick();
974 /// # // ticks are lazy by default, forces the second tick to run
975 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
976 /// # let batch_first_tick = process
977 /// # .source_iter(q!(vec![]))
978 /// # .batch(&tick, nondet!(/** test */));
979 /// # let batch_second_tick = process
980 /// # .source_iter(q!(vec![123, 456]))
981 /// # .batch(&tick, nondet!(/** test */))
982 /// # .defer_tick(); // appears on the second tick
983 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
984 /// input_batch // first tick: [], second tick: [123, 456]
985 /// .clone()
986 /// .max()
987 /// .into_stream()
988 /// .chain(input_batch)
989 /// .all_ticks()
990 /// # }, |mut stream| async move {
991 /// // [456, 123, 456]
992 /// # for w in vec![456, 123, 456] {
993 /// # assert_eq!(stream.next().await.unwrap(), w);
994 /// # }
995 /// # }));
996 /// # }
997 /// ```
998 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
999 where
1000 B: IsBounded,
1001 {
1002 Stream::new(
1003 self.location.clone(),
1004 HydroNode::Cast {
1005 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1006 metadata: self.location.new_node_metadata(Stream::<
1007 T,
1008 Tick<L>,
1009 Bounded,
1010 TotalOrder,
1011 ExactlyOnce,
1012 >::collection_kind()),
1013 },
1014 )
1015 }
1016
1017 /// Filters this optional, passing through the value if the boolean signal is `true`,
1018 /// otherwise the output is null.
1019 ///
1020 /// # Example
1021 /// ```rust
1022 /// # #[cfg(feature = "deploy")] {
1023 /// # use hydro_lang::prelude::*;
1024 /// # use futures::StreamExt;
1025 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1026 /// let tick = process.tick();
1027 /// // ticks are lazy by default, forces the second tick to run
1028 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1029 ///
1030 /// let some_first_tick = tick.optional_first_tick(q!(()));
1031 /// let signal = some_first_tick.is_some(); // true on first tick, false on second
1032 /// let batch_first_tick = process
1033 /// .source_iter(q!(vec![456]))
1034 /// .batch(&tick, nondet!(/** test */));
1035 /// let batch_second_tick = process
1036 /// .source_iter(q!(vec![789]))
1037 /// .batch(&tick, nondet!(/** test */))
1038 /// .defer_tick();
1039 /// batch_first_tick.chain(batch_second_tick).first()
1040 /// .filter_if(signal)
1041 /// .unwrap_or(tick.singleton(q!(0)))
1042 /// .all_ticks()
1043 /// # }, |mut stream| async move {
1044 /// // [456, 0]
1045 /// # for w in vec![456, 0] {
1046 /// # assert_eq!(stream.next().await.unwrap(), w);
1047 /// # }
1048 /// # }));
1049 /// # }
1050 /// ```
1051 pub fn filter_if(self, signal: Singleton<bool, L, B>) -> Optional<T, L, B>
1052 where
1053 B: IsBounded,
1054 {
1055 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
1056 }
1057
1058 /// Filters this optional, passing through the optional value if it is non-null **and** the
1059 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
1060 ///
1061 /// Useful for conditionally processing, such as only emitting an optional's value outside
1062 /// a tick if some other condition is satisfied.
1063 ///
1064 /// # Example
1065 /// ```rust
1066 /// # #[cfg(feature = "deploy")] {
1067 /// # use hydro_lang::prelude::*;
1068 /// # use futures::StreamExt;
1069 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1070 /// let tick = process.tick();
1071 /// // ticks are lazy by default, forces the second tick to run
1072 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1073 ///
1074 /// let batch_first_tick = process
1075 /// .source_iter(q!(vec![]))
1076 /// .batch(&tick, nondet!(/** test */));
1077 /// let batch_second_tick = process
1078 /// .source_iter(q!(vec![456]))
1079 /// .batch(&tick, nondet!(/** test */))
1080 /// .defer_tick(); // appears on the second tick
1081 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1082 /// batch_first_tick.chain(batch_second_tick).first()
1083 /// .filter_if_some(some_on_first_tick)
1084 /// .unwrap_or(tick.singleton(q!(789)))
1085 /// .all_ticks()
1086 /// # }, |mut stream| async move {
1087 /// // [789, 789]
1088 /// # for w in vec![789, 789] {
1089 /// # assert_eq!(stream.next().await.unwrap(), w);
1090 /// # }
1091 /// # }));
1092 /// # }
1093 /// ```
1094 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1095 pub fn filter_if_some<U>(self, signal: Optional<U, L, B>) -> Optional<T, L, B>
1096 where
1097 B: IsBounded,
1098 {
1099 self.filter_if(signal.is_some())
1100 }
1101
1102 /// Filters this optional, passing through the optional value if it is non-null **and** the
1103 /// argument (a [`Bounded`] [`Optional`]`) is _null_, otherwise the output is null.
1104 ///
1105 /// Useful for conditionally processing, such as only emitting an optional's value outside
1106 /// a tick if some other condition is satisfied.
1107 ///
1108 /// # Example
1109 /// ```rust
1110 /// # #[cfg(feature = "deploy")] {
1111 /// # use hydro_lang::prelude::*;
1112 /// # use futures::StreamExt;
1113 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1114 /// let tick = process.tick();
1115 /// // ticks are lazy by default, forces the second tick to run
1116 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1117 ///
1118 /// let batch_first_tick = process
1119 /// .source_iter(q!(vec![]))
1120 /// .batch(&tick, nondet!(/** test */));
1121 /// let batch_second_tick = process
1122 /// .source_iter(q!(vec![456]))
1123 /// .batch(&tick, nondet!(/** test */))
1124 /// .defer_tick(); // appears on the second tick
1125 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1126 /// batch_first_tick.chain(batch_second_tick).first()
1127 /// .filter_if_none(some_on_first_tick)
1128 /// .unwrap_or(tick.singleton(q!(789)))
1129 /// .all_ticks()
1130 /// # }, |mut stream| async move {
1131 /// // [789, 789]
1132 /// # for w in vec![789, 456] {
1133 /// # assert_eq!(stream.next().await.unwrap(), w);
1134 /// # }
1135 /// # }));
1136 /// # }
1137 /// ```
1138 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
1139 pub fn filter_if_none<U>(self, other: Optional<U, L, B>) -> Optional<T, L, B>
1140 where
1141 B: IsBounded,
1142 {
1143 self.filter_if(other.is_none())
1144 }
1145
1146 /// If `self` is null, emits a null optional, but if it non-null, emits `value`.
1147 ///
1148 /// Useful for gating the release of a [`Singleton`] on a condition of the [`Optional`]
1149 /// having a value, such as only releasing a piece of state if the node is the leader.
1150 ///
1151 /// # Example
1152 /// ```rust
1153 /// # #[cfg(feature = "deploy")] {
1154 /// # use hydro_lang::prelude::*;
1155 /// # use futures::StreamExt;
1156 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1157 /// let tick = process.tick();
1158 /// // ticks are lazy by default, forces the second tick to run
1159 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1160 ///
1161 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
1162 /// some_on_first_tick
1163 /// .if_some_then(tick.singleton(q!(456)))
1164 /// .unwrap_or(tick.singleton(q!(123)))
1165 /// # .all_ticks()
1166 /// # }, |mut stream| async move {
1167 /// // 456 (first tick) ~> 123 (second tick onwards)
1168 /// # for w in vec![456, 123, 123] {
1169 /// # assert_eq!(stream.next().await.unwrap(), w);
1170 /// # }
1171 /// # }));
1172 /// # }
1173 /// ```
1174 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
1175 pub fn if_some_then<U>(self, value: Singleton<U, L, B>) -> Optional<U, L, B>
1176 where
1177 B: IsBounded,
1178 {
1179 value.filter_if(self.is_some())
1180 }
1181}
1182
1183impl<'a, K, V, L, B: Boundedness> Optional<(K, V), L, B>
1184where
1185 L: Location<'a>,
1186{
1187 /// Converts this optional into a [`KeyedSingleton`] containing a single entry with the
1188 /// key-value pair of this [`Optional`].
1189 ///
1190 /// If this [`Optional`] is [`Bounded`], the [`KeyedSingleton`] will be [`Bounded`] as well
1191 /// if it is [`Unbounded`], the [`KeyedSingleton`] will be [`Unbounded`], which means that
1192 /// the entry will be updated and appear / disappear according to the state of the
1193 /// [`Optional`].
1194 pub fn into_keyed_singleton(self) -> KeyedSingleton<K, V, L, B> {
1195 KeyedSingleton::new(
1196 self.location.clone(),
1197 HydroNode::Cast {
1198 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1199 metadata: self
1200 .location
1201 .new_node_metadata(KeyedSingleton::<K, V, L, B>::collection_kind()),
1202 },
1203 )
1204 }
1205}
1206
1207impl<'a, T, L, B: Boundedness> Optional<T, Atomic<L>, B>
1208where
1209 L: Location<'a> + NoTick,
1210{
1211 /// Returns an optional value corresponding to the latest snapshot of the optional
1212 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1213 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1214 /// all snapshots of this optional into the atomic-associated tick will observe the
1215 /// same value each tick.
1216 ///
1217 /// # Non-Determinism
1218 /// Because this picks a snapshot of a optional whose value is continuously changing,
1219 /// the output optional has a non-deterministic value since the snapshot can be at an
1220 /// arbitrary point in time.
1221 pub fn snapshot_atomic(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1222 Optional::new(
1223 tick.clone(),
1224 HydroNode::Batch {
1225 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1226 metadata: tick
1227 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1228 },
1229 )
1230 }
1231
1232 /// Returns this optional back into a top-level, asynchronous execution context where updates
1233 /// to the value will be asynchronously propagated.
1234 pub fn end_atomic(self) -> Optional<T, L, B> {
1235 Optional::new(
1236 self.location.tick.l.clone(),
1237 HydroNode::EndAtomic {
1238 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1239 metadata: self
1240 .location
1241 .tick
1242 .l
1243 .new_node_metadata(Optional::<T, L, B>::collection_kind()),
1244 },
1245 )
1246 }
1247}
1248
1249impl<'a, T, L, B: Boundedness> Optional<T, L, B>
1250where
1251 L: Location<'a>,
1252{
1253 /// Shifts this optional into an atomic context, which guarantees that any downstream logic
1254 /// will observe the same version of the value and will be executed synchronously before any
1255 /// outputs are yielded (in [`Optional::end_atomic`]).
1256 ///
1257 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1258 /// see a consistent version of local state (since otherwise each [`Optional::snapshot`] may pick
1259 /// a different version).
1260 pub fn atomic(self) -> Optional<T, Atomic<L>, B> {
1261 let id = self.location.flow_state().borrow_mut().next_clock_id();
1262 let out_location = Atomic {
1263 tick: Tick {
1264 id,
1265 l: self.location.clone(),
1266 },
1267 };
1268 Optional::new(
1269 out_location.clone(),
1270 HydroNode::BeginAtomic {
1271 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1272 metadata: out_location
1273 .new_node_metadata(Optional::<T, Atomic<L>, B>::collection_kind()),
1274 },
1275 )
1276 }
1277
1278 /// Given a tick, returns a optional value corresponding to a snapshot of the optional
1279 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1280 /// relevant data that contributed to the snapshot at tick `t`.
1281 ///
1282 /// # Non-Determinism
1283 /// Because this picks a snapshot of a optional whose value is continuously changing,
1284 /// the output optional has a non-deterministic value since the snapshot can be at an
1285 /// arbitrary point in time.
1286 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Optional<T, Tick<L>, Bounded> {
1287 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1288 Optional::new(
1289 tick.clone(),
1290 HydroNode::Batch {
1291 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1292 metadata: tick
1293 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
1294 },
1295 )
1296 }
1297
1298 /// Eagerly samples the optional as fast as possible, returning a stream of snapshots
1299 /// with order corresponding to increasing prefixes of data contributing to the optional.
1300 ///
1301 /// # Non-Determinism
1302 /// At runtime, the optional will be arbitrarily sampled as fast as possible, but due
1303 /// to non-deterministic batching and arrival of inputs, the output stream is
1304 /// non-deterministic.
1305 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1306 where
1307 L: NoTick,
1308 {
1309 let tick = self.location.tick();
1310 self.snapshot(&tick, nondet).all_ticks().weaken_retries()
1311 }
1312
1313 /// Given a time interval, returns a stream corresponding to snapshots of the optional
1314 /// value taken at various points in time. Because the input optional may be
1315 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1316 /// represent the value of the optional given some prefix of the streams leading up to
1317 /// it.
1318 ///
1319 /// # Non-Determinism
1320 /// The output stream is non-deterministic in which elements are sampled, since this
1321 /// is controlled by a clock.
1322 pub fn sample_every(
1323 self,
1324 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1325 nondet: NonDet,
1326 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1327 where
1328 L: NoTick + NoAtomic,
1329 {
1330 let samples = self.location.source_interval(interval, nondet);
1331 let tick = self.location.tick();
1332
1333 self.snapshot(&tick, nondet)
1334 .filter_if(samples.batch(&tick, nondet).first().is_some())
1335 .all_ticks()
1336 .weaken_retries()
1337 }
1338}
1339
1340impl<'a, T, L> Optional<T, Tick<L>, Bounded>
1341where
1342 L: Location<'a>,
1343{
1344 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1345 /// which will stream the value computed in _each_ tick as a separate stream element (skipping
1346 /// null values).
1347 ///
1348 /// Unlike [`Optional::latest`], the value computed in each tick is emitted separately,
1349 /// producing one element in the output for each (non-null) tick. This is useful for batched
1350 /// computations, where the results from each tick must be combined together.
1351 ///
1352 /// # Example
1353 /// ```rust
1354 /// # #[cfg(feature = "deploy")] {
1355 /// # use hydro_lang::prelude::*;
1356 /// # use futures::StreamExt;
1357 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1358 /// # let tick = process.tick();
1359 /// # // ticks are lazy by default, forces the second tick to run
1360 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1361 /// # let batch_first_tick = process
1362 /// # .source_iter(q!(vec![]))
1363 /// # .batch(&tick, nondet!(/** test */));
1364 /// # let batch_second_tick = process
1365 /// # .source_iter(q!(vec![1, 2, 3]))
1366 /// # .batch(&tick, nondet!(/** test */))
1367 /// # .defer_tick(); // appears on the second tick
1368 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1369 /// input_batch // first tick: [], second tick: [1, 2, 3]
1370 /// .max()
1371 /// .all_ticks()
1372 /// # }, |mut stream| async move {
1373 /// // [3]
1374 /// # for w in vec![3] {
1375 /// # assert_eq!(stream.next().await.unwrap(), w);
1376 /// # }
1377 /// # }));
1378 /// # }
1379 /// ```
1380 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1381 self.into_stream().all_ticks()
1382 }
1383
1384 /// Synchronously yields the value of this optional outside the tick as an unbounded stream,
1385 /// which will stream the value computed in _each_ tick as a separate stream element.
1386 ///
1387 /// Unlike [`Optional::all_ticks`], this preserves synchronous execution, as the output stream
1388 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1389 /// optional's [`Tick`] context.
1390 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1391 self.into_stream().all_ticks_atomic()
1392 }
1393
1394 /// Asynchronously yields this optional outside the tick as an unbounded optional, which will
1395 /// be asynchronously updated with the latest value of the optional inside the tick, including
1396 /// whether the optional is null or not.
1397 ///
1398 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1399 /// tick that tracks the inner value. This is useful for getting the value as of the
1400 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1401 ///
1402 /// # Example
1403 /// ```rust
1404 /// # #[cfg(feature = "deploy")] {
1405 /// # use hydro_lang::prelude::*;
1406 /// # use futures::StreamExt;
1407 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1408 /// # let tick = process.tick();
1409 /// # // ticks are lazy by default, forces the second tick to run
1410 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1411 /// # let batch_first_tick = process
1412 /// # .source_iter(q!(vec![]))
1413 /// # .batch(&tick, nondet!(/** test */));
1414 /// # let batch_second_tick = process
1415 /// # .source_iter(q!(vec![1, 2, 3]))
1416 /// # .batch(&tick, nondet!(/** test */))
1417 /// # .defer_tick(); // appears on the second tick
1418 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1419 /// input_batch // first tick: [], second tick: [1, 2, 3]
1420 /// .max()
1421 /// .latest()
1422 /// # .into_singleton()
1423 /// # .sample_eager(nondet!(/** test */))
1424 /// # }, |mut stream| async move {
1425 /// // asynchronously changes from None ~> 3
1426 /// # for w in vec![None, Some(3)] {
1427 /// # assert_eq!(stream.next().await.unwrap(), w);
1428 /// # }
1429 /// # }));
1430 /// # }
1431 /// ```
1432 pub fn latest(self) -> Optional<T, L, Unbounded> {
1433 Optional::new(
1434 self.location.outer().clone(),
1435 HydroNode::YieldConcat {
1436 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1437 metadata: self
1438 .location
1439 .outer()
1440 .new_node_metadata(Optional::<T, L, Unbounded>::collection_kind()),
1441 },
1442 )
1443 }
1444
1445 /// Synchronously yields this optional outside the tick as an unbounded optional, which will
1446 /// be updated with the latest value of the optional inside the tick.
1447 ///
1448 /// Unlike [`Optional::latest`], this preserves synchronous execution, as the output optional
1449 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1450 /// optional's [`Tick`] context.
1451 pub fn latest_atomic(self) -> Optional<T, Atomic<L>, Unbounded> {
1452 let out_location = Atomic {
1453 tick: self.location.clone(),
1454 };
1455
1456 Optional::new(
1457 out_location.clone(),
1458 HydroNode::YieldConcat {
1459 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1460 metadata: out_location
1461 .new_node_metadata(Optional::<T, Atomic<L>, Unbounded>::collection_kind()),
1462 },
1463 )
1464 }
1465
1466 /// Shifts the state in `self` to the **next tick**, so that the returned optional at tick `T`
1467 /// always has the state of `self` at tick `T - 1`.
1468 ///
1469 /// At tick `0`, the output optional is null, since there is no previous tick.
1470 ///
1471 /// This operator enables stateful iterative processing with ticks, by sending data from one
1472 /// tick to the next. For example, you can use it to compare state across consecutive batches.
1473 ///
1474 /// # Example
1475 /// ```rust
1476 /// # #[cfg(feature = "deploy")] {
1477 /// # use hydro_lang::prelude::*;
1478 /// # use futures::StreamExt;
1479 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1480 /// let tick = process.tick();
1481 /// // ticks are lazy by default, forces the second tick to run
1482 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1483 ///
1484 /// let batch_first_tick = process
1485 /// .source_iter(q!(vec![1, 2]))
1486 /// .batch(&tick, nondet!(/** test */));
1487 /// let batch_second_tick = process
1488 /// .source_iter(q!(vec![3, 4]))
1489 /// .batch(&tick, nondet!(/** test */))
1490 /// .defer_tick(); // appears on the second tick
1491 /// let current_tick_sum = batch_first_tick.chain(batch_second_tick)
1492 /// .reduce(q!(|state, v| *state += v));
1493 ///
1494 /// current_tick_sum.clone().into_singleton().zip(
1495 /// current_tick_sum.defer_tick().into_singleton() // state from previous tick
1496 /// ).all_ticks()
1497 /// # }, |mut stream| async move {
1498 /// // [(Some(3), None) /* first tick */, (Some(7), Some(3)) /* second tick */]
1499 /// # for w in vec![(Some(3), None), (Some(7), Some(3))] {
1500 /// # assert_eq!(stream.next().await.unwrap(), w);
1501 /// # }
1502 /// # }));
1503 /// # }
1504 /// ```
1505 pub fn defer_tick(self) -> Optional<T, Tick<L>, Bounded> {
1506 Optional::new(
1507 self.location.clone(),
1508 HydroNode::DeferTick {
1509 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1510 metadata: self.location.new_node_metadata(Self::collection_kind()),
1511 },
1512 )
1513 }
1514}
1515
1516#[cfg(test)]
1517mod tests {
1518 #[cfg(feature = "deploy")]
1519 use futures::StreamExt;
1520 #[cfg(feature = "deploy")]
1521 use hydro_deploy::Deployment;
1522 #[cfg(any(feature = "deploy", feature = "sim"))]
1523 use stageleft::q;
1524
1525 #[cfg(feature = "deploy")]
1526 use super::Optional;
1527 #[cfg(any(feature = "deploy", feature = "sim"))]
1528 use crate::compile::builder::FlowBuilder;
1529 #[cfg(any(feature = "deploy", feature = "sim"))]
1530 use crate::location::Location;
1531 #[cfg(feature = "deploy")]
1532 use crate::nondet::nondet;
1533
1534 #[cfg(feature = "deploy")]
1535 #[tokio::test]
1536 async fn optional_or_cardinality() {
1537 let mut deployment = Deployment::new();
1538
1539 let mut flow = FlowBuilder::new();
1540 let node = flow.process::<()>();
1541 let external = flow.external::<()>();
1542
1543 let node_tick = node.tick();
1544 let tick_singleton = node_tick.singleton(q!(123));
1545 let tick_optional_inhabited: Optional<_, _, _> = tick_singleton.into();
1546 let counts = tick_optional_inhabited
1547 .clone()
1548 .or(tick_optional_inhabited)
1549 .into_stream()
1550 .count()
1551 .all_ticks()
1552 .send_bincode_external(&external);
1553
1554 let nodes = flow
1555 .with_process(&node, deployment.Localhost())
1556 .with_external(&external, deployment.Localhost())
1557 .deploy(&mut deployment);
1558
1559 deployment.deploy().await.unwrap();
1560
1561 let mut external_out = nodes.connect(counts).await;
1562
1563 deployment.start().await.unwrap();
1564
1565 assert_eq!(external_out.next().await.unwrap(), 1);
1566 }
1567
1568 #[cfg(feature = "deploy")]
1569 #[tokio::test]
1570 async fn into_singleton_top_level_none_cardinality() {
1571 let mut deployment = Deployment::new();
1572
1573 let mut flow = FlowBuilder::new();
1574 let node = flow.process::<()>();
1575 let external = flow.external::<()>();
1576
1577 let node_tick = node.tick();
1578 let top_level_none = node.singleton(q!(123)).filter(q!(|_| false));
1579 let into_singleton = top_level_none.into_singleton();
1580
1581 let tick_driver = node.spin();
1582
1583 let counts = into_singleton
1584 .snapshot(&node_tick, nondet!(/** test */))
1585 .into_stream()
1586 .count()
1587 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1588 .map(q!(|(c, _)| c))
1589 .all_ticks()
1590 .send_bincode_external(&external);
1591
1592 let nodes = flow
1593 .with_process(&node, deployment.Localhost())
1594 .with_external(&external, deployment.Localhost())
1595 .deploy(&mut deployment);
1596
1597 deployment.deploy().await.unwrap();
1598
1599 let mut external_out = nodes.connect(counts).await;
1600
1601 deployment.start().await.unwrap();
1602
1603 assert_eq!(external_out.next().await.unwrap(), 1);
1604 assert_eq!(external_out.next().await.unwrap(), 1);
1605 assert_eq!(external_out.next().await.unwrap(), 1);
1606 }
1607
1608 #[cfg(feature = "deploy")]
1609 #[tokio::test]
1610 async fn into_singleton_unbounded_top_level_none_cardinality() {
1611 let mut deployment = Deployment::new();
1612
1613 let mut flow = FlowBuilder::new();
1614 let node = flow.process::<()>();
1615 let external = flow.external::<()>();
1616
1617 let node_tick = node.tick();
1618 let top_level_none = node_tick.singleton(q!(123)).latest().filter(q!(|_| false));
1619 let into_singleton = top_level_none.into_singleton();
1620
1621 let tick_driver = node.spin();
1622
1623 let counts = into_singleton
1624 .snapshot(&node_tick, nondet!(/** test */))
1625 .into_stream()
1626 .count()
1627 .zip(tick_driver.batch(&node_tick, nondet!(/** test */)).count())
1628 .map(q!(|(c, _)| c))
1629 .all_ticks()
1630 .send_bincode_external(&external);
1631
1632 let nodes = flow
1633 .with_process(&node, deployment.Localhost())
1634 .with_external(&external, deployment.Localhost())
1635 .deploy(&mut deployment);
1636
1637 deployment.deploy().await.unwrap();
1638
1639 let mut external_out = nodes.connect(counts).await;
1640
1641 deployment.start().await.unwrap();
1642
1643 assert_eq!(external_out.next().await.unwrap(), 1);
1644 assert_eq!(external_out.next().await.unwrap(), 1);
1645 assert_eq!(external_out.next().await.unwrap(), 1);
1646 }
1647
1648 #[cfg(feature = "sim")]
1649 #[test]
1650 fn top_level_optional_some_into_stream_no_replay() {
1651 let mut flow = FlowBuilder::new();
1652 let node = flow.process::<()>();
1653
1654 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1655 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1656 let filtered_some = folded.filter(q!(|_| true));
1657
1658 let out_recv = filtered_some.into_stream().sim_output();
1659
1660 flow.sim().exhaustive(async || {
1661 out_recv.assert_yields_only([10]).await;
1662 });
1663 }
1664
1665 #[cfg(feature = "sim")]
1666 #[test]
1667 fn top_level_optional_none_into_stream_no_replay() {
1668 let mut flow = FlowBuilder::new();
1669 let node = flow.process::<()>();
1670
1671 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1672 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1673 let filtered_none = folded.filter(q!(|_| false));
1674
1675 let out_recv = filtered_none.into_stream().sim_output();
1676
1677 flow.sim().exhaustive(async || {
1678 out_recv.assert_yields_only([] as [i32; 0]).await;
1679 });
1680 }
1681}