Appearance
Futures Interop
Datum is Tokio-first. tokio is a hard dependency — Ractor (the actor runtime) runs on Tokio, and all async dispatch uses the Tokio runtime. Acceptance of std::future::Future is an optional portability surface, not the default model.
Futures driven by map_async, map_async_unordered, and related operators must not block inside poll. They are polled by the Tokio runtime; blocking a Tokio worker thread starves other tasks.
map_async
map_async(parallelism, f) applies an async transformation to each element. Up to parallelism futures run concurrently. Output order matches input order regardless of future completion order.
rust
use datum::{Sink, Source};
// map_async(parallelism, f) applies an async transformation and preserves
// element order in the output regardless of future completion order.
let items: Vec<u64> = Source::from_iter(1_u64..=5)
.map_async(2, |x| async move { Ok(x * 2) })
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();The parallelism argument must be greater than zero. Each future receives an owned copy of the element and returns StreamResult<Next>. A future returning Err fails the stream unless a supervision decider is attached via map_async_with_supervision.
Datum polls each future once on the drain thread. Futures that resolve immediately (already-ready Poll::Ready) complete without crossing the Tokio thread boundary. Pending futures are moved onto the Tokio runtime.
map_async_unordered
map_async_unordered(parallelism, f) runs the same concurrent transformation but emits results in completion order — not input order. Use it when downstream processing is independent of element sequence and lower latency per element matters more than stable ordering.
rust
use datum::{Sink, Source};
// map_async_unordered(parallelism, f) applies an async transformation and
// emits results in completion order — not necessarily input order.
// Sort before asserting when order does not matter.
let mut items: Vec<u64> = Source::from_iter(1_u64..=5)
.map_async_unordered(2, |x| async move { Ok(x * 2) })
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
items.sort_unstable();Sort the output before asserting equality when order is non-deterministic.
map_async_unordered_with_supervision is the supervised variant.
map_async_partitioned
map_async_partitioned(parallelism, per_partition, partition_fn, f) provides key-based ordering: elements with the same key are processed in order relative to each other; elements with different keys proceed independently up to the concurrency limits.
rust
use datum::{Sink, Source};
// map_async_partitioned(parallelism, per_partition, partition_fn, f) gives
// key-based ordering: elements with the same key are processed in order;
// different keys proceed independently.
let mut items: Vec<String> = Source::from_iter(0_u64..6)
.map_async_partitioned(
4, // total concurrent futures
2, // max concurrent per partition key
|x| x % 3, // partition key: 0, 1, or 2
|x| async move { Ok(format!("item-{x}")) },
)
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
items.sort(); // cross-partition order is non-deterministic; sort before assertingParameters:
parallelism: maximum total concurrent futures across all partitions.per_partition: maximum concurrent futures for a single partition key.partition_fn: Fn(&Out) -> Key— the key must beClone + Eq + Hash + Send + 'static.f: Fn(Out) -> Fut— the async transformation.
Elements within the same partition emerge in input order. Elements from different partitions may interleave freely. This is useful for per-user or per-key async enrichment where intra-key ordering must be preserved.
Async sources
Source::future
Source::future(f) evaluates the factory f, runs the returned future, and emits the single resulting element. The factory is called once per materialization.
rust
use datum::{Sink, Source};
// Source::future(f) evaluates the factory, runs the returned future,
// and emits the single resulting element.
let items: Vec<u64> = Source::<u64, _>::future(|| async { Ok(42_u64) })
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();Source::lazy_future has equivalent behavior. Both are blueprint-safe: the factory is not called until the source is materialized.
Source::future_source
Source::future_source(f) runs a future that itself returns a Source<Out>. Elements of the inner source are emitted in sequence after the future resolves:
rust
use datum::{Sink, Source};
// Source::future_source(f) runs a future that returns a Source<Out>.
// Elements of the inner source are emitted in sequence after the future resolves.
let items: Vec<u64> = Source::future_source(|| async { Ok(Source::from_iter(1_u64..=3)) })
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();Source::unfold_async
Source::unfold_async(initial, f) generates a stream by repeatedly calling f with a state value. The future returns Ok(Some((next_state, element))) to emit an element, or Ok(None) to complete:
rust
use datum::{Sink, Source};
// Source::unfold_async(initial, f) generates elements by calling f with a state.
// Return Ok(Some((next_state, element))) to emit, Ok(None) to complete.
let items: Vec<u64> = Source::unfold_async(3_u64, |n| async move {
if n == 0 {
Ok(None)
} else {
Ok(Some((n - 1, n)))
}
})
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();Source::unfold_resource_async
Source::unfold_resource_async(create, read, close) manages an async resource lifecycle:
create: Fn() -> Fut— opens the resource; the future resolves toStreamResult<Resource>.read: Fn(Resource) -> Fut— reads the next element; resolves toStreamResult<Option<(Resource, Out)>>. ReturnNoneto signal end-of-resource.close: Fn(Resource) -> Fut— releases the resource; called when the stream completes or fails.
This maps to Akka's Source.unfoldResourceAsync. Use it for async file handles, database cursors, or any resource that requires explicit teardown.
Async sinks
Sink::foreach_async
Sink::foreach_async(parallelism, f) drives up to parallelism side-effect futures concurrently. The materialized value is StreamCompletion<NotUsed>; call .wait() to block until all futures complete and retrieve NotUsed.
rust
use datum::{NotUsed, Sink, Source};
// Sink::foreach_async(parallelism, f) drives up to `parallelism` async
// side-effect futures concurrently. run_with returns StreamCompletion<NotUsed>;
// .wait().unwrap() blocks until all futures complete and yields NotUsed.
let result: NotUsed = Source::from_iter(1_u64..=3)
.run_with(Sink::foreach_async(2, |_x| async move { Ok(()) }))
.unwrap()
.wait()
.unwrap();Completion order across concurrent futures is not guaranteed. Each future receives an owned element and returns StreamResult<()>. A future returning Err fails the stream.
Sink::future_sink
Sink::future_sink(f) defers sink construction to a future. The factory f is called at materialization time; the future resolves to a Sink<In, InnerMat> that handles the actual stream:
rust
use datum::{Sink, Source};
// Sink::future_sink(f) defers sink construction to a future. The factory is
// called at materialization; the future resolves to the actual Sink<In, InnerMat>.
let _completion = Source::from_iter(1_u64..=3)
.run_with(Sink::future_sink(|| async { Ok(Sink::ignore()) }))
.unwrap()
.wait()
.unwrap();Sink::lazy_future_sink has equivalent behavior.
Dispatching to Tokio
When map_async or map_async_unordered receives an element, it calls the user factory to produce a future, then polls it once on the drain thread. If the future is already complete (Poll::Ready), the result is available immediately and no Tokio handoff occurs. If the future is pending (Poll::Pending), it is moved onto the Tokio runtime's executor via tokio::spawn.
Because futures can cross the drain-to-Tokio thread boundary, they must be Send + 'static. Futures must not block inside poll. A blocking call inside an async future blocks the Tokio worker thread that drives it and degrades throughput for all tasks sharing that runtime.
Use tokio::task::spawn_blocking inside the async closure to offload genuinely blocking work to a dedicated thread pool.
std-future compatibility
Datum's async operators accept std::future::Future directly — the same trait used by Tokio, async-std, and the Rust standard library. There is no adapter layer. tokio::spawn, tokio::time::sleep, and any other Tokio utilities work inside async closures without extra wrapping:
rust
use datum::{Sink, Source};
use std::time::Duration;
// Tokio utilities like tokio::time::sleep work inside async closures without
// extra wrapping — Datum dispatches pending futures to the Tokio runtime.
let items: Vec<u64> = Source::from_iter(1_u64..=3)
.map_async(2, |x| async move {
tokio::time::sleep(Duration::from_micros(1)).await;
Ok(x * 10)
})
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();std-future acceptance is an optional portability surface for integrating with non-Tokio async ecosystems. The default execution path is Tokio.
Next steps
- Actors Interop —
ActorFlow::askand Ractor integration - Execution Model — drain thread, Tokio dispatch, spin-then-park