Skip to content

Substreams

Substream operators partition a single stream into a stream of streams. Each inner Source<Out> can be processed independently — folded, filtered, mapped — and then merged back into the outer stream.

flat_map_concat

flat_map_concat(f) maps each element to a sub-source and concatenates them strictly in order. The second sub-source does not start until the first has completed:

rust
use datum::{Sink, Source};

// flat_map_concat maps each element to a sub-source and concatenates them in order.
// The second sub-source only starts after the first has completed.
let items: Vec<u64> = Source::from_iter(1_u64..=3)
    .flat_map_concat(|x| Source::from_iter([x * 10, x * 10 + 1]))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

This mirrors Akka's flatMapConcat. Use it when ordering matters and sub-sources are bounded.

flat_map_merge

flat_map_merge(breadth, f) runs up to breadth sub-sources concurrently and emits elements from whichever sub-source has output ready:

rust
use datum::{Sink, Source};

// flat_map_merge(breadth, f) runs up to `breadth` sub-sources concurrently.
// Output order across active sub-sources is not guaranteed; sort before asserting.
let mut items: Vec<u64> = Source::from_iter(1_u64..=3)
    .flat_map_merge(2, |x| Source::from_iter([x * 10, x * 10 + 1]))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();
items.sort_unstable();

Breadth 1 degenerates to flat_map_concat. Higher breadth increases parallelism at the cost of non-deterministic element ordering. The breadth parameter must be > 0.

split_when and split_after

split_when(pred) starts a new sub-source each time the predicate returns true. The element that triggers the split begins the new segment (the old segment closes before that element):

rust
use datum::{Sink, Source};

// split_when(pred) starts a new sub-source when pred returns true.
// The triggering element begins the new segment (it is NOT included in the old one).
// flat_map_concat(|sub| sub) flattens segments back in arrival order.
let items: Vec<u64> = Source::from_iter(1_u64..=6)
    .split_when(|&x| x == 4) // split before 4: segments [1,2,3] and [4,5,6]
    .flat_map_concat(|sub| sub)
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

split_after(pred) is the complement: the triggering element ends the current segment (inclusive), and the next element opens a new one. Use split_when when the boundary marker logically belongs to the next segment, split_after when it logically closes the current one.

Sub-sources from split_when/split_after are consumed with flat_map_concat(|sub| sub) to preserve segment ordering, or flat_map_merge(n, |sub| sub) to process segments concurrently. Apply Source::fold to each sub-source to reduce a segment to a single value:

rust
source
    .split_when(|&x| x == boundary)
    .flat_map_concat(|sub| sub.fold(0_u64, |acc, x| acc + x))

group_by

group_by(max_substreams, key_fn, allow_closed_substream_recreation) partitions the stream by a key function. Elements with the same key flow into the same sub-source:

rust
use datum::{Sink, Source};

// group_by(max_substreams, key_fn, allow_recreation) partitions elements by key.
// Each group's sub-source can be folded or transformed independently.
let mut sums: Vec<u64> = Source::from_iter(1_u64..=6)
    .group_by(2, |x| *x % 2, false) // two keys: 0 (even), 1 (odd)
    .flat_map_merge(2, |sub| sub.fold(0_u64, |acc, x| acc + x))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();
sums.sort_unstable();

Parameters:

  • max_substreams: the maximum number of concurrent open sub-sources. Exceeding this limit fails the stream.
  • key_fn: Fn(&Out) -> Key — must return a Clone + Eq + Hash + Send + 'static key.
  • allow_closed_substream_recreation: if true, a key that already had a sub-source that closed can open a new one when it reappears. If false, reappearance of a closed key fails the stream.

Sub-sources from group_by behave as live streams: they receive elements as the outer stream progresses and complete when no more elements for that key will arrive (i.e. when the outer stream completes). Use flat_map_merge with a concurrency equal to max_substreams to process all groups simultaneously.

Performance

All substream operators landed as part of M2 WP-17b / WP-19. Numbers below are from roadmap/benchmarks/substreams.md (2026-06-09, branch wp19c-inline). All rows are at or above Akka parity — the v0.2.0 perf gate is fully closed.

ScenarioDatum µs/opAkka µs/opSpeedup
split_when_rebuild_10k6063,7676.2x
split_after_rebuild_10k5993,6446.1x
flat_map_concat_expand_2k_x43804,43711.7x
flat_map_merge_expand_2k_x4_b82,9945,3741.79x
group_by_single_key_10k5693,0345.3x

flat_map_merge at breadth 8 with short 4-item inner sources (2,000 inner sources total) runs at 1.79x Akka after the WP-19c inline micro-source path. CPU is ~1.9x wall-clock (coordinator + consumer thread activity); see roadmap/benchmarks/substreams.md for the full story.

Next steps