hydro_lang/live_collections/singleton.rs
1//! Definitions for the [`Singleton`] live collection.
2
3use std::cell::RefCell;
4use std::marker::PhantomData;
5use std::ops::{Deref, Not};
6use std::rc::Rc;
7
8use sealed::sealed;
9use stageleft::{IntoQuotedMut, QuotedWithContext, QuotedWithContextWithProps, q};
10
11use super::boundedness::{Bounded, Boundedness, IsBounded, Unbounded};
12use super::optional::Optional;
13use super::sliced::sliced;
14use super::stream::{AtLeastOnce, ExactlyOnce, NoOrder, Stream, TotalOrder};
15use crate::compile::builder::{CycleId, FlowState};
16use crate::compile::ir::{
17 CollectionKind, HydroIrOpMetadata, HydroNode, HydroRoot, SharedNode, SingletonBoundKind,
18};
19#[cfg(stageleft_runtime)]
20use crate::forward_handle::{CycleCollection, CycleCollectionWithInitial, ReceiverComplete};
21use crate::forward_handle::{ForwardRef, TickCycle};
22#[cfg(stageleft_runtime)]
23use crate::location::dynamic::{DynLocation, LocationId};
24use crate::location::tick::{Atomic, NoAtomic};
25use crate::location::{Location, NoTick, Tick, check_matching_location};
26use crate::nondet::{NonDet, nondet};
27use crate::properties::{
28 ApplyMonotoneStream, ApplyOrderPreservingSingleton, MapFuncAlgebra, Proved,
29};
30
31/// A marker trait indicating which components of a [`Singleton`] may change.
32///
33/// In addition to [`Bounded`] (immutable) and [`Unbounded`] (arbitrarily mutable), this also
34/// includes an additional variant [`Monotonic`], which means that the value will only grow.
35pub trait SingletonBound {
36 /// The [`Boundedness`] that this [`Singleton`] would be erased to.
37 type UnderlyingBound: Boundedness + ApplyMonotoneStream<Proved, Self::StreamToMonotone>;
38
39 /// The [`Boundedness`] of this [`Singleton`] if it is produced from a [`Stream`] with [`Self`] boundedness.
40 type StreamToMonotone: SingletonBound<UnderlyingBound = Self::UnderlyingBound>;
41
42 /// Returns the [`SingletonBoundKind`] corresponding to this type.
43 fn bound_kind() -> SingletonBoundKind;
44}
45
46impl SingletonBound for Unbounded {
47 type UnderlyingBound = Unbounded;
48
49 type StreamToMonotone = Monotonic;
50
51 fn bound_kind() -> SingletonBoundKind {
52 SingletonBoundKind::Unbounded
53 }
54}
55
56impl SingletonBound for Bounded {
57 type UnderlyingBound = Bounded;
58
59 type StreamToMonotone = Bounded;
60
61 fn bound_kind() -> SingletonBoundKind {
62 SingletonBoundKind::Bounded
63 }
64}
65
66/// Marks that the [`Singleton`] is monotonic, which means that its value will only grow over time.
67pub struct Monotonic;
68
69impl SingletonBound for Monotonic {
70 type UnderlyingBound = Unbounded;
71
72 type StreamToMonotone = Monotonic;
73
74 fn bound_kind() -> SingletonBoundKind {
75 SingletonBoundKind::Monotonic
76 }
77}
78
79#[sealed]
80#[diagnostic::on_unimplemented(
81 message = "The input singleton must be monotonic (`Monotonic`) or bounded (`Bounded`), but has bound `{Self}`. Strengthen the monotonicity upstream or consider a different API.",
82 label = "required here",
83 note = "To intentionally process a non-deterministic snapshot or batch, you may want to use a `sliced!` region. This introduces non-determinism so avoid unless necessary."
84)]
85/// Marker trait that is implemented for the [`Monotonic`] boundedness guarantee.
86pub trait IsMonotonic: SingletonBound {}
87
88#[sealed]
89#[diagnostic::do_not_recommend]
90impl IsMonotonic for Monotonic {}
91
92#[sealed]
93#[diagnostic::do_not_recommend]
94impl<B: IsBounded> IsMonotonic for B {}
95
96/// A single Rust value that can asynchronously change over time.
97///
98/// If the singleton is [`Bounded`], the value is frozen and will not change. But if it is
99/// [`Unbounded`], the value will asynchronously change over time.
100///
101/// Singletons are often used to capture state in a Hydro program, such as an event counter which is
102/// a single number that will asynchronously change as events are processed. Singletons also appear
103/// when dealing with bounded collections, to perform regular Rust computations on concrete values,
104/// such as getting the length of a batch of requests.
105///
106/// Type Parameters:
107/// - `Type`: the type of the value in this singleton
108/// - `Loc`: the [`Location`] where the singleton is materialized
109/// - `Bound`: tracks whether the value is [`Bounded`] (fixed) or [`Unbounded`] (changing asynchronously)
110pub struct Singleton<Type, Loc, Bound: SingletonBound> {
111 pub(crate) location: Loc,
112 pub(crate) ir_node: RefCell<HydroNode>,
113 pub(crate) flow_state: FlowState,
114
115 _phantom: PhantomData<(Type, Loc, Bound)>,
116}
117
118impl<T, L, B: SingletonBound> Drop for Singleton<T, L, B> {
119 fn drop(&mut self) {
120 let ir_node = self.ir_node.replace(HydroNode::Placeholder);
121 if !matches!(ir_node, HydroNode::Placeholder) && !ir_node.is_shared_with_others() {
122 self.flow_state.borrow_mut().try_push_root(HydroRoot::Null {
123 input: Box::new(ir_node),
124 op_metadata: HydroIrOpMetadata::new(),
125 });
126 }
127 }
128}
129
130impl<'a, T, L> From<Singleton<T, L, Bounded>> for Singleton<T, L, Unbounded>
131where
132 T: Clone,
133 L: Location<'a> + NoTick,
134{
135 fn from(value: Singleton<T, L, Bounded>) -> Self {
136 let tick = value.location().tick();
137 value.clone_into_tick(&tick).latest()
138 }
139}
140
141impl<'a, T, L> CycleCollectionWithInitial<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
142where
143 L: Location<'a>,
144{
145 type Location = Tick<L>;
146
147 fn create_source_with_initial(cycle_id: CycleId, initial: Self, location: Tick<L>) -> Self {
148 let from_previous_tick: Optional<T, Tick<L>, Bounded> = Optional::new(
149 location.clone(),
150 HydroNode::DeferTick {
151 input: Box::new(HydroNode::CycleSource {
152 cycle_id,
153 metadata: location.new_node_metadata(Self::collection_kind()),
154 }),
155 metadata: location
156 .new_node_metadata(Optional::<T, Tick<L>, Bounded>::collection_kind()),
157 },
158 );
159
160 from_previous_tick.unwrap_or(initial)
161 }
162}
163
164impl<'a, T, L> ReceiverComplete<'a, TickCycle> for Singleton<T, Tick<L>, Bounded>
165where
166 L: Location<'a>,
167{
168 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
169 assert_eq!(
170 Location::id(&self.location),
171 expected_location,
172 "locations do not match"
173 );
174 self.location
175 .flow_state()
176 .borrow_mut()
177 .push_root(HydroRoot::CycleSink {
178 cycle_id,
179 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
180 op_metadata: HydroIrOpMetadata::new(),
181 });
182 }
183}
184
185impl<'a, T, L> CycleCollection<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
186where
187 L: Location<'a>,
188{
189 type Location = Tick<L>;
190
191 fn create_source(cycle_id: CycleId, location: Tick<L>) -> Self {
192 Singleton::new(
193 location.clone(),
194 HydroNode::CycleSource {
195 cycle_id,
196 metadata: location.new_node_metadata(Self::collection_kind()),
197 },
198 )
199 }
200}
201
202impl<'a, T, L> ReceiverComplete<'a, ForwardRef> for Singleton<T, Tick<L>, Bounded>
203where
204 L: Location<'a>,
205{
206 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
207 assert_eq!(
208 Location::id(&self.location),
209 expected_location,
210 "locations do not match"
211 );
212 self.location
213 .flow_state()
214 .borrow_mut()
215 .push_root(HydroRoot::CycleSink {
216 cycle_id,
217 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
218 op_metadata: HydroIrOpMetadata::new(),
219 });
220 }
221}
222
223impl<'a, T, L, B: SingletonBound> CycleCollection<'a, ForwardRef> for Singleton<T, L, B>
224where
225 L: Location<'a> + NoTick,
226{
227 type Location = L;
228
229 fn create_source(cycle_id: CycleId, location: L) -> Self {
230 Singleton::new(
231 location.clone(),
232 HydroNode::CycleSource {
233 cycle_id,
234 metadata: location.new_node_metadata(Self::collection_kind()),
235 },
236 )
237 }
238}
239
240impl<'a, T, L, B: SingletonBound> ReceiverComplete<'a, ForwardRef> for Singleton<T, L, B>
241where
242 L: Location<'a> + NoTick,
243{
244 fn complete(self, cycle_id: CycleId, expected_location: LocationId) {
245 assert_eq!(
246 Location::id(&self.location),
247 expected_location,
248 "locations do not match"
249 );
250 self.location
251 .flow_state()
252 .borrow_mut()
253 .push_root(HydroRoot::CycleSink {
254 cycle_id,
255 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
256 op_metadata: HydroIrOpMetadata::new(),
257 });
258 }
259}
260
261impl<'a, T, L, B: SingletonBound> Clone for Singleton<T, L, B>
262where
263 T: Clone,
264 L: Location<'a>,
265{
266 fn clone(&self) -> Self {
267 if !matches!(self.ir_node.borrow().deref(), HydroNode::Tee { .. }) {
268 let orig_ir_node = self.ir_node.replace(HydroNode::Placeholder);
269 *self.ir_node.borrow_mut() = HydroNode::Tee {
270 inner: SharedNode(Rc::new(RefCell::new(orig_ir_node))),
271 metadata: self.location.new_node_metadata(Self::collection_kind()),
272 };
273 }
274
275 if let HydroNode::Tee { inner, metadata } = self.ir_node.borrow().deref() {
276 Singleton {
277 location: self.location.clone(),
278 flow_state: self.flow_state.clone(),
279 ir_node: HydroNode::Tee {
280 inner: SharedNode(inner.0.clone()),
281 metadata: metadata.clone(),
282 }
283 .into(),
284 _phantom: PhantomData,
285 }
286 } else {
287 unreachable!()
288 }
289 }
290}
291
292#[cfg(stageleft_runtime)]
293fn zip_inside_tick<'a, T, L: Location<'a>, B: SingletonBound, O>(
294 me: Singleton<T, Tick<L>, B>,
295 other: Optional<O, Tick<L>, B::UnderlyingBound>,
296) -> Optional<(T, O), Tick<L>, B::UnderlyingBound> {
297 let me_as_optional: Optional<T, Tick<L>, B::UnderlyingBound> = me.into();
298 super::optional::zip_inside_tick(me_as_optional, other)
299}
300
301impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
302where
303 L: Location<'a>,
304{
305 pub(crate) fn new(location: L, ir_node: HydroNode) -> Self {
306 debug_assert_eq!(ir_node.metadata().location_id, Location::id(&location));
307 debug_assert_eq!(ir_node.metadata().collection_kind, Self::collection_kind());
308 let flow_state = location.flow_state().clone();
309 Singleton {
310 location,
311 flow_state,
312 ir_node: RefCell::new(ir_node),
313 _phantom: PhantomData,
314 }
315 }
316
317 pub(crate) fn collection_kind() -> CollectionKind {
318 CollectionKind::Singleton {
319 bound: B::bound_kind(),
320 element_type: stageleft::quote_type::<T>().into(),
321 }
322 }
323
324 /// Returns the [`Location`] where this singleton is being materialized.
325 pub fn location(&self) -> &L {
326 &self.location
327 }
328
329 /// Drops the monotonicity property of the [`Singleton`].
330 pub fn ignore_monotonic(self) -> Singleton<T, L, B::UnderlyingBound> {
331 if B::bound_kind() == B::UnderlyingBound::bound_kind() {
332 Singleton::new(
333 self.location.clone(),
334 self.ir_node.replace(HydroNode::Placeholder),
335 )
336 } else {
337 Singleton::new(
338 self.location.clone(),
339 HydroNode::Cast {
340 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
341 metadata:
342 self.location.new_node_metadata(
343 Singleton::<T, L, B::UnderlyingBound>::collection_kind(),
344 ),
345 },
346 )
347 }
348 }
349
350 /// Transforms the singleton value by applying a function `f` to it,
351 /// continuously as the input is updated.
352 ///
353 /// # Example
354 /// ```rust
355 /// # #[cfg(feature = "deploy")] {
356 /// # use hydro_lang::prelude::*;
357 /// # use futures::StreamExt;
358 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
359 /// let tick = process.tick();
360 /// let singleton = tick.singleton(q!(5));
361 /// singleton.map(q!(|v| v * 2)).all_ticks()
362 /// # }, |mut stream| async move {
363 /// // 10
364 /// # assert_eq!(stream.next().await.unwrap(), 10);
365 /// # }));
366 /// # }
367 /// ```
368 pub fn map<U, F, OP, B2: SingletonBound>(
369 self,
370 f: impl IntoQuotedMut<'a, F, L, MapFuncAlgebra<OP>>,
371 ) -> Singleton<U, L, B2>
372 where
373 F: Fn(T) -> U + 'a,
374 B: ApplyOrderPreservingSingleton<OP, B2>,
375 {
376 let (f, proof) = f.splice_fn1_ctx_props(&self.location);
377 proof.register_proof(&f);
378 let f = f.into();
379 Singleton::new(
380 self.location.clone(),
381 HydroNode::Map {
382 f,
383 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
384 metadata: self
385 .location
386 .new_node_metadata(Singleton::<U, L, B2>::collection_kind()),
387 },
388 )
389 }
390
391 /// Transforms the singleton value by applying a function `f` to it and then flattening
392 /// the result into a stream, preserving the order of elements.
393 ///
394 /// The function `f` is applied to the singleton value to produce an iterator, and all items
395 /// from that iterator are emitted in the output stream in deterministic order.
396 ///
397 /// The implementation of [`Iterator`] for the output type `I` must produce items in a
398 /// **deterministic** order. For example, `I` could be a `Vec`, but not a `HashSet`.
399 /// If the order is not deterministic, use [`Singleton::flat_map_unordered`] instead.
400 ///
401 /// # Example
402 /// ```rust
403 /// # #[cfg(feature = "deploy")] {
404 /// # use hydro_lang::prelude::*;
405 /// # use futures::StreamExt;
406 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
407 /// let tick = process.tick();
408 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
409 /// singleton.flat_map_ordered(q!(|v| v)).all_ticks()
410 /// # }, |mut stream| async move {
411 /// // 1, 2, 3
412 /// # for w in vec![1, 2, 3] {
413 /// # assert_eq!(stream.next().await.unwrap(), w);
414 /// # }
415 /// # }));
416 /// # }
417 /// ```
418 pub fn flat_map_ordered<U, I, F>(
419 self,
420 f: impl IntoQuotedMut<'a, F, L>,
421 ) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
422 where
423 B: IsBounded,
424 I: IntoIterator<Item = U>,
425 F: Fn(T) -> I + 'a,
426 {
427 self.into_stream().flat_map_ordered(f)
428 }
429
430 /// Like [`Singleton::flat_map_ordered`], but allows the implementation of [`Iterator`]
431 /// for the output type `I` to produce items in any order.
432 ///
433 /// The function `f` is applied to the singleton value to produce an iterator, and all items
434 /// from that iterator are emitted in the output stream in non-deterministic order.
435 ///
436 /// # Example
437 /// ```rust
438 /// # #[cfg(feature = "deploy")] {
439 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
440 /// # use futures::StreamExt;
441 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
442 /// let tick = process.tick();
443 /// let singleton = tick.singleton(q!(
444 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
445 /// ));
446 /// singleton.flat_map_unordered(q!(|v| v)).all_ticks()
447 /// # }, |mut stream| async move {
448 /// // 1, 2, 3, but in no particular order
449 /// # let mut results = Vec::new();
450 /// # for _ in 0..3 {
451 /// # results.push(stream.next().await.unwrap());
452 /// # }
453 /// # results.sort();
454 /// # assert_eq!(results, vec![1, 2, 3]);
455 /// # }));
456 /// # }
457 /// ```
458 pub fn flat_map_unordered<U, I, F>(
459 self,
460 f: impl IntoQuotedMut<'a, F, L>,
461 ) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
462 where
463 B: IsBounded,
464 I: IntoIterator<Item = U>,
465 F: Fn(T) -> I + 'a,
466 {
467 self.into_stream().flat_map_unordered(f)
468 }
469
470 /// Flattens the singleton value into a stream, preserving the order of elements.
471 ///
472 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
473 /// are emitted in the output stream in deterministic order.
474 ///
475 /// The implementation of [`Iterator`] for the element type `T` must produce items in a
476 /// **deterministic** order. For example, `T` could be a `Vec`, but not a `HashSet`.
477 /// If the order is not deterministic, use [`Singleton::flatten_unordered`] instead.
478 ///
479 /// # Example
480 /// ```rust
481 /// # #[cfg(feature = "deploy")] {
482 /// # use hydro_lang::prelude::*;
483 /// # use futures::StreamExt;
484 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
485 /// let tick = process.tick();
486 /// let singleton = tick.singleton(q!(vec![1, 2, 3]));
487 /// singleton.flatten_ordered().all_ticks()
488 /// # }, |mut stream| async move {
489 /// // 1, 2, 3
490 /// # for w in vec![1, 2, 3] {
491 /// # assert_eq!(stream.next().await.unwrap(), w);
492 /// # }
493 /// # }));
494 /// # }
495 /// ```
496 pub fn flatten_ordered<U>(self) -> Stream<U, L, Bounded, TotalOrder, ExactlyOnce>
497 where
498 B: IsBounded,
499 T: IntoIterator<Item = U>,
500 {
501 self.flat_map_ordered(q!(|x| x))
502 }
503
504 /// Like [`Singleton::flatten_ordered`], but allows the implementation of [`Iterator`]
505 /// for the element type `T` to produce items in any order.
506 ///
507 /// The singleton value must implement [`IntoIterator`], and all items from that iterator
508 /// are emitted in the output stream in non-deterministic order.
509 ///
510 /// # Example
511 /// ```rust
512 /// # #[cfg(feature = "deploy")] {
513 /// # use hydro_lang::{prelude::*, live_collections::stream::{NoOrder, ExactlyOnce}};
514 /// # use futures::StreamExt;
515 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test::<_, _, _, NoOrder, ExactlyOnce>(|process| {
516 /// let tick = process.tick();
517 /// let singleton = tick.singleton(q!(
518 /// std::collections::HashSet::<i32>::from_iter(vec![1, 2, 3])
519 /// ));
520 /// singleton.flatten_unordered().all_ticks()
521 /// # }, |mut stream| async move {
522 /// // 1, 2, 3, but in no particular order
523 /// # let mut results = Vec::new();
524 /// # for _ in 0..3 {
525 /// # results.push(stream.next().await.unwrap());
526 /// # }
527 /// # results.sort();
528 /// # assert_eq!(results, vec![1, 2, 3]);
529 /// # }));
530 /// # }
531 /// ```
532 pub fn flatten_unordered<U>(self) -> Stream<U, L, Bounded, NoOrder, ExactlyOnce>
533 where
534 B: IsBounded,
535 T: IntoIterator<Item = U>,
536 {
537 self.flat_map_unordered(q!(|x| x))
538 }
539
540 /// Creates an optional containing the singleton value if it satisfies a predicate `f`.
541 ///
542 /// If the predicate returns `true`, the output optional contains the same value.
543 /// If the predicate returns `false`, the output optional is empty.
544 ///
545 /// The closure `f` receives a reference `&T` rather than an owned value `T` because filtering does
546 /// not modify or take ownership of the value. If you need to modify the value while filtering
547 /// use [`Singleton::filter_map`] instead.
548 ///
549 /// # Example
550 /// ```rust
551 /// # #[cfg(feature = "deploy")] {
552 /// # use hydro_lang::prelude::*;
553 /// # use futures::StreamExt;
554 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
555 /// let tick = process.tick();
556 /// let singleton = tick.singleton(q!(5));
557 /// singleton.filter(q!(|&x| x > 3)).all_ticks()
558 /// # }, |mut stream| async move {
559 /// // 5
560 /// # assert_eq!(stream.next().await.unwrap(), 5);
561 /// # }));
562 /// # }
563 /// ```
564 pub fn filter<F>(self, f: impl IntoQuotedMut<'a, F, L>) -> Optional<T, L, B::UnderlyingBound>
565 where
566 F: Fn(&T) -> bool + 'a,
567 {
568 let f = f.splice_fn1_borrow_ctx(&self.location).into();
569 Optional::new(
570 self.location.clone(),
571 HydroNode::Filter {
572 f,
573 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
574 metadata: self
575 .location
576 .new_node_metadata(Optional::<T, L, B::UnderlyingBound>::collection_kind()),
577 },
578 )
579 }
580
581 /// An operator that both filters and maps. It yields the value only if the supplied
582 /// closure `f` returns `Some(value)`.
583 ///
584 /// If the closure returns `Some(new_value)`, the output optional contains `new_value`.
585 /// If the closure returns `None`, the output optional is empty.
586 ///
587 /// # Example
588 /// ```rust
589 /// # #[cfg(feature = "deploy")] {
590 /// # use hydro_lang::prelude::*;
591 /// # use futures::StreamExt;
592 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
593 /// let tick = process.tick();
594 /// let singleton = tick.singleton(q!("42"));
595 /// singleton
596 /// .filter_map(q!(|s| s.parse::<i32>().ok()))
597 /// .all_ticks()
598 /// # }, |mut stream| async move {
599 /// // 42
600 /// # assert_eq!(stream.next().await.unwrap(), 42);
601 /// # }));
602 /// # }
603 /// ```
604 pub fn filter_map<U, F>(
605 self,
606 f: impl IntoQuotedMut<'a, F, L>,
607 ) -> Optional<U, L, B::UnderlyingBound>
608 where
609 F: Fn(T) -> Option<U> + 'a,
610 {
611 let f = f.splice_fn1_ctx(&self.location).into();
612 Optional::new(
613 self.location.clone(),
614 HydroNode::FilterMap {
615 f,
616 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
617 metadata: self
618 .location
619 .new_node_metadata(Optional::<U, L, B::UnderlyingBound>::collection_kind()),
620 },
621 )
622 }
623
624 /// Combines this singleton with another [`Singleton`] or [`Optional`] by tupling their values.
625 ///
626 /// If the other value is a [`Singleton`], the output will be a [`Singleton`], but if it is an
627 /// [`Optional`], the output will be an [`Optional`] that is non-null only if the argument is
628 /// non-null. This is useful for combining several pieces of state together.
629 ///
630 /// # Example
631 /// ```rust
632 /// # #[cfg(feature = "deploy")] {
633 /// # use hydro_lang::prelude::*;
634 /// # use futures::StreamExt;
635 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
636 /// let tick = process.tick();
637 /// let numbers = process
638 /// .source_iter(q!(vec![123, 456]))
639 /// .batch(&tick, nondet!(/** test */));
640 /// let count = numbers.clone().count(); // Singleton
641 /// let max = numbers.max(); // Optional
642 /// count.zip(max).all_ticks()
643 /// # }, |mut stream| async move {
644 /// // [(2, 456)]
645 /// # for w in vec![(2, 456)] {
646 /// # assert_eq!(stream.next().await.unwrap(), w);
647 /// # }
648 /// # }));
649 /// # }
650 /// ```
651 pub fn zip<O>(self, other: O) -> <Self as ZipResult<'a, O>>::Out
652 where
653 Self: ZipResult<'a, O, Location = L>,
654 B: IsBounded,
655 {
656 check_matching_location(&self.location, &Self::other_location(&other));
657
658 if L::is_top_level()
659 && let Some(tick) = self.location.try_tick()
660 {
661 let other_location = <Self as ZipResult<'a, O>>::other_location(&other);
662 let out = zip_inside_tick(
663 self.snapshot(&tick, nondet!(/** eventually stabilizes */)),
664 Optional::<<Self as ZipResult<'a, O>>::OtherType, L, B>::new(
665 other_location.clone(),
666 HydroNode::Cast {
667 inner: Box::new(Self::other_ir_node(other)),
668 metadata: other_location.new_node_metadata(Optional::<
669 <Self as ZipResult<'a, O>>::OtherType,
670 Tick<L>,
671 Bounded,
672 >::collection_kind(
673 )),
674 },
675 )
676 .snapshot(&tick, nondet!(/** eventually stabilizes */)),
677 )
678 .latest();
679
680 Self::make(
681 out.location.clone(),
682 out.ir_node.replace(HydroNode::Placeholder),
683 )
684 } else {
685 Self::make(
686 self.location.clone(),
687 HydroNode::CrossSingleton {
688 left: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
689 right: Box::new(Self::other_ir_node(other)),
690 metadata: self.location.new_node_metadata(CollectionKind::Optional {
691 bound: B::BOUND_KIND,
692 element_type: stageleft::quote_type::<
693 <Self as ZipResult<'a, O>>::ElementType,
694 >()
695 .into(),
696 }),
697 },
698 )
699 }
700 }
701
702 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
703 /// boolean signal is `true`, otherwise the output is null.
704 ///
705 /// # Example
706 /// ```rust
707 /// # #[cfg(feature = "deploy")] {
708 /// # use hydro_lang::prelude::*;
709 /// # use futures::StreamExt;
710 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
711 /// let tick = process.tick();
712 /// // ticks are lazy by default, forces the second tick to run
713 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
714 ///
715 /// let signal = tick.optional_first_tick(q!(())).is_some(); // true on tick 1, false on tick 2
716 /// let batch_first_tick = process
717 /// .source_iter(q!(vec![1]))
718 /// .batch(&tick, nondet!(/** test */));
719 /// let batch_second_tick = process
720 /// .source_iter(q!(vec![1, 2, 3]))
721 /// .batch(&tick, nondet!(/** test */))
722 /// .defer_tick();
723 /// batch_first_tick.chain(batch_second_tick).count()
724 /// .filter_if(signal)
725 /// .all_ticks()
726 /// # }, |mut stream| async move {
727 /// // [1]
728 /// # for w in vec![1] {
729 /// # assert_eq!(stream.next().await.unwrap(), w);
730 /// # }
731 /// # }));
732 /// # }
733 /// ```
734 pub fn filter_if(
735 self,
736 signal: Singleton<bool, L, B>,
737 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
738 where
739 B: IsBounded,
740 {
741 self.zip(signal.filter(q!(|b| *b))).map(q!(|(d, _)| d))
742 }
743
744 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
745 /// argument (a [`Bounded`] [`Optional`]`) is non-null, otherwise the output is null.
746 ///
747 /// Useful for conditionally processing, such as only emitting a singleton's value outside
748 /// a tick if some other condition is satisfied.
749 ///
750 /// # Example
751 /// ```rust
752 /// # #[cfg(feature = "deploy")] {
753 /// # use hydro_lang::prelude::*;
754 /// # use futures::StreamExt;
755 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
756 /// let tick = process.tick();
757 /// // ticks are lazy by default, forces the second tick to run
758 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
759 ///
760 /// let batch_first_tick = process
761 /// .source_iter(q!(vec![1]))
762 /// .batch(&tick, nondet!(/** test */));
763 /// let batch_second_tick = process
764 /// .source_iter(q!(vec![1, 2, 3]))
765 /// .batch(&tick, nondet!(/** test */))
766 /// .defer_tick(); // appears on the second tick
767 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
768 /// batch_first_tick.chain(batch_second_tick).count()
769 /// .filter_if_some(some_on_first_tick)
770 /// .all_ticks()
771 /// # }, |mut stream| async move {
772 /// // [1]
773 /// # for w in vec![1] {
774 /// # assert_eq!(stream.next().await.unwrap(), w);
775 /// # }
776 /// # }));
777 /// # }
778 /// ```
779 #[deprecated(note = "use `filter_if` with `Optional::is_some()` instead")]
780 pub fn filter_if_some<U>(
781 self,
782 signal: Optional<U, L, B>,
783 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
784 where
785 B: IsBounded,
786 {
787 self.filter_if(signal.is_some())
788 }
789
790 /// Filters this singleton into an [`Optional`], passing through the singleton value if the
791 /// argument (a [`Bounded`] [`Optional`]`) is null, otherwise the output is null.
792 ///
793 /// Like [`Singleton::filter_if_some`], this is useful for conditional processing, but inverts
794 /// the condition.
795 ///
796 /// # Example
797 /// ```rust
798 /// # #[cfg(feature = "deploy")] {
799 /// # use hydro_lang::prelude::*;
800 /// # use futures::StreamExt;
801 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
802 /// let tick = process.tick();
803 /// // ticks are lazy by default, forces the second tick to run
804 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
805 ///
806 /// let batch_first_tick = process
807 /// .source_iter(q!(vec![1]))
808 /// .batch(&tick, nondet!(/** test */));
809 /// let batch_second_tick = process
810 /// .source_iter(q!(vec![1, 2, 3]))
811 /// .batch(&tick, nondet!(/** test */))
812 /// .defer_tick(); // appears on the second tick
813 /// let some_on_first_tick = tick.optional_first_tick(q!(()));
814 /// batch_first_tick.chain(batch_second_tick).count()
815 /// .filter_if_none(some_on_first_tick)
816 /// .all_ticks()
817 /// # }, |mut stream| async move {
818 /// // [3]
819 /// # for w in vec![3] {
820 /// # assert_eq!(stream.next().await.unwrap(), w);
821 /// # }
822 /// # }));
823 /// # }
824 /// ```
825 #[deprecated(note = "use `filter_if` with `!Optional::is_some()` instead")]
826 pub fn filter_if_none<U>(
827 self,
828 other: Optional<U, L, B>,
829 ) -> Optional<T, L, <B as SingletonBound>::UnderlyingBound>
830 where
831 B: IsBounded,
832 {
833 self.filter_if(other.is_none())
834 }
835
836 /// Returns a [`Singleton`] containing `true` if this singleton's value equals the other's.
837 ///
838 /// # Example
839 /// ```rust
840 /// # #[cfg(feature = "deploy")] {
841 /// # use hydro_lang::prelude::*;
842 /// # use futures::StreamExt;
843 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
844 /// let tick = process.tick();
845 /// let a = tick.singleton(q!(5));
846 /// let b = tick.singleton(q!(5));
847 /// a.equals(b).all_ticks()
848 /// # }, |mut stream| async move {
849 /// // [true]
850 /// # assert_eq!(stream.next().await.unwrap(), true);
851 /// # }));
852 /// # }
853 /// ```
854 pub fn equals(self, other: Singleton<T, L, B>) -> Singleton<bool, L, B>
855 where
856 T: PartialEq,
857 B: IsBounded,
858 {
859 self.zip(other).map(q!(|(a, b)| a == b))
860 }
861
862 /// Returns a [`Stream`] that emits an event the first time the singleton has a value that is
863 /// greater than or equal to the provided threshold. The event will have the value of the
864 /// given threshold.
865 ///
866 /// This requires the incoming singleton to be monotonic, because otherwise the detection of
867 /// the threshold would be non-deterministic.
868 ///
869 /// # Example
870 /// ```rust
871 /// # #[cfg(feature = "deploy")] {
872 /// # use hydro_lang::prelude::*;
873 /// # use futures::StreamExt;
874 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
875 /// let a = // singleton 1 ~> 5 ~> 10
876 /// # process.singleton(q!(5));
877 /// let b = process.singleton(q!(4));
878 /// a.threshold_greater_or_equal(b)
879 /// # }, |mut stream| async move {
880 /// // [4]
881 /// # assert_eq!(stream.next().await.unwrap(), 4);
882 /// # }));
883 /// # }
884 /// ```
885 pub fn threshold_greater_or_equal<B2: IsBounded>(
886 self,
887 threshold: Singleton<T, L, B2>,
888 ) -> Stream<T, L, B::UnderlyingBound>
889 where
890 T: Clone + PartialOrd,
891 B: IsMonotonic,
892 {
893 let threshold = threshold.make_bounded();
894 match self.try_make_bounded() {
895 Ok(bounded) => {
896 let uncasted = threshold
897 .zip(bounded)
898 .into_stream()
899 .filter_map(q!(|(t, m)| if m < t { None } else { Some(t) }));
900
901 Stream::new(
902 uncasted.location.clone(),
903 uncasted.ir_node.replace(HydroNode::Placeholder),
904 )
905 }
906 Err(me) => {
907 let uncasted = sliced! {
908 let me = use(me, nondet!(/** thresholds are deterministic */));
909 let mut remaining_threshold = use::state(|l| {
910 let as_option: Optional<_, _, _> = threshold.clone_into_tick(l).into();
911 as_option
912 });
913
914 let (not_passed, passed) = remaining_threshold.zip(me).into_stream().partition(q!(|(t, m)| m < t));
915 remaining_threshold = not_passed.first().map(q!(|(t, _)| t));
916 passed.map(q!(|(t, _)| t))
917 };
918
919 Stream::new(
920 uncasted.location.clone(),
921 uncasted.ir_node.replace(HydroNode::Placeholder),
922 )
923 }
924 }
925 }
926
927 /// An operator which allows you to "name" a `HydroNode`.
928 /// This is only used for testing, to correlate certain `HydroNode`s with IDs.
929 pub fn ir_node_named(self, name: &str) -> Singleton<T, L, B> {
930 {
931 let mut node = self.ir_node.borrow_mut();
932 let metadata = node.metadata_mut();
933 metadata.tag = Some(name.to_owned());
934 }
935 self
936 }
937}
938
939impl<'a, L: Location<'a>, B: SingletonBound> Not for Singleton<bool, L, B> {
940 type Output = Singleton<bool, L, B::UnderlyingBound>;
941
942 fn not(self) -> Self::Output {
943 self.map(q!(|b| !b))
944 }
945}
946
947impl<'a, T, L, B: SingletonBound> Singleton<Option<T>, L, B>
948where
949 L: Location<'a>,
950{
951 /// Converts a `Singleton<Option<U>, L, B>` into an `Optional<U, L, B>` by unwrapping
952 /// the inner `Option`.
953 ///
954 /// This is implemented as an identity [`Singleton::filter_map`], passing through the
955 /// `Option<U>` directly. If the singleton's value is `Some(v)`, the resulting
956 /// [`Optional`] contains `v`; if `None`, the [`Optional`] is empty.
957 ///
958 /// # Example
959 /// ```rust
960 /// # #[cfg(feature = "deploy")] {
961 /// # use hydro_lang::prelude::*;
962 /// # use futures::StreamExt;
963 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
964 /// let tick = process.tick();
965 /// let singleton = tick.singleton(q!(Some(42)));
966 /// singleton.into_optional().all_ticks()
967 /// # }, |mut stream| async move {
968 /// // 42
969 /// # assert_eq!(stream.next().await.unwrap(), 42);
970 /// # }));
971 /// # }
972 /// ```
973 pub fn into_optional(self) -> Optional<T, L, B::UnderlyingBound> {
974 self.filter_map(q!(|v| v))
975 }
976}
977
978impl<'a, L, B: SingletonBound> Singleton<bool, L, B>
979where
980 L: Location<'a>,
981{
982 /// Returns a [`Singleton`] containing the logical AND of this and another boolean singleton.
983 ///
984 /// # Example
985 /// ```rust
986 /// # #[cfg(feature = "deploy")] {
987 /// # use hydro_lang::prelude::*;
988 /// # use futures::StreamExt;
989 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
990 /// let tick = process.tick();
991 /// // ticks are lazy by default, forces the second tick to run
992 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
993 ///
994 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
995 /// let b = tick.singleton(q!(true)); // true, true
996 /// a.and(b).all_ticks()
997 /// # }, |mut stream| async move {
998 /// // [true, false]
999 /// # for w in vec![true, false] {
1000 /// # assert_eq!(stream.next().await.unwrap(), w);
1001 /// # }
1002 /// # }));
1003 /// # }
1004 /// ```
1005 pub fn and(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1006 where
1007 B: IsBounded,
1008 {
1009 self.zip(other).map(q!(|(a, b)| a && b)).make_bounded()
1010 }
1011
1012 /// Returns a [`Singleton`] containing the logical OR of this and another boolean singleton.
1013 ///
1014 /// # Example
1015 /// ```rust
1016 /// # #[cfg(feature = "deploy")] {
1017 /// # use hydro_lang::prelude::*;
1018 /// # use futures::StreamExt;
1019 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1020 /// let tick = process.tick();
1021 /// // ticks are lazy by default, forces the second tick to run
1022 /// tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1023 ///
1024 /// let a = tick.optional_first_tick(q!(())).is_some(); // true, false
1025 /// let b = tick.singleton(q!(false)); // false, false
1026 /// a.or(b).all_ticks()
1027 /// # }, |mut stream| async move {
1028 /// // [true, false]
1029 /// # for w in vec![true, false] {
1030 /// # assert_eq!(stream.next().await.unwrap(), w);
1031 /// # }
1032 /// # }));
1033 /// # }
1034 /// ```
1035 pub fn or(self, other: Singleton<bool, L, B>) -> Singleton<bool, L, Bounded>
1036 where
1037 B: IsBounded,
1038 {
1039 self.zip(other).map(q!(|(a, b)| a || b)).make_bounded()
1040 }
1041}
1042
1043impl<'a, T, L, B: SingletonBound> Singleton<T, Atomic<L>, B>
1044where
1045 L: Location<'a> + NoTick,
1046{
1047 /// Returns a singleton value corresponding to the latest snapshot of the singleton
1048 /// being atomically processed. The snapshot at tick `t + 1` is guaranteed to include
1049 /// at least all relevant data that contributed to the snapshot at tick `t`. Furthermore,
1050 /// all snapshots of this singleton into the atomic-associated tick will observe the
1051 /// same value each tick.
1052 ///
1053 /// # Non-Determinism
1054 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1055 /// the output singleton has a non-deterministic value since the snapshot can be at an
1056 /// arbitrary point in time.
1057 pub fn snapshot_atomic(
1058 self,
1059 tick: &Tick<L>,
1060 _nondet: NonDet,
1061 ) -> Singleton<T, Tick<L>, Bounded> {
1062 Singleton::new(
1063 tick.clone(),
1064 HydroNode::Batch {
1065 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1066 metadata: tick
1067 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1068 },
1069 )
1070 }
1071
1072 /// Returns this singleton back into a top-level, asynchronous execution context where updates
1073 /// to the value will be asynchronously propagated.
1074 pub fn end_atomic(self) -> Singleton<T, L, B> {
1075 Singleton::new(
1076 self.location.tick.l.clone(),
1077 HydroNode::EndAtomic {
1078 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1079 metadata: self
1080 .location
1081 .tick
1082 .l
1083 .new_node_metadata(Singleton::<T, L, B>::collection_kind()),
1084 },
1085 )
1086 }
1087}
1088
1089impl<'a, T, L, B: SingletonBound> Singleton<T, L, B>
1090where
1091 L: Location<'a>,
1092{
1093 /// Shifts this singleton into an atomic context, which guarantees that any downstream logic
1094 /// will observe the same version of the value and will be executed synchronously before any
1095 /// outputs are yielded (in [`Optional::end_atomic`]).
1096 ///
1097 /// This is useful to enforce local consistency constraints, such as ensuring that several readers
1098 /// see a consistent version of local state (since otherwise each [`Singleton::snapshot`] may pick
1099 /// a different version).
1100 pub fn atomic(self) -> Singleton<T, Atomic<L>, B> {
1101 let id = self.location.flow_state().borrow_mut().next_clock_id();
1102 let out_location = Atomic {
1103 tick: Tick {
1104 id,
1105 l: self.location.clone(),
1106 },
1107 };
1108 Singleton::new(
1109 out_location.clone(),
1110 HydroNode::BeginAtomic {
1111 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1112 metadata: out_location
1113 .new_node_metadata(Singleton::<T, Atomic<L>, B>::collection_kind()),
1114 },
1115 )
1116 }
1117
1118 /// Given a tick, returns a singleton value corresponding to a snapshot of the singleton
1119 /// as of that tick. The snapshot at tick `t + 1` is guaranteed to include at least all
1120 /// relevant data that contributed to the snapshot at tick `t`.
1121 ///
1122 /// # Non-Determinism
1123 /// Because this picks a snapshot of a singleton whose value is continuously changing,
1124 /// the output singleton has a non-deterministic value since the snapshot can be at an
1125 /// arbitrary point in time.
1126 pub fn snapshot(self, tick: &Tick<L>, _nondet: NonDet) -> Singleton<T, Tick<L>, Bounded> {
1127 assert_eq!(Location::id(tick.outer()), Location::id(&self.location));
1128 Singleton::new(
1129 tick.clone(),
1130 HydroNode::Batch {
1131 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1132 metadata: tick
1133 .new_node_metadata(Singleton::<T, Tick<L>, Bounded>::collection_kind()),
1134 },
1135 )
1136 }
1137
1138 /// Eagerly samples the singleton as fast as possible, returning a stream of snapshots
1139 /// with order corresponding to increasing prefixes of data contributing to the singleton.
1140 ///
1141 /// # Non-Determinism
1142 /// At runtime, the singleton will be arbitrarily sampled as fast as possible, but due
1143 /// to non-deterministic batching and arrival of inputs, the output stream is
1144 /// non-deterministic.
1145 pub fn sample_eager(self, nondet: NonDet) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1146 where
1147 L: NoTick,
1148 {
1149 sliced! {
1150 let snapshot = use(self, nondet);
1151 snapshot.into_stream()
1152 }
1153 .weaken_retries()
1154 }
1155
1156 /// Given a time interval, returns a stream corresponding to snapshots of the singleton
1157 /// value taken at various points in time. Because the input singleton may be
1158 /// [`Unbounded`], there are no guarantees on what these snapshots are other than they
1159 /// represent the value of the singleton given some prefix of the streams leading up to
1160 /// it.
1161 ///
1162 /// # Non-Determinism
1163 /// The output stream is non-deterministic in which elements are sampled, since this
1164 /// is controlled by a clock.
1165 pub fn sample_every(
1166 self,
1167 interval: impl QuotedWithContext<'a, std::time::Duration, L> + Copy + 'a,
1168 nondet: NonDet,
1169 ) -> Stream<T, L, Unbounded, TotalOrder, AtLeastOnce>
1170 where
1171 L: NoTick + NoAtomic,
1172 {
1173 let samples = self.location.source_interval(interval, nondet);
1174 sliced! {
1175 let snapshot = use(self, nondet);
1176 let sample_batch = use(samples, nondet);
1177
1178 snapshot.filter_if(sample_batch.first().is_some()).into_stream()
1179 }
1180 .weaken_retries()
1181 }
1182
1183 /// Strengthens the boundedness guarantee to `Bounded`, given that `B: IsBounded`, which
1184 /// implies that `B == Bounded`.
1185 pub fn make_bounded(self) -> Singleton<T, L, Bounded>
1186 where
1187 B: IsBounded,
1188 {
1189 Singleton::new(
1190 self.location.clone(),
1191 self.ir_node.replace(HydroNode::Placeholder),
1192 )
1193 }
1194
1195 #[expect(clippy::result_large_err, reason = "internal use only")]
1196 fn try_make_bounded(self) -> Result<Singleton<T, L, Bounded>, Singleton<T, L, B>> {
1197 if B::UnderlyingBound::BOUNDED {
1198 Ok(Singleton::new(
1199 self.location.clone(),
1200 self.ir_node.replace(HydroNode::Placeholder),
1201 ))
1202 } else {
1203 Err(self)
1204 }
1205 }
1206
1207 /// Clones this bounded singleton into a tick, returning a singleton that has the
1208 /// same value as the outer singleton. Because the outer singleton is bounded, this
1209 /// is deterministic because there is only a single immutable version.
1210 pub fn clone_into_tick(self, tick: &Tick<L>) -> Singleton<T, Tick<L>, Bounded>
1211 where
1212 B: IsBounded,
1213 T: Clone,
1214 {
1215 // TODO(shadaj): avoid printing simulator logs for this snapshot
1216 self.snapshot(
1217 tick,
1218 nondet!(/** bounded top-level singleton so deterministic */),
1219 )
1220 }
1221
1222 /// Converts this singleton into a [`Stream`] containing a single element, the value.
1223 ///
1224 /// # Example
1225 /// ```rust
1226 /// # #[cfg(feature = "deploy")] {
1227 /// # use hydro_lang::prelude::*;
1228 /// # use futures::StreamExt;
1229 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1230 /// let tick = process.tick();
1231 /// let batch_input = process
1232 /// .source_iter(q!(vec![123, 456]))
1233 /// .batch(&tick, nondet!(/** test */));
1234 /// batch_input.clone().chain(
1235 /// batch_input.count().into_stream()
1236 /// ).all_ticks()
1237 /// # }, |mut stream| async move {
1238 /// // [123, 456, 2]
1239 /// # for w in vec![123, 456, 2] {
1240 /// # assert_eq!(stream.next().await.unwrap(), w);
1241 /// # }
1242 /// # }));
1243 /// # }
1244 /// ```
1245 pub fn into_stream(self) -> Stream<T, L, Bounded, TotalOrder, ExactlyOnce>
1246 where
1247 B: IsBounded,
1248 {
1249 Stream::new(
1250 self.location.clone(),
1251 HydroNode::Cast {
1252 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1253 metadata: self.location.new_node_metadata(Stream::<
1254 T,
1255 Tick<L>,
1256 Bounded,
1257 TotalOrder,
1258 ExactlyOnce,
1259 >::collection_kind()),
1260 },
1261 )
1262 }
1263
1264 /// Resolves the singleton's [`Future`] value by blocking until it completes,
1265 /// producing a singleton of the resolved output.
1266 ///
1267 /// This is useful when the singleton contains an async computation that must
1268 /// be awaited before further processing. The future is polled to completion
1269 /// before the output value is emitted.
1270 ///
1271 /// # Example
1272 /// ```rust
1273 /// # #[cfg(feature = "deploy")] {
1274 /// # use hydro_lang::prelude::*;
1275 /// # use futures::StreamExt;
1276 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1277 /// let tick = process.tick();
1278 /// let singleton = tick.singleton(q!(5));
1279 /// singleton
1280 /// .map(q!(|v| async move { v * 2 }))
1281 /// .resolve_future_blocking()
1282 /// .all_ticks()
1283 /// # }, |mut stream| async move {
1284 /// // 10
1285 /// # assert_eq!(stream.next().await.unwrap(), 10);
1286 /// # }));
1287 /// # }
1288 /// ```
1289 pub fn resolve_future_blocking(
1290 self,
1291 ) -> Singleton<T::Output, L, <B as SingletonBound>::UnderlyingBound>
1292 where
1293 T: Future,
1294 B: IsBounded,
1295 {
1296 Singleton::new(
1297 self.location.clone(),
1298 HydroNode::ResolveFuturesBlocking {
1299 input: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1300 metadata: self
1301 .location
1302 .new_node_metadata(Singleton::<T::Output, L, B>::collection_kind()),
1303 },
1304 )
1305 }
1306}
1307
1308impl<'a, T, L> Singleton<T, Tick<L>, Bounded>
1309where
1310 L: Location<'a>,
1311{
1312 /// Asynchronously yields the value of this singleton outside the tick as an unbounded stream,
1313 /// which will stream the value computed in _each_ tick as a separate stream element.
1314 ///
1315 /// Unlike [`Singleton::latest`], the value computed in each tick is emitted separately,
1316 /// producing one element in the output for each tick. This is useful for batched computations,
1317 /// where the results from each tick must be combined together.
1318 ///
1319 /// # Example
1320 /// ```rust
1321 /// # #[cfg(feature = "deploy")] {
1322 /// # use hydro_lang::prelude::*;
1323 /// # use futures::StreamExt;
1324 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1325 /// let tick = process.tick();
1326 /// # // ticks are lazy by default, forces the second tick to run
1327 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1328 /// # let batch_first_tick = process
1329 /// # .source_iter(q!(vec![1]))
1330 /// # .batch(&tick, nondet!(/** test */));
1331 /// # let batch_second_tick = process
1332 /// # .source_iter(q!(vec![1, 2, 3]))
1333 /// # .batch(&tick, nondet!(/** test */))
1334 /// # .defer_tick(); // appears on the second tick
1335 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1336 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1337 /// .count()
1338 /// .all_ticks()
1339 /// # }, |mut stream| async move {
1340 /// // [1, 3]
1341 /// # for w in vec![1, 3] {
1342 /// # assert_eq!(stream.next().await.unwrap(), w);
1343 /// # }
1344 /// # }));
1345 /// # }
1346 /// ```
1347 pub fn all_ticks(self) -> Stream<T, L, Unbounded, TotalOrder, ExactlyOnce> {
1348 self.into_stream().all_ticks()
1349 }
1350
1351 /// Synchronously yields the value of this singleton outside the tick as an unbounded stream,
1352 /// which will stream the value computed in _each_ tick as a separate stream element.
1353 ///
1354 /// Unlike [`Singleton::all_ticks`], this preserves synchronous execution, as the output stream
1355 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1356 /// singleton's [`Tick`] context.
1357 pub fn all_ticks_atomic(self) -> Stream<T, Atomic<L>, Unbounded, TotalOrder, ExactlyOnce> {
1358 self.into_stream().all_ticks_atomic()
1359 }
1360
1361 /// Asynchronously yields this singleton outside the tick as an unbounded singleton, which will
1362 /// be asynchronously updated with the latest value of the singleton inside the tick.
1363 ///
1364 /// This converts a bounded value _inside_ a tick into an asynchronous value outside the
1365 /// tick that tracks the inner value. This is useful for getting the value as of the
1366 /// "most recent" tick, but note that updates are propagated asynchronously outside the tick.
1367 ///
1368 /// # Example
1369 /// ```rust
1370 /// # #[cfg(feature = "deploy")] {
1371 /// # use hydro_lang::prelude::*;
1372 /// # use futures::StreamExt;
1373 /// # tokio_test::block_on(hydro_lang::test_util::stream_transform_test(|process| {
1374 /// let tick = process.tick();
1375 /// # // ticks are lazy by default, forces the second tick to run
1376 /// # tick.spin_batch(q!(1)).all_ticks().for_each(q!(|_| {}));
1377 /// # let batch_first_tick = process
1378 /// # .source_iter(q!(vec![1]))
1379 /// # .batch(&tick, nondet!(/** test */));
1380 /// # let batch_second_tick = process
1381 /// # .source_iter(q!(vec![1, 2, 3]))
1382 /// # .batch(&tick, nondet!(/** test */))
1383 /// # .defer_tick(); // appears on the second tick
1384 /// # let input_batch = batch_first_tick.chain(batch_second_tick);
1385 /// input_batch // first tick: [1], second tick: [1, 2, 3]
1386 /// .count()
1387 /// .latest()
1388 /// # .sample_eager(nondet!(/** test */))
1389 /// # }, |mut stream| async move {
1390 /// // asynchronously changes from 1 ~> 3
1391 /// # for w in vec![1, 3] {
1392 /// # assert_eq!(stream.next().await.unwrap(), w);
1393 /// # }
1394 /// # }));
1395 /// # }
1396 /// ```
1397 pub fn latest(self) -> Singleton<T, L, Unbounded> {
1398 Singleton::new(
1399 self.location.outer().clone(),
1400 HydroNode::YieldConcat {
1401 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1402 metadata: self
1403 .location
1404 .outer()
1405 .new_node_metadata(Singleton::<T, L, Unbounded>::collection_kind()),
1406 },
1407 )
1408 }
1409
1410 /// Synchronously yields this singleton outside the tick as an unbounded singleton, which will
1411 /// be updated with the latest value of the singleton inside the tick.
1412 ///
1413 /// Unlike [`Singleton::latest`], this preserves synchronous execution, as the output singleton
1414 /// is emitted in an [`Atomic`] context that will process elements synchronously with the input
1415 /// singleton's [`Tick`] context.
1416 pub fn latest_atomic(self) -> Singleton<T, Atomic<L>, Unbounded> {
1417 let out_location = Atomic {
1418 tick: self.location.clone(),
1419 };
1420 Singleton::new(
1421 out_location.clone(),
1422 HydroNode::YieldConcat {
1423 inner: Box::new(self.ir_node.replace(HydroNode::Placeholder)),
1424 metadata: out_location
1425 .new_node_metadata(Singleton::<T, Atomic<L>, Unbounded>::collection_kind()),
1426 },
1427 )
1428 }
1429}
1430
1431#[doc(hidden)]
1432/// Helper trait that determines the output collection type for [`Singleton::zip`].
1433///
1434/// The output will be an [`Optional`] if the second input is an [`Optional`], otherwise it is a
1435/// [`Singleton`].
1436#[sealed::sealed]
1437pub trait ZipResult<'a, Other> {
1438 /// The output collection type.
1439 type Out;
1440 /// The type of the tupled output value.
1441 type ElementType;
1442 /// The type of the other collection's value.
1443 type OtherType;
1444 /// The location where the tupled result will be materialized.
1445 type Location: Location<'a>;
1446
1447 /// The location of the second input to the `zip`.
1448 fn other_location(other: &Other) -> Self::Location;
1449 /// The IR node of the second input to the `zip`.
1450 fn other_ir_node(other: Other) -> HydroNode;
1451
1452 /// Constructs the output live collection given an IR node containing the zip result.
1453 fn make(location: Self::Location, ir_node: HydroNode) -> Self::Out;
1454}
1455
1456#[sealed::sealed]
1457impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Singleton<U, L, B>> for Singleton<T, L, B>
1458where
1459 L: Location<'a>,
1460{
1461 type Out = Singleton<(T, U), L, B>;
1462 type ElementType = (T, U);
1463 type OtherType = U;
1464 type Location = L;
1465
1466 fn other_location(other: &Singleton<U, L, B>) -> L {
1467 other.location.clone()
1468 }
1469
1470 fn other_ir_node(other: Singleton<U, L, B>) -> HydroNode {
1471 other.ir_node.replace(HydroNode::Placeholder)
1472 }
1473
1474 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1475 Singleton::new(
1476 location.clone(),
1477 HydroNode::Cast {
1478 inner: Box::new(ir_node),
1479 metadata: location.new_node_metadata(Self::Out::collection_kind()),
1480 },
1481 )
1482 }
1483}
1484
1485#[sealed::sealed]
1486impl<'a, T, U, L, B: SingletonBound> ZipResult<'a, Optional<U, L, B::UnderlyingBound>>
1487 for Singleton<T, L, B>
1488where
1489 L: Location<'a>,
1490{
1491 type Out = Optional<(T, U), L, B::UnderlyingBound>;
1492 type ElementType = (T, U);
1493 type OtherType = U;
1494 type Location = L;
1495
1496 fn other_location(other: &Optional<U, L, B::UnderlyingBound>) -> L {
1497 other.location.clone()
1498 }
1499
1500 fn other_ir_node(other: Optional<U, L, B::UnderlyingBound>) -> HydroNode {
1501 other.ir_node.replace(HydroNode::Placeholder)
1502 }
1503
1504 fn make(location: L, ir_node: HydroNode) -> Self::Out {
1505 Optional::new(location, ir_node)
1506 }
1507}
1508
1509#[cfg(test)]
1510mod tests {
1511 #[cfg(feature = "deploy")]
1512 use futures::{SinkExt, StreamExt};
1513 #[cfg(feature = "deploy")]
1514 use hydro_deploy::Deployment;
1515 #[cfg(any(feature = "deploy", feature = "sim"))]
1516 use stageleft::q;
1517
1518 #[cfg(any(feature = "deploy", feature = "sim"))]
1519 use crate::compile::builder::FlowBuilder;
1520 #[cfg(feature = "deploy")]
1521 use crate::live_collections::stream::ExactlyOnce;
1522 #[cfg(any(feature = "deploy", feature = "sim"))]
1523 use crate::location::Location;
1524 #[cfg(any(feature = "deploy", feature = "sim"))]
1525 use crate::nondet::nondet;
1526
1527 #[cfg(feature = "deploy")]
1528 #[tokio::test]
1529 async fn tick_cycle_cardinality() {
1530 let mut deployment = Deployment::new();
1531
1532 let mut flow = FlowBuilder::new();
1533 let node = flow.process::<()>();
1534 let external = flow.external::<()>();
1535
1536 let (input_send, input) = node.source_external_bincode::<_, _, _, ExactlyOnce>(&external);
1537
1538 let node_tick = node.tick();
1539 let (complete_cycle, singleton) = node_tick.cycle_with_initial(node_tick.singleton(q!(0)));
1540 let counts = singleton
1541 .clone()
1542 .into_stream()
1543 .count()
1544 .filter_if(
1545 input
1546 .batch(&node_tick, nondet!(/** testing */))
1547 .first()
1548 .is_some(),
1549 )
1550 .all_ticks()
1551 .send_bincode_external(&external);
1552 complete_cycle.complete_next_tick(singleton);
1553
1554 let nodes = flow
1555 .with_process(&node, deployment.Localhost())
1556 .with_external(&external, deployment.Localhost())
1557 .deploy(&mut deployment);
1558
1559 deployment.deploy().await.unwrap();
1560
1561 let mut tick_trigger = nodes.connect(input_send).await;
1562 let mut external_out = nodes.connect(counts).await;
1563
1564 deployment.start().await.unwrap();
1565
1566 tick_trigger.send(()).await.unwrap();
1567
1568 assert_eq!(external_out.next().await.unwrap(), 1);
1569
1570 tick_trigger.send(()).await.unwrap();
1571
1572 assert_eq!(external_out.next().await.unwrap(), 1);
1573 }
1574
1575 #[cfg(feature = "sim")]
1576 #[test]
1577 #[should_panic]
1578 fn sim_fold_intermediate_states() {
1579 let mut flow = FlowBuilder::new();
1580 let node = flow.process::<()>();
1581
1582 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1583 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1584
1585 let tick = node.tick();
1586 let batch = folded.snapshot(&tick, nondet!(/** test */));
1587 let out_recv = batch.all_ticks().sim_output();
1588
1589 flow.sim().exhaustive(async || {
1590 assert_eq!(out_recv.next().await.unwrap(), 10);
1591 });
1592 }
1593
1594 #[cfg(feature = "sim")]
1595 #[test]
1596 fn sim_fold_intermediate_state_count() {
1597 let mut flow = FlowBuilder::new();
1598 let node = flow.process::<()>();
1599
1600 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1601 let folded = source.fold(q!(|| 0), q!(|a, b| *a += b));
1602
1603 let tick = node.tick();
1604 let batch = folded.snapshot(&tick, nondet!(/** test */));
1605 let out_recv = batch.all_ticks().sim_output();
1606
1607 let instance_count = flow.sim().exhaustive(async || {
1608 let out = out_recv.collect::<Vec<_>>().await;
1609 assert_eq!(out.last(), Some(&10));
1610 });
1611
1612 assert_eq!(
1613 instance_count,
1614 16 // 2^4 possible subsets of intermediates (including initial state)
1615 )
1616 }
1617
1618 #[cfg(feature = "sim")]
1619 #[test]
1620 fn sim_fold_no_repeat_initial() {
1621 // check that we don't repeat the initial state of the fold in autonomous decisions
1622
1623 let mut flow = FlowBuilder::new();
1624 let node = flow.process::<()>();
1625
1626 let (in_port, input) = node.sim_input();
1627 let folded = input.fold(q!(|| 0), q!(|a, b| *a += b));
1628
1629 let tick = node.tick();
1630 let batch = folded.snapshot(&tick, nondet!(/** test */));
1631 let out_recv = batch.all_ticks().sim_output();
1632
1633 flow.sim().exhaustive(async || {
1634 assert_eq!(out_recv.next().await.unwrap(), 0);
1635
1636 in_port.send(123);
1637
1638 assert_eq!(out_recv.next().await.unwrap(), 123);
1639 });
1640 }
1641
1642 #[cfg(feature = "sim")]
1643 #[test]
1644 #[should_panic]
1645 fn sim_fold_repeats_snapshots() {
1646 // when the tick is driven by a snapshot AND something else, the snapshot can
1647 // "stutter" and repeat the same state multiple times
1648
1649 let mut flow = FlowBuilder::new();
1650 let node = flow.process::<()>();
1651
1652 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2, 3, 4])));
1653 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1654
1655 let tick = node.tick();
1656 let batch = source
1657 .batch(&tick, nondet!(/** test */))
1658 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1659 let out_recv = batch.all_ticks().sim_output();
1660
1661 flow.sim().exhaustive(async || {
1662 if out_recv.next().await.unwrap() == (1, 3) && out_recv.next().await.unwrap() == (2, 3)
1663 {
1664 panic!("repeated snapshot");
1665 }
1666 });
1667 }
1668
1669 #[cfg(feature = "sim")]
1670 #[test]
1671 fn sim_fold_repeats_snapshots_count() {
1672 // check the number of instances
1673 let mut flow = FlowBuilder::new();
1674 let node = flow.process::<()>();
1675
1676 let source = node.source_stream(q!(tokio_stream::iter(vec![1, 2])));
1677 let folded = source.clone().fold(q!(|| 0), q!(|a, b| *a += b));
1678
1679 let tick = node.tick();
1680 let batch = source
1681 .batch(&tick, nondet!(/** test */))
1682 .cross_singleton(folded.snapshot(&tick, nondet!(/** test */)));
1683 let out_recv = batch.all_ticks().sim_output();
1684
1685 let count = flow.sim().exhaustive(async || {
1686 let _ = out_recv.collect::<Vec<_>>().await;
1687 });
1688
1689 assert_eq!(count, 52);
1690 // don't have a combinatorial explanation for this number yet, but checked via logs
1691 }
1692
1693 #[cfg(feature = "sim")]
1694 #[test]
1695 fn sim_top_level_singleton_exhaustive() {
1696 // ensures that top-level singletons have only one snapshot
1697 let mut flow = FlowBuilder::new();
1698 let node = flow.process::<()>();
1699
1700 let singleton = node.singleton(q!(1));
1701 let tick = node.tick();
1702 let batch = singleton.snapshot(&tick, nondet!(/** test */));
1703 let out_recv = batch.all_ticks().sim_output();
1704
1705 let count = flow.sim().exhaustive(async || {
1706 let _ = out_recv.collect::<Vec<_>>().await;
1707 });
1708
1709 assert_eq!(count, 1);
1710 }
1711
1712 #[cfg(feature = "sim")]
1713 #[test]
1714 fn sim_top_level_singleton_join_count() {
1715 // if a tick consumes a static snapshot and a stream batch, only the batch require space
1716 // exploration
1717
1718 let mut flow = FlowBuilder::new();
1719 let node = flow.process::<()>();
1720
1721 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1722 let tick = node.tick();
1723 let batch = source_iter
1724 .batch(&tick, nondet!(/** test */))
1725 .cross_singleton(node.singleton(q!(123)).clone_into_tick(&tick));
1726 let out_recv = batch.all_ticks().sim_output();
1727
1728 let instance_count = flow.sim().exhaustive(async || {
1729 let _ = out_recv.collect::<Vec<_>>().await;
1730 });
1731
1732 assert_eq!(
1733 instance_count,
1734 16 // 2^4 ways to split up (including a possibly empty first batch)
1735 )
1736 }
1737
1738 #[cfg(feature = "sim")]
1739 #[test]
1740 fn top_level_singleton_into_stream_no_replay() {
1741 let mut flow = FlowBuilder::new();
1742 let node = flow.process::<()>();
1743
1744 let source_iter = node.source_iter(q!(vec![1, 2, 3, 4]));
1745 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1746
1747 let out_recv = folded.into_stream().sim_output();
1748
1749 flow.sim().exhaustive(async || {
1750 out_recv.assert_yields_only([10]).await;
1751 });
1752 }
1753
1754 #[cfg(feature = "sim")]
1755 #[test]
1756 fn inside_tick_singleton_zip() {
1757 use crate::live_collections::Stream;
1758 use crate::live_collections::sliced::sliced;
1759
1760 let mut flow = FlowBuilder::new();
1761 let node = flow.process::<()>();
1762
1763 let source_iter: Stream<_, _> = node.source_iter(q!(vec![1, 2])).into();
1764 let folded = source_iter.fold(q!(|| 0), q!(|a, b| *a += b));
1765
1766 let out_recv = sliced! {
1767 let v = use(folded, nondet!(/** test */));
1768 v.clone().zip(v).into_stream()
1769 }
1770 .sim_output();
1771
1772 let count = flow.sim().exhaustive(async || {
1773 let out = out_recv.collect::<Vec<_>>().await;
1774 assert_eq!(out.last(), Some(&(3, 3)));
1775 });
1776
1777 assert_eq!(count, 4);
1778 }
1779}