Skip to content

Working with Graphs

The GraphDSL layer lets you build arbitrary fan-in / fan-out topologies that the linear Source → Flow → Sink API cannot express: diamonds, cycles, multi-port junctions, and reusable partial graphs. It mirrors the Akka Streams GraphDSL API.

Building a graph

GraphDsl::try_create opens a builder closure and returns StreamResult<GraphBlueprint<S>>:

rust
use datum::{Broadcast, GraphDsl, GraphFlowShape, Merge};

let graph = GraphDsl::try_create(|builder| {
    let bcast = builder.add(Broadcast::<u64>::new(2));
    let merge = builder.add(Merge::<u64>::new(2));

    builder.connect(bcast.outlet(0)?, merge.inlet(0)?)?;
    builder.connect(bcast.outlet(1)?, merge.inlet(1)?)?;

    Ok(GraphFlowShape::new(bcast.inlet(), merge.outlet()))
})?;

Inside the closure:

  • builder.add(stage) allocates the stage's typed ports and returns its shape.
  • builder.connect(outlet, inlet) wires two ports; types are checked at runtime.
  • The closure must return a Shape that describes the graph's external ports.

Shapes

A Shape lists the graph's external inlets and outlets. Common shapes:

ShapeExternal ports
FlowShape<In, Out>1 inlet, 1 outlet — use for a graph that replaces a Flow
SourceShape<Out>0 inlets, 1 outlet — a graph that acts as a source
SinkShape<In>1 inlet, 0 outlets — a graph that acts as a sink
FanInShape<In, Out>N inlets, 1 outlet
FanOutShape<In, Out>1 inlet, N outlets

GraphFlowShape is a re-export alias for FlowShape.

Junctions

All built-in junctions are re-exported from datum:

JunctionInputsOutputsSemantics
Broadcast<T>::new(n)1NPush each element to all N outlets
Balance<T>::new(n)1NRound-robin across N outlets
Merge<T>::new(n)N1Merge from whichever inlet has an element
MergePreferred<T>::new(n_secondary)1+N1Always drain preferred inlet first
MergePrioritized<T>::new(weights)N1Weighted probability merge
MergeSequence<T>::new(n, extract_seq)N1Ordered-by-sequence-number merge
MergeSorted<T>::new()21Sorted merge of two pre-sorted streams
Zip<L, R>::new()21Pair elements from two inlets
Unzip<A, B>::new()12Split (A, B) pairs to two outlets
UnzipWith<In, A, B>::new(f)12Split each element using f
Partition<T>::new(n, f)1NRoute each element to outlet f(&elem)
Concat<T>::new(n)N1Drain inputs in order
Interleave<T>::new(n, segment_size)N1Round-robin in chunks of segment_size

Shape accessors

Each junction returns a shape from builder.add(...). The accessors vary by shape type:

  • FanOutShape (Broadcast, Balance, Partition): .inlet(), .outlet(i)?, .outlet_count()
  • FanOutShape2 (Unzip, UnzipWith): .inlet(), .out0(), .out1()
  • FanInShape (Merge, Concat, Interleave): .inlet(i)?, .outlet(), .inlet_count()
  • ZipShape: .in0(), .in1(), .outlet()
  • MergePreferredShape: .preferred(), .secondary(i)?, .outlet()

Running a graph

A GraphBlueprint<FlowShape<In, Out>> exposes run_with_input(iter) to execute the graph directly from an iterator:

rust
use datum::{Broadcast, GraphDsl, GraphFlowShape, Merge};

// Build a broadcast→merge diamond: each input element is broadcast to two
// Merge inlets, so every element appears twice in the output.
let graph = GraphDsl::try_create(|builder| {
    let bcast = builder.add(Broadcast::<u64>::new(2));
    let merge = builder.add(Merge::<u64>::new(2));

    builder.connect(bcast.outlet(0)?, merge.inlet(0)?)?;
    builder.connect(bcast.outlet(1)?, merge.inlet(1)?)?;

    Ok(GraphFlowShape::new(bcast.inlet(), merge.outlet()))
})
.unwrap();

let mut items: Vec<u64> = graph.run_with_input(1_u64..=3).unwrap();
items.sort_unstable();

The broadcast→merge diamond sends each input element to both Merge inlets, so every element appears twice in the output. Order across inlets is not guaranteed; sort if you need a stable assertion.

run_with_input accepts any IntoIterator<Item = In>. Use run_count_with_input to get only the output count.

Partial graphs

GraphDsl::partial builds a reusable fragment that can be imported into multiple parent graphs. The returned PartialGraph<S> is a blueprint that is wired in with builder.import(&partial):

rust
use datum::{GraphDsl, PartialGraph, GraphFlowShape, MergePrioritized};

let fragment: PartialGraph<GraphFlowShape<u64, u64>> = GraphDsl::partial(|builder| {
    let stage = builder.add(MergePrioritized::<u64>::new(vec![3, 1]));
    Ok(GraphFlowShape::new(stage.inlet(0)?, stage.outlet()))
});

// Use in another graph:
let outer = GraphDsl::try_create(|builder| {
    let shape = builder.import(&fragment)?;
    // wire shape.inlet() / shape.outlet() to other stages
    Ok(shape)
})?;

Performance

Datum's fused executor has two paths:

  • Typed-linear fast path (typed_linear_plan): applies when the graph is a straight linear chain of type-safe stages. Avoids all DatumValue boxing — 16–46x faster than Akka on the benchmarked shapes (e.g. 622 µs vs 27,814 µs for a 10-stage typed identity chain).
  • Erased path: used for non-linear graphs and junction topologies. Boxes every element as DatumValue. WP-18 extended the typed-linear plan to cover previously-erased shapes — rows that formerly measured 0.28–0.31x Akka now auto-select the typed plan and run at 14.2–42.6x. The M1 Tier-1 performance gate is closed.
  • Junctions use dedicated StageKind fast paths: Broadcast+Zip at 1.11x, Balance+Merge at 1.03x, MergePreferred at 1.10x, Concat at 1.00x. WP-18 also resolved two previously trailing junctions: MergeSequence (0.73x → 7.79x, Phase 3a) and MergeLatest (0.55x → 2.16x, Phase 3b) — both now above Akka parity.
  • Graph build is at or near Akka parity (0.94–1.68x depending on depth).

All numbers are from roadmap/benchmarks/graph.md.

Next steps