Skip to content

Operator Reference โ€‹

Complete index of Datum's stream operators, grouped by category and cross-referenced to their Akka Streams equivalents. Status follows the M2 ledger: โœ… shipped ยท ๐ŸŸก partial ยท โญ๏ธ deferred ยท โŒ won't implement.

For narrative context on the major areas, see the corresponding guides: Graphs ยท Buffers & Rate ยท Error Handling ยท Futures Interop ยท Actors Interop ยท Dynamic Streams ยท Streaming IO ยท Testing Streams ยท Substreams


Source operators โ€‹

Akka operatorDatum operatorNotes
Source.emptySource::empty()โœ…
Source.singleSource::single(item)โœ…
Source.repeatSource::repeat(item)โœ…
Source.failedSource::failed(error)โœ…
Source.fromIteratorSource::from_fn_iter(factory)โœ…
Source.from / fromIterableSource::from_iterable(items)โœ…
Source.rangeSource::from_iterable(a..b)โœ… range(a, b) = from_iterable(a..b)
Source.futureSource::future(fut_factory)โœ…
Source.futureSourceSource::future_source(fut_factory)โœ…
Source.maybeSource::maybe()โœ… returns (MaybeHandle<T>, Source<T>)
Source.neverSource::never()โœ…
Source.tickSource::tick(initial, interval, element)โœ…
Source.cycleSource::cycle(factory)โœ… factory called each cycle
Source.unfoldSource::unfold(initial, f)โœ…
Source.unfoldAsyncSource::unfold_async(initial, f)โœ… Tokio-dispatched
Source.unfoldResourceSource::unfold_resource(create, read, close)โœ…
Source.unfoldResourceAsyncSource::unfold_resource_async(create, read, close)โœ… Tokio-dispatched
Source.lazily / lazySourceSource::lazy_source(factory)โœ… defers construction to materialization
Source.lazySingleSource::lazy_single(factory)โœ…
Source.lazyFutureSource::lazy_future(factory)โœ…
Source.lazyFutureSourceSource::lazy_future_source(factory)โœ…
Source.combineSource::combine(sources, strategy)โœ… SourceCombineStrategy
Source.zipNSource::zip_n(sources)โœ…
Source.zipWithNSource::zip_with_n(sources, zipper)โœ…
Source.queueSource::queue(capacity, strategy) / Source::queue_bounded(capacity)โœ… SourceQueue<T> / BoundedSourceQueue<T> are the materialized values
Source.asSourceWithContextSource::as_source_with_context(extract)โœ…
Source.mergePrioritizedNSource::merge_prioritized_n(sources_with_priority)โœ…
Source.asSubscriber / fromPublisherโ€”โŒ Reactive Streams โ€” out of scope; use Tokio futures
Source.fromCompletionStage / JVM fromJavaStreamโ€”โŒ JVM-only; use Source::future / Source::from_iterable

Sink operators โ€‹

Akka operatorDatum operatorNotes
Sink.ignoreSink::ignore()โœ…
Sink.headSink::head()โœ… fails on empty stream
Sink.headOptionSink::head_option()โœ…
Sink.lastSink::last()โœ…
Sink.lastOptionSink::last_option()โœ…
Sink.seq / collectSink::collect()โœ… returns Vec<T>
Sink.collectionSink::collection()โœ…
Sink.takeLastSink::take_last(n)โœ…
Sink.foldSink::fold(zero, f)โœ…
Sink.reduceSink::reduce(f)โœ…
Sink.foreachSink::foreach(f)โœ…
Sink.foreachAsyncSink::foreach_async(parallelism, f)โœ…
Sink.cancelledSink::cancelled()โœ…
Sink.neverSink::never()โœ…
Sink.onCompleteSink::on_complete(callback)โœ…
Sink.futureSinkSink::future_sink(factory)โœ…
Sink.lazySinkSink::lazy_sink(factory)โœ…
Sink.lazyFutureSinkSink::lazy_future_sink(factory)โœ…
Sink.fromMaterializer / setupSink::from_materializer(f) / Sink::setup(f)โœ…
Sink.preMaterializeSink::pre_materialize()โœ…
Sink.combineSink::combine(sinks, strategy)โœ… SinkCombineStrategy
Sink.queueSink::queue()โœ… SinkQueue
Sink.asPublisher / fromSubscriberโ€”โŒ Reactive Streams โ€” out of scope
Sink.asInputStream / asOutputStreamโ€”โญ๏ธ deferred; WP-12 delivered the forward direction (StreamConverters::from_reader / to_writer); adapters for stream-as-Read/Write handle are post-v0.2.0
JVM javaCollector* / asJavaStreamโ€”โŒ JVM-only; use Sink::collect()

Flow operators (simple / transforming) โ€‹

Akka operatorDatum operatorNotes
map.map(f)โœ…
filter.filter(predicate)โœ…
filterNot.filter_not(predicate)โœ…
collect.filter_map(f)โœ… Rust-native rename; collect is a keyword conflict
mapConcat.map_concat(f)โœ…
grouped.grouped(size)โœ…
sliding.sliding(size, step)โœ…
scan.scan(seed, f)โœ…
fold.fold(zero, f)โœ…
reduce.reduce(f)โœ…
take.take(n)โœ…
takeWhile.take_while(predicate)โœ…
drop.drop(n)โœ…
dropWhile.drop_while(predicate)โœ…
limit.limit(max)โœ…
statefulMap.stateful_map(seed, f)โœ…
statefulMapConcat.stateful_map_concat(seed, f)โœ…
intersperse.intersperse(inject)โœ…
flattenOptional.flatten_optional()โœ…
groupedWeighted.grouped_weighted(max_weight, cost_fn)โœ…
limitWeighted.limit_weighted(max_weight, cost_fn)โœ…
contramap.contramap(f)โœ…
log / logWithMarker.monitor(callback)โœ… logging via closure callback
monitor.monitor(callback)โœ…
watchTermination.watch_termination(materialize_callback)โœ…
foldAsync.fold_async(zero, f)โœ… Tokio-dispatched
scanAsync.scan_async(seed, f)โœ… Tokio-dispatched
mapWithResource.map_with_resource(create, f, close)โœ…
futureFlowFlow::future_flow(factory)โœ…
lazyFlowFlow::lazy_flow(factory)โœ…
lazyFutureFlowFlow::lazy_future_flow(factory)โœ…
throttle.throttle(elements, per, burst, mode)โœ… ThrottleMode::Shaping / Enforcing
detach.detach()โœ… decouples fused regions
asFlowWithContext.as_flow_with_context(collapse, extract)โœ…
fromSinkAndSourceFlow::from_sink_and_source(sink, source)โœ…
fromSinkAndSourceCoupledFlow::from_sink_and_source_coupled(sink, source)โœ…
collectTypeโ€”โœ… equiv: use `.filter_map(

Async operators โ€‹

Akka operatorDatum operatorNotes
mapAsync.map_async(parallelism, f)โœ… ordered, Tokio-dispatched
mapAsyncUnordered.map_async_unordered(parallelism, f)โœ… unordered, Tokio-dispatched
mapAsyncPartitioned.map_async_partitioned(parallelism, partition_fn, f)โœ… per-partition ordering

Time-aware operators โ€‹

Akka operatorDatum operatorNotes
delay.delay(duration, strategy)โœ… DelayOverflowStrategy
delayWith.delay_with(supplier, strategy)โœ… per-element delay
initialDelay.initial_delay(duration)โœ…
dropWithin.drop_within(timeout)โœ…
takeWithin.take_within(timeout)โœ…
groupedWithin.grouped_within(max_number, interval)โœ…
groupedWeightedWithin.grouped_weighted_within(max_weight, interval, cost_fn)โœ…
idleTimeout.idle_timeout(timeout)โœ…
backpressureTimeout.backpressure_timeout(timeout)โœ…
completionTimeout.completion_timeout(timeout)โœ…
initialTimeout.initial_timeout(timeout)โœ…
keepAlive.keep_alive(timeout, inject)โœ…

Backpressure & rate operators โ€‹

Akka operatorDatum operatorNotes
buffer.buffer(size, strategy)โœ… all OverflowStrategy variants
conflate.conflate(aggregate)โœ…
conflateWithSeed.conflate_with_seed(seed, aggregate)โœ…
expand.expand(expand)โœ…
extrapolate.extrapolate(expand, initial)โœ…
batch.batch(max, seed, aggregate)โœ…
batchWeighted.batch_weighted(max, cost_fn, seed, aggregate)โœ…
aggregateWithBoundary.aggregate_with_boundary(allocate, aggregate, harvest, boundary)โœ…

OverflowStrategy variants: DropHead, DropTail, DropBuffer, DropNew, Backpressure, Fail.


Substream operators โ€‹

Akka operatorDatum operatorNotes
flatMapConcat.flat_map_concat(f)โœ…
flatMapMerge.flat_map_merge(breadth, f)โœ…
flatMapPrefix.flat_map_prefix(n, f)โœ…
groupBy.group_by(max_substreams, key_fn, eager_complete)โœ… closed keys are retained until parent completes
prefixAndTail.prefix_and_tail(n)โœ… emits (Vec<T>, Source<T>)
splitWhen.split_when(predicate)โœ… emits Source<T> per segment
splitAfter.split_after(predicate)โœ… emits Source<T> per segment

See Substreams guide for the SubFlow-vs-nested-source behavioral difference.


Fan-in operators โ€‹

Akka operatorDatum operatorNotes
mergeMerge<T>::new(n) (graph junction)โœ…
mergePreferredMergePreferred<T>::new(n_secondary)โœ…
mergePrioritizedMergePrioritized<T>::new(weights)โœ…
mergePrioritizedNSource::merge_prioritized_n(sources)โœ…
zipZip<L, R>::new() (graph junction)โœ…
concat.concat(source) / Concat<T>::new(n)โœ…
concatLazy.concat_lazy(source)โœ…
concatAllLazy.concat_all_lazy(sources)โœ…
prepend.prepend(source)โœ…
prependLazy.prepend_lazy(source)โœ…
orElse.or_else(secondary)โœ…
interleave.interleave(source, segment_size) / Interleave<T>::new(n, segment_size)โœ…
interleaveAll.interleave_all(sources, segment_size, eager_close)โœ…
mergeSorted.merge_sorted(source) / MergeSorted<T>::new()โœ…
MergeSequenceMergeSequence<T>::new(n, extract_seq)โœ…
mergeLatest.merge_latest(source, eager_complete) / MergeLatest<T>::new(inputs, eager_complete)โœ…
mergeAll.merge_all(sources, eager_complete)โœ…
zipWith.zip_with(source, f)โœ…
zipLatest.zip_latest(source)โœ…
zipLatestWith.zip_latest_with(source, f)โœ…
zipWithIndex.zip_with_index()โœ… emits (T, u64)
zipAll.zip_all(source, this_default, that_default)โœ…
zipNSource::zip_n(sources)โœ…
zipWithNSource::zip_with_n(sources, zipper)โœ…

Fan-out operators โ€‹

Akka operatorDatum operatorNotes
BroadcastBroadcast<T>::new(n)โœ…
BalanceBalance<T>::new(n)โœ…
PartitionPartition<T>::new(n, f)โœ…
UnzipUnzip<A, B>::new()โœ…
UnzipWithUnzipWith<In, A, B>::new(f)โœ…
alsoTo.also_to(sink)โœ…
alsoToAll.also_to_all(sinks)โœ…
divertTo.divert_to(sink, predicate)โœ…
wireTap.wire_tap(sink)โœ…

Actor interop operators โ€‹

See Actors Interop guide for usage examples.

Akka operatorDatum operatorNotes
ActorFlow.askActorFlow::ask(actor_ref, parallelism, timeout, make_msg)โœ…
ActorFlow.askWithStatusActorFlow::ask_with_status(actor_ref, parallelism, timeout, make_msg)โœ… unwraps ActorStatus::Ok/Err
ActorFlow.askWithContextActorFlow::ask_with_context(actor_ref, parallelism, timeout, make_msg)โœ… preserves context
ActorFlow.askWithStatusAndContextActorFlow::ask_with_status_and_context(actor_ref, parallelism, timeout, make_msg)โœ…
ActorSource (actorRef)ActorSource::actor_ref()โœ… typed ActorSourceMessage<T> protocol
ActorSource (actorRef backpressured)ActorSource::actor_ref_with_backpressure()โœ…
ActorSource typedActorSource::typed()โœ…
ActorSink (actorRef)ActorSink::actor_ref(...)โœ…
ActorSink (actorRef backpressured)ActorSink::actor_ref_with_backpressure(...)โœ…
ActorSink typedActorSink::typed() / ActorSink::typed_with_backpressure()โœ…
PubSub source / sinkActorPubSub::source(group) / ActorPubSub::sink(group)โœ…
watchActorFlow::watch(actor_ref)โœ… emits pass-through elements; terminates on actor exit

Error-handling operators โ€‹

See Error Handling guide.

Akka operatorDatum operatorNotes
recover.recover(f)โœ…
recoverWith.recover_with(f)โœ…
recoverWithRetries.recover_with_retries(retries, f)โœ…
mapError.map_error(f)โœ…
onErrorComplete.on_error_complete()โœ…
Supervision decidersSupervision::stopping_decider() / resuming_decider() / restarting_decider()โœ…
Supervised callbacks.map_result_with_supervision(f, decider) (and equivalents for filter_map, fold, scan, reduce, foreach)โœ… fallible callbacks use *_result variants
RestartSource.withBackoffRestartSource::with_backoff(settings, factory)โœ…
RestartSource.onFailuresWithBackoffRestartSource::on_failures_with_backoff(settings, factory)โœ…
RestartFlow.withBackoffRestartFlow::with_backoff(settings, factory)โœ…
RestartFlow.onFailuresWithBackoffRestartFlow::on_failures_with_backoff(settings, factory)โœ…
RestartSink.withBackoffRestartSink::with_backoff(settings, factory)โœ… wraps sinks that materialize StreamCompletion<NotUsed>
RetryFlow.withBackoffRetryFlow::with_backoff(min, max, factor, f)โœ…
RetryFlow.withBackoffAndContextRetryFlow::with_backoff_and_context(...)โœ…

IO & compression operators โ€‹

See Streaming IO guide.

Akka operatorDatum operatorNotes
gzip / gunzipCompression::gzip() / Compression::gunzip()โœ… via flate2
deflate / inflateCompression::deflate() / Compression::inflate()โœ…
fromPath / toPathFileIO::from_path(path) / FileIO::to_path(path)โœ… sync std::fs
Tokio async file IOTokioFileIO::from_path(path) / TokioFileIO::to_path(path)โœ… Tokio fs
fromInputStream (std)StreamConverters::from_reader(reader)โœ…
fromOutputStream (std)StreamConverters::to_writer(writer)โœ…
TCPTokioByteSink / TokioByteSource / TokioTcpโœ… plain TCP; TLS out of scope
FramingFraming::delimiter(...) / Framing::length_field(...)โœ…
JSON framingFraming::json(...)โœ…
asInputStream / asOutputStream handleโ€”โญ๏ธ deferred โ€” stream-as-Read/Write adapters post-v0.2.0
JVM javaCollector* / fromJavaStreamโ€”โŒ JVM-only

Dynamic stream operators โ€‹

See Dynamic Streams guide.

Akka conceptDatum operatorNotes
KillSwitches.singleKillSwitches::single()โœ… UniqueKillSwitch
KillSwitches.sharedKillSwitches::shared(name)โœ… SharedKillSwitch
MergeHubMergeHub::source(buffer_size)โœ…
BroadcastHubBroadcastHub::sink(buffer_size)โœ…
PartitionHubPartitionHub::sink(router, start_after, buffer_size)โœ…

Context propagation operators โ€‹

Akka operatorDatum operatorNotes
asSourceWithContextSource::as_source_with_context(extract)โœ…
asFlowWithContextFlow::as_flow_with_context(collapse, extract)โœ…
FlowWithContextFlowWithContext<In, CtxIn, Out, CtxOut, Mat>โœ…
SourceWithContextSourceWithContext<Out, Ctx, Mat>โœ…

Testing operators โ€‹

See Testing Streams guide.

Akka testkit conceptDatum equivalentNotes
TestSource.probedatum::testkit::TestSourceโœ… manual demand + emit/complete/fail
TestSink.probedatum::testkit::TestSinkโœ… request / expect_next / expect_complete / expect_error

Graph stage API โ€‹

See Working with Graphs guide.

Akka conceptDatum equivalentNotes
GraphStageGraphStage<S> traitโœ…
GraphStageLogicGraphStageLogicโœ… pull / grab / push / complete / fail
InHandler / OutHandlerInHandler / OutHandlerโœ…
Async callbacks (getAsyncCallback)via GraphStageLogic side-channelโœ…
Timers in stagesvia GraphStageLogic timer APIโœ…
AsyncBoundaryAsyncBoundary / .async() hintโœ… AsyncBoundary::new()
Graph cyclesโ€”โญ๏ธ deferred (WP-16 stretch) โ€” GraphBuilder rejects cycles at validation time; requires a queued interpreter

Deferred and out-of-scope summary โ€‹

CategoryStatus
Reactive Streams interop (asPublisher, asSubscriber, fromPublisher, fromSubscriber)โŒ out of scope โ€” Rust has no equivalent ubiquitous standard; use Tokio futures
JVM-only operators (fromJavaStream, asJavaStream, javaCollector*, CompletionStage*)โŒ JVM-only โ€” capability covered by Source::from_iterable and Source::future
StreamRefs (distributed stream references)โญ๏ธ WP-15 (stretch) โ€” requires ractor_cluster; no concrete use case yet
Graph cycles + livenessโญ๏ธ WP-16 (stretch) โ€” requires queued interpreter with per-edge demand tracking
asInputStream / asOutputStream sink/source adaptersโญ๏ธ post-v0.2.0 โ€” forward direction (from_reader/to_writer) is shipped
TLS, UDPโญ๏ธ out of v0.2.0 scope