Appearance
Buffers & Rate
Datum is backpressure-first: producers slow down when consumers can't keep up (see Backpressure). The operators on this page override that default when you want to trade off latency, throughput, or memory.
buffer
buffer(n, strategy) inserts an N-element queue between producer and consumer. When the queue is full, strategy determines what happens:
rust
use datum::{Flow, OverflowStrategy, Sink, Source};
// buffer(n, strategy) inserts an n-element buffer between producer and consumer.
// Backpressure holds the producer when the buffer is full.
let items: Vec<u64> = Source::from_iter(1_u64..=5)
.via(Flow::identity().buffer(4, OverflowStrategy::Backpressure))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();OverflowStrategy variants:
| Variant | On overflow |
|---|---|
Backpressure | Slow the producer (default stream behavior without a buffer) |
DropNew | Discard the incoming element |
DropHead | Discard the oldest element in the buffer |
DropTail | Discard the newest element already in the buffer |
DropBuffer | Discard the entire buffer contents |
Fail | Fail the stream with StreamError::Failed("Buffer overflow (max capacity was: N)") |
detach() is a shorthand for buffer(1, OverflowStrategy::Backpressure). It breaks a fused chain at one point, allowing the two sides to run on separate scheduling ticks.
throttle
throttle(elements, per, maximum_burst, mode) limits throughput using a token bucket. At most elements tokens refill every per duration; maximum_burst sets the initial bucket level (and the ceiling for accumulated tokens):
rust
use datum::{Flow, Sink, Source, ThrottleMode};
use std::time::Duration;
// throttle(1, per, burst, Shaping) limits throughput to 1 element per `per`
// while spacing them evenly; Enforcing would drop excess elements instead.
let items: Vec<u64> = Source::from_iter(1_u64..=3)
.via(Flow::identity().throttle(1, Duration::from_micros(500), 1, ThrottleMode::Shaping))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();ThrottleMode values:
| Mode | Behavior when the bucket is empty |
|---|---|
Shaping | Hold the element until enough tokens accumulate |
Enforcing | Fail the stream with StreamError::Failed("Maximum throttle throughput exceeded.") |
throttle_with_cost(cost, per, burst, cost_fn, mode) lets each element carry a variable cost instead of counting every element as 1.
conflate
conflate(f) lets a fast producer merge elements into a single downstream value when the consumer is not ready. The function f(accumulated, next) is called each time a new element arrives while the consumer is busy:
rust
use datum::{Flow, Sink, Source};
// conflate(f) merges buffered elements when the consumer is slower than
// the producer. In a synchronous fused chain the consumer keeps up, so
// all items pass through and the sum across them is always correct.
let items: Vec<u64> = Source::from_iter(1_u64..=4)
.via(Flow::identity().conflate(|acc, x| acc + x))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();conflate_with_seed(seed, aggregate) provides separate functions for creating the initial accumulator (seed(first_element)) and extending it (aggregate(acc, next_element)).
In a synchronous fused chain where producer and consumer run at the same speed, conflate does not coalesce: elements pass through one at a time. Coalescing only occurs when the consumer is genuinely slower than the producer (e.g. there is an async boundary or a slow map downstream).
batch and batch_weighted
batch(max, seed, aggregate) groups up to max elements into one aggregate. Unlike conflate, batch has a cap: once the aggregate's "weight" reaches max, it is emitted and a new one starts:
rust
use datum::{Flow, Sink, Source};
// batch(max, seed, aggregate) groups up to `max` elements into one aggregate value.
// seed(first_element) starts the batch; aggregate(batch, next) extends it.
// Like conflate, batching only coalesces when the producer outruns the consumer.
let items: Vec<Vec<u64>> = Source::from_iter(1_u64..=4)
.via(Flow::identity().batch(
8,
|x| vec![x],
|mut agg, x| {
agg.push(x);
agg
},
))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();batch_weighted(max, cost_fn, seed, aggregate) uses a per-element cost function to determine when the batch is full.
expand and extrapolate
expand(f) is the mirror of conflate: it lets the downstream pull faster than the upstream produces. When downstream demands an element but the upstream has nothing ready, expand calls f on the last-seen element and emits from the resulting iterator until the upstream supplies a new element:
rust
// Repeat the last element up to 3 times while upstream is slow.
source.via(Flow::identity().expand(|x| std::iter::repeat(x).take(3)))extrapolate(f, initial) is a variant that requires Out: Clone and uses an optional initial value for the very first upstream pull. It is typically used to extrapolate a sensor reading while waiting for the next measurement.
Both operators have rate-dependent behavior: in a synchronous fused chain where the upstream always has an element ready, elements pass through without expansion.
Summary
| Operator | Direction | What it trades |
|---|---|---|
buffer(n, strategy) | Producer → Consumer | Memory for smoothed throughput |
throttle(n, per, burst, mode) | Producer → Consumer | Throughput for latency control |
conflate(f) | Consumer ← Producer | Memory for producer speed |
expand(f) | Consumer → Producer | Consumer speed for upstream gaps |
batch(max, seed, agg) | Consumer ← Producer | Memory (bounded) for reduced message count |
detach() | Both | One decoupling point |
Next steps
- Backpressure — the default flow-control model
- Error Handling — what happens when a stream fails