Skip to content

Configuration

Datum's runtime and materialization surface is intentionally small: there is no HOCON file, no actor-system configuration, and no dispatcher table. Configuration happens through the Runtime type and the Attributes API.


Runtime and Materializer

Runtime is the entry point for all stream execution. Materializer is a type alias for Runtime — they are the same type:

rust
pub type Materializer = Runtime;

You can use either name; Materializer is the conventional alias when passing the runtime to graph constructors, mirroring Akka's naming.

Creating a runtime

rust
use datum::Runtime;

let runtime = Runtime::new();

Runtime::new() starts a shared timer thread (datum-timer-N) and a process-wide lazy thread pool (datum-stream-runtime). The timer is used by time-aware operators (delay, throttle, grouped_within, timeouts, etc.). The thread pool runs each materialized stream's drain loop on a dedicated worker thread.

The process-wide thread pool is shared across all Runtime instances. Workers are spawned on demand (one per concurrent materialized stream), parked when idle, and reaped after a 10-second idle timeout. A spawn failure falls back to running the stream inline on the caller's thread.

Configuring a runtime

Runtime is Clone. Both methods return a new Runtime that shares the same underlying timer and thread pool:

rust
let runtime = Runtime::new()
    .with_name_prefix("my-app")          // names worker threads "my-app-N"
    .with_attributes(Attributes::named("root"));  // default attributes for all materialized streams
MethodEffect
with_name_prefix(prefix)Prefix for thread and stream names (default: "datum-stream")
with_attributes(attrs)Default Attributes applied to every materialized stream; merged with per-graph attributes
name_prefix()Returns the current prefix
attributes()Returns the current default attributes
effective_attributes(local)Merges default + local attributes (later attributes win)

Materializing a graph

Use materializer.materialize(graph) to run a RunnableGraph explicitly:

rust
let completion = materializer.materialize(&runnable_graph)?;

Source::run_with_materializer(sink, materializer) and RunnableGraph::run_with_materializer(mat) are convenience shortcuts that do the same thing.

Lifecycle

MethodEffect
shutdown()Sets the shutdown flag; the timer stops and no new streams can be materialized
is_shutdown()Returns true after shutdown() is called
active_streams()Number of streams currently running under this runtime

A runtime that has been shut down returns Err(StreamError::AbruptTermination) on any new materialization attempt. There is no restart — create a new Runtime instance after shutdown.

Timers

Runtime exposes three timer methods. All return a Cancellable handle:

rust
let cancellable = runtime.schedule_once(Duration::from_secs(5), || println!("fired"));

let cancellable = runtime.schedule_with_fixed_delay(
    Duration::from_millis(100),  // initial delay
    Duration::from_secs(1),      // delay between completions
    || println!("tick"),
);

let cancellable = runtime.schedule_at_fixed_rate(
    Duration::from_millis(100),  // initial delay
    Duration::from_secs(1),      // interval between firings
    || println!("rate tick"),
);

These are the same timer primitives used internally by tick, delay, throttle, and the timeout operators. schedule_with_fixed_delay waits the given delay after each task completes; schedule_at_fixed_rate fires at the given interval regardless of task duration.


Attributes and Attribute

Attributes is a list of Attribute values that annotate a blueprint or stage. They are hints to the materializer — not enforced by the type system — and are merged when a graph is materialized.

Available attributes

rust
pub enum Attribute {
    Name(Arc<str>),
    InputBuffer { initial: usize, max: usize },
    Dispatcher(Arc<str>),
}
Attribute variantConstructorEffect
Attribute::NameAttributes::named("my-stage")Labels a stage or graph segment; visible in thread names and debug output
Attribute::InputBuffer { initial, max }Attributes::input_buffer(initial, max)Hint for internal buffer sizing on detached stages
Attribute::DispatcherAttributes::dispatcher("my-dispatcher")Named dispatcher hint; currently advisory — dispatcher selection is not implemented beyond the default thread pool

Creating and composing attributes

rust
use datum::Attributes;

// Single attribute
let attrs = Attributes::named("ingestion");
let attrs = Attributes::input_buffer(16, 64);
let attrs = Attributes::dispatcher("blocking");

// Combine two Attributes (right-hand side wins on conflict)
let combined = Attributes::named("foo").and(Attributes::input_buffer(8, 32));

// Empty (no-op)
let none = Attributes::none();

Applying attributes to a blueprint

Every Source, Flow, Sink, and RunnableGraph has three attribute methods:

rust
// Replace all attributes
source.with_attributes(attrs)

// Append attributes (merged with existing)
source.add_attributes(Attributes::named("debug"))

// Shorthand for a name attribute
source.named("my-source")

Attributes set on a Runtime (via with_attributes) act as defaults; per-graph or per-stage attributes are merged on top using effective_attributes(local).

Reading attributes

rust
let attrs = source.attributes();

// Get the first name, if any
let name: Option<&str> = attrs.name();

// Get the input buffer hint, if any
let hint: Option<(usize, usize)> = attrs.input_buffer_hint();

// Get the dispatcher hint, if any
let dispatcher: Option<&str> = attrs.dispatcher_hint();

// Inspect the raw list
let list: &[Attribute] = attrs.attribute_list();

Async boundaries

An async boundary decouples two fused regions so they run concurrently on separate worker threads. In Datum, async boundaries are explicit:

rust
use datum::AsyncBoundary;

let flow = upstream
    .via(AsyncBoundary::new())
    .via(downstream);

AsyncBoundary materializes as an AsyncBoundaryExecutionConfig and inserts an internal channel between the upstream and downstream fused regions. The buffer size for the channel is controlled by an InputBuffer attribute on the boundary stage.

AsyncBoundary is the equivalent of Akka's .async operator / Attributes.asyncBoundary.


No configuration file

Datum has no HOCON file and no environment-variable dispatcher configuration. There is no application.conf, no named dispatcher table, and no fork-join pool configuration. This is a deliberate design choice: the crate targets embedded use cases and benchmarks where transparent, predictable behavior matters more than runtime reconfigurability.

If you need to bound concurrency, use map_async(parallelism, ...). If you need to isolate a slow stage, insert an AsyncBoundary with an InputBuffer hint. If you need a different thread pool, materialize on a custom Runtime with a distinct name prefix.