Appearance
Migrating from Akka Streams
This guide is for Scala/Akka Streams users coming to Datum. It covers the name map, the behavioral differences that matter most, and what is intentionally absent.
Name map
Source constructors
| Akka (Scala) | Datum (Rust) |
|---|---|
Source.empty | Source::empty() |
Source.single(x) | Source::single(x) |
Source.repeat(x) | Source::repeat(x) |
Source.failed(ex) | Source::failed(error) |
Source.fromIterator(() => iter) | Source::from_fn_iter(|| iter) |
Source(iterable) / Source.from(iterable) | Source::from_iterable(items) |
Source(1 to 100) / Source.range(1, 100) | Source::from_iterable(1..=100) |
Source.future(f) | Source::future(factory) |
Source.futureSource(f) | Source::future_source(factory) |
Source.maybe | Source::maybe() — returns (MaybeHandle<T>, Source<T>) |
Source.never | Source::never() |
Source.tick(init, interval, x) | Source::tick(init, interval, x) |
Source.cycle(() => iter) | Source::cycle(factory) |
Source.unfold(s)(f) | Source::unfold(s, f) |
Source.unfoldAsync(s)(f) | Source::unfold_async(s, f) |
Source.unfoldResource(create, read, close) | Source::unfold_resource(create, read, close) |
Source.unfoldResourceAsync(...) | Source::unfold_resource_async(...) |
Source.lazily(() => src) | Source::lazy_source(factory) |
Source.lazyFuture(() => fut) | Source::lazy_future(factory) |
Source.queue() | Source::queue(capacity, strategy) — SourceQueue<T> is the materialized value; Source::queue_bounded(capacity) for BoundedSourceQueue<T> |
Source.asSourceWithContext[Ctx](extract) | source.as_source_with_context(extract) |
Source.combine(s1, s2, rest: _*)(merge) | Source::combine(sources, strategy) |
Sink constructors
| Akka (Scala) | Datum (Rust) |
|---|---|
Sink.ignore | Sink::ignore() |
Sink.head | Sink::head() |
Sink.headOption | Sink::head_option() |
Sink.last | Sink::last() |
Sink.lastOption | Sink::last_option() |
Sink.seq | Sink::collect() — returns Vec<T> |
Sink.takeLast(n) | Sink::take_last(n) |
Sink.fold(z)(f) | Sink::fold(zero, f) |
Sink.reduce(f) | Sink::reduce(f) |
Sink.foreach(f) | Sink::foreach(f) |
Sink.cancelled | Sink::cancelled() |
Sink.onComplete(f) | Sink::on_complete(f) |
Sink.fromMaterializer | Sink::from_materializer(factory) |
Sink.queue() | Sink::queue() — SinkQueue<T> is the materialized value |
Sink.combine(s1, s2)(strategy) | Sink::combine(sinks, strategy) |
Flow operators
| Akka (Scala) | Datum (Rust) | Notes |
|---|---|---|
.map(f) | .map(f) | |
.filter(p) | .filter(p) | |
.filterNot(p) | .filter_not(p) | |
.collect { case ... } | .filter_map(f) | Rust rename — collect conflicts with Iterator::collect |
.mapConcat(f) | .map_concat(f) | |
.grouped(n) | .grouped(n) | |
.sliding(n, step) | .sliding(n, step) | |
.scan(z)(f) | .scan(z, f) | |
.fold(z)(f) | .fold(z, f) | |
.reduce(f) | .reduce(f) | |
.take(n) | .take(n) | |
.takeWhile(p) | .take_while(p) | |
.drop(n) | .drop(n) | |
.dropWhile(p) | .drop_while(p) | |
.limit(n) | .limit(n) | |
.statefulMap(factory)(f) | .stateful_map(seed, f) | |
.statefulMapConcat(factory)(f) | .stateful_map_concat(seed, f) | |
.mapAsync(p)(f) | .map_async(p, f) | |
.mapAsyncUnordered(p)(f) | .map_async_unordered(p, f) | |
.mapAsyncPartitioned(p)(part)(f) | .map_async_partitioned(p, partition_fn, f) | |
.intersperse(inject) | .intersperse(inject) | |
.groupedWeighted(max)(cost) | .grouped_weighted(max_weight, cost_fn) | |
.limitWeighted(max)(cost) | .limit_weighted(max_weight, cost_fn) | |
.contramap(f) | .contramap(f) | |
.log(name) | .monitor(callback) | Datum uses a closure, not a marker/logging framework |
.monitor | .monitor(callback) | |
.watchTermination()(f) | .watch_termination(materialize_callback) | |
.foldAsync(z)(f) | .fold_async(z, f) | |
.scanAsync(z)(f) | .scan_async(z, f) | |
.mapWithResource(create)(f, close) | .map_with_resource(create, f, close) | |
.throttle(elements, per, burst, mode) | .throttle(elements, per, burst, mode) | |
.delay(d) | .delay(d, strategy) | DelayOverflowStrategy required |
.initialDelay(d) | .initial_delay(d) | |
.groupedWithin(n, d) | .grouped_within(n, d) | |
.takeWithin(d) | .take_within(d) | |
.dropWithin(d) | .drop_within(d) | |
.idleTimeout(d) | .idle_timeout(d) | |
.backpressureTimeout(d) | .backpressure_timeout(d) | |
.completionTimeout(d) | .completion_timeout(d) | |
.initialTimeout(d) | .initial_timeout(d) | |
.keepAlive(d, inject) | .keep_alive(d, inject) | |
.buffer(n, strategy) | .buffer(n, strategy) | OverflowStrategy |
.conflate(f) | .conflate(f) | |
.conflateWithSeed(seed)(f) | .conflate_with_seed(seed, f) | |
.expand(f) | .expand(f) | |
.extrapolate(f) | .extrapolate(f, initial) | |
.batch(max, seed)(agg) | .batch(max, seed, aggregate) | |
.batchWeighted(max, cost)(seed)(agg) | .batch_weighted(max, cost_fn, seed, aggregate) | |
.recover { case ... } | .recover(f) | |
.recoverWith { case ... } | .recover_with(f) | |
.recoverWithRetries(n) { case ... } | .recover_with_retries(n, f) | |
.mapError(f) | .map_error(f) | |
RestartSource.withBackoff(settings)(() => src) | RestartSource::with_backoff(settings, factory) | |
RestartFlow.withBackoff(settings)(() => flow) | RestartFlow::with_backoff(settings, factory) | |
RetryFlow.withBackoff(...) | RetryFlow::with_backoff(min, max, factor, f) | |
.flatMapConcat(f) | .flat_map_concat(f) | |
.flatMapMerge(breadth, f) | .flat_map_merge(breadth, f) | |
.prefixAndTail(n) | .prefix_and_tail(n) | emits (Vec<T>, Source<T>) |
.flatMapPrefix(n)(f) | .flat_map_prefix(n, f) | |
.groupBy(max, key) | .group_by(max_substreams, key_fn, eager_complete) | |
.splitWhen(p) | .split_when(p) | emits Source<T> per segment |
.splitAfter(p) | .split_after(p) | emits Source<T> per segment |
.concat(src) | .concat(src) | |
.prepend(src) | .prepend(src) | |
.orElse(src) | .or_else(src) | |
.interleave(src, n) | .interleave(src, n) | |
.mergeSorted(src) | .merge_sorted(src) | |
.mergeLatest(src) | .merge_latest(src, eager_complete) | |
.mergeAll(srcs, eager) | .merge_all(srcs, eager_complete) | |
.zipWith(src)(f) | .zip_with(src, f) | |
.zipLatest(src) | .zip_latest(src) | |
.zipLatestWith(src)(f) | .zip_latest_with(src, f) | |
.zipWithIndex | .zip_with_index() | emits (T, u64) |
.zipAll(src, thisZ, thatZ) | .zip_all(src, this_default, that_default) | |
.alsoTo(sink) | .also_to(sink) | |
.alsoToAll(sinks) | .also_to_all(sinks) | |
.divertTo(sink, p) | .divert_to(sink, predicate) | |
.wireTap(sink) | .wire_tap(sink) |
Actor interop
| Akka (Scala) | Datum (Rust) |
|---|---|
ActorFlow.ask(ref, timeout)(f) | ActorFlow::ask(actor_ref, parallelism, timeout, make_msg) |
ActorFlow.askWithStatus(ref, timeout)(f) | ActorFlow::ask_with_status(actor_ref, parallelism, timeout, make_msg) |
ActorFlow.askWithContext(ref, timeout)(f) | ActorFlow::ask_with_context(actor_ref, parallelism, timeout, make_msg) |
Source.actorRef(bufferSize, overflowStrategy) | ActorSource::actor_ref() |
Sink.actorRef(ref, onCompleteMsg) | ActorSink::actor_ref(...) |
Behavioral differences
Failures flow through Result, not exceptions
In Akka, user callbacks throw exceptions to signal failure, and supervision strategies intercept Throwable. In Datum, failures are explicit: a failed stream emits a StreamError and terminates. Operators that apply user closures have explicit fallible variants (e.g. map_result, filter_map_result, fold_result) that return Result<T, StreamError>. Non-fallible variants (map, filter, etc.) stay branch-free and are not retroactively supervised.
When a callback can fail, use the *_result variant and pass a SupervisionDecider:
rust
use datum::{Supervision, StreamError};
source.map_result_with_supervision(
|x| if x == 0 { Err(StreamError::Failed("zero".into())) } else { Ok(x) },
Supervision::resuming_decider(),
)SupervisionDirective::Stop (fail the stream), Resume (drop element), and Restart (drop and reset state) work the same as Akka's supervision strategies.
Graph wiring is runtime-validated, not type-encoded
Akka's GraphDSL encodes legal port connections in the type system, so illegal wiring is a compile error. Datum validates connections at materialization time and returns StreamResult. This is a deliberate trade-off: it makes partial graphs and dynamic wiring easier to express, and lets the API stay stable while behavior is being refined. Compile-time graph encoding is planned once behavior is fully stable.
This means GraphBuilder::connect(outlet, inlet)? can fail at runtime if the connection is invalid; always propagate the error.
Blueprint vs materialization (same contract, explicit lifetime)
Both Akka and Datum distinguish between building a blueprint and running it. In Datum the distinction is more explicit: construction methods (Source::from_iterable, .map(...), etc.) have no side effects and start nothing. Execution begins only at .run() / .run_with() / materializer.materialize(graph), which return materialized handles immediately. See the Blueprint vs Run concept page for details.
Tokio-first async; no Reactive Streams interop
Datum is built on Tokio. Ractor (the actor runtime) runs on Tokio, and async operators like map_async dispatch futures onto the Tokio runtime. std::future::Future is accepted as an optional portability surface, but Tokio is the default and the primary tested path.
Reactive Streams publisher/subscriber interop (asPublisher, asSubscriber, fromPublisher, fromSubscriber) is intentionally absent. Rust has no equivalent ubiquitous standard; the idiomatic bridge is Tokio's Stream/Sink traits. See Reactive Streams interop below.
SubFlow vs nested Source
Akka's groupBy / splitWhen / splitAfter return a SubFlow — a specialized type with operators that apply to each substream. Datum returns a Source<Source<T>>: each substream is a plain Source<T> you can operate on directly. This means SubFlow-specific methods like .mergeSubstreams() and .concatSubstreams() do not exist as named methods; instead, use flat_map_merge(breadth, |sub| sub) or flat_map_concat(|sub| sub).
group_by key retention
Akka's groupBy tracks cancelled/closed keys until the substream completes (i.e. a re-opened key gets its own fresh substream). Datum mirrors this: group_by(max, key_fn, false) retains closed keys until the parent stream completes. High-cardinality churn (many distinct keys that each emit one element) can therefore grow memory linearly in the number of distinct closed keys seen.
What is intentionally absent
Reactive Streams interop is out of scope
asPublisher, asSubscriber, fromPublisher, fromSubscriber, and the JDK Flow.Publisher / Flow.Subscriber adapters exist in Akka to bridge the JVM Reactive Streams ecosystem and its TCK. Rust has no equivalent ubiquitous standard. The idiomatic integration surface is Tokio's async Stream and Sink traits, which Datum already uses internally.
Mirroring the RS publisher/subscriber protocol would add a foreign abstraction and TCK-conformance burden without serving a Rust audience. This is a firm v0.1.0 ground rule.
JVM-specific operators
fromJavaStream, asJavaStream, javaCollector*, and the CompletionStage family are JVM API adapters with no meaningful Rust equivalent. The underlying capability is covered:
Source.fromJavaStream(() => javaStream)→Source::from_iterable(iter)Source.fromCompletionStage(cs)→Source::future(|| async { ... })
StreamRefs (deferred)
StreamRefs (distributed stream references over Ractor cluster) are deferred past v0.2.0 (WP-15 stretch). They require the optional ractor_cluster transport and a serialization story, and no concrete distributed use case is driving them yet.
Graph cycles (deferred)
GraphBuilder currently rejects cycles at validation time. Akka-style cycle semantics (buffer in cycle, MergePreferred liveness patterns) require a queued interpreter with per-edge demand tracking, which is deferred to WP-16 (stretch).
SubFlow surface
Datum does not expose a dedicated SubFlow type. Use nested Source<Source<T>> and the flat_map_* family to process substream results.
Quick-start conversion example
Akka (Scala):
scala
Source(1 to 10)
.map(_ * 2)
.filter(_ % 3 != 0)
.runWith(Sink.seq)Datum (Rust):
rust
use datum::{Sink, Source};
let result = Source::from_iterable(1_u64..=10)
.map(|x| x * 2)
.filter(|x| x % 3 != 0)
.run_with(Sink::collect())?;Both return a Vec<u64>. The Datum version is synchronous; .run_with() blocks until the stream completes.
For a running graph with a materializer:
Akka (Scala):
scala
val mat = ActorMaterializer()
val (killSwitch, done) = source
.viaMat(KillSwitches.single)(Keep.right)
.toMat(sink)(Keep.both)
.run()(mat)Datum (Rust):
rust
use datum::{KillSwitches, Runtime, Source, Sink, Keep};
let materializer = Runtime::new();
let (kill_switch, completion) = Source::repeat(1_u64)
.via_mat(KillSwitches::single(), Keep::right)
.to_mat(Sink::ignore(), Keep::both)
.run_with_materializer(&materializer)?;Keep::left, Keep::right, Keep::both, and Keep::none work identically to Akka's Keep.