Skip to content

Materialization

Datum separates building a stream graph from running it. Calling .map(...), .filter(...), .via(...), and .to(...) constructs an immutable blueprint. Execution begins only at materialization.

run_with — the common path

run_with(sink) materializes a Source with a sink and returns the sink's materialized value:

rust
use datum::{Sink, Source};

// run_with keeps the Sink's materialized value (the right side by default).
let completion = Source::from_iter(1_u64..=3)
    .run_with(Sink::collect())
    .unwrap(); // -> StreamCompletion<Vec<u64>>

let items: Vec<u64> = completion.wait().unwrap();

run_with keeps the sink's materialized value and discards the source's (NotUsed in most cases). The result is wrapped in StreamResult<StreamCompletion<T>>:

  • StreamResult — fails if the stream could not be started (e.g. scheduler shut down).
  • StreamCompletion<T> — a handle to the running stream. Call .wait() to block until done.

StreamCompletion<T>

StreamCompletion<T> is returned immediately at materialization — before the computation finishes. The stream may run inline (for small, synchronous, bounded chains) or on a background thread. .wait() blocks until the result is ready and works correctly in either case. The mat-run-with example above shows the full pattern: run_with returns a StreamCompletion, then .wait() retrieves the result.

Keep — choosing which materialized value to keep

By default, run_with keeps the sink's materialized value. To keep both, or to keep the source's value, use to_mat(sink, combinator) and pass a Keep function:

rust
use datum::{Keep, Sink, Source};

// to_mat(sink, Keep::both) returns a RunnableGraph<(SourceMat, SinkMat)>.
let (_, sink_mat) = Source::from_iter(1_u64..=3)
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

let items: Vec<u64> = sink_mat.wait().unwrap();

The Keep combinators:

CombinatorReturns
Keep::rightSink's materialized value (default for run_with)
Keep::leftSource's materialized value
Keep::both(SourceMat, SinkMat) tuple
Keep::noneNotUsed (discard both)

Keep::both and Keep::left are useful when the source itself produces a meaningful handle — for example, a UniqueKillSwitch / SharedKillSwitch or a MergeHub producer port.

RunnableGraph and .run()

to_mat(...) produces a RunnableGraph<Mat>. Calling .run() on it is equivalent to calling run_with on the original source. The same RunnableGraph can be run multiple times to produce independent executions (see Blueprint vs. Run).

rust
use datum::{Keep, Sink, Source};

// `to_mat` builds a RunnableGraph — no execution yet.
let blueprint = Source::from_iter(1_u64..=5)
    .map(|x| x * x)
    .to_mat(Sink::collect(), Keep::right);

// Each call to `run()` starts a completely independent execution.
let run1: Vec<u64> = blueprint.run().unwrap().wait().unwrap();
let run2: Vec<u64> = blueprint.run().unwrap().wait().unwrap();

Materialized values in graphs

In the linear DSL (Source → Flow → Sink), only sources and sinks carry meaningful materialized values. Flow always materializes to NotUsed.

In the GraphDSL layer, junctions like MergeHub and BroadcastHub expose producer/consumer ports as materialized values. Combining them with Keep or a custom combinator lets you thread those handles out to the caller.

Summary

CallReturnsKeeps
source.run_with(sink)StreamResult<SinkMat>sink's mat
source.to_mat(sink, Keep::left).run()StreamResult<SourceMat>source's mat
source.to_mat(sink, Keep::both).run()StreamResult<(SourceMat, SinkMat)>both
source.to_mat(sink, Keep::none).run()StreamResult<NotUsed>neither