Skip to content

Source, Flow & Sink

The three building blocks of a Datum stream are Source, Flow, and Sink. They compose left-to-right to form a closed graph that can be materialized and run.

Types

TypeProducesConsumesMaterialized value
Source<Out, Mat>Out elementsnothing (no upstream)Mat
Flow<In, Out>Out elementsIn elementsNotUsed (always)
Sink<In, Mat>nothingIn elementsMat

NotUsed is a unit-like type that signals "no meaningful materialized value here."

Note: Flow<In, Out> is a shorthand for Flow<In, Out, NotUsed>. The full type has a third Mat parameter, but the only flows with meaningful materialized values are special terminal operators not commonly used in the linear DSL.

Composition

Source, Flow, and Sink compose in the order that data flows:

Source<A, _>  →  Flow<A, B>  →  Flow<B, C>  →  Sink<C, _>

Use .via(flow) to append a Flow, and .to(sink) to close the graph with a Sink:

rust
use datum::{Flow, Sink, Source};

// A reusable Flow: keep even numbers and double them.
let double_evens: Flow<u64, u64> = Flow::identity().filter(|x| x % 2 == 0).map(|x| x * 2);

// Wire Source -> Flow -> Sink and materialize.
let result: Vec<u64> = Source::from_iter(1_u64..=6)
    .via(double_evens)
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

Flow::identity() creates a pass-through flow (no transformation). You can build a reusable Flow by chaining operators onto it, then plug it into multiple Sources via .via(...).

Sink terminals

Several common sinks are built into Sink:

rust
use datum::{Sink, Source};

// Sink::fold — accumulate into a single value
let sum: u64 = Source::from_iter(1_u64..=100)
    .run_with(Sink::fold(0_u64, |acc, x| acc + x))
    .unwrap()
    .wait()
    .unwrap();
assert_eq!(sum, 5_050);

// Sink::collect — gather all elements into a Vec
let v: Vec<u32> = Source::from_iter([10_u32, 20, 30])
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();
assert_eq!(v, vec![10, 20, 30]);
SinkDescription
Sink::fold(init, f)Accumulate all elements into a single value
Sink::collect()Gather all elements into a Vec
Sink::ignore()Consume and discard all elements
Sink::foreach(f)Run a side effect for each element
Sink::head()Take the first element (completes after one)
Sink::last()Take the last element

Type parameters and ownership

When you call .via(flow), the Source is consumed (moved) and a new Source is returned. The operators are fused into the blueprint — no copies of the data are made at construction time.

Operators like map and filter take closures that must be Send + Sync + 'static because the fused chain may run on a different thread from the caller.

Flow operators

Flow exposes the full Datum operator set. A selection:

OperatorDescription
.map(f)Transform each element
.filter(pred)Drop elements where pred returns false
.filter_map(f)Map and drop None results (Akka's collect)
.take(n)Keep the first n elements
.drop(n)Skip the first n elements
.take_while(pred)Keep elements while pred is true
.map_error(f)Transform the error in a StreamResult
.recover(f)Replace an error with a fallback value
.flat_map_concat(f)Map each element to a sub-source and concatenate in order
.flat_map_merge(n, f)Map each element to a sub-source and merge up to n concurrently

See the API docs and src/stream.rs for the full list.

Next: Materialization

Building a Source → Flow → Sink chain is side-effect-free. To actually run it, see Materialization.