hydro_lang/live_collections/stream/networking.rs
1//! Networking APIs for [`Stream`].
2
3use std::marker::PhantomData;
4
5use serde::Serialize;
6use serde::de::DeserializeOwned;
7use stageleft::{q, quote_type};
8use syn::parse_quote;
9
10use super::{ExactlyOnce, MinOrder, Ordering, Stream, TotalOrder};
11use crate::compile::ir::{DebugInstantiate, HydroIrOpMetadata, HydroNode, HydroRoot};
12use crate::live_collections::boundedness::{Boundedness, Unbounded};
13use crate::live_collections::keyed_singleton::KeyedSingleton;
14use crate::live_collections::keyed_stream::KeyedStream;
15use crate::live_collections::sliced::sliced;
16use crate::live_collections::stream::Retries;
17#[cfg(feature = "sim")]
18use crate::location::LocationKey;
19use crate::location::cluster::ClusterIds;
20#[cfg(stageleft_runtime)]
21use crate::location::dynamic::DynLocation;
22use crate::location::external_process::ExternalBincodeStream;
23use crate::location::{Cluster, External, Location, MemberId, MembershipEvent, NoTick, Process};
24use crate::networking::{NetworkFor, TCP};
25use crate::nondet::NonDet;
26#[cfg(feature = "sim")]
27use crate::sim::SimReceiver;
28use crate::staging_util::get_this_crate;
29
30// same as the one in `hydro_std`, but internal use only
31fn track_membership<'a, C, L: Location<'a> + NoTick>(
32 membership: KeyedStream<MemberId<C>, MembershipEvent, L, Unbounded>,
33) -> KeyedSingleton<MemberId<C>, bool, L, Unbounded> {
34 membership.fold(
35 q!(|| false),
36 q!(|present, event| {
37 match event {
38 MembershipEvent::Joined => *present = true,
39 MembershipEvent::Left => *present = false,
40 }
41 }),
42 )
43}
44
45fn serialize_bincode_with_type(is_demux: bool, t_type: &syn::Type) -> syn::Expr {
46 let root = get_this_crate();
47
48 if is_demux {
49 parse_quote! {
50 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<(#root::__staged::location::MemberId<_>, #t_type), _>(
51 |(id, data)| {
52 (id.into_tagless(), #root::runtime_support::bincode::serialize(&data).unwrap().into())
53 }
54 )
55 }
56 } else {
57 parse_quote! {
58 #root::runtime_support::stageleft::runtime_support::fn1_type_hint::<#t_type, _>(
59 |data| {
60 #root::runtime_support::bincode::serialize(&data).unwrap().into()
61 }
62 )
63 }
64 }
65}
66
67pub(crate) fn serialize_bincode<T: Serialize>(is_demux: bool) -> syn::Expr {
68 serialize_bincode_with_type(is_demux, "e_type::<T>())
69}
70
71fn deserialize_bincode_with_type(tagged: Option<&syn::Type>, t_type: &syn::Type) -> syn::Expr {
72 let root = get_this_crate();
73 if let Some(c_type) = tagged {
74 parse_quote! {
75 |res| {
76 let (id, b) = res.unwrap();
77 (#root::__staged::location::MemberId::<#c_type>::from_tagless(id as #root::__staged::location::TaglessMemberId), #root::runtime_support::bincode::deserialize::<#t_type>(&b).unwrap())
78 }
79 }
80 } else {
81 parse_quote! {
82 |res| {
83 #root::runtime_support::bincode::deserialize::<#t_type>(&res.unwrap()).unwrap()
84 }
85 }
86 }
87}
88
89pub(crate) fn deserialize_bincode<T: DeserializeOwned>(tagged: Option<&syn::Type>) -> syn::Expr {
90 deserialize_bincode_with_type(tagged, "e_type::<T>())
91}
92
93impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Process<'a, L>, B, O, R> {
94 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
95 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
96 /// using [`bincode`] to serialize/deserialize messages.
97 ///
98 /// The returned stream captures the elements received at the destination, where values will
99 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
100 /// preserves ordering and retries guarantees by using a single TCP channel to send the values. The
101 /// recipient is guaranteed to receive a _prefix_ or the sent messages; if the TCP connection is
102 /// dropped no further messages will be sent.
103 ///
104 /// # Example
105 /// ```rust
106 /// # #[cfg(feature = "deploy")] {
107 /// # use hydro_lang::prelude::*;
108 /// # use futures::StreamExt;
109 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
110 /// let p1 = flow.process::<()>();
111 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
112 /// let p2 = flow.process::<()>();
113 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send_bincode(&p2);
114 /// // 1, 2, 3
115 /// # on_p2.send_bincode(&p_out)
116 /// # }, |mut stream| async move {
117 /// # for w in 1..=3 {
118 /// # assert_eq!(stream.next().await, Some(w));
119 /// # }
120 /// # }));
121 /// # }
122 /// ```
123 pub fn send_bincode<L2>(
124 self,
125 other: &Process<'a, L2>,
126 ) -> Stream<T, Process<'a, L2>, Unbounded, O, R>
127 where
128 T: Serialize + DeserializeOwned,
129 {
130 self.send(other, TCP.fail_stop().bincode())
131 }
132
133 /// "Moves" elements of this stream to a new distributed location by sending them over the network,
134 /// using the configuration in `via` to set up the message transport.
135 ///
136 /// The returned stream captures the elements received at the destination, where values will
137 /// asynchronously arrive over the network. Sending from a [`Process`] to another [`Process`]
138 /// preserves ordering and retries guarantees when using a single TCP channel to send the values.
139 /// The recipient is guaranteed to receive a _prefix_ or the sent messages; if the connection is
140 /// dropped no further messages will be sent.
141 ///
142 /// # Example
143 /// ```rust
144 /// # #[cfg(feature = "deploy")] {
145 /// # use hydro_lang::prelude::*;
146 /// # use futures::StreamExt;
147 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p_out| {
148 /// let p1 = flow.process::<()>();
149 /// let numbers: Stream<_, Process<_>, Bounded> = p1.source_iter(q!(vec![1, 2, 3]));
150 /// let p2 = flow.process::<()>();
151 /// let on_p2: Stream<_, Process<_>, Unbounded> = numbers.send(&p2, TCP.fail_stop().bincode());
152 /// // 1, 2, 3
153 /// # on_p2.send(&p_out, TCP.fail_stop().bincode())
154 /// # }, |mut stream| async move {
155 /// # for w in 1..=3 {
156 /// # assert_eq!(stream.next().await, Some(w));
157 /// # }
158 /// # }));
159 /// # }
160 /// ```
161 pub fn send<L2, N: NetworkFor<T>>(
162 self,
163 to: &Process<'a, L2>,
164 via: N,
165 ) -> Stream<T, Process<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
166 where
167 T: Serialize + DeserializeOwned,
168 O: MinOrder<N::OrderingGuarantee>,
169 {
170 let serialize_pipeline = Some(N::serialize_thunk(false));
171 let deserialize_pipeline = Some(N::deserialize_thunk(None));
172
173 let name = via.name();
174 if to.multiversioned() && name.is_none() {
175 panic!(
176 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
177 );
178 }
179
180 Stream::new(
181 to.clone(),
182 HydroNode::Network {
183 name: name.map(ToOwned::to_owned),
184 networking_info: N::networking_info(),
185 serialize_fn: serialize_pipeline.map(|e| e.into()),
186 instantiate_fn: DebugInstantiate::Building,
187 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
188 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
189 metadata: to.new_node_metadata(Stream::<
190 T,
191 Process<'a, L2>,
192 Unbounded,
193 <O as MinOrder<N::OrderingGuarantee>>::Min,
194 R,
195 >::collection_kind()),
196 },
197 )
198 }
199
200 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
201 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
202 /// using [`bincode`] to serialize/deserialize messages.
203 ///
204 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
205 /// membership information. This is a common pattern in distributed systems for broadcasting data to
206 /// all nodes in a cluster. Unlike [`Stream::demux_bincode`], which requires `(MemberId, T)` tuples to
207 /// target specific members, `broadcast_bincode` takes a stream of **only data elements** and sends
208 /// each element to all cluster members.
209 ///
210 /// # Non-Determinism
211 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
212 /// to the current cluster members _at that point in time_. Depending on when we are notified of
213 /// membership changes, we will broadcast each element to different members.
214 ///
215 /// # Example
216 /// ```rust
217 /// # #[cfg(feature = "deploy")] {
218 /// # use hydro_lang::prelude::*;
219 /// # use futures::StreamExt;
220 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
221 /// let p1 = flow.process::<()>();
222 /// let workers: Cluster<()> = flow.cluster::<()>();
223 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
224 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast_bincode(&workers, nondet!(/** assuming stable membership */));
225 /// # on_worker.send_bincode(&p2).entries()
226 /// // if there are 4 members in the cluster, each receives one element
227 /// // - MemberId::<()>(0): [123]
228 /// // - MemberId::<()>(1): [123]
229 /// // - MemberId::<()>(2): [123]
230 /// // - MemberId::<()>(3): [123]
231 /// # }, |mut stream| async move {
232 /// # let mut results = Vec::new();
233 /// # for w in 0..4 {
234 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
235 /// # }
236 /// # results.sort();
237 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
238 /// # }));
239 /// # }
240 /// ```
241 pub fn broadcast_bincode<L2: 'a>(
242 self,
243 other: &Cluster<'a, L2>,
244 nondet_membership: NonDet,
245 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
246 where
247 T: Clone + Serialize + DeserializeOwned,
248 {
249 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
250 }
251
252 /// Broadcasts elements of this stream to all members of a cluster by sending them over the network,
253 /// using the configuration in `via` to set up the message transport.
254 ///
255 /// Each element in the stream will be sent to **every** member of the cluster based on the latest
256 /// membership information. This is a common pattern in distributed systems for broadcasting data to
257 /// all nodes in a cluster. Unlike [`Stream::demux`], which requires `(MemberId, T)` tuples to
258 /// target specific members, `broadcast` takes a stream of **only data elements** and sends
259 /// each element to all cluster members.
260 ///
261 /// # Non-Determinism
262 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
263 /// to the current cluster members _at that point in time_. Depending on when we are notified of
264 /// membership changes, we will broadcast each element to different members.
265 ///
266 /// # Example
267 /// ```rust
268 /// # #[cfg(feature = "deploy")] {
269 /// # use hydro_lang::prelude::*;
270 /// # use futures::StreamExt;
271 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
272 /// let p1 = flow.process::<()>();
273 /// let workers: Cluster<()> = flow.cluster::<()>();
274 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
275 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.broadcast(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
276 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
277 /// // if there are 4 members in the cluster, each receives one element
278 /// // - MemberId::<()>(0): [123]
279 /// // - MemberId::<()>(1): [123]
280 /// // - MemberId::<()>(2): [123]
281 /// // - MemberId::<()>(3): [123]
282 /// # }, |mut stream| async move {
283 /// # let mut results = Vec::new();
284 /// # for w in 0..4 {
285 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
286 /// # }
287 /// # results.sort();
288 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
289 /// # }));
290 /// # }
291 /// ```
292 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
293 self,
294 to: &Cluster<'a, L2>,
295 via: N,
296 nondet_membership: NonDet,
297 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
298 where
299 T: Clone + Serialize + DeserializeOwned,
300 O: MinOrder<N::OrderingGuarantee>,
301 {
302 let ids = track_membership(self.location.source_cluster_members(to));
303 sliced! {
304 let members_snapshot = use(ids, nondet_membership);
305 let elements = use(self, nondet_membership);
306
307 let current_members = members_snapshot.filter(q!(|b| *b));
308 elements.repeat_with_keys(current_members)
309 }
310 .demux(to, via)
311 }
312
313 /// Broadcasts elements of this stream to all members of a cluster,
314 /// assuming membership is closed (fixed at deploy time).
315 ///
316 /// Unlike [`Stream::broadcast`], this does not require a [`NonDet`] guard.
317 /// The membership set is obtained from deploy metadata via
318 /// [`ClusterIds`], producing a
319 /// `Bounded` stream. The cross-product of data × members is fully
320 /// deterministic.
321 ///
322 /// This is only available in deployment targets with static cluster
323 /// membership (legacy Hydro Deploy and simulation). There are no late
324 /// joiners in that context, so broadcast receivers are guaranteed to
325 /// get data from the start of the stream. On dynamic targets
326 /// (e.g. ECS), use [`Stream::broadcast`] instead.
327 ///
328 /// # Example
329 /// ```rust
330 /// # #[cfg(feature = "deploy")] {
331 /// # use hydro_lang::prelude::*;
332 /// # use futures::StreamExt;
333 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
334 /// let p1 = flow.process::<()>();
335 /// let workers: Cluster<()> = flow.cluster::<()>();
336 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![123]));
337 /// let on_worker = numbers.broadcast_closed(&workers, TCP.fail_stop().bincode());
338 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
339 /// // each of the 4 cluster members receives 123
340 /// # }, |mut stream| async move {
341 /// # let mut results = Vec::new();
342 /// # for _ in 0..4 {
343 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
344 /// # }
345 /// # results.sort();
346 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 123)", "(MemberId::<()>(1), 123)", "(MemberId::<()>(2), 123)", "(MemberId::<()>(3), 123)"]);
347 /// # }));
348 /// # }
349 /// ```
350 pub fn broadcast_closed<L2: 'a, N: NetworkFor<T>>(
351 self,
352 to: &Cluster<'a, L2>,
353 via: N,
354 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
355 where
356 T: Clone + Serialize + DeserializeOwned,
357 O: MinOrder<N::OrderingGuarantee>,
358 {
359 let cluster_ids = ClusterIds {
360 key: to.key,
361 _phantom: PhantomData,
362 };
363 let member_ids = self.location.source_iter(q!(cluster_ids
364 .iter()
365 .map(|id| MemberId::from_tagless(id.clone()))));
366
367 // Late joiners will receive no data from this broadcast, which is
368 // future-monotone and eventually consistent (a safe under-approximation).
369 self.cross_product(member_ids.weaken_retries())
370 .map(q!(|(data, member_id)| (member_id, data)))
371 .into_keyed()
372 .demux(to, via)
373 }
374
375 /// Sends the elements of this stream to an external (non-Hydro) process, using [`bincode`]
376 /// serialization. The external process can receive these elements by establishing a TCP
377 /// connection and decoding using [`tokio_util::codec::LengthDelimitedCodec`].
378 ///
379 /// # Example
380 /// ```rust
381 /// # #[cfg(feature = "deploy")] {
382 /// # use hydro_lang::prelude::*;
383 /// # use futures::StreamExt;
384 /// # tokio_test::block_on(async move {
385 /// let mut flow = FlowBuilder::new();
386 /// let process = flow.process::<()>();
387 /// let numbers: Stream<_, Process<_>, Bounded> = process.source_iter(q!(vec![1, 2, 3]));
388 /// let external = flow.external::<()>();
389 /// let external_handle = numbers.send_bincode_external(&external);
390 ///
391 /// let mut deployment = hydro_deploy::Deployment::new();
392 /// let nodes = flow
393 /// .with_process(&process, deployment.Localhost())
394 /// .with_external(&external, deployment.Localhost())
395 /// .deploy(&mut deployment);
396 ///
397 /// deployment.deploy().await.unwrap();
398 /// // establish the TCP connection
399 /// let mut external_recv_stream = nodes.connect(external_handle).await;
400 /// deployment.start().await.unwrap();
401 ///
402 /// for w in 1..=3 {
403 /// assert_eq!(external_recv_stream.next().await, Some(w));
404 /// }
405 /// # });
406 /// # }
407 /// ```
408 pub fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
409 where
410 T: Serialize + DeserializeOwned,
411 {
412 let serialize_pipeline = Some(serialize_bincode::<T>(false));
413
414 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
415
416 let external_port_id = flow_state_borrow.next_external_port();
417
418 flow_state_borrow.push_root(HydroRoot::SendExternal {
419 to_external_key: other.key,
420 to_port_id: external_port_id,
421 to_many: false,
422 unpaired: true,
423 serialize_fn: serialize_pipeline.map(|e| e.into()),
424 instantiate_fn: DebugInstantiate::Building,
425 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
426 op_metadata: HydroIrOpMetadata::new(),
427 });
428
429 ExternalBincodeStream {
430 process_key: other.key,
431 port_id: external_port_id,
432 _phantom: PhantomData,
433 }
434 }
435
436 #[cfg(feature = "sim")]
437 /// Sets up a simulation output port for this stream, allowing test code to receive elements
438 /// sent to this stream during simulation.
439 pub fn sim_output(self) -> SimReceiver<T, O, R>
440 where
441 T: Serialize + DeserializeOwned,
442 {
443 let external_location: External<'a, ()> = External {
444 key: LocationKey::FIRST,
445 flow_state: self.location.flow_state().clone(),
446 _phantom: PhantomData,
447 };
448
449 let external = self.send_bincode_external(&external_location);
450
451 SimReceiver(external.port_id, PhantomData)
452 }
453}
454
455impl<'a, T, L: Location<'a> + NoTick, B: Boundedness> Stream<T, L, B, TotalOrder, ExactlyOnce> {
456 /// Creates an external output for embedded deployment mode.
457 ///
458 /// The `name` parameter specifies the name of the field in the generated
459 /// `EmbeddedOutputs` struct that will receive elements from this stream.
460 /// The generated function will accept an `EmbeddedOutputs` struct with an
461 /// `impl FnMut(T)` field with this name.
462 pub fn embedded_output(self, name: impl Into<String>) {
463 let ident = syn::Ident::new(&name.into(), proc_macro2::Span::call_site());
464
465 self.location
466 .flow_state()
467 .borrow_mut()
468 .push_root(HydroRoot::EmbeddedOutput {
469 ident,
470 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
471 op_metadata: HydroIrOpMetadata::new(),
472 });
473 }
474}
475
476impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
477 Stream<(MemberId<L2>, T), Process<'a, L>, B, O, R>
478{
479 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
480 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
481 /// using [`bincode`] to serialize/deserialize messages.
482 ///
483 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
484 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
485 /// this API allows precise targeting of specific cluster members rather than broadcasting to
486 /// all members.
487 ///
488 /// # Example
489 /// ```rust
490 /// # #[cfg(feature = "deploy")] {
491 /// # use hydro_lang::prelude::*;
492 /// # use futures::StreamExt;
493 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
494 /// let p1 = flow.process::<()>();
495 /// let workers: Cluster<()> = flow.cluster::<()>();
496 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
497 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
498 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
499 /// .demux_bincode(&workers);
500 /// # on_worker.send_bincode(&p2).entries()
501 /// // if there are 4 members in the cluster, each receives one element
502 /// // - MemberId::<()>(0): [0]
503 /// // - MemberId::<()>(1): [1]
504 /// // - MemberId::<()>(2): [2]
505 /// // - MemberId::<()>(3): [3]
506 /// # }, |mut stream| async move {
507 /// # let mut results = Vec::new();
508 /// # for w in 0..4 {
509 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
510 /// # }
511 /// # results.sort();
512 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
513 /// # }));
514 /// # }
515 /// ```
516 pub fn demux_bincode(
517 self,
518 other: &Cluster<'a, L2>,
519 ) -> Stream<T, Cluster<'a, L2>, Unbounded, O, R>
520 where
521 T: Serialize + DeserializeOwned,
522 {
523 self.demux(other, TCP.fail_stop().bincode())
524 }
525
526 /// Sends elements of this stream to specific members of a cluster, identified by a [`MemberId`],
527 /// using the configuration in `via` to set up the message transport.
528 ///
529 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
530 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
531 /// this API allows precise targeting of specific cluster members rather than broadcasting to
532 /// all members.
533 ///
534 /// # Example
535 /// ```rust
536 /// # #[cfg(feature = "deploy")] {
537 /// # use hydro_lang::prelude::*;
538 /// # use futures::StreamExt;
539 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
540 /// let p1 = flow.process::<()>();
541 /// let workers: Cluster<()> = flow.cluster::<()>();
542 /// let numbers: Stream<_, Process<_>, _> = p1.source_iter(q!(vec![0, 1, 2, 3]));
543 /// let on_worker: Stream<_, Cluster<_>, _> = numbers
544 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)))
545 /// .demux(&workers, TCP.fail_stop().bincode());
546 /// # on_worker.send(&p2, TCP.fail_stop().bincode()).entries()
547 /// // if there are 4 members in the cluster, each receives one element
548 /// // - MemberId::<()>(0): [0]
549 /// // - MemberId::<()>(1): [1]
550 /// // - MemberId::<()>(2): [2]
551 /// // - MemberId::<()>(3): [3]
552 /// # }, |mut stream| async move {
553 /// # let mut results = Vec::new();
554 /// # for w in 0..4 {
555 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
556 /// # }
557 /// # results.sort();
558 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 0)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 2)", "(MemberId::<()>(3), 3)"]);
559 /// # }));
560 /// # }
561 /// ```
562 pub fn demux<N: NetworkFor<T>>(
563 self,
564 to: &Cluster<'a, L2>,
565 via: N,
566 ) -> Stream<T, Cluster<'a, L2>, Unbounded, <O as MinOrder<N::OrderingGuarantee>>::Min, R>
567 where
568 T: Serialize + DeserializeOwned,
569 O: MinOrder<N::OrderingGuarantee>,
570 {
571 self.into_keyed().demux(to, via)
572 }
573}
574
575impl<'a, T, L, B: Boundedness> Stream<T, Process<'a, L>, B, TotalOrder, ExactlyOnce> {
576 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
577 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
578 /// [`bincode`] to serialize/deserialize messages.
579 ///
580 /// This provides load balancing by evenly distributing work across cluster members. The
581 /// distribution is deterministic based on element order - the first element goes to member 0,
582 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
583 ///
584 /// # Non-Determinism
585 /// The set of cluster members may asynchronously change over time. Each element is distributed
586 /// based on the current cluster membership _at that point in time_. Depending on when cluster
587 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
588 /// membership is stable, the order of members in the round-robin pattern may change across runs.
589 ///
590 /// # Ordering Requirements
591 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
592 /// order of messages and retries affects the round-robin pattern.
593 ///
594 /// # Example
595 /// ```rust
596 /// # #[cfg(feature = "deploy")] {
597 /// # use hydro_lang::prelude::*;
598 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
599 /// # use futures::StreamExt;
600 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
601 /// let p1 = flow.process::<()>();
602 /// let workers: Cluster<()> = flow.cluster::<()>();
603 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
604 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers, nondet!(/** assuming stable membership */));
605 /// on_worker.send_bincode(&p2)
606 /// # .first().values() // we use first to assert that each member gets one element
607 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
608 /// // - MemberId::<()>(?): [1]
609 /// // - MemberId::<()>(?): [2]
610 /// // - MemberId::<()>(?): [3]
611 /// // - MemberId::<()>(?): [4]
612 /// # }, |mut stream| async move {
613 /// # let mut results = Vec::new();
614 /// # for w in 0..4 {
615 /// # results.push(stream.next().await.unwrap());
616 /// # }
617 /// # results.sort();
618 /// # assert_eq!(results, vec![1, 2, 3, 4]);
619 /// # }));
620 /// # }
621 /// ```
622 pub fn round_robin_bincode<L2: 'a>(
623 self,
624 other: &Cluster<'a, L2>,
625 nondet_membership: NonDet,
626 ) -> Stream<T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
627 where
628 T: Serialize + DeserializeOwned,
629 {
630 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
631 }
632
633 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
634 /// the configuration in `via` to set up the message transport.
635 ///
636 /// This provides load balancing by evenly distributing work across cluster members. The
637 /// distribution is deterministic based on element order - the first element goes to member 0,
638 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
639 ///
640 /// # Non-Determinism
641 /// The set of cluster members may asynchronously change over time. Each element is distributed
642 /// based on the current cluster membership _at that point in time_. Depending on when cluster
643 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
644 /// membership is stable, the order of members in the round-robin pattern may change across runs.
645 ///
646 /// # Ordering Requirements
647 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
648 /// order of messages and retries affects the round-robin pattern.
649 ///
650 /// # Example
651 /// ```rust
652 /// # #[cfg(feature = "deploy")] {
653 /// # use hydro_lang::prelude::*;
654 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce};
655 /// # use futures::StreamExt;
656 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
657 /// let p1 = flow.process::<()>();
658 /// let workers: Cluster<()> = flow.cluster::<()>();
659 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(vec![1, 2, 3, 4]));
660 /// let on_worker: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
661 /// on_worker.send(&p2, TCP.fail_stop().bincode())
662 /// # .first().values() // we use first to assert that each member gets one element
663 /// // with 4 cluster members, elements are distributed (with a non-deterministic round-robin order):
664 /// // - MemberId::<()>(?): [1]
665 /// // - MemberId::<()>(?): [2]
666 /// // - MemberId::<()>(?): [3]
667 /// // - MemberId::<()>(?): [4]
668 /// # }, |mut stream| async move {
669 /// # let mut results = Vec::new();
670 /// # for w in 0..4 {
671 /// # results.push(stream.next().await.unwrap());
672 /// # }
673 /// # results.sort();
674 /// # assert_eq!(results, vec![1, 2, 3, 4]);
675 /// # }));
676 /// # }
677 /// ```
678 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
679 self,
680 to: &Cluster<'a, L2>,
681 via: N,
682 nondet_membership: NonDet,
683 ) -> Stream<T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
684 where
685 T: Serialize + DeserializeOwned,
686 {
687 let ids = track_membership(self.location.source_cluster_members(to));
688 sliced! {
689 let members_snapshot = use(ids, nondet_membership);
690 let elements = use(self.enumerate(), nondet_membership);
691
692 let current_members = members_snapshot
693 .filter(q!(|b| *b))
694 .keys()
695 .assume_ordering::<TotalOrder>(nondet_membership)
696 .collect_vec();
697
698 elements
699 .cross_singleton(current_members)
700 .filter_map(q!(|(data, members)| {
701 if members.is_empty() {
702 None
703 } else {
704 Some((members[data.0 % members.len()].clone(), data.1))
705 }
706 }))
707 }
708 .demux(to, via)
709 }
710}
711
712impl<'a, T, L, B: Boundedness> Stream<T, Cluster<'a, L>, B, TotalOrder, ExactlyOnce> {
713 #[deprecated = "use Stream::round_robin(..., TCP.fail_stop().bincode()) instead"]
714 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
715 /// [`bincode`] to serialize/deserialize messages.
716 ///
717 /// This provides load balancing by evenly distributing work across cluster members. The
718 /// distribution is deterministic based on element order - the first element goes to member 0,
719 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
720 ///
721 /// # Non-Determinism
722 /// The set of cluster members may asynchronously change over time. Each element is distributed
723 /// based on the current cluster membership _at that point in time_. Depending on when cluster
724 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
725 /// membership is stable, the order of members in the round-robin pattern may change across runs.
726 ///
727 /// # Ordering Requirements
728 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
729 /// order of messages and retries affects the round-robin pattern.
730 ///
731 /// # Example
732 /// ```rust
733 /// # #[cfg(feature = "deploy")] {
734 /// # use hydro_lang::prelude::*;
735 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
736 /// # use hydro_lang::location::MemberId;
737 /// # use futures::StreamExt;
738 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
739 /// let p1 = flow.process::<()>();
740 /// let workers1: Cluster<()> = flow.cluster::<()>();
741 /// let workers2: Cluster<()> = flow.cluster::<()>();
742 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
743 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin_bincode(&workers1, nondet!(/** assuming stable membership */));
744 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin_bincode(&workers2, nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
745 /// on_worker2.send_bincode(&p2)
746 /// # .entries()
747 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
748 /// # }, |mut stream| async move {
749 /// # let mut results = Vec::new();
750 /// # let mut locations = std::collections::HashSet::new();
751 /// # for w in 0..=16 {
752 /// # let (location, v) = stream.next().await.unwrap();
753 /// # locations.insert(location);
754 /// # results.push(v);
755 /// # }
756 /// # results.sort();
757 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
758 /// # assert_eq!(locations.len(), 16);
759 /// # }));
760 /// # }
761 /// ```
762 pub fn round_robin_bincode<L2: 'a>(
763 self,
764 other: &Cluster<'a, L2>,
765 nondet_membership: NonDet,
766 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, TotalOrder, ExactlyOnce>
767 where
768 T: Serialize + DeserializeOwned,
769 {
770 self.round_robin(other, TCP.fail_stop().bincode(), nondet_membership)
771 }
772
773 /// Distributes elements of this stream to cluster members in a round-robin fashion, using
774 /// the configuration in `via` to set up the message transport.
775 ///
776 /// This provides load balancing by evenly distributing work across cluster members. The
777 /// distribution is deterministic based on element order - the first element goes to member 0,
778 /// the second to member 1, and so on, wrapping around when reaching the end of the member list.
779 ///
780 /// # Non-Determinism
781 /// The set of cluster members may asynchronously change over time. Each element is distributed
782 /// based on the current cluster membership _at that point in time_. Depending on when cluster
783 /// members join and leave, the round-robin pattern will change. Furthermore, even when the
784 /// membership is stable, the order of members in the round-robin pattern may change across runs.
785 ///
786 /// # Ordering Requirements
787 /// This method is only available on streams with [`TotalOrder`] and [`ExactlyOnce`], since the
788 /// order of messages and retries affects the round-robin pattern.
789 ///
790 /// # Example
791 /// ```rust
792 /// # #[cfg(feature = "deploy")] {
793 /// # use hydro_lang::prelude::*;
794 /// # use hydro_lang::live_collections::stream::{TotalOrder, ExactlyOnce, NoOrder};
795 /// # use hydro_lang::location::MemberId;
796 /// # use futures::StreamExt;
797 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
798 /// let p1 = flow.process::<()>();
799 /// let workers1: Cluster<()> = flow.cluster::<()>();
800 /// let workers2: Cluster<()> = flow.cluster::<()>();
801 /// let numbers: Stream<_, Process<_>, _, TotalOrder, ExactlyOnce> = p1.source_iter(q!(0..=16));
802 /// let on_worker1: Stream<_, Cluster<_>, _> = numbers.round_robin(&workers1, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
803 /// let on_worker2: Stream<_, Cluster<_>, _> = on_worker1.round_robin(&workers2, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */)).entries().assume_ordering(nondet!(/** assuming stable membership */));
804 /// on_worker2.send(&p2, TCP.fail_stop().bincode())
805 /// # .entries()
806 /// # .map(q!(|(w2, (w1, v))| ((w2, w1), v)))
807 /// # }, |mut stream| async move {
808 /// # let mut results = Vec::new();
809 /// # let mut locations = std::collections::HashSet::new();
810 /// # for w in 0..=16 {
811 /// # let (location, v) = stream.next().await.unwrap();
812 /// # locations.insert(location);
813 /// # results.push(v);
814 /// # }
815 /// # results.sort();
816 /// # assert_eq!(results, (0..=16).collect::<Vec<_>>());
817 /// # assert_eq!(locations.len(), 16);
818 /// # }));
819 /// # }
820 /// ```
821 pub fn round_robin<L2: 'a, N: NetworkFor<T>>(
822 self,
823 to: &Cluster<'a, L2>,
824 via: N,
825 nondet_membership: NonDet,
826 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, N::OrderingGuarantee, ExactlyOnce>
827 where
828 T: Serialize + DeserializeOwned,
829 {
830 let ids = track_membership(self.location.source_cluster_members(to));
831 sliced! {
832 let members_snapshot = use(ids, nondet_membership);
833 let elements = use(self.enumerate(), nondet_membership);
834
835 let current_members = members_snapshot
836 .filter(q!(|b| *b))
837 .keys()
838 .assume_ordering::<TotalOrder>(nondet_membership)
839 .collect_vec();
840
841 elements
842 .cross_singleton(current_members)
843 .filter_map(q!(|(data, members)| {
844 if members.is_empty() {
845 None
846 } else {
847 Some((members[data.0 % members.len()].clone(), data.1))
848 }
849 }))
850 }
851 .demux(to, via)
852 }
853}
854
855impl<'a, T, L, B: Boundedness, O: Ordering, R: Retries> Stream<T, Cluster<'a, L>, B, O, R> {
856 #[deprecated = "use Stream::send(..., TCP.fail_stop().bincode()) instead"]
857 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
858 /// using [`bincode`] to serialize/deserialize messages.
859 ///
860 /// Each cluster member sends its local stream elements, and they are collected at the destination
861 /// as a [`KeyedStream`] where keys identify the source cluster member.
862 ///
863 /// # Example
864 /// ```rust
865 /// # #[cfg(feature = "deploy")] {
866 /// # use hydro_lang::prelude::*;
867 /// # use futures::StreamExt;
868 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
869 /// let workers: Cluster<()> = flow.cluster::<()>();
870 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
871 /// let all_received = numbers.send_bincode(&process); // KeyedStream<MemberId<()>, i32, ...>
872 /// # all_received.entries()
873 /// # }, |mut stream| async move {
874 /// // if there are 4 members in the cluster, we should receive 4 elements
875 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
876 /// # let mut results = Vec::new();
877 /// # for w in 0..4 {
878 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
879 /// # }
880 /// # results.sort();
881 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
882 /// # }));
883 /// # }
884 /// ```
885 ///
886 /// If you don't need to know the source for each element, you can use `.values()`
887 /// to get just the data:
888 /// ```rust
889 /// # #[cfg(feature = "deploy")] {
890 /// # use hydro_lang::prelude::*;
891 /// # use hydro_lang::live_collections::stream::NoOrder;
892 /// # use futures::StreamExt;
893 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
894 /// # let workers: Cluster<()> = flow.cluster::<()>();
895 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
896 /// let values: Stream<i32, _, _, NoOrder> = numbers.send_bincode(&process).values();
897 /// # values
898 /// # }, |mut stream| async move {
899 /// # let mut results = Vec::new();
900 /// # for w in 0..4 {
901 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
902 /// # }
903 /// # results.sort();
904 /// // if there are 4 members in the cluster, we should receive 4 elements
905 /// // 1, 1, 1, 1
906 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
907 /// # }));
908 /// # }
909 /// ```
910 pub fn send_bincode<L2>(
911 self,
912 other: &Process<'a, L2>,
913 ) -> KeyedStream<MemberId<L>, T, Process<'a, L2>, Unbounded, O, R>
914 where
915 T: Serialize + DeserializeOwned,
916 {
917 self.send(other, TCP.fail_stop().bincode())
918 }
919
920 /// "Moves" elements of this stream from a cluster to a process by sending them over the network,
921 /// using the configuration in `via` to set up the message transport.
922 ///
923 /// Each cluster member sends its local stream elements, and they are collected at the destination
924 /// as a [`KeyedStream`] where keys identify the source cluster member.
925 ///
926 /// # Example
927 /// ```rust
928 /// # #[cfg(feature = "deploy")] {
929 /// # use hydro_lang::prelude::*;
930 /// # use futures::StreamExt;
931 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
932 /// let workers: Cluster<()> = flow.cluster::<()>();
933 /// let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
934 /// let all_received = numbers.send(&process, TCP.fail_stop().bincode()); // KeyedStream<MemberId<()>, i32, ...>
935 /// # all_received.entries()
936 /// # }, |mut stream| async move {
937 /// // if there are 4 members in the cluster, we should receive 4 elements
938 /// // { MemberId::<()>(0): [1], MemberId::<()>(1): [1], MemberId::<()>(2): [1], MemberId::<()>(3): [1] }
939 /// # let mut results = Vec::new();
940 /// # for w in 0..4 {
941 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
942 /// # }
943 /// # results.sort();
944 /// # assert_eq!(results, vec!["(MemberId::<()>(0), 1)", "(MemberId::<()>(1), 1)", "(MemberId::<()>(2), 1)", "(MemberId::<()>(3), 1)"]);
945 /// # }));
946 /// # }
947 /// ```
948 ///
949 /// If you don't need to know the source for each element, you can use `.values()`
950 /// to get just the data:
951 /// ```rust
952 /// # #[cfg(feature = "deploy")] {
953 /// # use hydro_lang::prelude::*;
954 /// # use hydro_lang::live_collections::stream::NoOrder;
955 /// # use futures::StreamExt;
956 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, process| {
957 /// # let workers: Cluster<()> = flow.cluster::<()>();
958 /// # let numbers: Stream<_, Cluster<_>, _> = workers.source_iter(q!(vec![1]));
959 /// let values: Stream<i32, _, _, NoOrder> =
960 /// numbers.send(&process, TCP.fail_stop().bincode()).values();
961 /// # values
962 /// # }, |mut stream| async move {
963 /// # let mut results = Vec::new();
964 /// # for w in 0..4 {
965 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
966 /// # }
967 /// # results.sort();
968 /// // if there are 4 members in the cluster, we should receive 4 elements
969 /// // 1, 1, 1, 1
970 /// # assert_eq!(results, vec!["1", "1", "1", "1"]);
971 /// # }));
972 /// # }
973 /// ```
974 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
975 pub fn send<L2, N: NetworkFor<T>>(
976 self,
977 to: &Process<'a, L2>,
978 via: N,
979 ) -> KeyedStream<
980 MemberId<L>,
981 T,
982 Process<'a, L2>,
983 Unbounded,
984 <O as MinOrder<N::OrderingGuarantee>>::Min,
985 R,
986 >
987 where
988 T: Serialize + DeserializeOwned,
989 O: MinOrder<N::OrderingGuarantee>,
990 {
991 let serialize_pipeline = Some(N::serialize_thunk(false));
992
993 let deserialize_pipeline = Some(N::deserialize_thunk(Some("e_type::<L>())));
994
995 let name = via.name();
996 if to.multiversioned() && name.is_none() {
997 panic!(
998 "Cannot send to a multiversioned location without a channel name. Please provide a name for the network."
999 );
1000 }
1001
1002 let raw_stream: Stream<
1003 (MemberId<L>, T),
1004 Process<'a, L2>,
1005 Unbounded,
1006 <O as MinOrder<N::OrderingGuarantee>>::Min,
1007 R,
1008 > = Stream::new(
1009 to.clone(),
1010 HydroNode::Network {
1011 name: name.map(ToOwned::to_owned),
1012 networking_info: N::networking_info(),
1013 serialize_fn: serialize_pipeline.map(|e| e.into()),
1014 instantiate_fn: DebugInstantiate::Building,
1015 deserialize_fn: deserialize_pipeline.map(|e| e.into()),
1016 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1017 metadata: to.new_node_metadata(Stream::<
1018 (MemberId<L>, T),
1019 Process<'a, L2>,
1020 Unbounded,
1021 <O as MinOrder<N::OrderingGuarantee>>::Min,
1022 R,
1023 >::collection_kind()),
1024 },
1025 );
1026
1027 raw_stream.into_keyed()
1028 }
1029
1030 #[deprecated = "use Stream::broadcast(..., TCP.fail_stop().bincode()) instead"]
1031 /// Broadcasts elements of this stream at each source member to all members of a destination
1032 /// cluster, using [`bincode`] to serialize/deserialize messages.
1033 ///
1034 /// Each source member sends each of its stream elements to **every** member of the cluster
1035 /// based on its latest membership information. Unlike [`Stream::demux_bincode`], which requires
1036 /// `(MemberId, T)` tuples to target specific members, `broadcast_bincode` takes a stream of
1037 /// **only data elements** and sends each element to all cluster members.
1038 ///
1039 /// # Non-Determinism
1040 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1041 /// to the current cluster members known _at that point in time_ at the source member. Depending
1042 /// on when each source member is notified of membership changes, it will broadcast each element
1043 /// to different members.
1044 ///
1045 /// # Example
1046 /// ```rust
1047 /// # #[cfg(feature = "deploy")] {
1048 /// # use hydro_lang::prelude::*;
1049 /// # use hydro_lang::location::MemberId;
1050 /// # use futures::StreamExt;
1051 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1052 /// # type Source = ();
1053 /// # type Destination = ();
1054 /// let source: Cluster<Source> = flow.cluster::<Source>();
1055 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1056 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1057 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast_bincode(&destination, nondet!(/** assuming stable membership */));
1058 /// # on_destination.entries().send_bincode(&p2).entries()
1059 /// // if there are 4 members in the desination, each receives one element from each source member
1060 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1061 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1062 /// // - ...
1063 /// # }, |mut stream| async move {
1064 /// # let mut results = Vec::new();
1065 /// # for w in 0..16 {
1066 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1067 /// # }
1068 /// # results.sort();
1069 /// # assert_eq!(results, vec![
1070 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1071 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1072 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1073 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1074 /// # ]);
1075 /// # }));
1076 /// # }
1077 /// ```
1078 pub fn broadcast_bincode<L2: 'a>(
1079 self,
1080 other: &Cluster<'a, L2>,
1081 nondet_membership: NonDet,
1082 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1083 where
1084 T: Clone + Serialize + DeserializeOwned,
1085 {
1086 self.broadcast(other, TCP.fail_stop().bincode(), nondet_membership)
1087 }
1088
1089 /// Broadcasts elements of this stream at each source member to all members of a destination
1090 /// cluster, using the configuration in `via` to set up the message transport.
1091 ///
1092 /// Each source member sends each of its stream elements to **every** member of the cluster
1093 /// based on its latest membership information. Unlike [`Stream::demux`], which requires
1094 /// `(MemberId, T)` tuples to target specific members, `broadcast` takes a stream of
1095 /// **only data elements** and sends each element to all cluster members.
1096 ///
1097 /// # Non-Determinism
1098 /// The set of cluster members may asynchronously change over time. Each element is only broadcast
1099 /// to the current cluster members known _at that point in time_ at the source member. Depending
1100 /// on when each source member is notified of membership changes, it will broadcast each element
1101 /// to different members.
1102 ///
1103 /// # Example
1104 /// ```rust
1105 /// # #[cfg(feature = "deploy")] {
1106 /// # use hydro_lang::prelude::*;
1107 /// # use hydro_lang::location::MemberId;
1108 /// # use futures::StreamExt;
1109 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1110 /// # type Source = ();
1111 /// # type Destination = ();
1112 /// let source: Cluster<Source> = flow.cluster::<Source>();
1113 /// let numbers: Stream<_, Cluster<Source>, _> = source.source_iter(q!(vec![123]));
1114 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1115 /// let on_destination: KeyedStream<MemberId<Source>, _, Cluster<Destination>, _> = numbers.broadcast(&destination, TCP.fail_stop().bincode(), nondet!(/** assuming stable membership */));
1116 /// # on_destination.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1117 /// // if there are 4 members in the desination, each receives one element from each source member
1118 /// // - Destination(0): { Source(0): [123], Source(1): [123], ... }
1119 /// // - Destination(1): { Source(0): [123], Source(1): [123], ... }
1120 /// // - ...
1121 /// # }, |mut stream| async move {
1122 /// # let mut results = Vec::new();
1123 /// # for w in 0..16 {
1124 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1125 /// # }
1126 /// # results.sort();
1127 /// # assert_eq!(results, vec![
1128 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 123))", "(MemberId::<()>(0), (MemberId::<()>(1), 123))", "(MemberId::<()>(0), (MemberId::<()>(2), 123))", "(MemberId::<()>(0), (MemberId::<()>(3), 123))",
1129 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 123))", "(MemberId::<()>(1), (MemberId::<()>(1), 123))", "(MemberId::<()>(1), (MemberId::<()>(2), 123))", "(MemberId::<()>(1), (MemberId::<()>(3), 123))",
1130 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 123))", "(MemberId::<()>(2), (MemberId::<()>(1), 123))", "(MemberId::<()>(2), (MemberId::<()>(2), 123))", "(MemberId::<()>(2), (MemberId::<()>(3), 123))",
1131 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 123))", "(MemberId::<()>(3), (MemberId::<()>(1), 123))", "(MemberId::<()>(3), (MemberId::<()>(2), 123))", "(MemberId::<()>(3), (MemberId::<()>(3), 123))"
1132 /// # ]);
1133 /// # }));
1134 /// # }
1135 /// ```
1136 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1137 pub fn broadcast<L2: 'a, N: NetworkFor<T>>(
1138 self,
1139 to: &Cluster<'a, L2>,
1140 via: N,
1141 nondet_membership: NonDet,
1142 ) -> KeyedStream<
1143 MemberId<L>,
1144 T,
1145 Cluster<'a, L2>,
1146 Unbounded,
1147 <O as MinOrder<N::OrderingGuarantee>>::Min,
1148 R,
1149 >
1150 where
1151 T: Clone + Serialize + DeserializeOwned,
1152 O: MinOrder<N::OrderingGuarantee>,
1153 {
1154 let ids = track_membership(self.location.source_cluster_members(to));
1155 sliced! {
1156 let members_snapshot = use(ids, nondet_membership);
1157 let elements = use(self, nondet_membership);
1158
1159 let current_members = members_snapshot.filter(q!(|b| *b));
1160 elements.repeat_with_keys(current_members)
1161 }
1162 .demux(to, via)
1163 }
1164
1165 #[cfg(feature = "sim")]
1166 /// Sends elements of this cluster stream to an external location using bincode serialization.
1167 fn send_bincode_external<L2>(self, other: &External<L2>) -> ExternalBincodeStream<T, O, R>
1168 where
1169 T: Serialize + DeserializeOwned,
1170 {
1171 let serialize_pipeline = Some(serialize_bincode::<T>(false));
1172
1173 let mut flow_state_borrow = self.location.flow_state().borrow_mut();
1174
1175 let external_port_id = flow_state_borrow.next_external_port();
1176
1177 flow_state_borrow.push_root(HydroRoot::SendExternal {
1178 to_external_key: other.key,
1179 to_port_id: external_port_id,
1180 to_many: false,
1181 unpaired: true,
1182 serialize_fn: serialize_pipeline.map(|e| e.into()),
1183 instantiate_fn: DebugInstantiate::Building,
1184 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1185 op_metadata: HydroIrOpMetadata::new(),
1186 });
1187
1188 ExternalBincodeStream {
1189 process_key: other.key,
1190 port_id: external_port_id,
1191 _phantom: PhantomData,
1192 }
1193 }
1194
1195 #[cfg(feature = "sim")]
1196 /// Sets up a simulation output port for this cluster stream, allowing test code
1197 /// to receive `(member_id, T)` pairs during simulation.
1198 pub fn sim_cluster_output(self) -> crate::sim::SimClusterReceiver<T, O, R>
1199 where
1200 T: Serialize + DeserializeOwned,
1201 {
1202 let external_location: External<'a, ()> = External {
1203 key: LocationKey::FIRST,
1204 flow_state: self.location.flow_state().clone(),
1205 _phantom: PhantomData,
1206 };
1207
1208 let external = self.send_bincode_external(&external_location);
1209
1210 crate::sim::SimClusterReceiver(external.port_id, PhantomData)
1211 }
1212}
1213
1214impl<'a, T, L, L2, B: Boundedness, O: Ordering, R: Retries>
1215 Stream<(MemberId<L2>, T), Cluster<'a, L>, B, O, R>
1216{
1217 #[deprecated = "use Stream::demux(..., TCP.fail_stop().bincode()) instead"]
1218 /// Sends elements of this stream at each source member to specific members of a destination
1219 /// cluster, identified by a [`MemberId`], using [`bincode`] to serialize/deserialize messages.
1220 ///
1221 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1222 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast_bincode`],
1223 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1224 /// all members.
1225 ///
1226 /// Each cluster member sends its local stream elements, and they are collected at each
1227 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1228 ///
1229 /// # Example
1230 /// ```rust
1231 /// # #[cfg(feature = "deploy")] {
1232 /// # use hydro_lang::prelude::*;
1233 /// # use futures::StreamExt;
1234 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1235 /// # type Source = ();
1236 /// # type Destination = ();
1237 /// let source: Cluster<Source> = flow.cluster::<Source>();
1238 /// let to_send: Stream<_, Cluster<_>, _> = source
1239 /// .source_iter(q!(vec![0, 1, 2, 3]))
1240 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1241 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1242 /// let all_received = to_send.demux_bincode(&destination); // KeyedStream<MemberId<Source>, i32, ...>
1243 /// # all_received.entries().send_bincode(&p2).entries()
1244 /// # }, |mut stream| async move {
1245 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1246 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1247 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1248 /// // - ...
1249 /// # let mut results = Vec::new();
1250 /// # for w in 0..16 {
1251 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1252 /// # }
1253 /// # results.sort();
1254 /// # assert_eq!(results, vec![
1255 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1256 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1257 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1258 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1259 /// # ]);
1260 /// # }));
1261 /// # }
1262 /// ```
1263 pub fn demux_bincode(
1264 self,
1265 other: &Cluster<'a, L2>,
1266 ) -> KeyedStream<MemberId<L>, T, Cluster<'a, L2>, Unbounded, O, R>
1267 where
1268 T: Serialize + DeserializeOwned,
1269 {
1270 self.demux(other, TCP.fail_stop().bincode())
1271 }
1272
1273 /// Sends elements of this stream at each source member to specific members of a destination
1274 /// cluster, identified by a [`MemberId`], using the configuration in `via` to set up the
1275 /// message transport.
1276 ///
1277 /// Each element in the stream must be a tuple `(MemberId<L2>, T)` where the first element
1278 /// specifies which cluster member should receive the data. Unlike [`Stream::broadcast`],
1279 /// this API allows precise targeting of specific cluster members rather than broadcasting to
1280 /// all members.
1281 ///
1282 /// Each cluster member sends its local stream elements, and they are collected at each
1283 /// destination member as a [`KeyedStream`] where keys identify the source cluster member.
1284 ///
1285 /// # Example
1286 /// ```rust
1287 /// # #[cfg(feature = "deploy")] {
1288 /// # use hydro_lang::prelude::*;
1289 /// # use futures::StreamExt;
1290 /// # tokio_test::block_on(hydro_lang::test_util::multi_location_test(|flow, p2| {
1291 /// # type Source = ();
1292 /// # type Destination = ();
1293 /// let source: Cluster<Source> = flow.cluster::<Source>();
1294 /// let to_send: Stream<_, Cluster<_>, _> = source
1295 /// .source_iter(q!(vec![0, 1, 2, 3]))
1296 /// .map(q!(|x| (hydro_lang::location::MemberId::from_raw_id(x), x)));
1297 /// let destination: Cluster<Destination> = flow.cluster::<Destination>();
1298 /// let all_received = to_send.demux(&destination, TCP.fail_stop().bincode()); // KeyedStream<MemberId<Source>, i32, ...>
1299 /// # all_received.entries().send(&p2, TCP.fail_stop().bincode()).entries()
1300 /// # }, |mut stream| async move {
1301 /// // if there are 4 members in the destination cluster, each receives one message from each source member
1302 /// // - Destination(0): { Source(0): [0], Source(1): [0], ... }
1303 /// // - Destination(1): { Source(0): [1], Source(1): [1], ... }
1304 /// // - ...
1305 /// # let mut results = Vec::new();
1306 /// # for w in 0..16 {
1307 /// # results.push(format!("{:?}", stream.next().await.unwrap()));
1308 /// # }
1309 /// # results.sort();
1310 /// # assert_eq!(results, vec![
1311 /// # "(MemberId::<()>(0), (MemberId::<()>(0), 0))", "(MemberId::<()>(0), (MemberId::<()>(1), 0))", "(MemberId::<()>(0), (MemberId::<()>(2), 0))", "(MemberId::<()>(0), (MemberId::<()>(3), 0))",
1312 /// # "(MemberId::<()>(1), (MemberId::<()>(0), 1))", "(MemberId::<()>(1), (MemberId::<()>(1), 1))", "(MemberId::<()>(1), (MemberId::<()>(2), 1))", "(MemberId::<()>(1), (MemberId::<()>(3), 1))",
1313 /// # "(MemberId::<()>(2), (MemberId::<()>(0), 2))", "(MemberId::<()>(2), (MemberId::<()>(1), 2))", "(MemberId::<()>(2), (MemberId::<()>(2), 2))", "(MemberId::<()>(2), (MemberId::<()>(3), 2))",
1314 /// # "(MemberId::<()>(3), (MemberId::<()>(0), 3))", "(MemberId::<()>(3), (MemberId::<()>(1), 3))", "(MemberId::<()>(3), (MemberId::<()>(2), 3))", "(MemberId::<()>(3), (MemberId::<()>(3), 3))"
1315 /// # ]);
1316 /// # }));
1317 /// # }
1318 /// ```
1319 #[expect(clippy::type_complexity, reason = "MinOrder projection in return type")]
1320 pub fn demux<N: NetworkFor<T>>(
1321 self,
1322 to: &Cluster<'a, L2>,
1323 via: N,
1324 ) -> KeyedStream<
1325 MemberId<L>,
1326 T,
1327 Cluster<'a, L2>,
1328 Unbounded,
1329 <O as MinOrder<N::OrderingGuarantee>>::Min,
1330 R,
1331 >
1332 where
1333 T: Serialize + DeserializeOwned,
1334 O: MinOrder<N::OrderingGuarantee>,
1335 {
1336 self.into_keyed().demux(to, via)
1337 }
1338}
1339
1340#[cfg(test)]
1341mod tests {
1342 #[cfg(feature = "sim")]
1343 use stageleft::q;
1344
1345 #[cfg(feature = "sim")]
1346 use crate::live_collections::sliced::sliced;
1347 #[cfg(feature = "sim")]
1348 use crate::location::{Location, MemberId};
1349 #[cfg(feature = "sim")]
1350 use crate::networking::TCP;
1351 #[cfg(feature = "sim")]
1352 use crate::nondet::nondet;
1353 #[cfg(feature = "sim")]
1354 use crate::prelude::FlowBuilder;
1355
1356 #[cfg(feature = "sim")]
1357 #[test]
1358 fn sim_send_bincode_o2o() {
1359 use crate::networking::TCP;
1360
1361 let mut flow = FlowBuilder::new();
1362 let node = flow.process::<()>();
1363 let node2 = flow.process::<()>();
1364
1365 let (in_send, input) = node.sim_input();
1366
1367 let out_recv = input
1368 .send(&node2, TCP.fail_stop().bincode())
1369 .batch(&node2.tick(), nondet!(/** test */))
1370 .count()
1371 .all_ticks()
1372 .sim_output();
1373
1374 let instances = flow.sim().exhaustive(async || {
1375 in_send.send(());
1376 in_send.send(());
1377 in_send.send(());
1378
1379 let received = out_recv.collect::<Vec<_>>().await;
1380 assert!(received.into_iter().sum::<usize>() == 3);
1381 });
1382
1383 assert_eq!(instances, 4); // 2^{3 - 1}
1384 }
1385
1386 #[cfg(feature = "sim")]
1387 #[test]
1388 fn sim_send_bincode_m2o() {
1389 let mut flow = FlowBuilder::new();
1390 let cluster = flow.cluster::<()>();
1391 let node = flow.process::<()>();
1392
1393 let input = cluster.source_iter(q!(vec![1]));
1394
1395 let out_recv = input
1396 .send(&node, TCP.fail_stop().bincode())
1397 .entries()
1398 .batch(&node.tick(), nondet!(/** test */))
1399 .all_ticks()
1400 .sim_output();
1401
1402 let instances = flow
1403 .sim()
1404 .with_cluster_size(&cluster, 4)
1405 .exhaustive(async || {
1406 out_recv
1407 .assert_yields_only_unordered(vec![
1408 (MemberId::from_raw_id(0), 1),
1409 (MemberId::from_raw_id(1), 1),
1410 (MemberId::from_raw_id(2), 1),
1411 (MemberId::from_raw_id(3), 1),
1412 ])
1413 .await
1414 });
1415
1416 assert_eq!(instances, 75); // ∑ (k=1 to 4) S(4,k) × k! = 75
1417 }
1418
1419 #[cfg(feature = "sim")]
1420 #[test]
1421 fn sim_send_bincode_multiple_m2o() {
1422 let mut flow = FlowBuilder::new();
1423 let cluster1 = flow.cluster::<()>();
1424 let cluster2 = flow.cluster::<()>();
1425 let node = flow.process::<()>();
1426
1427 let out_recv_1 = cluster1
1428 .source_iter(q!(vec![1]))
1429 .send(&node, TCP.fail_stop().bincode())
1430 .entries()
1431 .sim_output();
1432
1433 let out_recv_2 = cluster2
1434 .source_iter(q!(vec![2]))
1435 .send(&node, TCP.fail_stop().bincode())
1436 .entries()
1437 .sim_output();
1438
1439 let instances = flow
1440 .sim()
1441 .with_cluster_size(&cluster1, 3)
1442 .with_cluster_size(&cluster2, 4)
1443 .exhaustive(async || {
1444 out_recv_1
1445 .assert_yields_only_unordered(vec![
1446 (MemberId::from_raw_id(0), 1),
1447 (MemberId::from_raw_id(1), 1),
1448 (MemberId::from_raw_id(2), 1),
1449 ])
1450 .await;
1451
1452 out_recv_2
1453 .assert_yields_only_unordered(vec![
1454 (MemberId::from_raw_id(0), 2),
1455 (MemberId::from_raw_id(1), 2),
1456 (MemberId::from_raw_id(2), 2),
1457 (MemberId::from_raw_id(3), 2),
1458 ])
1459 .await;
1460 });
1461
1462 assert_eq!(instances, 1);
1463 }
1464
1465 #[cfg(feature = "sim")]
1466 #[test]
1467 fn sim_send_bincode_o2m() {
1468 let mut flow = FlowBuilder::new();
1469 let cluster = flow.cluster::<()>();
1470 let node = flow.process::<()>();
1471
1472 let input = node.source_iter(q!(vec![
1473 (MemberId::from_raw_id(0), 123),
1474 (MemberId::from_raw_id(1), 456),
1475 ]));
1476
1477 let out_recv = input
1478 .demux(&cluster, TCP.fail_stop().bincode())
1479 .map(q!(|x| x + 1))
1480 .send(&node, TCP.fail_stop().bincode())
1481 .entries()
1482 .sim_output();
1483
1484 flow.sim()
1485 .with_cluster_size(&cluster, 4)
1486 .exhaustive(async || {
1487 out_recv
1488 .assert_yields_only_unordered(vec![
1489 (MemberId::from_raw_id(0), 124),
1490 (MemberId::from_raw_id(1), 457),
1491 ])
1492 .await
1493 });
1494 }
1495
1496 #[cfg(feature = "sim")]
1497 #[test]
1498 fn sim_broadcast_bincode_o2m() {
1499 let mut flow = FlowBuilder::new();
1500 let cluster = flow.cluster::<()>();
1501 let node = flow.process::<()>();
1502
1503 let input = node.source_iter(q!(vec![123, 456]));
1504
1505 let out_recv = input
1506 .broadcast(&cluster, TCP.fail_stop().bincode(), nondet!(/** test */))
1507 .map(q!(|x| x + 1))
1508 .send(&node, TCP.fail_stop().bincode())
1509 .entries()
1510 .sim_output();
1511
1512 let mut c_1_produced = false;
1513 let mut c_2_produced = false;
1514 let mut c_1_saw_457_but_not_124 = false;
1515
1516 flow.sim()
1517 .with_cluster_size(&cluster, 2)
1518 .exhaustive(async || {
1519 let all_out = out_recv.collect_sorted::<Vec<_>>().await;
1520
1521 // check that order is preserved
1522 if all_out.contains(&(MemberId::from_raw_id(0), 124)) {
1523 assert!(all_out.contains(&(MemberId::from_raw_id(0), 457)));
1524 c_1_produced = true;
1525 }
1526
1527 if all_out.contains(&(MemberId::from_raw_id(1), 124)) {
1528 assert!(all_out.contains(&(MemberId::from_raw_id(1), 457)));
1529 c_2_produced = true;
1530 }
1531
1532 if all_out.contains(&(MemberId::from_raw_id(0), 457))
1533 && !all_out.contains(&(MemberId::from_raw_id(0), 124))
1534 {
1535 c_1_saw_457_but_not_124 = true;
1536 }
1537 });
1538
1539 assert!(c_1_produced && c_2_produced); // in at least one execution each, the cluster member received both messages
1540
1541 // in at least one execution, the cluster member received 457 but not 124, this tests
1542 // that the simulator properly explores dynamic membership additions (a member that joins after 123 is broadcast)
1543 assert!(c_1_saw_457_but_not_124);
1544 }
1545
1546 #[cfg(feature = "sim")]
1547 #[test]
1548 fn sim_send_bincode_m2m() {
1549 let mut flow = FlowBuilder::new();
1550 let cluster = flow.cluster::<()>();
1551 let node = flow.process::<()>();
1552
1553 let input = node.source_iter(q!(vec![
1554 (MemberId::from_raw_id(0), 123),
1555 (MemberId::from_raw_id(1), 456),
1556 ]));
1557
1558 let out_recv = input
1559 .demux(&cluster, TCP.fail_stop().bincode())
1560 .map(q!(|x| x + 1))
1561 .flat_map_ordered(q!(|x| vec![
1562 (MemberId::from_raw_id(0), x),
1563 (MemberId::from_raw_id(1), x),
1564 ]))
1565 .demux(&cluster, TCP.fail_stop().bincode())
1566 .entries()
1567 .send(&node, TCP.fail_stop().bincode())
1568 .entries()
1569 .sim_output();
1570
1571 flow.sim()
1572 .with_cluster_size(&cluster, 4)
1573 .exhaustive(async || {
1574 out_recv
1575 .assert_yields_only_unordered(vec![
1576 (MemberId::from_raw_id(0), (MemberId::from_raw_id(0), 124)),
1577 (MemberId::from_raw_id(0), (MemberId::from_raw_id(1), 457)),
1578 (MemberId::from_raw_id(1), (MemberId::from_raw_id(0), 124)),
1579 (MemberId::from_raw_id(1), (MemberId::from_raw_id(1), 457)),
1580 ])
1581 .await
1582 });
1583 }
1584
1585 #[cfg(feature = "sim")]
1586 #[test]
1587 fn sim_lossy_delayed_forever_o2o() {
1588 use std::collections::HashSet;
1589
1590 use crate::properties::manual_proof;
1591
1592 let mut flow = FlowBuilder::new();
1593 let node = flow.process::<()>();
1594 let node2 = flow.process::<()>();
1595
1596 let received = node
1597 .source_iter(q!(0..3_u32))
1598 .send(&node2, TCP.lossy_delayed_forever().bincode())
1599 .fold(
1600 q!(|| std::collections::HashSet::<u32>::new()),
1601 q!(
1602 |set, v| {
1603 set.insert(v);
1604 },
1605 commutative = manual_proof!(/** set insert is commutative */)
1606 ),
1607 );
1608
1609 let out_recv = sliced! {
1610 let snapshot = use(received, nondet!(/** test */));
1611 snapshot.into_stream()
1612 }
1613 .sim_output();
1614
1615 let mut saw_non_contiguous = false;
1616
1617 flow.sim().test_safety_only().exhaustive(async || {
1618 let snapshots = out_recv.collect::<Vec<HashSet<u32>>>().await;
1619
1620 // Check each individual snapshot for a non-contiguous subset.
1621 for set in &snapshots {
1622 #[expect(clippy::disallowed_methods, reason = "min / max are deterministic")]
1623 if set.len() >= 2 && set.len() < 3 {
1624 let min = *set.iter().min().unwrap();
1625 let max = *set.iter().max().unwrap();
1626 if set.len() < (max - min + 1) as usize {
1627 saw_non_contiguous = true;
1628 }
1629 }
1630 }
1631 });
1632
1633 assert!(
1634 saw_non_contiguous,
1635 "Expected at least one execution with a non-contiguous subset of inputs"
1636 );
1637 }
1638
1639 #[cfg(feature = "sim")]
1640 #[test]
1641 fn sim_broadcast_closed_o2m() {
1642 let mut flow = FlowBuilder::new();
1643 let cluster = flow.cluster::<()>();
1644 let node = flow.process::<()>();
1645
1646 let input = node.source_iter(q!(vec![123, 456]));
1647
1648 let out_recv = input
1649 .broadcast_closed(&cluster, TCP.fail_stop().bincode())
1650 .send(&node, TCP.fail_stop().bincode())
1651 .entries()
1652 .sim_output();
1653
1654 flow.sim()
1655 .with_cluster_size(&cluster, 2)
1656 .exhaustive(async || {
1657 out_recv
1658 .assert_yields_only_unordered(vec![
1659 (MemberId::from_raw_id(0), 123),
1660 (MemberId::from_raw_id(0), 456),
1661 (MemberId::from_raw_id(1), 123),
1662 (MemberId::from_raw_id(1), 456),
1663 ])
1664 .await
1665 });
1666 }
1667}