Appearance
Substreams
Substream operators partition a single stream into a stream of streams. Each inner Source<Out> can be processed independently — folded, filtered, mapped — and then merged back into the outer stream.
flat_map_concat
flat_map_concat(f) maps each element to a sub-source and concatenates them strictly in order. The second sub-source does not start until the first has completed:
rust
use datum::{Sink, Source};
// flat_map_concat maps each element to a sub-source and concatenates them in order.
// The second sub-source only starts after the first has completed.
let items: Vec<u64> = Source::from_iter(1_u64..=3)
.flat_map_concat(|x| Source::from_iter([x * 10, x * 10 + 1]))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();This mirrors Akka's flatMapConcat. Use it when ordering matters and sub-sources are bounded.
flat_map_merge
flat_map_merge(breadth, f) runs up to breadth sub-sources concurrently and emits elements from whichever sub-source has output ready:
rust
use datum::{Sink, Source};
// flat_map_merge(breadth, f) runs up to `breadth` sub-sources concurrently.
// Output order across active sub-sources is not guaranteed; sort before asserting.
let mut items: Vec<u64> = Source::from_iter(1_u64..=3)
.flat_map_merge(2, |x| Source::from_iter([x * 10, x * 10 + 1]))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
items.sort_unstable();Breadth 1 degenerates to flat_map_concat. Higher breadth increases parallelism at the cost of non-deterministic element ordering. The breadth parameter must be > 0.
split_when and split_after
split_when(pred) starts a new sub-source each time the predicate returns true. The element that triggers the split begins the new segment (the old segment closes before that element):
rust
use datum::{Sink, Source};
// split_when(pred) starts a new sub-source when pred returns true.
// The triggering element begins the new segment (it is NOT included in the old one).
// flat_map_concat(|sub| sub) flattens segments back in arrival order.
let items: Vec<u64> = Source::from_iter(1_u64..=6)
.split_when(|&x| x == 4) // split before 4: segments [1,2,3] and [4,5,6]
.flat_map_concat(|sub| sub)
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();split_after(pred) is the complement: the triggering element ends the current segment (inclusive), and the next element opens a new one. Use split_when when the boundary marker logically belongs to the next segment, split_after when it logically closes the current one.
Sub-sources from split_when/split_after are consumed with flat_map_concat(|sub| sub) to preserve segment ordering, or flat_map_merge(n, |sub| sub) to process segments concurrently. Apply Source::fold to each sub-source to reduce a segment to a single value:
rust
source
.split_when(|&x| x == boundary)
.flat_map_concat(|sub| sub.fold(0_u64, |acc, x| acc + x))group_by
group_by(max_substreams, key_fn, allow_closed_substream_recreation) partitions the stream by a key function. Elements with the same key flow into the same sub-source:
rust
use datum::{Sink, Source};
// group_by(max_substreams, key_fn, allow_recreation) partitions elements by key.
// Each group's sub-source can be folded or transformed independently.
let mut sums: Vec<u64> = Source::from_iter(1_u64..=6)
.group_by(2, |x| *x % 2, false) // two keys: 0 (even), 1 (odd)
.flat_map_merge(2, |sub| sub.fold(0_u64, |acc, x| acc + x))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
sums.sort_unstable();Parameters:
max_substreams: the maximum number of concurrent open sub-sources. Exceeding this limit fails the stream.key_fn: Fn(&Out) -> Key— must return aClone + Eq + Hash + Send + 'statickey.allow_closed_substream_recreation: iftrue, a key that already had a sub-source that closed can open a new one when it reappears. Iffalse, reappearance of a closed key fails the stream.
Sub-sources from group_by behave as live streams: they receive elements as the outer stream progresses and complete when no more elements for that key will arrive (i.e. when the outer stream completes). Use flat_map_merge with a concurrency equal to max_substreams to process all groups simultaneously.
Performance
All substream operators landed as part of M2 WP-17b / WP-19. Numbers below are from roadmap/benchmarks/substreams.md (2026-06-09, branch wp19c-inline). All rows are at or above Akka parity — the v0.2.0 perf gate is fully closed.
| Scenario | Datum µs/op | Akka µs/op | Speedup |
|---|---|---|---|
split_when_rebuild_10k | 606 | 3,767 | 6.2x |
split_after_rebuild_10k | 599 | 3,644 | 6.1x |
flat_map_concat_expand_2k_x4 | 380 | 4,437 | 11.7x |
flat_map_merge_expand_2k_x4_b8 | 2,994 | 5,374 | 1.79x |
group_by_single_key_10k | 569 | 3,034 | 5.3x |
flat_map_merge at breadth 8 with short 4-item inner sources (2,000 inner sources total) runs at 1.79x Akka after the WP-19c inline micro-source path. CPU is ~1.9x wall-clock (coordinator + consumer thread activity); see roadmap/benchmarks/substreams.md for the full story.
Next steps
- Error Handling — handling failures in substreams
- Dynamic Streams —
MergeHubandBroadcastHubfor runtime fan-in/fan-out