Skip to content

Futures Interop

Datum is Tokio-first. tokio is a hard dependency — Ractor (the actor runtime) runs on Tokio, and all async dispatch uses the Tokio runtime. Acceptance of std::future::Future is an optional portability surface, not the default model.

Futures driven by map_async, map_async_unordered, and related operators must not block inside poll. They are polled by the Tokio runtime; blocking a Tokio worker thread starves other tasks.

map_async

map_async(parallelism, f) applies an async transformation to each element. Up to parallelism futures run concurrently. Output order matches input order regardless of future completion order.

rust
use datum::{Sink, Source};

// map_async(parallelism, f) applies an async transformation and preserves
// element order in the output regardless of future completion order.
let items: Vec<u64> = Source::from_iter(1_u64..=5)
    .map_async(2, |x| async move { Ok(x * 2) })
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

The parallelism argument must be greater than zero. Each future receives an owned copy of the element and returns StreamResult<Next>. A future returning Err fails the stream unless a supervision decider is attached via map_async_with_supervision.

Datum polls each future once on the drain thread. Futures that resolve immediately (already-ready Poll::Ready) complete without crossing the Tokio thread boundary. Pending futures are moved onto the Tokio runtime.

map_async_unordered

map_async_unordered(parallelism, f) runs the same concurrent transformation but emits results in completion order — not input order. Use it when downstream processing is independent of element sequence and lower latency per element matters more than stable ordering.

rust
use datum::{Sink, Source};

// map_async_unordered(parallelism, f) applies an async transformation and
// emits results in completion order — not necessarily input order.
// Sort before asserting when order does not matter.
let mut items: Vec<u64> = Source::from_iter(1_u64..=5)
    .map_async_unordered(2, |x| async move { Ok(x * 2) })
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();
items.sort_unstable();

Sort the output before asserting equality when order is non-deterministic.

map_async_unordered_with_supervision is the supervised variant.

map_async_partitioned

map_async_partitioned(parallelism, per_partition, partition_fn, f) provides key-based ordering: elements with the same key are processed in order relative to each other; elements with different keys proceed independently up to the concurrency limits.

rust
use datum::{Sink, Source};

// map_async_partitioned(parallelism, per_partition, partition_fn, f) gives
// key-based ordering: elements with the same key are processed in order;
// different keys proceed independently.
let mut items: Vec<String> = Source::from_iter(0_u64..6)
    .map_async_partitioned(
        4,         // total concurrent futures
        2,         // max concurrent per partition key
        |x| x % 3, // partition key: 0, 1, or 2
        |x| async move { Ok(format!("item-{x}")) },
    )
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();
items.sort(); // cross-partition order is non-deterministic; sort before asserting

Parameters:

  • parallelism: maximum total concurrent futures across all partitions.
  • per_partition: maximum concurrent futures for a single partition key.
  • partition_fn: Fn(&Out) -> Key — the key must be Clone + Eq + Hash + Send + 'static.
  • f: Fn(Out) -> Fut — the async transformation.

Elements within the same partition emerge in input order. Elements from different partitions may interleave freely. This is useful for per-user or per-key async enrichment where intra-key ordering must be preserved.

Async sources

Source::future

Source::future(f) evaluates the factory f, runs the returned future, and emits the single resulting element. The factory is called once per materialization.

rust
use datum::{Sink, Source};

// Source::future(f) evaluates the factory, runs the returned future,
// and emits the single resulting element.
let items: Vec<u64> = Source::<u64, _>::future(|| async { Ok(42_u64) })
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

Source::lazy_future has equivalent behavior. Both are blueprint-safe: the factory is not called until the source is materialized.

Source::future_source

Source::future_source(f) runs a future that itself returns a Source<Out>. Elements of the inner source are emitted in sequence after the future resolves:

rust
use datum::{Sink, Source};

// Source::future_source(f) runs a future that returns a Source<Out>.
// Elements of the inner source are emitted in sequence after the future resolves.
let items: Vec<u64> = Source::future_source(|| async { Ok(Source::from_iter(1_u64..=3)) })
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

Source::unfold_async

Source::unfold_async(initial, f) generates a stream by repeatedly calling f with a state value. The future returns Ok(Some((next_state, element))) to emit an element, or Ok(None) to complete:

rust
use datum::{Sink, Source};

// Source::unfold_async(initial, f) generates elements by calling f with a state.
// Return Ok(Some((next_state, element))) to emit, Ok(None) to complete.
let items: Vec<u64> = Source::unfold_async(3_u64, |n| async move {
    if n == 0 {
        Ok(None)
    } else {
        Ok(Some((n - 1, n)))
    }
})
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();

Source::unfold_resource_async

Source::unfold_resource_async(create, read, close) manages an async resource lifecycle:

  • create: Fn() -> Fut — opens the resource; the future resolves to StreamResult<Resource>.
  • read: Fn(Resource) -> Fut — reads the next element; resolves to StreamResult<Option<(Resource, Out)>>. Return None to signal end-of-resource.
  • close: Fn(Resource) -> Fut — releases the resource; called when the stream completes or fails.

This maps to Akka's Source.unfoldResourceAsync. Use it for async file handles, database cursors, or any resource that requires explicit teardown.

Async sinks

Sink::foreach_async

Sink::foreach_async(parallelism, f) drives up to parallelism side-effect futures concurrently. The materialized value is StreamCompletion<NotUsed>; call .wait() to block until all futures complete and retrieve NotUsed.

rust
use datum::{NotUsed, Sink, Source};

// Sink::foreach_async(parallelism, f) drives up to `parallelism` async
// side-effect futures concurrently. run_with returns StreamCompletion<NotUsed>;
// .wait().unwrap() blocks until all futures complete and yields NotUsed.
let result: NotUsed = Source::from_iter(1_u64..=3)
    .run_with(Sink::foreach_async(2, |_x| async move { Ok(()) }))
    .unwrap()
    .wait()
    .unwrap();

Completion order across concurrent futures is not guaranteed. Each future receives an owned element and returns StreamResult<()>. A future returning Err fails the stream.

Sink::future_sink

Sink::future_sink(f) defers sink construction to a future. The factory f is called at materialization time; the future resolves to a Sink<In, InnerMat> that handles the actual stream:

rust
use datum::{Sink, Source};

// Sink::future_sink(f) defers sink construction to a future. The factory is
// called at materialization; the future resolves to the actual Sink<In, InnerMat>.
let _completion = Source::from_iter(1_u64..=3)
    .run_with(Sink::future_sink(|| async { Ok(Sink::ignore()) }))
    .unwrap()
    .wait()
    .unwrap();

Sink::lazy_future_sink has equivalent behavior.

Dispatching to Tokio

When map_async or map_async_unordered receives an element, it calls the user factory to produce a future, then polls it once on the drain thread. If the future is already complete (Poll::Ready), the result is available immediately and no Tokio handoff occurs. If the future is pending (Poll::Pending), it is moved onto the Tokio runtime's executor via tokio::spawn.

Because futures can cross the drain-to-Tokio thread boundary, they must be Send + 'static. Futures must not block inside poll. A blocking call inside an async future blocks the Tokio worker thread that drives it and degrades throughput for all tasks sharing that runtime.

Use tokio::task::spawn_blocking inside the async closure to offload genuinely blocking work to a dedicated thread pool.

std-future compatibility

Datum's async operators accept std::future::Future directly — the same trait used by Tokio, async-std, and the Rust standard library. There is no adapter layer. tokio::spawn, tokio::time::sleep, and any other Tokio utilities work inside async closures without extra wrapping:

rust
use datum::{Sink, Source};
use std::time::Duration;

// Tokio utilities like tokio::time::sleep work inside async closures without
// extra wrapping — Datum dispatches pending futures to the Tokio runtime.
let items: Vec<u64> = Source::from_iter(1_u64..=3)
    .map_async(2, |x| async move {
        tokio::time::sleep(Duration::from_micros(1)).await;
        Ok(x * 10)
    })
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

std-future acceptance is an optional portability surface for integrating with non-Tokio async ecosystems. The default execution path is Tokio.

Next steps