hydro_lang/live_collections/keyed_stream/mod.rs
1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::marker::PhantomData;
6
7use stageleft::{IntoQuotedMut, QuotedWithContext, q};
8
9use super::boundedness::{Bounded, Boundedness, Unbounded};
10use super::keyed_singleton::KeyedSingleton;
11use super::optional::Optional;
12use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::HydroNode;
14use crate::forward_handle::ForwardRef;
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::live_collections::stream::{Ordering, Retries};
18use crate::location::dynamic::LocationId;
19use crate::location::tick::NoAtomic;
20use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
21use crate::manual_expr::ManualExpr;
22use crate::nondet::{NonDet, nondet};
23
24pub mod networking;
25
26/// Streaming elements of type `V` grouped by a key of type `K`.
27///
28/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
29/// order of keys is non-deterministic but the order *within* each group may be deterministic.
30///
31/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
32/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
33/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
34///
35/// Type Parameters:
36/// - `K`: the type of the key for each group
37/// - `V`: the type of the elements inside each group
38/// - `Loc`: the [`Location`] where the keyed stream is materialized
39/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
40/// - `Order`: tracks whether the elements within each group have deterministic order
41/// ([`TotalOrder`]) or not ([`NoOrder`])
42/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
43/// ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
44pub struct KeyedStream<
45 K,
46 V,
47 Loc,
48 Bound: Boundedness,
49 Order: Ordering = TotalOrder,
50 Retry: Retries = ExactlyOnce,
51> {
52 pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retry>,
53 pub(crate) _phantom_order: PhantomData<Order>,
54}
55
56impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
57 for KeyedStream<K, V, L, B, NoOrder, R>
58where
59 L: Location<'a>,
60{
61 fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
62 KeyedStream {
63 underlying: stream.underlying,
64 _phantom_order: Default::default(),
65 }
66 }
67}
68
69impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
70 Clone for KeyedStream<K, V, Loc, Bound, Order, R>
71{
72 fn clone(&self) -> Self {
73 KeyedStream {
74 underlying: self.underlying.clone(),
75 _phantom_order: PhantomData,
76 }
77 }
78}
79
80impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
81 for KeyedStream<K, V, L, B, O, R>
82where
83 L: Location<'a> + NoTick,
84{
85 type Location = L;
86
87 fn create_source(ident: syn::Ident, location: L) -> Self {
88 Stream::create_source(ident, location).into_keyed()
89 }
90}
91
92impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
93 for KeyedStream<K, V, L, B, O, R>
94where
95 L: Location<'a> + NoTick,
96{
97 fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98 self.underlying.complete(ident, expected_location);
99 }
100}
101
102impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
103 KeyedStream<K, V, L, B, O, R>
104{
105 /// Explicitly "casts" the keyed stream to a type with a different ordering
106 /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
107 /// by the type-system.
108 ///
109 /// # Non-Determinism
110 /// This function is used as an escape hatch, and any mistakes in the
111 /// provided ordering guarantee will propagate into the guarantees
112 /// for the rest of the program.
113 pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
114 KeyedStream {
115 underlying: self.underlying,
116 _phantom_order: PhantomData,
117 }
118 }
119
120 /// Explicitly "casts" the keyed stream to a type with a different retries
121 /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
122 /// be proven by the type-system.
123 ///
124 /// # Non-Determinism
125 /// This function is used as an escape hatch, and any mistakes in the
126 /// provided retries guarantee will propagate into the guarantees
127 /// for the rest of the program.
128 pub fn assume_retries<R2: Retries>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
129 KeyedStream {
130 underlying: self.underlying.assume_retries::<R2>(nondet),
131 _phantom_order: PhantomData,
132 }
133 }
134
135 /// Flattens the keyed stream into an unordered stream of key-value pairs.
136 ///
137 /// # Example
138 /// ```rust
139 /// # use hydro_lang::prelude::*;
140 /// # use futures::StreamExt;
141 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
142 /// process
143 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
144 /// .into_keyed()
145 /// .entries()
146 /// # }, |mut stream| async move {
147 /// // (1, 2), (1, 3), (2, 4) in any order
148 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
149 /// # assert_eq!(stream.next().await.unwrap(), w);
150 /// # }
151 /// # }));
152 /// ```
153 pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
154 self.underlying
155 }
156
157 /// Flattens the keyed stream into an unordered stream of only the values.
158 ///
159 /// # Example
160 /// ```rust
161 /// # use hydro_lang::prelude::*;
162 /// # use futures::StreamExt;
163 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
164 /// process
165 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
166 /// .into_keyed()
167 /// .values()
168 /// # }, |mut stream| async move {
169 /// // 2, 3, 4 in any order
170 /// # for w in vec![2, 3, 4] {
171 /// # assert_eq!(stream.next().await.unwrap(), w);
172 /// # }
173 /// # }));
174 /// ```
175 pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
176 self.underlying.map(q!(|(_, v)| v))
177 }
178
179 /// Transforms each value by invoking `f` on each element, with keys staying the same
180 /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
181 ///
182 /// If you do not want to modify the stream and instead only want to view
183 /// each item use [`KeyedStream::inspect`] instead.
184 ///
185 /// # Example
186 /// ```rust
187 /// # use hydro_lang::prelude::*;
188 /// # use futures::StreamExt;
189 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
190 /// process
191 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
192 /// .into_keyed()
193 /// .map(q!(|v| v + 1))
194 /// # .entries()
195 /// # }, |mut stream| async move {
196 /// // { 1: [3, 4], 2: [5] }
197 /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
198 /// # assert_eq!(stream.next().await.unwrap(), w);
199 /// # }
200 /// # }));
201 /// ```
202 pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
203 where
204 F: Fn(V) -> U + 'a,
205 {
206 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
207 KeyedStream {
208 underlying: self.underlying.map(q!({
209 let orig = f;
210 move |(k, v)| (k, orig(v))
211 })),
212 _phantom_order: Default::default(),
213 }
214 }
215
216 /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
217 /// re-grouped even they are tuples; instead they will be grouped under the original key.
218 ///
219 /// If you do not want to modify the stream and instead only want to view
220 /// each item use [`KeyedStream::inspect_with_key`] instead.
221 ///
222 /// # Example
223 /// ```rust
224 /// # use hydro_lang::prelude::*;
225 /// # use futures::StreamExt;
226 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
227 /// process
228 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
229 /// .into_keyed()
230 /// .map_with_key(q!(|(k, v)| k + v))
231 /// # .entries()
232 /// # }, |mut stream| async move {
233 /// // { 1: [3, 4], 2: [6] }
234 /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
235 /// # assert_eq!(stream.next().await.unwrap(), w);
236 /// # }
237 /// # }));
238 /// ```
239 pub fn map_with_key<U, F>(
240 self,
241 f: impl IntoQuotedMut<'a, F, L> + Copy,
242 ) -> KeyedStream<K, U, L, B, O, R>
243 where
244 F: Fn((K, V)) -> U + 'a,
245 K: Clone,
246 {
247 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
248 KeyedStream {
249 underlying: self.underlying.map(q!({
250 let orig = f;
251 move |(k, v)| {
252 let out = orig((k.clone(), v));
253 (k, out)
254 }
255 })),
256 _phantom_order: Default::default(),
257 }
258 }
259
260 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
261 /// `f`, preserving the order of the elements within the group.
262 ///
263 /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
264 /// not modify or take ownership of the values. If you need to modify the values while filtering
265 /// use [`KeyedStream::filter_map`] instead.
266 ///
267 /// # Example
268 /// ```rust
269 /// # use hydro_lang::prelude::*;
270 /// # use futures::StreamExt;
271 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
272 /// process
273 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
274 /// .into_keyed()
275 /// .filter(q!(|&x| x > 2))
276 /// # .entries()
277 /// # }, |mut stream| async move {
278 /// // { 1: [3], 2: [4] }
279 /// # for w in vec![(1, 3), (2, 4)] {
280 /// # assert_eq!(stream.next().await.unwrap(), w);
281 /// # }
282 /// # }));
283 /// ```
284 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
285 where
286 F: Fn(&V) -> bool + 'a,
287 {
288 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
289 KeyedStream {
290 underlying: self.underlying.filter(q!({
291 let orig = f;
292 move |(_k, v)| orig(v)
293 })),
294 _phantom_order: Default::default(),
295 }
296 }
297
298 /// Creates a stream containing only the elements of each group stream that satisfy a predicate
299 /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
300 ///
301 /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
302 /// not modify or take ownership of the values. If you need to modify the values while filtering
303 /// use [`KeyedStream::filter_map_with_key`] instead.
304 ///
305 /// # Example
306 /// ```rust
307 /// # use hydro_lang::prelude::*;
308 /// # use futures::StreamExt;
309 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310 /// process
311 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
312 /// .into_keyed()
313 /// .filter_with_key(q!(|&(k, v)| v - k == 2))
314 /// # .entries()
315 /// # }, |mut stream| async move {
316 /// // { 1: [3], 2: [4] }
317 /// # for w in vec![(1, 3), (2, 4)] {
318 /// # assert_eq!(stream.next().await.unwrap(), w);
319 /// # }
320 /// # }));
321 /// ```
322 pub fn filter_with_key<F>(
323 self,
324 f: impl IntoQuotedMut<'a, F, L> + Copy,
325 ) -> KeyedStream<K, V, L, B, O, R>
326 where
327 F: Fn(&(K, V)) -> bool + 'a,
328 {
329 KeyedStream {
330 underlying: self.underlying.filter(f),
331 _phantom_order: Default::default(),
332 }
333 }
334
335 /// An operator that both filters and maps each value, with keys staying the same.
336 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
337 /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
338 ///
339 /// # Example
340 /// ```rust
341 /// # use hydro_lang::prelude::*;
342 /// # use futures::StreamExt;
343 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
344 /// process
345 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
346 /// .into_keyed()
347 /// .filter_map(q!(|s| s.parse::<usize>().ok()))
348 /// # .entries()
349 /// # }, |mut stream| async move {
350 /// // { 1: [2], 2: [4] }
351 /// # for w in vec![(1, 2), (2, 4)] {
352 /// # assert_eq!(stream.next().await.unwrap(), w);
353 /// # }
354 /// # }));
355 /// ```
356 pub fn filter_map<U, F>(
357 self,
358 f: impl IntoQuotedMut<'a, F, L> + Copy,
359 ) -> KeyedStream<K, U, L, B, O, R>
360 where
361 F: Fn(V) -> Option<U> + 'a,
362 {
363 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
364 KeyedStream {
365 underlying: self.underlying.filter_map(q!({
366 let orig = f;
367 move |(k, v)| orig(v).map(|o| (k, o))
368 })),
369 _phantom_order: Default::default(),
370 }
371 }
372
373 /// An operator that both filters and maps each key-value pair. The resulting values are **not**
374 /// re-grouped even they are tuples; instead they will be grouped under the original key.
375 /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
376 ///
377 /// # Example
378 /// ```rust
379 /// # use hydro_lang::prelude::*;
380 /// # use futures::StreamExt;
381 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
382 /// process
383 /// .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
384 /// .into_keyed()
385 /// .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
386 /// # .entries()
387 /// # }, |mut stream| async move {
388 /// // { 2: [2] }
389 /// # for w in vec![(2, 2)] {
390 /// # assert_eq!(stream.next().await.unwrap(), w);
391 /// # }
392 /// # }));
393 /// ```
394 pub fn filter_map_with_key<U, F>(
395 self,
396 f: impl IntoQuotedMut<'a, F, L> + Copy,
397 ) -> KeyedStream<K, U, L, B, O, R>
398 where
399 F: Fn((K, V)) -> Option<U> + 'a,
400 K: Clone,
401 {
402 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
403 KeyedStream {
404 underlying: self.underlying.filter_map(q!({
405 let orig = f;
406 move |(k, v)| {
407 let out = orig((k.clone(), v));
408 out.map(|o| (k, o))
409 }
410 })),
411 _phantom_order: Default::default(),
412 }
413 }
414
415 /// An operator which allows you to "inspect" each element of a stream without
416 /// modifying it. The closure `f` is called on a reference to each value. This is
417 /// mainly useful for debugging, and should not be used to generate side-effects.
418 ///
419 /// # Example
420 /// ```rust
421 /// # use hydro_lang::prelude::*;
422 /// # use futures::StreamExt;
423 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
424 /// process
425 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
426 /// .into_keyed()
427 /// .inspect(q!(|v| println!("{}", v)))
428 /// # .entries()
429 /// # }, |mut stream| async move {
430 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
431 /// # assert_eq!(stream.next().await.unwrap(), w);
432 /// # }
433 /// # }));
434 /// ```
435 pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
436 where
437 F: Fn(&V) + 'a,
438 {
439 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
440 KeyedStream {
441 underlying: self.underlying.inspect(q!({
442 let orig = f;
443 move |(_k, v)| orig(v)
444 })),
445 _phantom_order: Default::default(),
446 }
447 }
448
449 /// An operator which allows you to "inspect" each element of a stream without
450 /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
451 /// mainly useful for debugging, and should not be used to generate side-effects.
452 ///
453 /// # Example
454 /// ```rust
455 /// # use hydro_lang::prelude::*;
456 /// # use futures::StreamExt;
457 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458 /// process
459 /// .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
460 /// .into_keyed()
461 /// .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
462 /// # .entries()
463 /// # }, |mut stream| async move {
464 /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
465 /// # assert_eq!(stream.next().await.unwrap(), w);
466 /// # }
467 /// # }));
468 /// ```
469 pub fn inspect_with_key<F>(
470 self,
471 f: impl IntoQuotedMut<'a, F, L>,
472 ) -> KeyedStream<K, V, L, B, O, R>
473 where
474 F: Fn(&(K, V)) + 'a,
475 {
476 KeyedStream {
477 underlying: self.underlying.inspect(f),
478 _phantom_order: Default::default(),
479 }
480 }
481
482 /// An operator which allows you to "name" a `HydroNode`.
483 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
484 pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
485 {
486 let mut node = self.underlying.ir_node.borrow_mut();
487 let metadata = node.metadata_mut();
488 metadata.tag = Some(name.to_string());
489 }
490 self
491 }
492}
493
494impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
495 KeyedStream<K, V, L, Unbounded, O, R>
496{
497 /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
498 /// of any overlapping groups. The result has [`NoOrder`] on each group because the
499 /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
500 /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
501 ///
502 /// Currently, both input streams must be [`Unbounded`].
503 ///
504 /// # Example
505 /// ```rust
506 /// # use hydro_lang::prelude::*;
507 /// # use futures::StreamExt;
508 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
509 /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
510 /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
511 /// numbers1.interleave(numbers2)
512 /// # .entries()
513 /// # }, |mut stream| async move {
514 /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
515 /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
516 /// # assert_eq!(stream.next().await.unwrap(), w);
517 /// # }
518 /// # }));
519 /// ```
520 pub fn interleave<O2: Ordering, R2: Retries>(
521 self,
522 other: KeyedStream<K, V, L, Unbounded, O2, R2>,
523 ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
524 where
525 R: MinRetries<R2>,
526 {
527 self.entries().interleave(other.entries()).into_keyed()
528 }
529}
530
531/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
532/// control the processing of future elements.
533pub enum Generate<T> {
534 /// Emit the provided element, and keep processing future inputs.
535 Yield(T),
536 /// Emit the provided element as the _final_ element, do not process future inputs.
537 Return(T),
538 /// Do not emit anything, but continue processing future inputs.
539 Continue,
540 /// Do not emit anything, and do not process further inputs.
541 Break,
542}
543
544impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
545where
546 K: Eq + Hash,
547 L: Location<'a>,
548{
549 /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
550 ///
551 /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
552 /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
553 /// early by returning `None`.
554 ///
555 /// The function takes a mutable reference to the accumulator and the current element, and returns
556 /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
557 /// If the function returns `None`, the stream is terminated and no more elements are processed.
558 ///
559 /// # Example
560 /// ```rust
561 /// # use hydro_lang::prelude::*;
562 /// # use futures::StreamExt;
563 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
564 /// process
565 /// .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
566 /// .into_keyed()
567 /// .scan(
568 /// q!(|| 0),
569 /// q!(|acc, x| {
570 /// *acc += x;
571 /// if *acc % 2 == 0 { None } else { Some(*acc) }
572 /// }),
573 /// )
574 /// # .entries()
575 /// # }, |mut stream| async move {
576 /// // Output: { 0: [1], 1: [3, 7] }
577 /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
578 /// # assert_eq!(stream.next().await.unwrap(), w);
579 /// # }
580 /// # }));
581 /// ```
582 pub fn scan<A, U, I, F>(
583 self,
584 init: impl IntoQuotedMut<'a, I, L> + Copy,
585 f: impl IntoQuotedMut<'a, F, L> + Copy,
586 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
587 where
588 K: Clone,
589 I: Fn() -> A + 'a,
590 F: Fn(&mut A, V) -> Option<U> + 'a,
591 {
592 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
593 self.generator(
594 init,
595 q!({
596 let orig = f;
597 move |state, v| {
598 if let Some(out) = orig(state, v) {
599 Generate::Yield(out)
600 } else {
601 Generate::Break
602 }
603 }
604 }),
605 )
606 }
607
608 /// Iteratively processes the elements in each group using a state machine that can yield
609 /// elements as it processes its inputs. This is designed to mirror the unstable generator
610 /// syntax in Rust, without requiring special syntax.
611 ///
612 /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
613 /// state for each group. The second argument defines the processing logic, taking in a
614 /// mutable reference to the group's state and the value to be processed. It emits a
615 /// [`Generate`] value, whose variants define what is emitted and whether further inputs
616 /// should be processed.
617 ///
618 /// # Example
619 /// ```rust
620 /// # use hydro_lang::prelude::*;
621 /// # use futures::StreamExt;
622 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
623 /// process
624 /// .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
625 /// .into_keyed()
626 /// .generator(
627 /// q!(|| 0),
628 /// q!(|acc, x| {
629 /// *acc += x;
630 /// if *acc > 100 {
631 /// hydro_lang::live_collections::keyed_stream::Generate::Return(
632 /// "done!".to_string()
633 /// )
634 /// } else if *acc % 2 == 0 {
635 /// hydro_lang::live_collections::keyed_stream::Generate::Yield(
636 /// "even".to_string()
637 /// )
638 /// } else {
639 /// hydro_lang::live_collections::keyed_stream::Generate::Continue
640 /// }
641 /// }),
642 /// )
643 /// # .entries()
644 /// # }, |mut stream| async move {
645 /// // Output: { 0: ["even", "done!"], 1: ["even"] }
646 /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
647 /// # assert_eq!(stream.next().await.unwrap(), w);
648 /// # }
649 /// # }));
650 /// ```
651 pub fn generator<A, U, I, F>(
652 self,
653 init: impl IntoQuotedMut<'a, I, L> + Copy,
654 f: impl IntoQuotedMut<'a, F, L> + Copy,
655 ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
656 where
657 K: Clone,
658 I: Fn() -> A + 'a,
659 F: Fn(&mut A, V) -> Generate<U> + 'a,
660 {
661 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
662 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
663 let underlying_scanned = self
664 .underlying
665 .assume_ordering(nondet!(
666 /** we do not rely on the order of keys */
667 ))
668 .scan(
669 q!(|| HashMap::new()),
670 q!(move |acc, (k, v)| {
671 let existing_state = acc.entry(k.clone()).or_insert_with(|| Some(init()));
672 if let Some(existing_state_value) = existing_state {
673 match f(existing_state_value, v) {
674 Generate::Yield(out) => Some(Some((k, out))),
675 Generate::Return(out) => {
676 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
677 Some(Some((k, out)))
678 }
679 Generate::Break => {
680 let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
681 Some(None)
682 }
683 Generate::Continue => Some(None),
684 }
685 } else {
686 Some(None)
687 }
688 }),
689 )
690 .flatten_ordered();
691
692 KeyedStream {
693 underlying: underlying_scanned.into(),
694 _phantom_order: Default::default(),
695 }
696 }
697
698 /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
699 /// in-order across the values in each group. But the aggregation function returns a boolean,
700 /// which when true indicates that the aggregated result is complete and can be released to
701 /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
702 /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
703 /// normal stream elements.
704 ///
705 /// # Example
706 /// ```rust
707 /// # use hydro_lang::prelude::*;
708 /// # use futures::StreamExt;
709 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
710 /// process
711 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
712 /// .into_keyed()
713 /// .fold_early_stop(
714 /// q!(|| 0),
715 /// q!(|acc, x| {
716 /// *acc += x;
717 /// x % 2 == 0
718 /// }),
719 /// )
720 /// # .entries()
721 /// # }, |mut stream| async move {
722 /// // Output: { 0: 2, 1: 9 }
723 /// # for w in vec![(0, 2), (1, 9)] {
724 /// # assert_eq!(stream.next().await.unwrap(), w);
725 /// # }
726 /// # }));
727 /// ```
728 pub fn fold_early_stop<A, I, F>(
729 self,
730 init: impl IntoQuotedMut<'a, I, L> + Copy,
731 f: impl IntoQuotedMut<'a, F, L> + Copy,
732 ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
733 where
734 K: Clone,
735 I: Fn() -> A + 'a,
736 F: Fn(&mut A, V) -> bool + 'a,
737 {
738 let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
739 let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
740 let out_without_bound_cast = self
741 .generator(
742 q!(move || Some(init())),
743 q!(move |key_state, v| {
744 if let Some(key_state_value) = key_state.as_mut() {
745 if f(key_state_value, v) {
746 Generate::Return(key_state.take().unwrap())
747 } else {
748 Generate::Continue
749 }
750 } else {
751 unreachable!()
752 }
753 }),
754 )
755 .underlying;
756
757 KeyedSingleton {
758 underlying: out_without_bound_cast,
759 }
760 }
761
762 /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
763 /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
764 /// otherwise the first element would be non-deterministic.
765 ///
766 /// # Example
767 /// ```rust
768 /// # use hydro_lang::prelude::*;
769 /// # use futures::StreamExt;
770 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
771 /// process
772 /// .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
773 /// .into_keyed()
774 /// .first()
775 /// # .entries()
776 /// # }, |mut stream| async move {
777 /// // Output: { 0: 2, 1: 3 }
778 /// # for w in vec![(0, 2), (1, 3)] {
779 /// # assert_eq!(stream.next().await.unwrap(), w);
780 /// # }
781 /// # }));
782 /// ```
783 pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
784 where
785 K: Clone,
786 {
787 self.fold_early_stop(
788 q!(|| None),
789 q!(|acc, v| {
790 *acc = Some(v);
791 true
792 }),
793 )
794 .map(q!(|v| v.unwrap()))
795 }
796
797 /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
798 ///
799 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
800 /// to depend on the order of elements in the group.
801 ///
802 /// If the input and output value types are the same and do not require initialization then use
803 /// [`KeyedStream::reduce`].
804 ///
805 /// # Example
806 /// ```rust
807 /// # use hydro_lang::prelude::*;
808 /// # use futures::StreamExt;
809 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
810 /// let tick = process.tick();
811 /// let numbers = process
812 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
813 /// .into_keyed();
814 /// let batch = numbers.batch(&tick, nondet!(/** test */));
815 /// batch
816 /// .fold(q!(|| 0), q!(|acc, x| *acc += x))
817 /// .entries()
818 /// .all_ticks()
819 /// # }, |mut stream| async move {
820 /// // (1, 5), (2, 7)
821 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
822 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
823 /// # }));
824 /// ```
825 pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
826 self,
827 init: impl IntoQuotedMut<'a, I, L>,
828 comb: impl IntoQuotedMut<'a, F, L>,
829 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
830 let init = init.splice_fn0_ctx(&self.underlying.location).into();
831 let comb = comb
832 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
833 .into();
834
835 let out_ir = HydroNode::FoldKeyed {
836 init,
837 acc: comb,
838 input: Box::new(self.underlying.ir_node.into_inner()),
839 metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
840 };
841
842 KeyedSingleton {
843 underlying: Stream::new(self.underlying.location, out_ir),
844 }
845 }
846
847 /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
848 ///
849 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
850 /// to depend on the order of elements in the stream.
851 ///
852 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
853 ///
854 /// # Example
855 /// ```rust
856 /// # use hydro_lang::prelude::*;
857 /// # use futures::StreamExt;
858 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
859 /// let tick = process.tick();
860 /// let numbers = process
861 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
862 /// .into_keyed();
863 /// let batch = numbers.batch(&tick, nondet!(/** test */));
864 /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
865 /// # }, |mut stream| async move {
866 /// // (1, 5), (2, 7)
867 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
868 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
869 /// # }));
870 /// ```
871 pub fn reduce<F: Fn(&mut V, V) + 'a>(
872 self,
873 comb: impl IntoQuotedMut<'a, F, L>,
874 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
875 let f = comb
876 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
877 .into();
878
879 let out_ir = HydroNode::ReduceKeyed {
880 f,
881 input: Box::new(self.underlying.ir_node.into_inner()),
882 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
883 };
884
885 KeyedSingleton {
886 underlying: Stream::new(self.underlying.location, out_ir),
887 }
888 }
889
890 /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
891 ///
892 /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
893 /// to depend on the order of elements in the stream.
894 ///
895 /// # Example
896 /// ```rust
897 /// # use hydro_lang::prelude::*;
898 /// # use futures::StreamExt;
899 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
900 /// let tick = process.tick();
901 /// let watermark = tick.singleton(q!(1));
902 /// let numbers = process
903 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
904 /// .into_keyed();
905 /// let batch = numbers.batch(&tick, nondet!(/** test */));
906 /// batch
907 /// .reduce_watermark(watermark, q!(|acc, x| *acc += x))
908 /// .entries()
909 /// .all_ticks()
910 /// # }, |mut stream| async move {
911 /// // (2, 204)
912 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
913 /// # }));
914 /// ```
915 pub fn reduce_watermark<O, F>(
916 self,
917 other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
918 comb: impl IntoQuotedMut<'a, F, L>,
919 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
920 where
921 O: Clone,
922 F: Fn(&mut V, V) + 'a,
923 {
924 let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
925 check_matching_location(&self.underlying.location.root(), other.location.outer());
926 let f = comb
927 .splice_fn2_borrow_mut_ctx(&self.underlying.location)
928 .into();
929
930 let out_ir = Stream::new(
931 self.underlying.location.clone(),
932 HydroNode::ReduceKeyedWatermark {
933 f,
934 input: Box::new(self.underlying.ir_node.into_inner()),
935 watermark: Box::new(other.ir_node.into_inner()),
936 metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
937 },
938 );
939
940 KeyedSingleton { underlying: out_ir }
941 }
942}
943
944impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
945where
946 K: Eq + Hash,
947 L: Location<'a>,
948{
949 /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
950 ///
951 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
952 ///
953 /// If the input and output value types are the same and do not require initialization then use
954 /// [`KeyedStream::reduce_commutative`].
955 ///
956 /// # Example
957 /// ```rust
958 /// # use hydro_lang::prelude::*;
959 /// # use futures::StreamExt;
960 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
961 /// let tick = process.tick();
962 /// let numbers = process
963 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
964 /// .into_keyed();
965 /// let batch = numbers.batch(&tick, nondet!(/** test */));
966 /// batch
967 /// .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
968 /// .entries()
969 /// .all_ticks()
970 /// # }, |mut stream| async move {
971 /// // (1, 5), (2, 7)
972 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
973 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
974 /// # }));
975 /// ```
976 pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
977 self,
978 init: impl IntoQuotedMut<'a, I, L>,
979 comb: impl IntoQuotedMut<'a, F, L>,
980 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
981 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
982 .fold(init, comb)
983 }
984
985 /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
986 ///
987 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
988 ///
989 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
990 ///
991 /// # Example
992 /// ```rust
993 /// # use hydro_lang::prelude::*;
994 /// # use futures::StreamExt;
995 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
996 /// let tick = process.tick();
997 /// let numbers = process
998 /// .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
999 /// .into_keyed();
1000 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1001 /// batch
1002 /// .reduce_commutative(q!(|acc, x| *acc += x))
1003 /// .entries()
1004 /// .all_ticks()
1005 /// # }, |mut stream| async move {
1006 /// // (1, 5), (2, 7)
1007 /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1008 /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1009 /// # }));
1010 /// ```
1011 pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1012 self,
1013 comb: impl IntoQuotedMut<'a, F, L>,
1014 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1015 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1016 .reduce(comb)
1017 }
1018
1019 /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1020 ///
1021 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1022 ///
1023 /// # Example
1024 /// ```rust
1025 /// # use hydro_lang::prelude::*;
1026 /// # use futures::StreamExt;
1027 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028 /// let tick = process.tick();
1029 /// let watermark = tick.singleton(q!(1));
1030 /// let numbers = process
1031 /// .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1032 /// .into_keyed();
1033 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1034 /// batch
1035 /// .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1036 /// .entries()
1037 /// .all_ticks()
1038 /// # }, |mut stream| async move {
1039 /// // (2, 204)
1040 /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1041 /// # }));
1042 /// ```
1043 pub fn reduce_watermark_commutative<O2, F>(
1044 self,
1045 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1046 comb: impl IntoQuotedMut<'a, F, L>,
1047 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1048 where
1049 O2: Clone,
1050 F: Fn(&mut V, V) + 'a,
1051 {
1052 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1053 .reduce_watermark(other, comb)
1054 }
1055}
1056
1057impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1058where
1059 K: Eq + Hash,
1060 L: Location<'a>,
1061{
1062 /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1063 ///
1064 /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1065 ///
1066 /// If the input and output value types are the same and do not require initialization then use
1067 /// [`KeyedStream::reduce_idempotent`].
1068 ///
1069 /// # Example
1070 /// ```rust
1071 /// # use hydro_lang::prelude::*;
1072 /// # use futures::StreamExt;
1073 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1074 /// let tick = process.tick();
1075 /// let numbers = process
1076 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1077 /// .into_keyed();
1078 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1079 /// batch
1080 /// .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1081 /// .entries()
1082 /// .all_ticks()
1083 /// # }, |mut stream| async move {
1084 /// // (1, false), (2, true)
1085 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1086 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1087 /// # }));
1088 /// ```
1089 pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1090 self,
1091 init: impl IntoQuotedMut<'a, I, L>,
1092 comb: impl IntoQuotedMut<'a, F, L>,
1093 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1094 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1095 .fold(init, comb)
1096 }
1097
1098 /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1099 ///
1100 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1101 ///
1102 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1103 ///
1104 /// # Example
1105 /// ```rust
1106 /// # use hydro_lang::prelude::*;
1107 /// # use futures::StreamExt;
1108 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1109 /// let tick = process.tick();
1110 /// let numbers = process
1111 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1112 /// .into_keyed();
1113 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1114 /// batch
1115 /// .reduce_idempotent(q!(|acc, x| *acc |= x))
1116 /// .entries()
1117 /// .all_ticks()
1118 /// # }, |mut stream| async move {
1119 /// // (1, false), (2, true)
1120 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1121 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1122 /// # }));
1123 /// ```
1124 pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1125 self,
1126 comb: impl IntoQuotedMut<'a, F, L>,
1127 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1128 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1129 .reduce(comb)
1130 }
1131
1132 /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1133 ///
1134 /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1135 ///
1136 /// # Example
1137 /// ```rust
1138 /// # use hydro_lang::prelude::*;
1139 /// # use futures::StreamExt;
1140 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1141 /// let tick = process.tick();
1142 /// let watermark = tick.singleton(q!(1));
1143 /// let numbers = process
1144 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1145 /// .into_keyed();
1146 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1147 /// batch
1148 /// .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1149 /// .entries()
1150 /// .all_ticks()
1151 /// # }, |mut stream| async move {
1152 /// // (2, true)
1153 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1154 /// # }));
1155 /// ```
1156 pub fn reduce_watermark_idempotent<O2, F>(
1157 self,
1158 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1159 comb: impl IntoQuotedMut<'a, F, L>,
1160 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1161 where
1162 O2: Clone,
1163 F: Fn(&mut V, V) + 'a,
1164 {
1165 self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1166 .reduce_watermark(other, comb)
1167 }
1168}
1169
1170impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1171where
1172 K: Eq + Hash,
1173 L: Location<'a>,
1174{
1175 /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1176 ///
1177 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1178 /// as there may be non-deterministic duplicates.
1179 ///
1180 /// If the input and output value types are the same and do not require initialization then use
1181 /// [`KeyedStream::reduce_commutative_idempotent`].
1182 ///
1183 /// # Example
1184 /// ```rust
1185 /// # use hydro_lang::prelude::*;
1186 /// # use futures::StreamExt;
1187 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1188 /// let tick = process.tick();
1189 /// let numbers = process
1190 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1191 /// .into_keyed();
1192 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1193 /// batch
1194 /// .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1195 /// .entries()
1196 /// .all_ticks()
1197 /// # }, |mut stream| async move {
1198 /// // (1, false), (2, true)
1199 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1200 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1201 /// # }));
1202 /// ```
1203 pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1204 self,
1205 init: impl IntoQuotedMut<'a, I, L>,
1206 comb: impl IntoQuotedMut<'a, F, L>,
1207 ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1208 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1209 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1210 .fold(init, comb)
1211 }
1212
1213 /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1214 ///
1215 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1216 /// as there may be non-deterministic duplicates.
1217 ///
1218 /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1219 ///
1220 /// # Example
1221 /// ```rust
1222 /// # use hydro_lang::prelude::*;
1223 /// # use futures::StreamExt;
1224 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1225 /// let tick = process.tick();
1226 /// let numbers = process
1227 /// .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1228 /// .into_keyed();
1229 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1230 /// batch
1231 /// .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1232 /// .entries()
1233 /// .all_ticks()
1234 /// # }, |mut stream| async move {
1235 /// // (1, false), (2, true)
1236 /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1237 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1238 /// # }));
1239 /// ```
1240 pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1241 self,
1242 comb: impl IntoQuotedMut<'a, F, L>,
1243 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1244 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1245 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1246 .reduce(comb)
1247 }
1248
1249 /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1250 ///
1251 /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1252 /// as there may be non-deterministic duplicates.
1253 ///
1254 /// # Example
1255 /// ```rust
1256 /// # use hydro_lang::prelude::*;
1257 /// # use futures::StreamExt;
1258 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1259 /// let tick = process.tick();
1260 /// let watermark = tick.singleton(q!(1));
1261 /// let numbers = process
1262 /// .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1263 /// .into_keyed();
1264 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1265 /// batch
1266 /// .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1267 /// .entries()
1268 /// .all_ticks()
1269 /// # }, |mut stream| async move {
1270 /// // (2, true)
1271 /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1272 /// # }));
1273 /// ```
1274 pub fn reduce_watermark_commutative_idempotent<O2, F>(
1275 self,
1276 other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1277 comb: impl IntoQuotedMut<'a, F, L>,
1278 ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1279 where
1280 O2: Clone,
1281 F: Fn(&mut V, V) + 'a,
1282 {
1283 self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1284 .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1285 .reduce_watermark(other, comb)
1286 }
1287
1288 /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1289 /// whose keys are not in the bounded stream.
1290 ///
1291 /// # Example
1292 /// ```rust
1293 /// # use hydro_lang::prelude::*;
1294 /// # use futures::StreamExt;
1295 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1296 /// let tick = process.tick();
1297 /// let keyed_stream = process
1298 /// .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1299 /// .batch(&tick, nondet!(/** test */))
1300 /// .into_keyed();
1301 /// let keys_to_remove = process
1302 /// .source_iter(q!(vec![1, 2]))
1303 /// .batch(&tick, nondet!(/** test */));
1304 /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1305 /// # .entries()
1306 /// # }, |mut stream| async move {
1307 /// // { 3: ['c'], 4: ['d'] }
1308 /// # for w in vec![(3, 'c'), (4, 'd')] {
1309 /// # assert_eq!(stream.next().await.unwrap(), w);
1310 /// # }
1311 /// # }));
1312 /// ```
1313 pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1314 self,
1315 other: Stream<K, L, Bounded, O2, R2>,
1316 ) -> Self {
1317 KeyedStream {
1318 underlying: self.entries().anti_join(other),
1319 _phantom_order: Default::default(),
1320 }
1321 }
1322}
1323
1324impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1325where
1326 L: Location<'a> + NoTick + NoAtomic,
1327{
1328 /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1329 /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1330 ///
1331 /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1332 /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1333 /// argument that declares where the stream will be atomically processed. Batching a stream into
1334 /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1335 /// [`Tick`] will introduce asynchrony.
1336 pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1337 KeyedStream {
1338 underlying: self.underlying.atomic(tick),
1339 _phantom_order: Default::default(),
1340 }
1341 }
1342
1343 /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1344 /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1345 /// the order of the input.
1346 ///
1347 /// # Non-Determinism
1348 /// The batch boundaries are non-deterministic and may change across executions.
1349 pub fn batch(
1350 self,
1351 tick: &Tick<L>,
1352 nondet: NonDet,
1353 ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1354 self.atomic(tick).batch(nondet)
1355 }
1356}
1357
1358impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1359where
1360 L: Location<'a> + NoTick + NoAtomic,
1361{
1362 /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1363 /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1364 /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1365 /// used to create the atomic section.
1366 ///
1367 /// # Non-Determinism
1368 /// The batch boundaries are non-deterministic and may change across executions.
1369 pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1370 KeyedStream {
1371 underlying: self.underlying.batch(nondet),
1372 _phantom_order: Default::default(),
1373 }
1374 }
1375
1376 /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1377 /// See [`KeyedStream::atomic`] for more details.
1378 pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1379 KeyedStream {
1380 underlying: self.underlying.end_atomic(),
1381 _phantom_order: Default::default(),
1382 }
1383 }
1384}
1385
1386impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1387where
1388 L: Location<'a>,
1389{
1390 /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1391 /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1392 /// is only present in one of the inputs, its values are passed through as-is). The output has
1393 /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1394 ///
1395 /// Currently, both input streams must be [`Bounded`]. This operator will block
1396 /// on the first stream until all its elements are available. In a future version,
1397 /// we will relax the requirement on the `other` stream.
1398 ///
1399 /// # Example
1400 /// ```rust
1401 /// # use hydro_lang::prelude::*;
1402 /// # use futures::StreamExt;
1403 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1404 /// let tick = process.tick();
1405 /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1406 /// let batch = numbers.batch(&tick, nondet!(/** test */));
1407 /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1408 /// # .entries()
1409 /// # }, |mut stream| async move {
1410 /// // { 0: [2, 1], 1: [4, 3] }
1411 /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1412 /// # assert_eq!(stream.next().await.unwrap(), w);
1413 /// # }
1414 /// # }));
1415 /// ```
1416 pub fn chain<O2: Ordering>(
1417 self,
1418 other: KeyedStream<K, V, L, Bounded, O2, R>,
1419 ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>
1420 where
1421 O: MinOrder<O2>,
1422 {
1423 KeyedStream {
1424 underlying: self.underlying.chain(other.underlying),
1425 _phantom_order: Default::default(),
1426 }
1427 }
1428}
1429
1430impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1431where
1432 L: Location<'a>,
1433{
1434 /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1435 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1436 /// each key.
1437 pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1438 KeyedStream {
1439 underlying: self.underlying.all_ticks(),
1440 _phantom_order: Default::default(),
1441 }
1442 }
1443
1444 /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1445 /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1446 /// each key.
1447 ///
1448 /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1449 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1450 /// stream's [`Tick`] context.
1451 pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1452 KeyedStream {
1453 underlying: self.underlying.all_ticks(),
1454 _phantom_order: Default::default(),
1455 }
1456 }
1457
1458 #[expect(missing_docs, reason = "TODO")]
1459 pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1460 KeyedStream {
1461 underlying: self.underlying.defer_tick(),
1462 _phantom_order: Default::default(),
1463 }
1464 }
1465}
1466
1467#[cfg(test)]
1468mod tests {
1469 use futures::{SinkExt, StreamExt};
1470 use hydro_deploy::Deployment;
1471 use stageleft::q;
1472
1473 use crate::compile::builder::FlowBuilder;
1474 use crate::location::Location;
1475 use crate::nondet::nondet;
1476
1477 #[tokio::test]
1478 async fn reduce_watermark_filter() {
1479 let mut deployment = Deployment::new();
1480
1481 let flow = FlowBuilder::new();
1482 let node = flow.process::<()>();
1483 let external = flow.external::<()>();
1484
1485 let node_tick = node.tick();
1486 let watermark = node_tick.singleton(q!(1));
1487
1488 let sum = node
1489 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1490 .into_keyed()
1491 .reduce_watermark(
1492 watermark,
1493 q!(|acc, v| {
1494 *acc += v;
1495 }),
1496 )
1497 .snapshot(&node_tick, nondet!(/** test */))
1498 .entries()
1499 .all_ticks()
1500 .send_bincode_external(&external);
1501
1502 let nodes = flow
1503 .with_process(&node, deployment.Localhost())
1504 .with_external(&external, deployment.Localhost())
1505 .deploy(&mut deployment);
1506
1507 deployment.deploy().await.unwrap();
1508
1509 let mut out = nodes.connect_source_bincode(sum).await;
1510
1511 deployment.start().await.unwrap();
1512
1513 assert_eq!(out.next().await.unwrap(), (2, 204));
1514 }
1515
1516 #[tokio::test]
1517 async fn reduce_watermark_garbage_collect() {
1518 let mut deployment = Deployment::new();
1519
1520 let flow = FlowBuilder::new();
1521 let node = flow.process::<()>();
1522 let external = flow.external::<()>();
1523 let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1524
1525 let node_tick = node.tick();
1526 let (watermark_complete_cycle, watermark) =
1527 node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1528 let next_watermark = watermark.clone().map(q!(|v| v + 1));
1529 watermark_complete_cycle.complete_next_tick(next_watermark);
1530
1531 let tick_triggered_input = node
1532 .source_iter(q!([(3, 103)]))
1533 .batch(&node_tick, nondet!(/** test */))
1534 .filter_if_some(
1535 tick_trigger
1536 .clone()
1537 .batch(&node_tick, nondet!(/** test */))
1538 .first(),
1539 )
1540 .all_ticks();
1541
1542 let sum = node
1543 .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1544 .interleave(tick_triggered_input)
1545 .into_keyed()
1546 .reduce_watermark_commutative(
1547 watermark,
1548 q!(|acc, v| {
1549 *acc += v;
1550 }),
1551 )
1552 .snapshot(&node_tick, nondet!(/** test */))
1553 .entries()
1554 .all_ticks()
1555 .send_bincode_external(&external);
1556
1557 let nodes = flow
1558 .with_default_optimize()
1559 .with_process(&node, deployment.Localhost())
1560 .with_external(&external, deployment.Localhost())
1561 .deploy(&mut deployment);
1562
1563 deployment.deploy().await.unwrap();
1564
1565 let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1566 let mut out_recv = nodes.connect_source_bincode(sum).await;
1567
1568 deployment.start().await.unwrap();
1569
1570 assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1571
1572 tick_send.send(()).await.unwrap();
1573
1574 assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1575 }
1576}