Appearance
Testing Streams
datum::testkit provides TestSource and TestSink probes for deterministic, synchronous stream testing without mocking. Tests drive the stream step by step using explicit credit — no elements move until the test asks for them.
Setting up probes
rust
use datum::testkit::{TestSink, TestSource};TestSource::probe::<T>() returns a Source<T, TestPublisherProbe<T>>. The materialized value is a TestPublisherProbe<T> that lets the test control what the source emits.
TestSink::probe::<T>() returns a Sink<T, TestSubscriberProbe<T>>. The materialized value is a TestSubscriberProbe<T> that lets the test control demand and inspect results.
Use Keep::both with to_mat and run_with_materializer to retain both probes:
rust
use datum::{
Keep, Materializer,
testkit::{TestSink, TestSource},
};
// Create a materializer, wire TestSource through a map into TestSink,
// and keep both probes with Keep::both.
let materializer = Materializer::new();
let (source, sink) = TestSource::probe::<i32>()
.map(|x| x * 10)
.to_mat(TestSink::probe(), Keep::both)
.run_with_materializer(&materializer)
.expect("probe graph materializes");
// Subscriber issues credit; publisher sees one pull per element.
sink.request(1);
assert_eq!(source.expect_request(), 1);
source.send_next(3);
sink.assert_next(30);
sink.request(1);
assert_eq!(source.expect_request(), 1);
source.send_next(7);
sink.assert_next(70);
// Publisher completes; subscriber observes completion after one more pull.
sink.request(1);
assert_eq!(source.expect_request(), 1);
source.send_complete();
sink.expect_complete();When you only need the subscriber side — for example when the source drives itself — use run_with directly:
rust
let sink = Source::from_iter(1_i32..=4)
.run_with(TestSink::probe())
.expect("materializes");Publisher probe — controlling the upstream
TestPublisherProbe<T> methods:
| Method | Description |
|---|---|
send_next(element) | Enqueue one element to be delivered on the next pull |
send_complete() | Signal completion; no further sends are allowed |
send_error(error) | Signal failure with the given StreamError |
expect_request() | Block until downstream issues one pull; returns 1 |
expect_cancellation() | Block until downstream cancels |
set_timeout(duration) | Override the default 3-second wait timeout |
expect_request() returns 1 per pull in Datum's current pull model, not batched demand as in Akka Streams. Tests must not assume batching.
send_complete() and send_error() mark the source as terminated; any subsequent send_* call on the same probe panics.
Subscriber probe — controlling downstream demand
TestSubscriberProbe<T> methods:
| Method | Description |
|---|---|
request(n) | Issue n pull credits; the worker consumes one credit per element |
expect_next() | Block until one element arrives; returns it |
assert_next(expected) | Like expect_next() but asserts the value |
expect_next_n(n) | Block and collect the next n elements |
assert_next_n(expected_iter) | Like expect_next_n but asserts the values |
expect_complete() | Block until stream completion is observed |
expect_error() | Block until a stream error is observed; returns the StreamError |
expect_no_message(timeout) | Assert nothing arrives within timeout |
drain_until_complete() | Issue large credit, drain all elements, return Vec<T> |
cancel() | Cancel the stream cooperatively |
set_timeout(duration) | Override the default 3-second wait timeout |
The simpler probe-driven pattern:
rust
use datum::{Source, testkit::TestSink};
// Drive a finite iterator source through a TestSink probe.
let sink = Source::from_iter(1_i32..=4)
.run_with(TestSink::probe())
.expect("sink probe materializes");
// Request two elements, assert both arrive.
sink.request(2);
sink.assert_next_n([1, 2]);
// Request and consume the third element individually.
sink.request(1);
let third = sink.expect_next();
assert_eq!(third, 3);
// Request and consume the fourth (last) element.
sink.request(1);
assert_eq!(sink.expect_next(), 4);
// Completion is observed on the next pull after the source is exhausted.
sink.request(1);
sink.expect_complete();Asserting elements — free functions
datum::testkit also exports two free functions for use outside probe methods:
rust
use datum::testkit::{assert_next_eq, assert_next_n_eq};
assert_next_eq(&actual, &expected);
assert_next_n_eq(&actual_slice, &expected_slice);Both panic with a descriptive message that includes both the actual and expected values.
Error testing
Terminal signals are observed through the pull loop; a failed source still requires request(1) before expect_error():
rust
use datum::{Source, StreamError, testkit::TestSink};
// A source that fails immediately.
let sink = Source::<i32>::failed(StreamError::Failed("boom".into()))
.run_with(TestSink::probe())
.expect("sink probe materializes");
// Terminal signals require outstanding credit: issue request(1) first.
sink.request(1);
let err = sink.expect_error();
assert_eq!(err, StreamError::Failed("boom".into()));Cancellation testing
cancel() is cooperative with the pull loop. The worker observes cancellation at its next credit wait. If the worker is blocked inside upstream next(), it unblocks when the upstream probe is dropped.
After calling sink.cancel(), the publisher probe should see cancellation:
rust
use datum::{
Keep, Materializer,
testkit::{TestSink, TestSource},
};
// Wire TestSource directly into TestSink — no intermediate operators.
let materializer = Materializer::new();
let (source, mut sink) = TestSource::probe::<i32>()
.to_mat(TestSink::probe(), Keep::both)
.run_with_materializer(&materializer)
.expect("probe graph materializes");
// Cancellation is cooperative: the worker observes it at the next credit wait.
sink.cancel();
// The publisher probe should now see cancellation arrive from downstream.
source.expect_cancellation();Draining all elements
drain_until_complete() issues request(usize::MAX / 2) internally and returns every element until the stream completes:
rust
use datum::{Source, testkit::TestSink};
// drain_until_complete issues a large credit internally and returns all elements.
let sink = Source::from_iter(1_i32..=5)
.run_with(TestSink::probe())
.expect("sink probe materializes");
let items = sink.drain_until_complete();
assert_eq!(items, vec![1, 2, 3, 4, 5]);Use drain_until_complete() when the exact element count is not known ahead of time.
Timeouts
Both probe types default to a 3-second wait timeout. Override it with set_timeout:
rust
let (mut source, mut sink) = TestSource::probe::<i32>()
.to_mat(TestSink::probe(), Keep::both)
.run_with_materializer(&materializer)
.expect("materializes");
source.set_timeout(Duration::from_millis(200));
sink.set_timeout(Duration::from_millis(200));A timeout expires with a panic that names what was waited for and how long elapsed.
Pull semantics note
Datum's probes observe terminal signals through the normal pull loop. Completion and errors are not surfaced until a credit is outstanding. For any stream that ends immediately — empty sources, Source::failed(...) — always call request(1) before expect_complete() or expect_error().
The testkit does not issue speculative pulls on behalf of the test. That would consume elements the test never requested, making assertions order-sensitive in ways that are hard to reason about.
Next steps
- Error Handling —
recover,map_error, supervision, and restart wrappers - Execution Model — how blueprints materialize and where work runs