Appearance
Operator Reference โ
Complete index of Datum's stream operators, grouped by category and cross-referenced to their Akka Streams equivalents. Status follows the M2 ledger: โ shipped ยท ๐ก partial ยท โญ๏ธ deferred ยท โ won't implement.
For narrative context on the major areas, see the corresponding guides: Graphs ยท Buffers & Rate ยท Error Handling ยท Futures Interop ยท Actors Interop ยท Dynamic Streams ยท Streaming IO ยท Testing Streams ยท Substreams
Source operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
Source.empty | Source::empty() | โ |
Source.single | Source::single(item) | โ |
Source.repeat | Source::repeat(item) | โ |
Source.failed | Source::failed(error) | โ |
Source.fromIterator | Source::from_fn_iter(factory) | โ |
Source.from / fromIterable | Source::from_iterable(items) | โ |
Source.range | Source::from_iterable(a..b) | โ
range(a, b) = from_iterable(a..b) |
Source.future | Source::future(fut_factory) | โ |
Source.futureSource | Source::future_source(fut_factory) | โ |
Source.maybe | Source::maybe() | โ
returns (MaybeHandle<T>, Source<T>) |
Source.never | Source::never() | โ |
Source.tick | Source::tick(initial, interval, element) | โ |
Source.cycle | Source::cycle(factory) | โ factory called each cycle |
Source.unfold | Source::unfold(initial, f) | โ |
Source.unfoldAsync | Source::unfold_async(initial, f) | โ Tokio-dispatched |
Source.unfoldResource | Source::unfold_resource(create, read, close) | โ |
Source.unfoldResourceAsync | Source::unfold_resource_async(create, read, close) | โ Tokio-dispatched |
Source.lazily / lazySource | Source::lazy_source(factory) | โ defers construction to materialization |
Source.lazySingle | Source::lazy_single(factory) | โ |
Source.lazyFuture | Source::lazy_future(factory) | โ |
Source.lazyFutureSource | Source::lazy_future_source(factory) | โ |
Source.combine | Source::combine(sources, strategy) | โ
SourceCombineStrategy |
Source.zipN | Source::zip_n(sources) | โ |
Source.zipWithN | Source::zip_with_n(sources, zipper) | โ |
Source.queue | Source::queue(capacity, strategy) / Source::queue_bounded(capacity) | โ
SourceQueue<T> / BoundedSourceQueue<T> are the materialized values |
Source.asSourceWithContext | Source::as_source_with_context(extract) | โ |
Source.mergePrioritizedN | Source::merge_prioritized_n(sources_with_priority) | โ |
Source.asSubscriber / fromPublisher | โ | โ Reactive Streams โ out of scope; use Tokio futures |
Source.fromCompletionStage / JVM fromJavaStream | โ | โ JVM-only; use Source::future / Source::from_iterable |
Sink operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
Sink.ignore | Sink::ignore() | โ |
Sink.head | Sink::head() | โ fails on empty stream |
Sink.headOption | Sink::head_option() | โ |
Sink.last | Sink::last() | โ |
Sink.lastOption | Sink::last_option() | โ |
Sink.seq / collect | Sink::collect() | โ
returns Vec<T> |
Sink.collection | Sink::collection() | โ |
Sink.takeLast | Sink::take_last(n) | โ |
Sink.fold | Sink::fold(zero, f) | โ |
Sink.reduce | Sink::reduce(f) | โ |
Sink.foreach | Sink::foreach(f) | โ |
Sink.foreachAsync | Sink::foreach_async(parallelism, f) | โ |
Sink.cancelled | Sink::cancelled() | โ |
Sink.never | Sink::never() | โ |
Sink.onComplete | Sink::on_complete(callback) | โ |
Sink.futureSink | Sink::future_sink(factory) | โ |
Sink.lazySink | Sink::lazy_sink(factory) | โ |
Sink.lazyFutureSink | Sink::lazy_future_sink(factory) | โ |
Sink.fromMaterializer / setup | Sink::from_materializer(f) / Sink::setup(f) | โ |
Sink.preMaterialize | Sink::pre_materialize() | โ |
Sink.combine | Sink::combine(sinks, strategy) | โ
SinkCombineStrategy |
Sink.queue | Sink::queue() | โ
SinkQueue |
Sink.asPublisher / fromSubscriber | โ | โ Reactive Streams โ out of scope |
Sink.asInputStream / asOutputStream | โ | โญ๏ธ deferred; WP-12 delivered the forward direction (StreamConverters::from_reader / to_writer); adapters for stream-as-Read/Write handle are post-v0.2.0 |
JVM javaCollector* / asJavaStream | โ | โ JVM-only; use Sink::collect() |
Flow operators (simple / transforming) โ
| Akka operator | Datum operator | Notes |
|---|---|---|
map | .map(f) | โ |
filter | .filter(predicate) | โ |
filterNot | .filter_not(predicate) | โ |
collect | .filter_map(f) | โ
Rust-native rename; collect is a keyword conflict |
mapConcat | .map_concat(f) | โ |
grouped | .grouped(size) | โ |
sliding | .sliding(size, step) | โ |
scan | .scan(seed, f) | โ |
fold | .fold(zero, f) | โ |
reduce | .reduce(f) | โ |
take | .take(n) | โ |
takeWhile | .take_while(predicate) | โ |
drop | .drop(n) | โ |
dropWhile | .drop_while(predicate) | โ |
limit | .limit(max) | โ |
statefulMap | .stateful_map(seed, f) | โ |
statefulMapConcat | .stateful_map_concat(seed, f) | โ |
intersperse | .intersperse(inject) | โ |
flattenOptional | .flatten_optional() | โ |
groupedWeighted | .grouped_weighted(max_weight, cost_fn) | โ |
limitWeighted | .limit_weighted(max_weight, cost_fn) | โ |
contramap | .contramap(f) | โ |
log / logWithMarker | .monitor(callback) | โ logging via closure callback |
monitor | .monitor(callback) | โ |
watchTermination | .watch_termination(materialize_callback) | โ |
foldAsync | .fold_async(zero, f) | โ Tokio-dispatched |
scanAsync | .scan_async(seed, f) | โ Tokio-dispatched |
mapWithResource | .map_with_resource(create, f, close) | โ |
futureFlow | Flow::future_flow(factory) | โ |
lazyFlow | Flow::lazy_flow(factory) | โ |
lazyFutureFlow | Flow::lazy_future_flow(factory) | โ |
throttle | .throttle(elements, per, burst, mode) | โ
ThrottleMode::Shaping / Enforcing |
detach | .detach() | โ decouples fused regions |
asFlowWithContext | .as_flow_with_context(collapse, extract) | โ |
fromSinkAndSource | Flow::from_sink_and_source(sink, source) | โ |
fromSinkAndSourceCoupled | Flow::from_sink_and_source_coupled(sink, source) | โ |
collectType | โ | โ equiv: use `.filter_map( |
Async operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
mapAsync | .map_async(parallelism, f) | โ ordered, Tokio-dispatched |
mapAsyncUnordered | .map_async_unordered(parallelism, f) | โ unordered, Tokio-dispatched |
mapAsyncPartitioned | .map_async_partitioned(parallelism, partition_fn, f) | โ per-partition ordering |
Time-aware operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
delay | .delay(duration, strategy) | โ
DelayOverflowStrategy |
delayWith | .delay_with(supplier, strategy) | โ per-element delay |
initialDelay | .initial_delay(duration) | โ |
dropWithin | .drop_within(timeout) | โ |
takeWithin | .take_within(timeout) | โ |
groupedWithin | .grouped_within(max_number, interval) | โ |
groupedWeightedWithin | .grouped_weighted_within(max_weight, interval, cost_fn) | โ |
idleTimeout | .idle_timeout(timeout) | โ |
backpressureTimeout | .backpressure_timeout(timeout) | โ |
completionTimeout | .completion_timeout(timeout) | โ |
initialTimeout | .initial_timeout(timeout) | โ |
keepAlive | .keep_alive(timeout, inject) | โ |
Backpressure & rate operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
buffer | .buffer(size, strategy) | โ
all OverflowStrategy variants |
conflate | .conflate(aggregate) | โ |
conflateWithSeed | .conflate_with_seed(seed, aggregate) | โ |
expand | .expand(expand) | โ |
extrapolate | .extrapolate(expand, initial) | โ |
batch | .batch(max, seed, aggregate) | โ |
batchWeighted | .batch_weighted(max, cost_fn, seed, aggregate) | โ |
aggregateWithBoundary | .aggregate_with_boundary(allocate, aggregate, harvest, boundary) | โ |
OverflowStrategy variants: DropHead, DropTail, DropBuffer, DropNew, Backpressure, Fail.
Substream operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
flatMapConcat | .flat_map_concat(f) | โ |
flatMapMerge | .flat_map_merge(breadth, f) | โ |
flatMapPrefix | .flat_map_prefix(n, f) | โ |
groupBy | .group_by(max_substreams, key_fn, eager_complete) | โ closed keys are retained until parent completes |
prefixAndTail | .prefix_and_tail(n) | โ
emits (Vec<T>, Source<T>) |
splitWhen | .split_when(predicate) | โ
emits Source<T> per segment |
splitAfter | .split_after(predicate) | โ
emits Source<T> per segment |
See Substreams guide for the SubFlow-vs-nested-source behavioral difference.
Fan-in operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
merge | Merge<T>::new(n) (graph junction) | โ |
mergePreferred | MergePreferred<T>::new(n_secondary) | โ |
mergePrioritized | MergePrioritized<T>::new(weights) | โ |
mergePrioritizedN | Source::merge_prioritized_n(sources) | โ |
zip | Zip<L, R>::new() (graph junction) | โ |
concat | .concat(source) / Concat<T>::new(n) | โ |
concatLazy | .concat_lazy(source) | โ |
concatAllLazy | .concat_all_lazy(sources) | โ |
prepend | .prepend(source) | โ |
prependLazy | .prepend_lazy(source) | โ |
orElse | .or_else(secondary) | โ |
interleave | .interleave(source, segment_size) / Interleave<T>::new(n, segment_size) | โ |
interleaveAll | .interleave_all(sources, segment_size, eager_close) | โ |
mergeSorted | .merge_sorted(source) / MergeSorted<T>::new() | โ |
MergeSequence | MergeSequence<T>::new(n, extract_seq) | โ |
mergeLatest | .merge_latest(source, eager_complete) / MergeLatest<T>::new(inputs, eager_complete) | โ |
mergeAll | .merge_all(sources, eager_complete) | โ |
zipWith | .zip_with(source, f) | โ |
zipLatest | .zip_latest(source) | โ |
zipLatestWith | .zip_latest_with(source, f) | โ |
zipWithIndex | .zip_with_index() | โ
emits (T, u64) |
zipAll | .zip_all(source, this_default, that_default) | โ |
zipN | Source::zip_n(sources) | โ |
zipWithN | Source::zip_with_n(sources, zipper) | โ |
Fan-out operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
Broadcast | Broadcast<T>::new(n) | โ |
Balance | Balance<T>::new(n) | โ |
Partition | Partition<T>::new(n, f) | โ |
Unzip | Unzip<A, B>::new() | โ |
UnzipWith | UnzipWith<In, A, B>::new(f) | โ |
alsoTo | .also_to(sink) | โ |
alsoToAll | .also_to_all(sinks) | โ |
divertTo | .divert_to(sink, predicate) | โ |
wireTap | .wire_tap(sink) | โ |
Actor interop operators โ
See Actors Interop guide for usage examples.
| Akka operator | Datum operator | Notes |
|---|---|---|
ActorFlow.ask | ActorFlow::ask(actor_ref, parallelism, timeout, make_msg) | โ |
ActorFlow.askWithStatus | ActorFlow::ask_with_status(actor_ref, parallelism, timeout, make_msg) | โ
unwraps ActorStatus::Ok/Err |
ActorFlow.askWithContext | ActorFlow::ask_with_context(actor_ref, parallelism, timeout, make_msg) | โ preserves context |
ActorFlow.askWithStatusAndContext | ActorFlow::ask_with_status_and_context(actor_ref, parallelism, timeout, make_msg) | โ |
ActorSource (actorRef) | ActorSource::actor_ref() | โ
typed ActorSourceMessage<T> protocol |
ActorSource (actorRef backpressured) | ActorSource::actor_ref_with_backpressure() | โ |
ActorSource typed | ActorSource::typed() | โ |
ActorSink (actorRef) | ActorSink::actor_ref(...) | โ |
ActorSink (actorRef backpressured) | ActorSink::actor_ref_with_backpressure(...) | โ |
ActorSink typed | ActorSink::typed() / ActorSink::typed_with_backpressure() | โ |
PubSub source / sink | ActorPubSub::source(group) / ActorPubSub::sink(group) | โ |
watch | ActorFlow::watch(actor_ref) | โ emits pass-through elements; terminates on actor exit |
Error-handling operators โ
See Error Handling guide.
| Akka operator | Datum operator | Notes |
|---|---|---|
recover | .recover(f) | โ |
recoverWith | .recover_with(f) | โ |
recoverWithRetries | .recover_with_retries(retries, f) | โ |
mapError | .map_error(f) | โ |
onErrorComplete | .on_error_complete() | โ |
| Supervision deciders | Supervision::stopping_decider() / resuming_decider() / restarting_decider() | โ |
| Supervised callbacks | .map_result_with_supervision(f, decider) (and equivalents for filter_map, fold, scan, reduce, foreach) | โ
fallible callbacks use *_result variants |
RestartSource.withBackoff | RestartSource::with_backoff(settings, factory) | โ |
RestartSource.onFailuresWithBackoff | RestartSource::on_failures_with_backoff(settings, factory) | โ |
RestartFlow.withBackoff | RestartFlow::with_backoff(settings, factory) | โ |
RestartFlow.onFailuresWithBackoff | RestartFlow::on_failures_with_backoff(settings, factory) | โ |
RestartSink.withBackoff | RestartSink::with_backoff(settings, factory) | โ
wraps sinks that materialize StreamCompletion<NotUsed> |
RetryFlow.withBackoff | RetryFlow::with_backoff(min, max, factor, f) | โ |
RetryFlow.withBackoffAndContext | RetryFlow::with_backoff_and_context(...) | โ |
IO & compression operators โ
See Streaming IO guide.
| Akka operator | Datum operator | Notes |
|---|---|---|
gzip / gunzip | Compression::gzip() / Compression::gunzip() | โ
via flate2 |
deflate / inflate | Compression::deflate() / Compression::inflate() | โ |
fromPath / toPath | FileIO::from_path(path) / FileIO::to_path(path) | โ
sync std::fs |
| Tokio async file IO | TokioFileIO::from_path(path) / TokioFileIO::to_path(path) | โ
Tokio fs |
fromInputStream (std) | StreamConverters::from_reader(reader) | โ |
fromOutputStream (std) | StreamConverters::to_writer(writer) | โ |
| TCP | TokioByteSink / TokioByteSource / TokioTcp | โ plain TCP; TLS out of scope |
| Framing | Framing::delimiter(...) / Framing::length_field(...) | โ |
| JSON framing | Framing::json(...) | โ |
asInputStream / asOutputStream handle | โ | โญ๏ธ deferred โ stream-as-Read/Write adapters post-v0.2.0 |
JVM javaCollector* / fromJavaStream | โ | โ JVM-only |
Dynamic stream operators โ
| Akka concept | Datum operator | Notes |
|---|---|---|
KillSwitches.single | KillSwitches::single() | โ
UniqueKillSwitch |
KillSwitches.shared | KillSwitches::shared(name) | โ
SharedKillSwitch |
MergeHub | MergeHub::source(buffer_size) | โ |
BroadcastHub | BroadcastHub::sink(buffer_size) | โ |
PartitionHub | PartitionHub::sink(router, start_after, buffer_size) | โ |
Context propagation operators โ
| Akka operator | Datum operator | Notes |
|---|---|---|
asSourceWithContext | Source::as_source_with_context(extract) | โ |
asFlowWithContext | Flow::as_flow_with_context(collapse, extract) | โ |
FlowWithContext | FlowWithContext<In, CtxIn, Out, CtxOut, Mat> | โ |
SourceWithContext | SourceWithContext<Out, Ctx, Mat> | โ |
Testing operators โ
| Akka testkit concept | Datum equivalent | Notes |
|---|---|---|
TestSource.probe | datum::testkit::TestSource | โ manual demand + emit/complete/fail |
TestSink.probe | datum::testkit::TestSink | โ
request / expect_next / expect_complete / expect_error |
Graph stage API โ
See Working with Graphs guide.
| Akka concept | Datum equivalent | Notes |
|---|---|---|
GraphStage | GraphStage<S> trait | โ |
GraphStageLogic | GraphStageLogic | โ
pull / grab / push / complete / fail |
InHandler / OutHandler | InHandler / OutHandler | โ |
Async callbacks (getAsyncCallback) | via GraphStageLogic side-channel | โ |
| Timers in stages | via GraphStageLogic timer API | โ |
AsyncBoundary | AsyncBoundary / .async() hint | โ
AsyncBoundary::new() |
| Graph cycles | โ | โญ๏ธ deferred (WP-16 stretch) โ GraphBuilder rejects cycles at validation time; requires a queued interpreter |
Deferred and out-of-scope summary โ
| Category | Status |
|---|---|
Reactive Streams interop (asPublisher, asSubscriber, fromPublisher, fromSubscriber) | โ out of scope โ Rust has no equivalent ubiquitous standard; use Tokio futures |
JVM-only operators (fromJavaStream, asJavaStream, javaCollector*, CompletionStage*) | โ JVM-only โ capability covered by Source::from_iterable and Source::future |
| StreamRefs (distributed stream references) | โญ๏ธ WP-15 (stretch) โ requires ractor_cluster; no concrete use case yet |
| Graph cycles + liveness | โญ๏ธ WP-16 (stretch) โ requires queued interpreter with per-edge demand tracking |
asInputStream / asOutputStream sink/source adapters | โญ๏ธ post-v0.2.0 โ forward direction (from_reader/to_writer) is shipped |
| TLS, UDP | โญ๏ธ out of v0.2.0 scope |