Skip to content

Error Handling

In Datum, failures flow through StreamResult<T> (Result<T, StreamError>), not panics. A failed stream emits an error and then terminates — subsequent operators do not receive any more elements. By default that error propagates to the caller through StreamCompletion::wait().

StreamError variants

StreamError is an enum of recognized failure modes:

VariantWhen it occurs
Failed(String)User-generated failure (most Source::failed(...) calls)
CancelledThe stream was cancelled via a kill switch
AbruptTerminationThe materializer shut down before the stream finished
LimitExceeded { max }A limit(max) or limit_weighted bound was exceeded
EmptyStreamSink::head or Sink::last ran on an empty stream
ActorAskTimeoutAn ActorFlow::ask call exceeded its timeout

recover

recover(f) intercepts the first error and optionally emits a fallback element. The stream then completes normally:

rust
use datum::{Sink, Source, StreamError};

// recover(f) catches the first stream error and emits an optional fallback element.
// The stream completes normally after the fallback is emitted.
let items: Vec<u64> = Source::from_iter(1_u64..=2)
    .concat(Source::failed(StreamError::Failed(
        "simulated error".into(),
    )))
    .recover(|_err| Some(0_u64))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

If f returns None, the original error is re-emitted and the stream fails.

recover_with(f) replaces the error with a Source<Out> fallback — the fallback source's elements are emitted and the stream continues from there:

rust
source.recover_with(|_err| Some(Source::from_iter([99_u64, 100])))

recover_with_retries(retries, f) re-attempts the fallback source up to retries times.

on_error_complete() swallows the first error and completes the stream. Note that this is not equivalent to recover(|_| None) — the latter re-emits the original error and fails.

map_error

map_error(f) transforms an error value without recovering from it — the stream still fails, but with a different (typically more descriptive) StreamError:

rust
use datum::{Sink, Source, StreamError};

// map_error(f) transforms the error type without recovering — the stream still fails,
// but with a different (typically more informative) error value.
let result: Result<Vec<u64>, _> = Source::failed(StreamError::Failed("raw".into()))
    .map_error(|e| StreamError::Failed(format!("wrapped: {e}")))
    .run_with(Sink::collect())
    .unwrap()
    .wait();

Supervision

For operators that apply a user closure to each element (map, filter, fold, etc.), Datum provides supervised variants that consult a SupervisionDecider when the closure fails:

rust
use datum::{Supervision, SupervisionDirective};

let decider = Supervision::resuming_decider(); // or restarting_decider() / stopping_decider()
source.via(
    Flow::identity()
        .map_result_with_supervision(|x| {
            if x == 0 { Err(StreamError::Failed("zero".into())) }
            else { Ok(1_u64 / x) }
        }, decider)
)

SupervisionDirective values:

DirectiveBehavior
StopFail the stream (default without supervision)
ResumeDrop the failing element and continue
RestartDrop the failing element and reset any accumulated state

Supervision::stopping_decider(), resuming_decider(), and restarting_decider() are convenience constructors. Custom deciders are any Arc<dyn Fn(&StreamError) -> SupervisionDirective + Send + Sync>.

Supervised variants are available on Flow for: map_result_with_supervision, filter_map_result_with_supervision, fold_result_with_supervision, scan_result_with_supervision, reduce_result_with_supervision, and foreach_result_with_supervision.

RestartSource, RestartFlow, RestartSink

These three types wrap a factory function and rematerialize the inner component when it fails (or also on clean completion, depending on the variant). They mirror Akka's RestartSource.withBackoff / RestartFlow.withBackoff / RestartSink.withBackoff.

Configure backoff with RestartSettings:

rust
use std::time::Duration;
use datum::RestartSettings;

let settings = RestartSettings::new(
    Duration::from_millis(100),  // min_backoff
    Duration::from_secs(30),     // max_backoff
    0.2,                         // random_factor (adds up to 20% extra delay)
)
.with_max_restarts(5, Duration::from_secs(60)); // at most 5 restarts within 60 s

RestartSource::on_failures_with_backoff restarts only on failure; a clean completion flows through unchanged. RestartSource::with_backoff restarts on both failure and completion:

rust
use datum::{RestartSettings, RestartSource, Sink};
use std::time::Duration;

let settings = RestartSettings::new(
    Duration::from_millis(10), // min backoff
    Duration::from_millis(50), // max backoff
    0.1,                       // 10% random jitter
);

// on_failures_with_backoff rematerializes the factory only when the inner
// source fails; a clean completion flows through unchanged.
let items: Vec<u64> =
    RestartSource::on_failures_with_backoff(settings, || datum::Source::from_iter(1_u64..=3))
        .take(3)
        .run_with(Sink::collect())
        .unwrap()
        .wait()
        .unwrap();

RestartFlow and RestartSink follow the same pattern with matching with_backoff and on_failures_with_backoff constructors.

Note: Restart flows are lossy across restarts. The element that triggers the wrapped flow failure — and any elements already buffered in the restarted flow — are dropped when the restart occurs.

RetryFlow

RetryFlow::with_backoff retries individual elements through a wrapped one-in / one-out Flow when a caller-supplied decider says to retry:

rust
use std::time::Duration;
use datum::{Flow, RetryFlow};

let retrying = RetryFlow::with_backoff(
    Duration::from_millis(10),  // min_backoff
    Duration::from_millis(100), // max_backoff
    0.1,                        // random_factor
    3,                          // max_retries
    Flow::identity::<String>().map(|s| s.to_uppercase()),
    |original: &String, result: &String| {
        // Return Some(retry_input) to retry, None to accept the result.
        if result.is_empty() { Some(original.clone()) } else { None }
    },
);

The wrapped flow must emit exactly one element per input; Datum validates this at runtime and fails the stream otherwise.

Summary

MechanismWhat it does
recover(f)Emit optional fallback element, then complete normally
recover_with(f)Switch to a fallback source
on_error_complete()Silently swallow the error and complete
map_error(f)Transform the error; stream still fails
*_with_supervision(…, decider)Per-element Resume / Restart / Stop
RestartSource/Flow/SinkRematerialize on failure with exponential backoff
RetryFlowPer-element retry with backoff

Next steps