hydro_lang/live_collections/keyed_stream/
mod.rs

1//! Definitions for the [`KeyedStream`] live collection.
2
3use std::collections::HashMap;
4use std::hash::Hash;
5use std::marker::PhantomData;
6
7use stageleft::{IntoQuotedMut, QuotedWithContext, q};
8
9use super::boundedness::{Bounded, Boundedness, Unbounded};
10use super::keyed_singleton::KeyedSingleton;
11use super::optional::Optional;
12use super::stream::{ExactlyOnce, MinOrder, MinRetries, NoOrder, Stream, TotalOrder};
13use crate::compile::ir::HydroNode;
14use crate::forward_handle::ForwardRef;
15#[cfg(stageleft_runtime)]
16use crate::forward_handle::{CycleCollection, ReceiverComplete};
17use crate::live_collections::stream::{Ordering, Retries};
18use crate::location::dynamic::LocationId;
19use crate::location::tick::NoAtomic;
20use crate::location::{Atomic, Location, NoTick, Tick, check_matching_location};
21use crate::manual_expr::ManualExpr;
22use crate::nondet::{NonDet, nondet};
23
24pub mod networking;
25
26/// Streaming elements of type `V` grouped by a key of type `K`.
27///
28/// Keyed Streams capture streaming elements of type `V` grouped by a key of type `K`, where the
29/// order of keys is non-deterministic but the order *within* each group may be deterministic.
30///
31/// Although keyed streams are conceptually grouped by keys, values are not immediately grouped
32/// into buckets when constructing a keyed stream. Instead, keyed streams defer grouping until an
33/// operator such as [`KeyedStream::fold`] is called, which requires `K: Hash + Eq`.
34///
35/// Type Parameters:
36/// - `K`: the type of the key for each group
37/// - `V`: the type of the elements inside each group
38/// - `Loc`: the [`Location`] where the keyed stream is materialized
39/// - `Bound`: tracks whether the entries are [`Bounded`] (local and finite) or [`Unbounded`] (asynchronous and possibly infinite)
40/// - `Order`: tracks whether the elements within each group have deterministic order
41///   ([`TotalOrder`]) or not ([`NoOrder`])
42/// - `Retries`: tracks whether the elements within each group have deterministic cardinality
43///   ([`ExactlyOnce`]) or may have non-deterministic retries ([`crate::live_collections::stream::AtLeastOnce`])
44pub struct KeyedStream<
45    K,
46    V,
47    Loc,
48    Bound: Boundedness,
49    Order: Ordering = TotalOrder,
50    Retry: Retries = ExactlyOnce,
51> {
52    pub(crate) underlying: Stream<(K, V), Loc, Bound, NoOrder, Retry>,
53    pub(crate) _phantom_order: PhantomData<Order>,
54}
55
56impl<'a, K, V, L, B: Boundedness, R: Retries> From<KeyedStream<K, V, L, B, TotalOrder, R>>
57    for KeyedStream<K, V, L, B, NoOrder, R>
58where
59    L: Location<'a>,
60{
61    fn from(stream: KeyedStream<K, V, L, B, TotalOrder, R>) -> KeyedStream<K, V, L, B, NoOrder, R> {
62        KeyedStream {
63            underlying: stream.underlying,
64            _phantom_order: Default::default(),
65        }
66    }
67}
68
69impl<'a, K: Clone, V: Clone, Loc: Location<'a>, Bound: Boundedness, Order: Ordering, R: Retries>
70    Clone for KeyedStream<K, V, Loc, Bound, Order, R>
71{
72    fn clone(&self) -> Self {
73        KeyedStream {
74            underlying: self.underlying.clone(),
75            _phantom_order: PhantomData,
76        }
77    }
78}
79
80impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> CycleCollection<'a, ForwardRef>
81    for KeyedStream<K, V, L, B, O, R>
82where
83    L: Location<'a> + NoTick,
84{
85    type Location = L;
86
87    fn create_source(ident: syn::Ident, location: L) -> Self {
88        Stream::create_source(ident, location).into_keyed()
89    }
90}
91
92impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> ReceiverComplete<'a, ForwardRef>
93    for KeyedStream<K, V, L, B, O, R>
94where
95    L: Location<'a> + NoTick,
96{
97    fn complete(self, ident: syn::Ident, expected_location: LocationId) {
98        self.underlying.complete(ident, expected_location);
99    }
100}
101
102impl<'a, K, V, L: Location<'a>, B: Boundedness, O: Ordering, R: Retries>
103    KeyedStream<K, V, L, B, O, R>
104{
105    /// Explicitly "casts" the keyed stream to a type with a different ordering
106    /// guarantee for each group. Useful in unsafe code where the ordering cannot be proven
107    /// by the type-system.
108    ///
109    /// # Non-Determinism
110    /// This function is used as an escape hatch, and any mistakes in the
111    /// provided ordering guarantee will propagate into the guarantees
112    /// for the rest of the program.
113    pub fn assume_ordering<O2: Ordering>(self, _nondet: NonDet) -> KeyedStream<K, V, L, B, O2, R> {
114        KeyedStream {
115            underlying: self.underlying,
116            _phantom_order: PhantomData,
117        }
118    }
119
120    /// Explicitly "casts" the keyed stream to a type with a different retries
121    /// guarantee for each group. Useful in unsafe code where the lack of retries cannot
122    /// be proven by the type-system.
123    ///
124    /// # Non-Determinism
125    /// This function is used as an escape hatch, and any mistakes in the
126    /// provided retries guarantee will propagate into the guarantees
127    /// for the rest of the program.
128    pub fn assume_retries<R2: Retries>(self, nondet: NonDet) -> KeyedStream<K, V, L, B, O, R2> {
129        KeyedStream {
130            underlying: self.underlying.assume_retries::<R2>(nondet),
131            _phantom_order: PhantomData,
132        }
133    }
134
135    /// Flattens the keyed stream into an unordered stream of key-value pairs.
136    ///
137    /// # Example
138    /// ```rust
139    /// # use hydro_lang::prelude::*;
140    /// # use futures::StreamExt;
141    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
142    /// process
143    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
144    ///     .into_keyed()
145    ///     .entries()
146    /// # }, |mut stream| async move {
147    /// // (1, 2), (1, 3), (2, 4) in any order
148    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
149    /// #     assert_eq!(stream.next().await.unwrap(), w);
150    /// # }
151    /// # }));
152    /// ```
153    pub fn entries(self) -> Stream<(K, V), L, B, NoOrder, R> {
154        self.underlying
155    }
156
157    /// Flattens the keyed stream into an unordered stream of only the values.
158    ///
159    /// # Example
160    /// ```rust
161    /// # use hydro_lang::prelude::*;
162    /// # use futures::StreamExt;
163    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
164    /// process
165    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
166    ///     .into_keyed()
167    ///     .values()
168    /// # }, |mut stream| async move {
169    /// // 2, 3, 4 in any order
170    /// # for w in vec![2, 3, 4] {
171    /// #     assert_eq!(stream.next().await.unwrap(), w);
172    /// # }
173    /// # }));
174    /// ```
175    pub fn values(self) -> Stream<V, L, B, NoOrder, R> {
176        self.underlying.map(q!(|(_, v)| v))
177    }
178
179    /// Transforms each value by invoking `f` on each element, with keys staying the same
180    /// after transformation. If you need access to the key, see [`KeyedStream::map_with_key`].
181    ///
182    /// If you do not want to modify the stream and instead only want to view
183    /// each item use [`KeyedStream::inspect`] instead.
184    ///
185    /// # Example
186    /// ```rust
187    /// # use hydro_lang::prelude::*;
188    /// # use futures::StreamExt;
189    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
190    /// process
191    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
192    ///     .into_keyed()
193    ///     .map(q!(|v| v + 1))
194    /// #   .entries()
195    /// # }, |mut stream| async move {
196    /// // { 1: [3, 4], 2: [5] }
197    /// # for w in vec![(1, 3), (1, 4), (2, 5)] {
198    /// #     assert_eq!(stream.next().await.unwrap(), w);
199    /// # }
200    /// # }));
201    /// ```
202    pub fn map<U, F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, U, L, B, O, R>
203    where
204        F: Fn(V) -> U + 'a,
205    {
206        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
207        KeyedStream {
208            underlying: self.underlying.map(q!({
209                let orig = f;
210                move |(k, v)| (k, orig(v))
211            })),
212            _phantom_order: Default::default(),
213        }
214    }
215
216    /// Transforms each value by invoking `f` on each key-value pair. The resulting values are **not**
217    /// re-grouped even they are tuples; instead they will be grouped under the original key.
218    ///
219    /// If you do not want to modify the stream and instead only want to view
220    /// each item use [`KeyedStream::inspect_with_key`] instead.
221    ///
222    /// # Example
223    /// ```rust
224    /// # use hydro_lang::prelude::*;
225    /// # use futures::StreamExt;
226    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
227    /// process
228    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
229    ///     .into_keyed()
230    ///     .map_with_key(q!(|(k, v)| k + v))
231    /// #   .entries()
232    /// # }, |mut stream| async move {
233    /// // { 1: [3, 4], 2: [6] }
234    /// # for w in vec![(1, 3), (1, 4), (2, 6)] {
235    /// #     assert_eq!(stream.next().await.unwrap(), w);
236    /// # }
237    /// # }));
238    /// ```
239    pub fn map_with_key<U, F>(
240        self,
241        f: impl IntoQuotedMut<'a, F, L> + Copy,
242    ) -> KeyedStream<K, U, L, B, O, R>
243    where
244        F: Fn((K, V)) -> U + 'a,
245        K: Clone,
246    {
247        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
248        KeyedStream {
249            underlying: self.underlying.map(q!({
250                let orig = f;
251                move |(k, v)| {
252                    let out = orig((k.clone(), v));
253                    (k, out)
254                }
255            })),
256            _phantom_order: Default::default(),
257        }
258    }
259
260    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
261    /// `f`, preserving the order of the elements within the group.
262    ///
263    /// The closure `f` receives a reference `&V` rather than an owned value `v` because filtering does
264    /// not modify or take ownership of the values. If you need to modify the values while filtering
265    /// use [`KeyedStream::filter_map`] instead.
266    ///
267    /// # Example
268    /// ```rust
269    /// # use hydro_lang::prelude::*;
270    /// # use futures::StreamExt;
271    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
272    /// process
273    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
274    ///     .into_keyed()
275    ///     .filter(q!(|&x| x > 2))
276    /// #   .entries()
277    /// # }, |mut stream| async move {
278    /// // { 1: [3], 2: [4] }
279    /// # for w in vec![(1, 3), (2, 4)] {
280    /// #     assert_eq!(stream.next().await.unwrap(), w);
281    /// # }
282    /// # }));
283    /// ```
284    pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
285    where
286        F: Fn(&V) -> bool + 'a,
287    {
288        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
289        KeyedStream {
290            underlying: self.underlying.filter(q!({
291                let orig = f;
292                move |(_k, v)| orig(v)
293            })),
294            _phantom_order: Default::default(),
295        }
296    }
297
298    /// Creates a stream containing only the elements of each group stream that satisfy a predicate
299    /// `f` (which receives the key-value tuple), preserving the order of the elements within the group.
300    ///
301    /// The closure `f` receives a reference `&(K, V)` rather than an owned value `(K, V)` because filtering does
302    /// not modify or take ownership of the values. If you need to modify the values while filtering
303    /// use [`KeyedStream::filter_map_with_key`] instead.
304    ///
305    /// # Example
306    /// ```rust
307    /// # use hydro_lang::prelude::*;
308    /// # use futures::StreamExt;
309    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
310    /// process
311    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
312    ///     .into_keyed()
313    ///     .filter_with_key(q!(|&(k, v)| v - k == 2))
314    /// #   .entries()
315    /// # }, |mut stream| async move {
316    /// // { 1: [3], 2: [4] }
317    /// # for w in vec![(1, 3), (2, 4)] {
318    /// #     assert_eq!(stream.next().await.unwrap(), w);
319    /// # }
320    /// # }));
321    /// ```
322    pub fn filter_with_key<F>(
323        self,
324        f: impl IntoQuotedMut<'a, F, L> + Copy,
325    ) -> KeyedStream<K, V, L, B, O, R>
326    where
327        F: Fn(&(K, V)) -> bool + 'a,
328    {
329        KeyedStream {
330            underlying: self.underlying.filter(f),
331            _phantom_order: Default::default(),
332        }
333    }
334
335    /// An operator that both filters and maps each value, with keys staying the same.
336    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
337    /// If you need access to the key, see [`KeyedStream::filter_map_with_key`].
338    ///
339    /// # Example
340    /// ```rust
341    /// # use hydro_lang::prelude::*;
342    /// # use futures::StreamExt;
343    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
344    /// process
345    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "4")]))
346    ///     .into_keyed()
347    ///     .filter_map(q!(|s| s.parse::<usize>().ok()))
348    /// #   .entries()
349    /// # }, |mut stream| async move {
350    /// // { 1: [2], 2: [4] }
351    /// # for w in vec![(1, 2), (2, 4)] {
352    /// #     assert_eq!(stream.next().await.unwrap(), w);
353    /// # }
354    /// # }));
355    /// ```
356    pub fn filter_map<U, F>(
357        self,
358        f: impl IntoQuotedMut<'a, F, L> + Copy,
359    ) -> KeyedStream<K, U, L, B, O, R>
360    where
361        F: Fn(V) -> Option<U> + 'a,
362    {
363        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
364        KeyedStream {
365            underlying: self.underlying.filter_map(q!({
366                let orig = f;
367                move |(k, v)| orig(v).map(|o| (k, o))
368            })),
369            _phantom_order: Default::default(),
370        }
371    }
372
373    /// An operator that both filters and maps each key-value pair. The resulting values are **not**
374    /// re-grouped even they are tuples; instead they will be grouped under the original key.
375    /// It yields only the items for which the supplied closure `f` returns `Some(value)`.
376    ///
377    /// # Example
378    /// ```rust
379    /// # use hydro_lang::prelude::*;
380    /// # use futures::StreamExt;
381    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
382    /// process
383    ///     .source_iter(q!(vec![(1, "2"), (1, "hello"), (2, "2")]))
384    ///     .into_keyed()
385    ///     .filter_map_with_key(q!(|(k, s)| s.parse::<usize>().ok().filter(|v| v == &k)))
386    /// #   .entries()
387    /// # }, |mut stream| async move {
388    /// // { 2: [2] }
389    /// # for w in vec![(2, 2)] {
390    /// #     assert_eq!(stream.next().await.unwrap(), w);
391    /// # }
392    /// # }));
393    /// ```
394    pub fn filter_map_with_key<U, F>(
395        self,
396        f: impl IntoQuotedMut<'a, F, L> + Copy,
397    ) -> KeyedStream<K, U, L, B, O, R>
398    where
399        F: Fn((K, V)) -> Option<U> + 'a,
400        K: Clone,
401    {
402        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_ctx(ctx));
403        KeyedStream {
404            underlying: self.underlying.filter_map(q!({
405                let orig = f;
406                move |(k, v)| {
407                    let out = orig((k.clone(), v));
408                    out.map(|o| (k, o))
409                }
410            })),
411            _phantom_order: Default::default(),
412        }
413    }
414
415    /// An operator which allows you to "inspect" each element of a stream without
416    /// modifying it. The closure `f` is called on a reference to each value. This is
417    /// mainly useful for debugging, and should not be used to generate side-effects.
418    ///
419    /// # Example
420    /// ```rust
421    /// # use hydro_lang::prelude::*;
422    /// # use futures::StreamExt;
423    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
424    /// process
425    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
426    ///     .into_keyed()
427    ///     .inspect(q!(|v| println!("{}", v)))
428    /// #   .entries()
429    /// # }, |mut stream| async move {
430    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
431    /// #     assert_eq!(stream.next().await.unwrap(), w);
432    /// # }
433    /// # }));
434    /// ```
435    pub fn inspect<F>(self, f: impl IntoQuotedMut<'a, F, L> + Copy) -> KeyedStream<K, V, L, B, O, R>
436    where
437        F: Fn(&V) + 'a,
438    {
439        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn1_borrow_ctx(ctx));
440        KeyedStream {
441            underlying: self.underlying.inspect(q!({
442                let orig = f;
443                move |(_k, v)| orig(v)
444            })),
445            _phantom_order: Default::default(),
446        }
447    }
448
449    /// An operator which allows you to "inspect" each element of a stream without
450    /// modifying it. The closure `f` is called on a reference to each key-value pair. This is
451    /// mainly useful for debugging, and should not be used to generate side-effects.
452    ///
453    /// # Example
454    /// ```rust
455    /// # use hydro_lang::prelude::*;
456    /// # use futures::StreamExt;
457    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
458    /// process
459    ///     .source_iter(q!(vec![(1, 2), (1, 3), (2, 4)]))
460    ///     .into_keyed()
461    ///     .inspect_with_key(q!(|(k, v)| println!("{}: {}", k, v)))
462    /// #   .entries()
463    /// # }, |mut stream| async move {
464    /// # for w in vec![(1, 2), (1, 3), (2, 4)] {
465    /// #     assert_eq!(stream.next().await.unwrap(), w);
466    /// # }
467    /// # }));
468    /// ```
469    pub fn inspect_with_key<F>(
470        self,
471        f: impl IntoQuotedMut<'a, F, L>,
472    ) -> KeyedStream<K, V, L, B, O, R>
473    where
474        F: Fn(&(K, V)) + 'a,
475    {
476        KeyedStream {
477            underlying: self.underlying.inspect(f),
478            _phantom_order: Default::default(),
479        }
480    }
481
482    /// An operator which allows you to "name" a `HydroNode`.
483    /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
484    pub fn ir_node_named(self, name: &str) -> KeyedStream<K, V, L, B, O, R> {
485        {
486            let mut node = self.underlying.ir_node.borrow_mut();
487            let metadata = node.metadata_mut();
488            metadata.tag = Some(name.to_string());
489        }
490        self
491    }
492}
493
494impl<'a, K, V, L: Location<'a> + NoTick + NoAtomic, O: Ordering, R: Retries>
495    KeyedStream<K, V, L, Unbounded, O, R>
496{
497    /// Produces a new keyed stream that "merges" the inputs by interleaving the elements
498    /// of any overlapping groups. The result has [`NoOrder`] on each group because the
499    /// order of interleaving is not guaranteed. If the keys across both inputs do not overlap,
500    /// the ordering will be deterministic and you can safely use [`Self::assume_ordering`].
501    ///
502    /// Currently, both input streams must be [`Unbounded`].
503    ///
504    /// # Example
505    /// ```rust
506    /// # use hydro_lang::prelude::*;
507    /// # use futures::StreamExt;
508    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
509    /// let numbers1 = process.source_iter(q!(vec![(1, 2), (3, 4)])).into_keyed();
510    /// let numbers2 = process.source_iter(q!(vec![(1, 3), (3, 5)])).into_keyed();
511    /// numbers1.interleave(numbers2)
512    /// #   .entries()
513    /// # }, |mut stream| async move {
514    /// // { 1: [2, 3], 3: [4, 5] } with each group in unknown order
515    /// # for w in vec![(1, 2), (3, 4), (1, 3), (3, 5)] {
516    /// #     assert_eq!(stream.next().await.unwrap(), w);
517    /// # }
518    /// # }));
519    /// ```
520    pub fn interleave<O2: Ordering, R2: Retries>(
521        self,
522        other: KeyedStream<K, V, L, Unbounded, O2, R2>,
523    ) -> KeyedStream<K, V, L, Unbounded, NoOrder, <R as MinRetries<R2>>::Min>
524    where
525        R: MinRetries<R2>,
526    {
527        self.entries().interleave(other.entries()).into_keyed()
528    }
529}
530
531/// The output of a Hydro generator created with [`KeyedStream::generator`], which can yield elements and
532/// control the processing of future elements.
533pub enum Generate<T> {
534    /// Emit the provided element, and keep processing future inputs.
535    Yield(T),
536    /// Emit the provided element as the _final_ element, do not process future inputs.
537    Return(T),
538    /// Do not emit anything, but continue processing future inputs.
539    Continue,
540    /// Do not emit anything, and do not process further inputs.
541    Break,
542}
543
544impl<'a, K, V, L, B: Boundedness> KeyedStream<K, V, L, B, TotalOrder, ExactlyOnce>
545where
546    K: Eq + Hash,
547    L: Location<'a>,
548{
549    /// A special case of [`Stream::scan`] for keyed streams. For each key group the values are transformed via the `f` combinator.
550    ///
551    /// Unlike [`Stream::fold_keyed`] which only returns the final accumulated value, `scan` produces a new stream
552    /// containing all intermediate accumulated values paired with the key. The scan operation can also terminate
553    /// early by returning `None`.
554    ///
555    /// The function takes a mutable reference to the accumulator and the current element, and returns
556    /// an `Option<U>`. If the function returns `Some(value)`, `value` is emitted to the output stream.
557    /// If the function returns `None`, the stream is terminated and no more elements are processed.
558    ///
559    /// # Example
560    /// ```rust
561    /// # use hydro_lang::prelude::*;
562    /// # use futures::StreamExt;
563    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
564    /// process
565    ///     .source_iter(q!(vec![(0, 1), (0, 3), (1, 3), (1, 4)]))
566    ///     .into_keyed()
567    ///     .scan(
568    ///         q!(|| 0),
569    ///         q!(|acc, x| {
570    ///             *acc += x;
571    ///             if *acc % 2 == 0 { None } else { Some(*acc) }
572    ///         }),
573    ///     )
574    /// #   .entries()
575    /// # }, |mut stream| async move {
576    /// // Output: { 0: [1], 1: [3, 7] }
577    /// # for w in vec![(0, 1), (1, 3), (1, 7)] {
578    /// #     assert_eq!(stream.next().await.unwrap(), w);
579    /// # }
580    /// # }));
581    /// ```
582    pub fn scan<A, U, I, F>(
583        self,
584        init: impl IntoQuotedMut<'a, I, L> + Copy,
585        f: impl IntoQuotedMut<'a, F, L> + Copy,
586    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
587    where
588        K: Clone,
589        I: Fn() -> A + 'a,
590        F: Fn(&mut A, V) -> Option<U> + 'a,
591    {
592        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
593        self.generator(
594            init,
595            q!({
596                let orig = f;
597                move |state, v| {
598                    if let Some(out) = orig(state, v) {
599                        Generate::Yield(out)
600                    } else {
601                        Generate::Break
602                    }
603                }
604            }),
605        )
606    }
607
608    /// Iteratively processes the elements in each group using a state machine that can yield
609    /// elements as it processes its inputs. This is designed to mirror the unstable generator
610    /// syntax in Rust, without requiring special syntax.
611    ///
612    /// Like [`KeyedStream::scan`], this function takes in an initializer that emits the initial
613    /// state for each group. The second argument defines the processing logic, taking in a
614    /// mutable reference to the group's state and the value to be processed. It emits a
615    /// [`Generate`] value, whose variants define what is emitted and whether further inputs
616    /// should be processed.
617    ///
618    /// # Example
619    /// ```rust
620    /// # use hydro_lang::prelude::*;
621    /// # use futures::StreamExt;
622    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
623    /// process
624    ///     .source_iter(q!(vec![(0, 1), (0, 3), (0, 100), (0, 10), (1, 3), (1, 4), (1, 3)]))
625    ///     .into_keyed()
626    ///     .generator(
627    ///         q!(|| 0),
628    ///         q!(|acc, x| {
629    ///             *acc += x;
630    ///             if *acc > 100 {
631    ///                 hydro_lang::live_collections::keyed_stream::Generate::Return(
632    ///                     "done!".to_string()
633    ///                 )
634    ///             } else if *acc % 2 == 0 {
635    ///                 hydro_lang::live_collections::keyed_stream::Generate::Yield(
636    ///                     "even".to_string()
637    ///                 )
638    ///             } else {
639    ///                 hydro_lang::live_collections::keyed_stream::Generate::Continue
640    ///             }
641    ///         }),
642    ///     )
643    /// #   .entries()
644    /// # }, |mut stream| async move {
645    /// // Output: { 0: ["even", "done!"], 1: ["even"] }
646    /// # for w in vec![(0, "even".to_string()), (0, "done!".to_string()), (1, "even".to_string())] {
647    /// #     assert_eq!(stream.next().await.unwrap(), w);
648    /// # }
649    /// # }));
650    /// ```
651    pub fn generator<A, U, I, F>(
652        self,
653        init: impl IntoQuotedMut<'a, I, L> + Copy,
654        f: impl IntoQuotedMut<'a, F, L> + Copy,
655    ) -> KeyedStream<K, U, L, B, TotalOrder, ExactlyOnce>
656    where
657        K: Clone,
658        I: Fn() -> A + 'a,
659        F: Fn(&mut A, V) -> Generate<U> + 'a,
660    {
661        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
662        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
663        let underlying_scanned = self
664            .underlying
665            .assume_ordering(nondet!(
666                /** we do not rely on the order of keys */
667            ))
668            .scan(
669                q!(|| HashMap::new()),
670                q!(move |acc, (k, v)| {
671                    let existing_state = acc.entry(k.clone()).or_insert_with(|| Some(init()));
672                    if let Some(existing_state_value) = existing_state {
673                        match f(existing_state_value, v) {
674                            Generate::Yield(out) => Some(Some((k, out))),
675                            Generate::Return(out) => {
676                                let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
677                                Some(Some((k, out)))
678                            }
679                            Generate::Break => {
680                                let _ = existing_state.take(); // TODO(shadaj): garbage collect with termination markers
681                                Some(None)
682                            }
683                            Generate::Continue => Some(None),
684                        }
685                    } else {
686                        Some(None)
687                    }
688                }),
689            )
690            .flatten_ordered();
691
692        KeyedStream {
693            underlying: underlying_scanned.into(),
694            _phantom_order: Default::default(),
695        }
696    }
697
698    /// A variant of [`Stream::fold`], intended for keyed streams. The aggregation is executed
699    /// in-order across the values in each group. But the aggregation function returns a boolean,
700    /// which when true indicates that the aggregated result is complete and can be released to
701    /// downstream computation. Unlike [`Stream::fold_keyed`], this means that even if the input
702    /// stream is [`super::boundedness::Unbounded`], the outputs of the fold can be processed like
703    /// normal stream elements.
704    ///
705    /// # Example
706    /// ```rust
707    /// # use hydro_lang::prelude::*;
708    /// # use futures::StreamExt;
709    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
710    /// process
711    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
712    ///     .into_keyed()
713    ///     .fold_early_stop(
714    ///         q!(|| 0),
715    ///         q!(|acc, x| {
716    ///             *acc += x;
717    ///             x % 2 == 0
718    ///         }),
719    ///     )
720    /// #   .entries()
721    /// # }, |mut stream| async move {
722    /// // Output: { 0: 2, 1: 9 }
723    /// # for w in vec![(0, 2), (1, 9)] {
724    /// #     assert_eq!(stream.next().await.unwrap(), w);
725    /// # }
726    /// # }));
727    /// ```
728    pub fn fold_early_stop<A, I, F>(
729        self,
730        init: impl IntoQuotedMut<'a, I, L> + Copy,
731        f: impl IntoQuotedMut<'a, F, L> + Copy,
732    ) -> KeyedSingleton<K, A, L, B::WhenValueBounded>
733    where
734        K: Clone,
735        I: Fn() -> A + 'a,
736        F: Fn(&mut A, V) -> bool + 'a,
737    {
738        let init: ManualExpr<I, _> = ManualExpr::new(move |ctx: &L| init.splice_fn0_ctx(ctx));
739        let f: ManualExpr<F, _> = ManualExpr::new(move |ctx: &L| f.splice_fn2_borrow_mut_ctx(ctx));
740        let out_without_bound_cast = self
741            .generator(
742                q!(move || Some(init())),
743                q!(move |key_state, v| {
744                    if let Some(key_state_value) = key_state.as_mut() {
745                        if f(key_state_value, v) {
746                            Generate::Return(key_state.take().unwrap())
747                        } else {
748                            Generate::Continue
749                        }
750                    } else {
751                        unreachable!()
752                    }
753                }),
754            )
755            .underlying;
756
757        KeyedSingleton {
758            underlying: out_without_bound_cast,
759        }
760    }
761
762    /// Gets the first element inside each group of values as a [`KeyedSingleton`] that preserves
763    /// the original group keys. Requires the input stream to have [`TotalOrder`] guarantees,
764    /// otherwise the first element would be non-deterministic.
765    ///
766    /// # Example
767    /// ```rust
768    /// # use hydro_lang::prelude::*;
769    /// # use futures::StreamExt;
770    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
771    /// process
772    ///     .source_iter(q!(vec![(0, 2), (0, 3), (1, 3), (1, 6)]))
773    ///     .into_keyed()
774    ///     .first()
775    /// #   .entries()
776    /// # }, |mut stream| async move {
777    /// // Output: { 0: 2, 1: 3 }
778    /// # for w in vec![(0, 2), (1, 3)] {
779    /// #     assert_eq!(stream.next().await.unwrap(), w);
780    /// # }
781    /// # }));
782    /// ```
783    pub fn first(self) -> KeyedSingleton<K, V, L, B::WhenValueBounded>
784    where
785        K: Clone,
786    {
787        self.fold_early_stop(
788            q!(|| None),
789            q!(|acc, v| {
790                *acc = Some(v);
791                true
792            }),
793        )
794        .map(q!(|v| v.unwrap()))
795    }
796
797    /// Like [`Stream::fold`], aggregates the values in each group via the `comb` closure.
798    ///
799    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
800    /// to depend on the order of elements in the group.
801    ///
802    /// If the input and output value types are the same and do not require initialization then use
803    /// [`KeyedStream::reduce`].
804    ///
805    /// # Example
806    /// ```rust
807    /// # use hydro_lang::prelude::*;
808    /// # use futures::StreamExt;
809    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
810    /// let tick = process.tick();
811    /// let numbers = process
812    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
813    ///     .into_keyed();
814    /// let batch = numbers.batch(&tick, nondet!(/** test */));
815    /// batch
816    ///     .fold(q!(|| 0), q!(|acc, x| *acc += x))
817    ///     .entries()
818    ///     .all_ticks()
819    /// # }, |mut stream| async move {
820    /// // (1, 5), (2, 7)
821    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
822    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
823    /// # }));
824    /// ```
825    pub fn fold<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
826        self,
827        init: impl IntoQuotedMut<'a, I, L>,
828        comb: impl IntoQuotedMut<'a, F, L>,
829    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
830        let init = init.splice_fn0_ctx(&self.underlying.location).into();
831        let comb = comb
832            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
833            .into();
834
835        let out_ir = HydroNode::FoldKeyed {
836            init,
837            acc: comb,
838            input: Box::new(self.underlying.ir_node.into_inner()),
839            metadata: self.underlying.location.new_node_metadata::<(K, A)>(),
840        };
841
842        KeyedSingleton {
843            underlying: Stream::new(self.underlying.location, out_ir),
844        }
845    }
846
847    /// Like [`Stream::reduce`], aggregates the values in each group via the `comb` closure.
848    ///
849    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
850    /// to depend on the order of elements in the stream.
851    ///
852    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold`].
853    ///
854    /// # Example
855    /// ```rust
856    /// # use hydro_lang::prelude::*;
857    /// # use futures::StreamExt;
858    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
859    /// let tick = process.tick();
860    /// let numbers = process
861    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
862    ///     .into_keyed();
863    /// let batch = numbers.batch(&tick, nondet!(/** test */));
864    /// batch.reduce(q!(|acc, x| *acc += x)).entries().all_ticks()
865    /// # }, |mut stream| async move {
866    /// // (1, 5), (2, 7)
867    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
868    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
869    /// # }));
870    /// ```
871    pub fn reduce<F: Fn(&mut V, V) + 'a>(
872        self,
873        comb: impl IntoQuotedMut<'a, F, L>,
874    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
875        let f = comb
876            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
877            .into();
878
879        let out_ir = HydroNode::ReduceKeyed {
880            f,
881            input: Box::new(self.underlying.ir_node.into_inner()),
882            metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
883        };
884
885        KeyedSingleton {
886            underlying: Stream::new(self.underlying.location, out_ir),
887        }
888    }
889
890    /// A special case of [`KeyedStream::reduce`] where tuples with keys less than the watermark are automatically deleted.
891    ///
892    /// Each group must have a [`TotalOrder`] guarantee, which means that the `comb` closure is allowed
893    /// to depend on the order of elements in the stream.
894    ///
895    /// # Example
896    /// ```rust
897    /// # use hydro_lang::prelude::*;
898    /// # use futures::StreamExt;
899    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
900    /// let tick = process.tick();
901    /// let watermark = tick.singleton(q!(1));
902    /// let numbers = process
903    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
904    ///     .into_keyed();
905    /// let batch = numbers.batch(&tick, nondet!(/** test */));
906    /// batch
907    ///     .reduce_watermark(watermark, q!(|acc, x| *acc += x))
908    ///     .entries()
909    ///     .all_ticks()
910    /// # }, |mut stream| async move {
911    /// // (2, 204)
912    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
913    /// # }));
914    /// ```
915    pub fn reduce_watermark<O, F>(
916        self,
917        other: impl Into<Optional<O, Tick<L::Root>, Bounded>>,
918        comb: impl IntoQuotedMut<'a, F, L>,
919    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
920    where
921        O: Clone,
922        F: Fn(&mut V, V) + 'a,
923    {
924        let other: Optional<O, Tick<L::Root>, Bounded> = other.into();
925        check_matching_location(&self.underlying.location.root(), other.location.outer());
926        let f = comb
927            .splice_fn2_borrow_mut_ctx(&self.underlying.location)
928            .into();
929
930        let out_ir = Stream::new(
931            self.underlying.location.clone(),
932            HydroNode::ReduceKeyedWatermark {
933                f,
934                input: Box::new(self.underlying.ir_node.into_inner()),
935                watermark: Box::new(other.ir_node.into_inner()),
936                metadata: self.underlying.location.new_node_metadata::<(K, V)>(),
937            },
938        );
939
940        KeyedSingleton { underlying: out_ir }
941    }
942}
943
944impl<'a, K, V, L, B: Boundedness, O: Ordering> KeyedStream<K, V, L, B, O, ExactlyOnce>
945where
946    K: Eq + Hash,
947    L: Location<'a>,
948{
949    /// Like [`Stream::fold_commutative`], aggregates the values in each group via the `comb` closure.
950    ///
951    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
952    ///
953    /// If the input and output value types are the same and do not require initialization then use
954    /// [`KeyedStream::reduce_commutative`].
955    ///
956    /// # Example
957    /// ```rust
958    /// # use hydro_lang::prelude::*;
959    /// # use futures::StreamExt;
960    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
961    /// let tick = process.tick();
962    /// let numbers = process
963    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
964    ///     .into_keyed();
965    /// let batch = numbers.batch(&tick, nondet!(/** test */));
966    /// batch
967    ///     .fold_commutative(q!(|| 0), q!(|acc, x| *acc += x))
968    ///     .entries()
969    ///     .all_ticks()
970    /// # }, |mut stream| async move {
971    /// // (1, 5), (2, 7)
972    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
973    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
974    /// # }));
975    /// ```
976    pub fn fold_commutative<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
977        self,
978        init: impl IntoQuotedMut<'a, I, L>,
979        comb: impl IntoQuotedMut<'a, F, L>,
980    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
981        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
982            .fold(init, comb)
983    }
984
985    /// Like [`Stream::reduce_commutative`], aggregates the values in each group via the `comb` closure.
986    ///
987    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
988    ///
989    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative`].
990    ///
991    /// # Example
992    /// ```rust
993    /// # use hydro_lang::prelude::*;
994    /// # use futures::StreamExt;
995    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
996    /// let tick = process.tick();
997    /// let numbers = process
998    ///     .source_iter(q!(vec![(1, 2), (2, 3), (1, 3), (2, 4)]))
999    ///     .into_keyed();
1000    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1001    /// batch
1002    ///     .reduce_commutative(q!(|acc, x| *acc += x))
1003    ///     .entries()
1004    ///     .all_ticks()
1005    /// # }, |mut stream| async move {
1006    /// // (1, 5), (2, 7)
1007    /// # assert_eq!(stream.next().await.unwrap(), (1, 5));
1008    /// # assert_eq!(stream.next().await.unwrap(), (2, 7));
1009    /// # }));
1010    /// ```
1011    pub fn reduce_commutative<F: Fn(&mut V, V) + 'a>(
1012        self,
1013        comb: impl IntoQuotedMut<'a, F, L>,
1014    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1015        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1016            .reduce(comb)
1017    }
1018
1019    /// A special case of [`KeyedStream::reduce_commutative`] where tuples with keys less than the watermark are automatically deleted.
1020    ///
1021    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed.
1022    ///
1023    /// # Example
1024    /// ```rust
1025    /// # use hydro_lang::prelude::*;
1026    /// # use futures::StreamExt;
1027    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1028    /// let tick = process.tick();
1029    /// let watermark = tick.singleton(q!(1));
1030    /// let numbers = process
1031    ///     .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1032    ///     .into_keyed();
1033    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1034    /// batch
1035    ///     .reduce_watermark_commutative(watermark, q!(|acc, x| *acc += x))
1036    ///     .entries()
1037    ///     .all_ticks()
1038    /// # }, |mut stream| async move {
1039    /// // (2, 204)
1040    /// # assert_eq!(stream.next().await.unwrap(), (2, 204));
1041    /// # }));
1042    /// ```
1043    pub fn reduce_watermark_commutative<O2, F>(
1044        self,
1045        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1046        comb: impl IntoQuotedMut<'a, F, L>,
1047    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1048    where
1049        O2: Clone,
1050        F: Fn(&mut V, V) + 'a,
1051    {
1052        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1053            .reduce_watermark(other, comb)
1054    }
1055}
1056
1057impl<'a, K, V, L, B: Boundedness, R: Retries> KeyedStream<K, V, L, B, TotalOrder, R>
1058where
1059    K: Eq + Hash,
1060    L: Location<'a>,
1061{
1062    /// Like [`Stream::fold_idempotent`], aggregates the values in each group via the `comb` closure.
1063    ///
1064    /// The `comb` closure must be **idempotent** as there may be non-deterministic duplicates.
1065    ///
1066    /// If the input and output value types are the same and do not require initialization then use
1067    /// [`KeyedStream::reduce_idempotent`].
1068    ///
1069    /// # Example
1070    /// ```rust
1071    /// # use hydro_lang::prelude::*;
1072    /// # use futures::StreamExt;
1073    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1074    /// let tick = process.tick();
1075    /// let numbers = process
1076    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1077    ///     .into_keyed();
1078    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1079    /// batch
1080    ///     .fold_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1081    ///     .entries()
1082    ///     .all_ticks()
1083    /// # }, |mut stream| async move {
1084    /// // (1, false), (2, true)
1085    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1086    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1087    /// # }));
1088    /// ```
1089    pub fn fold_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1090        self,
1091        init: impl IntoQuotedMut<'a, I, L>,
1092        comb: impl IntoQuotedMut<'a, F, L>,
1093    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1094        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1095            .fold(init, comb)
1096    }
1097
1098    /// Like [`Stream::reduce_idempotent`], aggregates the values in each group via the `comb` closure.
1099    ///
1100    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1101    ///
1102    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_idempotent`].
1103    ///
1104    /// # Example
1105    /// ```rust
1106    /// # use hydro_lang::prelude::*;
1107    /// # use futures::StreamExt;
1108    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1109    /// let tick = process.tick();
1110    /// let numbers = process
1111    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1112    ///     .into_keyed();
1113    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1114    /// batch
1115    ///     .reduce_idempotent(q!(|acc, x| *acc |= x))
1116    ///     .entries()
1117    ///     .all_ticks()
1118    /// # }, |mut stream| async move {
1119    /// // (1, false), (2, true)
1120    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1121    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1122    /// # }));
1123    /// ```
1124    pub fn reduce_idempotent<F: Fn(&mut V, V) + 'a>(
1125        self,
1126        comb: impl IntoQuotedMut<'a, F, L>,
1127    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1128        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1129            .reduce(comb)
1130    }
1131
1132    /// A special case of [`KeyedStream::reduce_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1133    ///
1134    /// The `comb` closure must be **idempotent**, as there may be non-deterministic duplicates.
1135    ///
1136    /// # Example
1137    /// ```rust
1138    /// # use hydro_lang::prelude::*;
1139    /// # use futures::StreamExt;
1140    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1141    /// let tick = process.tick();
1142    /// let watermark = tick.singleton(q!(1));
1143    /// let numbers = process
1144    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1145    ///     .into_keyed();
1146    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1147    /// batch
1148    ///     .reduce_watermark_idempotent(watermark, q!(|acc, x| *acc |= x))
1149    ///     .entries()
1150    ///     .all_ticks()
1151    /// # }, |mut stream| async move {
1152    /// // (2, true)
1153    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1154    /// # }));
1155    /// ```
1156    pub fn reduce_watermark_idempotent<O2, F>(
1157        self,
1158        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1159        comb: impl IntoQuotedMut<'a, F, L>,
1160    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1161    where
1162        O2: Clone,
1163        F: Fn(&mut V, V) + 'a,
1164    {
1165        self.assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1166            .reduce_watermark(other, comb)
1167    }
1168}
1169
1170impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1171where
1172    K: Eq + Hash,
1173    L: Location<'a>,
1174{
1175    /// Like [`Stream::fold_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1176    ///
1177    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1178    /// as there may be non-deterministic duplicates.
1179    ///
1180    /// If the input and output value types are the same and do not require initialization then use
1181    /// [`KeyedStream::reduce_commutative_idempotent`].
1182    ///
1183    /// # Example
1184    /// ```rust
1185    /// # use hydro_lang::prelude::*;
1186    /// # use futures::StreamExt;
1187    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1188    /// let tick = process.tick();
1189    /// let numbers = process
1190    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1191    ///     .into_keyed();
1192    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1193    /// batch
1194    ///     .fold_commutative_idempotent(q!(|| false), q!(|acc, x| *acc |= x))
1195    ///     .entries()
1196    ///     .all_ticks()
1197    /// # }, |mut stream| async move {
1198    /// // (1, false), (2, true)
1199    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1200    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1201    /// # }));
1202    /// ```
1203    pub fn fold_commutative_idempotent<A, I: Fn() -> A + 'a, F: Fn(&mut A, V)>(
1204        self,
1205        init: impl IntoQuotedMut<'a, I, L>,
1206        comb: impl IntoQuotedMut<'a, F, L>,
1207    ) -> KeyedSingleton<K, A, L, B::WhenValueUnbounded> {
1208        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1209            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1210            .fold(init, comb)
1211    }
1212
1213    /// Like [`Stream::reduce_commutative_idempotent`], aggregates the values in each group via the `comb` closure.
1214    ///
1215    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1216    /// as there may be non-deterministic duplicates.
1217    ///
1218    /// If you need the accumulated value to have a different type than the input, use [`KeyedStream::fold_commutative_idempotent`].
1219    ///
1220    /// # Example
1221    /// ```rust
1222    /// # use hydro_lang::prelude::*;
1223    /// # use futures::StreamExt;
1224    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1225    /// let tick = process.tick();
1226    /// let numbers = process
1227    ///     .source_iter(q!(vec![(1, false), (2, true), (1, false), (2, false)]))
1228    ///     .into_keyed();
1229    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1230    /// batch
1231    ///     .reduce_commutative_idempotent(q!(|acc, x| *acc |= x))
1232    ///     .entries()
1233    ///     .all_ticks()
1234    /// # }, |mut stream| async move {
1235    /// // (1, false), (2, true)
1236    /// # assert_eq!(stream.next().await.unwrap(), (1, false));
1237    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1238    /// # }));
1239    /// ```
1240    pub fn reduce_commutative_idempotent<F: Fn(&mut V, V) + 'a>(
1241        self,
1242        comb: impl IntoQuotedMut<'a, F, L>,
1243    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded> {
1244        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1245            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1246            .reduce(comb)
1247    }
1248
1249    /// A special case of [`Stream::reduce_keyed_commutative_idempotent`] where tuples with keys less than the watermark are automatically deleted.
1250    ///
1251    /// The `comb` closure must be **commutative**, as the order of input items is not guaranteed, and **idempotent**,
1252    /// as there may be non-deterministic duplicates.
1253    ///
1254    /// # Example
1255    /// ```rust
1256    /// # use hydro_lang::prelude::*;
1257    /// # use futures::StreamExt;
1258    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1259    /// let tick = process.tick();
1260    /// let watermark = tick.singleton(q!(1));
1261    /// let numbers = process
1262    ///     .source_iter(q!([(0, false), (1, false), (2, false), (2, true)]))
1263    ///     .into_keyed();
1264    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1265    /// batch
1266    ///     .reduce_watermark_commutative_idempotent(watermark, q!(|acc, x| *acc |= x))
1267    ///     .entries()
1268    ///     .all_ticks()
1269    /// # }, |mut stream| async move {
1270    /// // (2, true)
1271    /// # assert_eq!(stream.next().await.unwrap(), (2, true));
1272    /// # }));
1273    /// ```
1274    pub fn reduce_watermark_commutative_idempotent<O2, F>(
1275        self,
1276        other: impl Into<Optional<O2, Tick<L::Root>, Bounded>>,
1277        comb: impl IntoQuotedMut<'a, F, L>,
1278    ) -> KeyedSingleton<K, V, L, B::WhenValueUnbounded>
1279    where
1280        O2: Clone,
1281        F: Fn(&mut V, V) + 'a,
1282    {
1283        self.assume_ordering::<TotalOrder>(nondet!(/** the combinator function is commutative */))
1284            .assume_retries::<ExactlyOnce>(nondet!(/** the combinator function is idempotent */))
1285            .reduce_watermark(other, comb)
1286    }
1287
1288    /// Given a bounded stream of keys `K`, returns a new keyed stream containing only the groups
1289    /// whose keys are not in the bounded stream.
1290    ///
1291    /// # Example
1292    /// ```rust
1293    /// # use hydro_lang::prelude::*;
1294    /// # use futures::StreamExt;
1295    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1296    /// let tick = process.tick();
1297    /// let keyed_stream = process
1298    ///     .source_iter(q!(vec![ (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd') ]))
1299    ///     .batch(&tick, nondet!(/** test */))
1300    ///     .into_keyed();
1301    /// let keys_to_remove = process
1302    ///     .source_iter(q!(vec![1, 2]))
1303    ///     .batch(&tick, nondet!(/** test */));
1304    /// keyed_stream.filter_key_not_in(keys_to_remove).all_ticks()
1305    /// #   .entries()
1306    /// # }, |mut stream| async move {
1307    /// // { 3: ['c'], 4: ['d'] }
1308    /// # for w in vec![(3, 'c'), (4, 'd')] {
1309    /// #     assert_eq!(stream.next().await.unwrap(), w);
1310    /// # }
1311    /// # }));
1312    /// ```
1313    pub fn filter_key_not_in<O2: Ordering, R2: Retries>(
1314        self,
1315        other: Stream<K, L, Bounded, O2, R2>,
1316    ) -> Self {
1317        KeyedStream {
1318            underlying: self.entries().anti_join(other),
1319            _phantom_order: Default::default(),
1320        }
1321    }
1322}
1323
1324impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, L, B, O, R>
1325where
1326    L: Location<'a> + NoTick + NoAtomic,
1327{
1328    /// Shifts this keyed stream into an atomic context, which guarantees that any downstream logic
1329    /// will all be executed synchronously before any outputs are yielded (in [`KeyedStream::end_atomic`]).
1330    ///
1331    /// This is useful to enforce local consistency constraints, such as ensuring that a write is
1332    /// processed before an acknowledgement is emitted. Entering an atomic section requires a [`Tick`]
1333    /// argument that declares where the stream will be atomically processed. Batching a stream into
1334    /// the _same_ [`Tick`] will preserve the synchronous execution, while batching into a different
1335    /// [`Tick`] will introduce asynchrony.
1336    pub fn atomic(self, tick: &Tick<L>) -> KeyedStream<K, V, Atomic<L>, B, O, R> {
1337        KeyedStream {
1338            underlying: self.underlying.atomic(tick),
1339            _phantom_order: Default::default(),
1340        }
1341    }
1342
1343    /// Given a tick, returns a keyed stream corresponding to a batch of elements segmented by
1344    /// that tick. These batches are guaranteed to be contiguous across ticks and preserve
1345    /// the order of the input.
1346    ///
1347    /// # Non-Determinism
1348    /// The batch boundaries are non-deterministic and may change across executions.
1349    pub fn batch(
1350        self,
1351        tick: &Tick<L>,
1352        nondet: NonDet,
1353    ) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1354        self.atomic(tick).batch(nondet)
1355    }
1356}
1357
1358impl<'a, K, V, L, B: Boundedness, O: Ordering, R: Retries> KeyedStream<K, V, Atomic<L>, B, O, R>
1359where
1360    L: Location<'a> + NoTick + NoAtomic,
1361{
1362    /// Returns a keyed stream corresponding to the latest batch of elements being atomically
1363    /// processed. These batches are guaranteed to be contiguous across ticks and preserve
1364    /// the order of the input. The output keyed stream will execute in the [`Tick`] that was
1365    /// used to create the atomic section.
1366    ///
1367    /// # Non-Determinism
1368    /// The batch boundaries are non-deterministic and may change across executions.
1369    pub fn batch(self, nondet: NonDet) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1370        KeyedStream {
1371            underlying: self.underlying.batch(nondet),
1372            _phantom_order: Default::default(),
1373        }
1374    }
1375
1376    /// Yields the elements of this keyed stream back into a top-level, asynchronous execution context.
1377    /// See [`KeyedStream::atomic`] for more details.
1378    pub fn end_atomic(self) -> KeyedStream<K, V, L, B, O, R> {
1379        KeyedStream {
1380            underlying: self.underlying.end_atomic(),
1381            _phantom_order: Default::default(),
1382        }
1383    }
1384}
1385
1386impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, L, Bounded, O, R>
1387where
1388    L: Location<'a>,
1389{
1390    /// Produces a new keyed stream that combines the groups of the inputs by first emitting the
1391    /// elements of the `self` stream, and then emits the elements of the `other` stream (if a key
1392    /// is only present in one of the inputs, its values are passed through as-is). The output has
1393    /// a [`TotalOrder`] guarantee if and only if both inputs have a [`TotalOrder`] guarantee.
1394    ///
1395    /// Currently, both input streams must be [`Bounded`]. This operator will block
1396    /// on the first stream until all its elements are available. In a future version,
1397    /// we will relax the requirement on the `other` stream.
1398    ///
1399    /// # Example
1400    /// ```rust
1401    /// # use hydro_lang::prelude::*;
1402    /// # use futures::StreamExt;
1403    /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1404    /// let tick = process.tick();
1405    /// let numbers = process.source_iter(q!(vec![(0, 1), (1, 3)])).into_keyed();
1406    /// let batch = numbers.batch(&tick, nondet!(/** test */));
1407    /// batch.clone().map(q!(|x| x + 1)).chain(batch).all_ticks()
1408    /// # .entries()
1409    /// # }, |mut stream| async move {
1410    /// // { 0: [2, 1], 1: [4, 3] }
1411    /// # for w in vec![(0, 2), (1, 4), (0, 1), (1, 3)] {
1412    /// #     assert_eq!(stream.next().await.unwrap(), w);
1413    /// # }
1414    /// # }));
1415    /// ```
1416    pub fn chain<O2: Ordering>(
1417        self,
1418        other: KeyedStream<K, V, L, Bounded, O2, R>,
1419    ) -> KeyedStream<K, V, L, Bounded, <O as MinOrder<O2>>::Min, R>
1420    where
1421        O: MinOrder<O2>,
1422    {
1423        KeyedStream {
1424            underlying: self.underlying.chain(other.underlying),
1425            _phantom_order: Default::default(),
1426        }
1427    }
1428}
1429
1430impl<'a, K, V, L, O: Ordering, R: Retries> KeyedStream<K, V, Tick<L>, Bounded, O, R>
1431where
1432    L: Location<'a>,
1433{
1434    /// Asynchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1435    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1436    /// each key.
1437    pub fn all_ticks(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1438        KeyedStream {
1439            underlying: self.underlying.all_ticks(),
1440            _phantom_order: Default::default(),
1441        }
1442    }
1443
1444    /// Synchronously yields this batch of keyed elements outside the tick as an unbounded keyed stream,
1445    /// which will stream all the elements across _all_ tick iterations by concatenating the batches for
1446    /// each key.
1447    ///
1448    /// Unlike [`KeyedStream::all_ticks`], this preserves synchronous execution, as the output stream
1449    /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1450    /// stream's [`Tick`] context.
1451    pub fn all_ticks_atomic(self) -> KeyedStream<K, V, L, Unbounded, O, R> {
1452        KeyedStream {
1453            underlying: self.underlying.all_ticks(),
1454            _phantom_order: Default::default(),
1455        }
1456    }
1457
1458    #[expect(missing_docs, reason = "TODO")]
1459    pub fn defer_tick(self) -> KeyedStream<K, V, Tick<L>, Bounded, O, R> {
1460        KeyedStream {
1461            underlying: self.underlying.defer_tick(),
1462            _phantom_order: Default::default(),
1463        }
1464    }
1465}
1466
1467#[cfg(test)]
1468mod tests {
1469    use futures::{SinkExt, StreamExt};
1470    use hydro_deploy::Deployment;
1471    use stageleft::q;
1472
1473    use crate::compile::builder::FlowBuilder;
1474    use crate::location::Location;
1475    use crate::nondet::nondet;
1476
1477    #[tokio::test]
1478    async fn reduce_watermark_filter() {
1479        let mut deployment = Deployment::new();
1480
1481        let flow = FlowBuilder::new();
1482        let node = flow.process::<()>();
1483        let external = flow.external::<()>();
1484
1485        let node_tick = node.tick();
1486        let watermark = node_tick.singleton(q!(1));
1487
1488        let sum = node
1489            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1490            .into_keyed()
1491            .reduce_watermark(
1492                watermark,
1493                q!(|acc, v| {
1494                    *acc += v;
1495                }),
1496            )
1497            .snapshot(&node_tick, nondet!(/** test */))
1498            .entries()
1499            .all_ticks()
1500            .send_bincode_external(&external);
1501
1502        let nodes = flow
1503            .with_process(&node, deployment.Localhost())
1504            .with_external(&external, deployment.Localhost())
1505            .deploy(&mut deployment);
1506
1507        deployment.deploy().await.unwrap();
1508
1509        let mut out = nodes.connect_source_bincode(sum).await;
1510
1511        deployment.start().await.unwrap();
1512
1513        assert_eq!(out.next().await.unwrap(), (2, 204));
1514    }
1515
1516    #[tokio::test]
1517    async fn reduce_watermark_garbage_collect() {
1518        let mut deployment = Deployment::new();
1519
1520        let flow = FlowBuilder::new();
1521        let node = flow.process::<()>();
1522        let external = flow.external::<()>();
1523        let (tick_send, tick_trigger) = node.source_external_bincode(&external);
1524
1525        let node_tick = node.tick();
1526        let (watermark_complete_cycle, watermark) =
1527            node_tick.cycle_with_initial(node_tick.singleton(q!(1)));
1528        let next_watermark = watermark.clone().map(q!(|v| v + 1));
1529        watermark_complete_cycle.complete_next_tick(next_watermark);
1530
1531        let tick_triggered_input = node
1532            .source_iter(q!([(3, 103)]))
1533            .batch(&node_tick, nondet!(/** test */))
1534            .filter_if_some(
1535                tick_trigger
1536                    .clone()
1537                    .batch(&node_tick, nondet!(/** test */))
1538                    .first(),
1539            )
1540            .all_ticks();
1541
1542        let sum = node
1543            .source_iter(q!([(0, 100), (1, 101), (2, 102), (2, 102)]))
1544            .interleave(tick_triggered_input)
1545            .into_keyed()
1546            .reduce_watermark_commutative(
1547                watermark,
1548                q!(|acc, v| {
1549                    *acc += v;
1550                }),
1551            )
1552            .snapshot(&node_tick, nondet!(/** test */))
1553            .entries()
1554            .all_ticks()
1555            .send_bincode_external(&external);
1556
1557        let nodes = flow
1558            .with_default_optimize()
1559            .with_process(&node, deployment.Localhost())
1560            .with_external(&external, deployment.Localhost())
1561            .deploy(&mut deployment);
1562
1563        deployment.deploy().await.unwrap();
1564
1565        let mut tick_send = nodes.connect_sink_bincode(tick_send).await;
1566        let mut out_recv = nodes.connect_source_bincode(sum).await;
1567
1568        deployment.start().await.unwrap();
1569
1570        assert_eq!(out_recv.next().await.unwrap(), (2, 204));
1571
1572        tick_send.send(()).await.unwrap();
1573
1574        assert_eq!(out_recv.next().await.unwrap(), (3, 103));
1575    }
1576}