Appearance
Working with Graphs
The GraphDSL layer lets you build arbitrary fan-in / fan-out topologies that the linear Source → Flow → Sink API cannot express: diamonds, cycles, multi-port junctions, and reusable partial graphs. It mirrors the Akka Streams GraphDSL API.
Building a graph
GraphDsl::try_create opens a builder closure and returns StreamResult<GraphBlueprint<S>>:
rust
use datum::{Broadcast, GraphDsl, GraphFlowShape, Merge};
let graph = GraphDsl::try_create(|builder| {
let bcast = builder.add(Broadcast::<u64>::new(2));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(bcast.outlet(0)?, merge.inlet(0)?)?;
builder.connect(bcast.outlet(1)?, merge.inlet(1)?)?;
Ok(GraphFlowShape::new(bcast.inlet(), merge.outlet()))
})?;Inside the closure:
builder.add(stage)allocates the stage's typed ports and returns its shape.builder.connect(outlet, inlet)wires two ports; types are checked at runtime.- The closure must return a
Shapethat describes the graph's external ports.
Shapes
A Shape lists the graph's external inlets and outlets. Common shapes:
| Shape | External ports |
|---|---|
FlowShape<In, Out> | 1 inlet, 1 outlet — use for a graph that replaces a Flow |
SourceShape<Out> | 0 inlets, 1 outlet — a graph that acts as a source |
SinkShape<In> | 1 inlet, 0 outlets — a graph that acts as a sink |
FanInShape<In, Out> | N inlets, 1 outlet |
FanOutShape<In, Out> | 1 inlet, N outlets |
GraphFlowShape is a re-export alias for FlowShape.
Junctions
All built-in junctions are re-exported from datum:
| Junction | Inputs | Outputs | Semantics |
|---|---|---|---|
Broadcast<T>::new(n) | 1 | N | Push each element to all N outlets |
Balance<T>::new(n) | 1 | N | Round-robin across N outlets |
Merge<T>::new(n) | N | 1 | Merge from whichever inlet has an element |
MergePreferred<T>::new(n_secondary) | 1+N | 1 | Always drain preferred inlet first |
MergePrioritized<T>::new(weights) | N | 1 | Weighted probability merge |
MergeSequence<T>::new(n, extract_seq) | N | 1 | Ordered-by-sequence-number merge |
MergeSorted<T>::new() | 2 | 1 | Sorted merge of two pre-sorted streams |
Zip<L, R>::new() | 2 | 1 | Pair elements from two inlets |
Unzip<A, B>::new() | 1 | 2 | Split (A, B) pairs to two outlets |
UnzipWith<In, A, B>::new(f) | 1 | 2 | Split each element using f |
Partition<T>::new(n, f) | 1 | N | Route each element to outlet f(&elem) |
Concat<T>::new(n) | N | 1 | Drain inputs in order |
Interleave<T>::new(n, segment_size) | N | 1 | Round-robin in chunks of segment_size |
Shape accessors
Each junction returns a shape from builder.add(...). The accessors vary by shape type:
FanOutShape(Broadcast, Balance, Partition):.inlet(),.outlet(i)?,.outlet_count()FanOutShape2(Unzip, UnzipWith):.inlet(),.out0(),.out1()FanInShape(Merge, Concat, Interleave):.inlet(i)?,.outlet(),.inlet_count()ZipShape:.in0(),.in1(),.outlet()MergePreferredShape:.preferred(),.secondary(i)?,.outlet()
Running a graph
A GraphBlueprint<FlowShape<In, Out>> exposes run_with_input(iter) to execute the graph directly from an iterator:
rust
use datum::{Broadcast, GraphDsl, GraphFlowShape, Merge};
// Build a broadcast→merge diamond: each input element is broadcast to two
// Merge inlets, so every element appears twice in the output.
let graph = GraphDsl::try_create(|builder| {
let bcast = builder.add(Broadcast::<u64>::new(2));
let merge = builder.add(Merge::<u64>::new(2));
builder.connect(bcast.outlet(0)?, merge.inlet(0)?)?;
builder.connect(bcast.outlet(1)?, merge.inlet(1)?)?;
Ok(GraphFlowShape::new(bcast.inlet(), merge.outlet()))
})
.unwrap();
let mut items: Vec<u64> = graph.run_with_input(1_u64..=3).unwrap();
items.sort_unstable();The broadcast→merge diamond sends each input element to both Merge inlets, so every element appears twice in the output. Order across inlets is not guaranteed; sort if you need a stable assertion.
run_with_input accepts any IntoIterator<Item = In>. Use run_count_with_input to get only the output count.
Partial graphs
GraphDsl::partial builds a reusable fragment that can be imported into multiple parent graphs. The returned PartialGraph<S> is a blueprint that is wired in with builder.import(&partial):
rust
use datum::{GraphDsl, PartialGraph, GraphFlowShape, MergePrioritized};
let fragment: PartialGraph<GraphFlowShape<u64, u64>> = GraphDsl::partial(|builder| {
let stage = builder.add(MergePrioritized::<u64>::new(vec![3, 1]));
Ok(GraphFlowShape::new(stage.inlet(0)?, stage.outlet()))
});
// Use in another graph:
let outer = GraphDsl::try_create(|builder| {
let shape = builder.import(&fragment)?;
// wire shape.inlet() / shape.outlet() to other stages
Ok(shape)
})?;Performance
Datum's fused executor has two paths:
- Typed-linear fast path (
typed_linear_plan): applies when the graph is a straight linear chain of type-safe stages. Avoids allDatumValueboxing — 16–46x faster than Akka on the benchmarked shapes (e.g. 622 µs vs 27,814 µs for a 10-stage typed identity chain). - Erased path: used for non-linear graphs and junction topologies. Boxes every element as
DatumValue. WP-18 extended the typed-linear plan to cover previously-erased shapes — rows that formerly measured 0.28–0.31x Akka now auto-select the typed plan and run at 14.2–42.6x. The M1 Tier-1 performance gate is closed. - Junctions use dedicated
StageKindfast paths:Broadcast+Zipat 1.11x,Balance+Mergeat 1.03x,MergePreferredat 1.10x,Concatat 1.00x. WP-18 also resolved two previously trailing junctions:MergeSequence(0.73x → 7.79x, Phase 3a) andMergeLatest(0.55x → 2.16x, Phase 3b) — both now above Akka parity. - Graph build is at or near Akka parity (0.94–1.68x depending on depth).
All numbers are from roadmap/benchmarks/graph.md.
Next steps
- Materialization — how blueprints become running streams
- Buffers & Rate — decoupling producer and consumer speed
- Dynamic Streams —
MergeHub,BroadcastHub, and kill switches