Appearance
Materialization
Datum separates building a stream graph from running it. Calling .map(...), .filter(...), .via(...), and .to(...) constructs an immutable blueprint. Execution begins only at materialization.
run_with — the common path
run_with(sink) materializes a Source with a sink and returns the sink's materialized value:
rust
use datum::{Sink, Source};
// run_with keeps the Sink's materialized value (the right side by default).
let completion = Source::from_iter(1_u64..=3)
.run_with(Sink::collect())
.unwrap(); // -> StreamCompletion<Vec<u64>>
let items: Vec<u64> = completion.wait().unwrap();run_with keeps the sink's materialized value and discards the source's (NotUsed in most cases). The result is wrapped in StreamResult<StreamCompletion<T>>:
StreamResult— fails if the stream could not be started (e.g. scheduler shut down).StreamCompletion<T>— a handle to the running stream. Call.wait()to block until done.
StreamCompletion<T>
StreamCompletion<T> is returned immediately at materialization — before the computation finishes. The stream may run inline (for small, synchronous, bounded chains) or on a background thread. .wait() blocks until the result is ready and works correctly in either case. The mat-run-with example above shows the full pattern: run_with returns a StreamCompletion, then .wait() retrieves the result.
Keep — choosing which materialized value to keep
By default, run_with keeps the sink's materialized value. To keep both, or to keep the source's value, use to_mat(sink, combinator) and pass a Keep function:
rust
use datum::{Keep, Sink, Source};
// to_mat(sink, Keep::both) returns a RunnableGraph<(SourceMat, SinkMat)>.
let (_, sink_mat) = Source::from_iter(1_u64..=3)
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let items: Vec<u64> = sink_mat.wait().unwrap();The Keep combinators:
| Combinator | Returns |
|---|---|
Keep::right | Sink's materialized value (default for run_with) |
Keep::left | Source's materialized value |
Keep::both | (SourceMat, SinkMat) tuple |
Keep::none | NotUsed (discard both) |
Keep::both and Keep::left are useful when the source itself produces a meaningful handle — for example, a UniqueKillSwitch / SharedKillSwitch or a MergeHub producer port.
RunnableGraph and .run()
to_mat(...) produces a RunnableGraph<Mat>. Calling .run() on it is equivalent to calling run_with on the original source. The same RunnableGraph can be run multiple times to produce independent executions (see Blueprint vs. Run).
rust
use datum::{Keep, Sink, Source};
// `to_mat` builds a RunnableGraph — no execution yet.
let blueprint = Source::from_iter(1_u64..=5)
.map(|x| x * x)
.to_mat(Sink::collect(), Keep::right);
// Each call to `run()` starts a completely independent execution.
let run1: Vec<u64> = blueprint.run().unwrap().wait().unwrap();
let run2: Vec<u64> = blueprint.run().unwrap().wait().unwrap();Materialized values in graphs
In the linear DSL (Source → Flow → Sink), only sources and sinks carry meaningful materialized values. Flow always materializes to NotUsed.
In the GraphDSL layer, junctions like MergeHub and BroadcastHub expose producer/consumer ports as materialized values. Combining them with Keep or a custom combinator lets you thread those handles out to the caller.
Summary
| Call | Returns | Keeps |
|---|---|---|
source.run_with(sink) | StreamResult<SinkMat> | sink's mat |
source.to_mat(sink, Keep::left).run() | StreamResult<SourceMat> | source's mat |
source.to_mat(sink, Keep::both).run() | StreamResult<(SourceMat, SinkMat)> | both |
source.to_mat(sink, Keep::none).run() | StreamResult<NotUsed> | neither |