Skip to content

Dynamic Streams

Dynamic stream controls let you manage stream lifecycles at runtime and build fan-in / fan-out topologies where producers or consumers attach independently of each other. Datum provides two families of primitives: kill switches for lifecycle control, and hubs for dynamic fan-in and fan-out.

Kill Switches

A kill switch is a handle that lets external code complete or abort a running stream on demand. The signal is observed on the next pull issued by downstream — because Datum's runtime is pull-based, the terminal signal propagates only when downstream requests an element.

UniqueKillSwitch

KillSwitches::single() returns a Flow<T, T, UniqueKillSwitch> whose materialized value is an exclusive handle to that one stream:

rust
use datum::{
    Keep, KillSwitches, Materializer,
    testkit::{TestSink, TestSource},
};

// KillSwitches::single() injects a UniqueKillSwitch as the materialized value
// of a Flow.  The switch is owned by this stream only.
let materializer = Materializer::new();
let ((source, switch), sink) = TestSource::probe::<i32>()
    .via_mat(KillSwitches::single(), Keep::both)
    .to_mat(TestSink::probe(), Keep::both)
    .run_with_materializer(&materializer)
    .expect("graph materializes");

// Normal element delivery before the switch fires.
sink.request(1);
assert_eq!(source.expect_request(), 1);
source.send_next(42);
sink.assert_next(42);

// shutdown() completes downstream and cancels upstream on the next pull.
switch.shutdown();
sink.request(1);
sink.expect_complete();
source.expect_cancellation();
  • switch.shutdown() completes the downstream and cancels the upstream on the next pull.
  • switch.abort(error) fails the stream with the given StreamError on the next pull.
  • Both are idempotent: calling shutdown() twice, or abort() after shutdown(), is a no-op.

SharedKillSwitch

KillSwitches::shared(name) creates a SharedKillSwitch that can be wired into any number of independent streams. Each call to switch.flow::<T>() returns a Flow<T, T, SharedKillSwitch> that shares the same on/off state:

rust
use datum::{
    Keep, KillSwitches, Materializer,
    testkit::{TestSink, TestSource},
};

// KillSwitches::shared(name) returns a SharedKillSwitch that can be wired
// into any number of independent streams via switch.flow().
let switch = KillSwitches::shared("my-switch");
let materializer = Materializer::new();

let ((source_a, _), sink_a) = TestSource::probe::<i32>()
    .via_mat(switch.flow(), Keep::both)
    .to_mat(TestSink::probe(), Keep::both)
    .run_with_materializer(&materializer)
    .expect("first stream");

let ((source_b, _), sink_b) = TestSource::probe::<i32>()
    .via_mat(switch.flow(), Keep::both)
    .to_mat(TestSink::probe(), Keep::both)
    .run_with_materializer(&materializer)
    .expect("second stream");

// A single shutdown() call terminates both streams.
switch.shutdown();

sink_a.request(1);
sink_b.request(1);
sink_a.expect_complete();
sink_b.expect_complete();
source_a.expect_cancellation();
source_b.expect_cancellation();
  • switch.name() returns the name given at construction.
  • switch.shutdown() and switch.abort(error) propagate to every attached flow.
  • The SharedKillSwitch itself is Clone — you can share it across threads safely.

Pull-based propagation

The terminal signal from shutdown() or abort() is observed only when downstream issues demand. If no consumer is pulling, the stream stays suspended until the next request. This matches Datum's pull-based execution model and means that tests must call sink.request(1) before sink.expect_complete() or sink.expect_error().

MergeHub — fan-in (N producers → 1 downstream)

MergeHub materializes a reusable Sink<T, NotUsed> that many producers can connect to independently. The downstream consumer runs as a single stream that interleaves elements from all attached producers.

Creating a hub

MergeHub::source(per_producer_buffer_size) materializes the hub source and returns Source<T, Sink<T, NotUsed>>:

rust
use datum::{Keep, MergeHub, Sink};

let (hub_sink, completion) = MergeHub::source::<u64>(4)
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

The hub source blocks waiting for items. It completes when the hub enters a draining state and all currently-attached producers finish. Use MergeHub::source_with_draining to obtain a MergeHubDrainingControl:

Attaching producers and draining

rust
use datum::{Keep, MergeHub, Sink, Source};

// MergeHub::source_with_draining materializes a cloneable Sink<T, NotUsed>
// that producers use to attach, plus a MergeHubDrainingControl.
let ((hub_sink, control), completion) = MergeHub::source_with_draining::<u64>(4)
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

// Each run_with call starts the producer in its own background thread.
// Clone the sink for each producer; the original clone is dropped below.
hub_sink
    .clone()
    .run_with(Source::from_iter(0_u64..3))
    .unwrap();
hub_sink
    .clone()
    .run_with(Source::from_iter(10_u64..13))
    .unwrap();

// drain_and_complete() signals that no new producers will attach.
// The hub completes once the currently-running producers finish.
control.drain_and_complete();

let mut collected: Vec<u64> = completion.wait().unwrap();
collected.sort_unstable();

Key points:

  • hub_sink is a Sink<T, NotUsed> that is Clone. Each clone is an independent connection that can be attached to a different source.
  • Calling hub_sink.clone().run_with(source) starts the producer in a background thread managed by Datum's internal Materializer. The call returns immediately.
  • control.drain_and_complete() signals that no new producers will attach. Once all running producers finish, the hub source completes and the downstream Sink::collect returns.
  • Order of elements across producers is non-deterministic; sort before asserting.

Variant: source (without draining control)

MergeHub::source(buf) is shorthand for source_with_draining(buf) with the control discarded. It is useful when you know all producers will finish naturally and you control termination by some other means (e.g. by dropping all hub_sink clones when using a source that owns the upstream lifecycle).

Performance

MergeHub is functionally correct but not yet performance-competitive with Akka. At a single producer, Datum measures ~13,663 µs/op vs. Akka's ~56 µs/op (0.00x). The gap grows with producer count. The bottleneck is coordination and wake-up overhead in the mutex-and-condvar rendezvous; the allocation picture is the reverse of the throughput gap — Datum allocates ~2.4x more bytes than Akka at p1 (alloc ratio 0.42x = Akka/Datum). This is a tracked open item; see roadmap/benchmarks/dynamic-streams.md for the full table.

BroadcastHub — fan-out (1 upstream → N consumers)

BroadcastHub materializes a reusable BroadcastHubConsumerSource<T> that lets many consumers attach to a single upstream source. Every consumer receives every element. The upstream adapts to the slowest active consumer.

Creating a hub

BroadcastHub::sink(buffer_size) starts upstream immediately (once the sink is materialized). BroadcastHub::sink_starting_after(n, buffer_size) defers upstream until at least n consumers have attached, which is useful when you need all consumers ready before the first element flows:

rust
use datum::{BroadcastHub, Sink, Source};

// BroadcastHub::sink_starting_after(n, buf) defers upstream until n
// consumers attach.  Each consumer.source() call creates an independent
// Source<T, NotUsed> that receives every upstream element.
let consumer_source = Source::from_iter(1_u64..=3)
    .run_with(BroadcastHub::sink_starting_after(1, 8))
    .unwrap();

// Materialize the consumer source; this attaches the consumer and starts
// upstream (start_after = 1 means one consumer is sufficient).
let items: Vec<u64> = consumer_source
    .source()
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

Attaching consumers

Each call to consumer_source.source() returns a fresh Source<T, NotUsed> blueprint. When that blueprint is materialized (run), the consumer attaches to the hub and begins receiving elements. Consumers that attach after upstream has started miss earlier elements — they see only elements produced after their attachment.

BroadcastHubConsumerSource<T> is Clone, so you can distribute it to multiple threads that each materialize their own consumer source.

Backpressure behavior

Unlike Akka, Datum's BroadcastHub blocks upstream immediately when there are zero consumers instead of pre-buffering up to buffer_size elements. When consumers are present, upstream waits until every consumer's per-consumer queue has room before emitting the next element, matching Akka's slowest-consumer backpressure contract.

Performance

BroadcastHub is functionally correct. At one consumer, Datum measures ~13,678 µs/op vs. Akka's ~923 µs/op (0.07x). Datum allocates 4.88x less memory than Akka at c1, which points to coordination overhead rather than allocation as the dominant cost. Tracked open item in the roadmap.

PartitionHub — fan-out (1 upstream → 1 selected consumer per element)

PartitionHub is like BroadcastHub but routes each element to exactly one consumer — the one selected by a partitioner function. Elements can also be dropped by returning -1.

Signature

rust
PartitionHub::sink(
    partitioner: impl Fn(&PartitionConsumerInfo, &T) -> isize + Send + Sync + 'static,
    start_after_nr_of_consumers: usize,
    buffer_size: usize,
) -> Sink<T, PartitionHubConsumerSource<T>>

The partitioner receives:

  • A &PartitionConsumerInfo describing all currently-attached consumers.
  • A &T reference to the element being routed.

It must return a consumer_id (an isize cast from the u64 consumer ID) or -1 to drop the element. The partitioner runs while the hub's internal lock is held, so it must be fast and non-blocking.

PartitionConsumerInfo accessors

MethodReturns
size()Number of currently-attached consumers
consumer_ids()Slice of active consumer IDs (&[u64])
consumer_id_by_idx(idx)Consumer ID at position idx
queue_size(consumer_id)Current queue depth for a consumer

Example

rust
use datum::{Materializer, PartitionHub, Sink, Source};

// PartitionHub::sink(partitioner, start_after, buffer_size) routes each
// element to one consumer selected by the partitioner function.
// partitioner(&info, &item) -> isize: return a consumer_id or -1 to drop.
let materializer = Materializer::new();
let hub = Source::from_iter(0_u64..6)
    .run_with_materializer(
        PartitionHub::sink(
            |info, item| {
                // Distribute elements round-robin across all consumers.
                let idx = (*item as usize) % info.size();
                info.consumer_id_by_idx(idx) as isize
            },
            2, // start_after_nr_of_consumers
            8, // buffer_size
        ),
        &materializer,
    )
    .unwrap();

// Attach two consumers sequentially so their IDs are assigned in order.
// Upstream starts once both are registered (start_after_nr_of_consumers = 2).
let completion_a = hub.source().run_with(Sink::collect()).unwrap();
let completion_b = hub.source().run_with(Sink::collect()).unwrap();

let mut items_a: Vec<u64> = completion_a.wait().unwrap();
let mut items_b: Vec<u64> = completion_b.wait().unwrap();
items_a.sort_unstable();
items_b.sort_unstable();

Consumers are attached sequentially before any elements flow, ensuring deterministic consumer_id_by_idx ordering. completion_a.wait() and completion_b.wait() block until the hub finishes processing all elements.

The round-robin index pattern (item % info.size()) distributes elements evenly when the number of consumers is stable. For sticky routing (e.g. all elements for a given key go to the same consumer), hash the key into a stable slot instead.

Routing to the least-loaded consumer

rust
PartitionHub::sink(
    |info, _item| {
        // Select the consumer with the smallest pending queue.
        info.consumer_ids()
            .iter()
            .min_by_key(|&&id| info.queue_size(id))
            .map(|&id| id as isize)
            .unwrap_or(-1)
    },
    1,
    64,
)

Dropping elements

Return -1 from the partitioner to discard an element without routing it:

rust
PartitionHub::sink(
    |info, item| {
        if *item % 2 == 0 {
            info.consumer_id_by_idx(0) as isize
        } else {
            -1 // drop odd elements
        }
    },
    1,
    8,
)

Performance

PartitionHub is a correctness-first baseline. At one consumer, Datum measures ~15,581 µs/op vs. Akka's ~1,931 µs/op (0.12x). Per-consumer queue bookkeeping and the clone-per-broadcast routing strategy keep behavior correct but contribute significant overhead. Tracked open item.

Performance summary

Numbers from roadmap/benchmarks/dynamic-streams.md (2026-06-05):

ScenarioDatum µs/opAkka µs/opSpeedupDatum alloc B/opAlloc ratio
kill_switch_shared_shutdown_fanout_116.437.62.30x1,1207.54x less
kill_switch_shared_shutdown_fanout_1003983750.94x100,1517.23x less
kill_switch_shared_shutdown_fanout_1000038,19940,4441.06x10,000,1207.33x less
merge_hub_throughput_p113,66355.70.00x84,9170.42x (Datum allocates ~2.4× more)
broadcast_hub_throughput_c113,6789230.07x83,9874.88x less
partition_hub_throughput_c115,5811,9310.12x2,285,3861.56x

SharedKillSwitch is competitive with Akka across all fan-out sizes, with ~7x lower allocation. The hubs (MergeHub, BroadcastHub, PartitionHub) are functionally correct but materially slower than Akka — the gap is coordination overhead, not allocation pressure. These are open tracked items in the roadmap.

Next steps