Skip to content

First Stream

Let's build a stream that sums even numbers from a range. The full example compiles and asserts its output — every code block in this documentation is imported directly from a cargo test–verified integration test.

rust
use datum::{Sink, Source};

// Build a stream blueprint: iterate 0..1_000, shift by 1, keep evens, sum.
// Nothing executes yet — `run_with` below triggers materialization.
let sum = Source::from_iter(0_u64..1_000)
    .map(|item| item + 1)
    .filter(|item| item % 2 == 0)
    .run_with(Sink::fold(0_u64, |acc, item| acc + item))
    .unwrap() // StreamResult<StreamCompletion<u64>> -> StreamCompletion<u64>
    .wait() // blocks until the stream finishes
    .unwrap(); // StreamResult<u64> -> u64

assert_eq!(sum, 250_500);

Line by line

Source::from_iter(0_u64..1_000)

Creates a blueprint — a Source<u64, NotUsed> that, when run, will emit the integers 0, 1, 2, …, 999. Nothing executes yet. No thread is spawned. The range iterator is stored lazily so the same blueprint can be materialized multiple times.

.map(|item| item + 1)

Appends a synchronous map operator to the source blueprint. Returns a new Source<u64, NotUsed> — the original is consumed, not mutated. No allocation; the chain fuses into a single pulled iterator at materialization time.

.filter(|item| item % 2 == 0)

Appends a filter predicate. Elements that fail the test are dropped before reaching downstream.

.run_with(Sink::fold(0_u64, |acc, item| acc + item))

run_with materializes the blueprint — this is where execution begins. It:

  1. Takes a Sink::fold as the terminal consumer.
  2. Fuses the entire Source → filter → map → Sink chain.
  3. Spawns a stream task (or runs inline for small bounded sources) on the Datum thread pool.
  4. Returns StreamResult<StreamCompletion<u64>> immediately.

run_with keeps the sink's materialized value (the fold accumulator). The source's materialized value (NotUsed) is discarded. See Materialization for how to keep both.

.unwrap()StreamCompletion<u64>

Unwraps the StreamResult, failing if the stream could not be started (e.g. the thread pool is shut down). The returned StreamCompletion<u64> is a handle to the running stream — it exists before the computation finishes.

.wait()StreamResult<u64>

Blocks the calling thread until the stream finishes and returns its result. For the inline fast path, the stream may already be done by the time .wait() is called.

.unwrap()u64

Unwraps the stream result. Fails if the stream itself failed (e.g. a map function panicked or returned an error).

Blueprint vs. run

Notice that .map(...) and .filter(...) do not execute anything — they extend the blueprint. Only run_with triggers execution. The same blueprint can be .run()-ed multiple times to produce independent, isolated stream executions. This is a core invariant; see Blueprint vs. Run.

Next steps