Appearance
Error Handling
In Datum, failures flow through StreamResult<T> (Result<T, StreamError>), not panics. A failed stream emits an error and then terminates — subsequent operators do not receive any more elements. By default that error propagates to the caller through StreamCompletion::wait().
StreamError variants
StreamError is an enum of recognized failure modes:
| Variant | When it occurs |
|---|---|
Failed(String) | User-generated failure (most Source::failed(...) calls) |
Cancelled | The stream was cancelled via a kill switch |
AbruptTermination | The materializer shut down before the stream finished |
LimitExceeded { max } | A limit(max) or limit_weighted bound was exceeded |
EmptyStream | Sink::head or Sink::last ran on an empty stream |
ActorAskTimeout | An ActorFlow::ask call exceeded its timeout |
recover
recover(f) intercepts the first error and optionally emits a fallback element. The stream then completes normally:
rust
use datum::{Sink, Source, StreamError};
// recover(f) catches the first stream error and emits an optional fallback element.
// The stream completes normally after the fallback is emitted.
let items: Vec<u64> = Source::from_iter(1_u64..=2)
.concat(Source::failed(StreamError::Failed(
"simulated error".into(),
)))
.recover(|_err| Some(0_u64))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();If f returns None, the original error is re-emitted and the stream fails.
recover_with(f) replaces the error with a Source<Out> fallback — the fallback source's elements are emitted and the stream continues from there:
rust
source.recover_with(|_err| Some(Source::from_iter([99_u64, 100])))recover_with_retries(retries, f) re-attempts the fallback source up to retries times.
on_error_complete() swallows the first error and completes the stream. Note that this is not equivalent to recover(|_| None) — the latter re-emits the original error and fails.
map_error
map_error(f) transforms an error value without recovering from it — the stream still fails, but with a different (typically more descriptive) StreamError:
rust
use datum::{Sink, Source, StreamError};
// map_error(f) transforms the error type without recovering — the stream still fails,
// but with a different (typically more informative) error value.
let result: Result<Vec<u64>, _> = Source::failed(StreamError::Failed("raw".into()))
.map_error(|e| StreamError::Failed(format!("wrapped: {e}")))
.run_with(Sink::collect())
.unwrap()
.wait();Supervision
For operators that apply a user closure to each element (map, filter, fold, etc.), Datum provides supervised variants that consult a SupervisionDecider when the closure fails:
rust
use datum::{Supervision, SupervisionDirective};
let decider = Supervision::resuming_decider(); // or restarting_decider() / stopping_decider()
source.via(
Flow::identity()
.map_result_with_supervision(|x| {
if x == 0 { Err(StreamError::Failed("zero".into())) }
else { Ok(1_u64 / x) }
}, decider)
)SupervisionDirective values:
| Directive | Behavior |
|---|---|
Stop | Fail the stream (default without supervision) |
Resume | Drop the failing element and continue |
Restart | Drop the failing element and reset any accumulated state |
Supervision::stopping_decider(), resuming_decider(), and restarting_decider() are convenience constructors. Custom deciders are any Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>.
Supervised variants are available on Flow for: map_result_with_supervision, filter_map_result_with_supervision, fold_result_with_supervision, scan_result_with_supervision, reduce_result_with_supervision, and foreach_result_with_supervision.
RestartSource, RestartFlow, RestartSink
These three types wrap a factory function and rematerialize the inner component when it fails (or also on clean completion, depending on the variant). They mirror Akka's RestartSource.withBackoff / RestartFlow.withBackoff / RestartSink.withBackoff.
Configure backoff with RestartSettings:
rust
use std::time::Duration;
use datum::RestartSettings;
let settings = RestartSettings::new(
Duration::from_millis(100), // min_backoff
Duration::from_secs(30), // max_backoff
0.2, // random_factor (adds up to 20% extra delay)
)
.with_max_restarts(5, Duration::from_secs(60)); // at most 5 restarts within 60 sRestartSource::on_failures_with_backoff restarts only on failure; a clean completion flows through unchanged. RestartSource::with_backoff restarts on both failure and completion:
rust
use datum::{RestartSettings, RestartSource, Sink};
use std::time::Duration;
let settings = RestartSettings::new(
Duration::from_millis(10), // min backoff
Duration::from_millis(50), // max backoff
0.1, // 10% random jitter
);
// on_failures_with_backoff rematerializes the factory only when the inner
// source fails; a clean completion flows through unchanged.
let items: Vec<u64> =
RestartSource::on_failures_with_backoff(settings, || datum::Source::from_iter(1_u64..=3))
.take(3)
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();RestartFlow and RestartSink follow the same pattern with matching with_backoff and on_failures_with_backoff constructors.
Note: Restart flows are lossy across restarts. The element that triggers the wrapped flow failure — and any elements already buffered in the restarted flow — are dropped when the restart occurs.
RetryFlow
RetryFlow::with_backoff retries individual elements through a wrapped one-in / one-out Flow when a caller-supplied decider says to retry:
rust
use std::time::Duration;
use datum::{Flow, RetryFlow};
let retrying = RetryFlow::with_backoff(
Duration::from_millis(10), // min_backoff
Duration::from_millis(100), // max_backoff
0.1, // random_factor
3, // max_retries
Flow::identity::<String>().map(|s| s.to_uppercase()),
|original: &String, result: &String| {
// Return Some(retry_input) to retry, None to accept the result.
if result.is_empty() { Some(original.clone()) } else { None }
},
);The wrapped flow must emit exactly one element per input; Datum validates this at runtime and fails the stream otherwise.
Summary
| Mechanism | What it does |
|---|---|
recover(f) | Emit optional fallback element, then complete normally |
recover_with(f) | Switch to a fallback source |
on_error_complete() | Silently swallow the error and complete |
map_error(f) | Transform the error; stream still fails |
*_with_supervision(…, decider) | Per-element Resume / Restart / Stop |
RestartSource/Flow/Sink | Rematerialize on failure with exponential backoff |
RetryFlow | Per-element retry with backoff |
Next steps
- Substreams — group_by, split_when, flat_map_concat
- Testing Streams — asserting on stream output and failures