Skip to content

Actors Interop

Datum integrates with Ractor actors through three bridge APIs:

  • ActorSource — an actor-backed source that lets external code push elements into a stream.
  • ActorSink — routes stream elements to an actor's mailbox.
  • ActorFlow::ask — transforms each stream element by sending it to an actor and waiting for a reply.
  • ActorPubSub — named broadcast groups backed by Ractor's pg process groups.

Actor spawning is asynchronous (Ractor runs on Tokio). ActorSource handles its own spawn internally so you can start immediately. ActorSink and ActorFlow::ask require an already-running ActorRef, which you obtain by spawning the actor before materialization.

ActorSource

ActorSource::actor_ref::<T>() builds a source whose materialized value is an ActorRef<ActorSourceMessage<T>>. The source creates and owns its own actor — no external spawning is needed. Send ActorSourceMessage::Element(x) to emit elements, ActorSourceMessage::Complete to complete normally, or ActorSourceMessage::Fail(reason) to fail the stream.

rust
use datum::{ActorSource, ActorSourceMessage, Keep, Sink};

// ActorSource::actor_ref() materializes an ActorRef that feeds the source.
// No external actor spawning is needed: the source creates and owns its actor.
let (actor_ref, completion) = ActorSource::actor_ref::<u64>()
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

actor_ref.cast(ActorSourceMessage::Element(10)).unwrap();
actor_ref.cast(ActorSourceMessage::Element(20)).unwrap();
actor_ref.cast(ActorSourceMessage::Complete).unwrap();

The source's internal actor mailbox is unbounded. A fast sender can enqueue messages faster than downstream consumes them. Use ActorSource::actor_ref_with_backpressure when the producer may outpace demand — it sends an ack to a supplied actor after each element is consumed.

ActorSource::typed is an alias for ActorSource::actor_ref with the same protocol.

ActorSink

ActorSink::typed(actor_ref) routes each stream element to actor_ref as an ActorSinkMessage::Element(x), followed by ActorSinkMessage::Complete when the stream ends (or ActorSinkMessage::Fail(error) on failure). The receiving actor must accept ActorSinkMessage<In>.

rust
use datum::actor::{Actor, ActorProcessingErr, ActorRef};
use datum::{ActorSink, ActorSinkMessage, Source};
use std::sync::{Arc, Mutex};
use std::time::Duration;

// A minimal actor that collects ActorSinkMessage<u64> into a shared Vec.
struct CollectorActor;

impl Actor for CollectorActor {
    type Msg = ActorSinkMessage<u64>;
    type State = Arc<Mutex<Vec<u64>>>;
    type Arguments = Arc<Mutex<Vec<u64>>>;

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        args: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(args)
    }

    async fn handle(
        &self,
        _myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        if let ActorSinkMessage::Element(x) = message {
            state.lock().unwrap().push(x);
        }
        Ok(())
    }
}

let received = Arc::new(Mutex::new(Vec::<u64>::new()));
let rt = tokio::runtime::Builder::new_multi_thread()
    .enable_all()
    .build()
    .unwrap();
let (actor_ref, _handle) = rt.block_on(async {
    Actor::spawn(None, CollectorActor, Arc::clone(&received))
        .await
        .expect("actor spawns")
});

Source::from_iter(1_u64..=3)
    .run_with(ActorSink::typed(actor_ref))
    .unwrap()
    .wait()
    .unwrap();

// Allow the actor a moment to process the in-flight messages.
std::thread::sleep(Duration::from_millis(50));

ActorSink::actor_ref is the general form: you supply three closures that build the element, complete, and failure messages from your own enum. This avoids wrapping your existing message type in ActorSinkMessage:

rust
use datum::{ActorSink, Source};

// MyMsg is your own enum — no ActorSinkMessage wrapper needed.
enum MyMsg { Item(u64), Done, Failed(String) }

Source::from_iter(1_u64..=3)
    .run_with(ActorSink::actor_ref(
        actor_ref,
        MyMsg::Item,
        || MyMsg::Done,
        |err| MyMsg::Failed(err.to_string()),
    ))
    .unwrap()
    .wait()
    .unwrap();

ActorSink has no backpressure signal from the destination actor: a fast upstream can saturate the actor's mailbox. Use ActorSink::actor_ref_with_backpressure (or the typed equivalent ActorSink::typed_with_backpressure) when the actor processes slowly or the element rate is unbounded — those variants send each element only after receiving an acknowledgement from the actor.

ActorFlow::ask

ActorFlow::ask turns each stream element into a request/reply round-trip to a Ractor actor:

rust
ActorFlow::ask(actor_ref, parallelism, timeout, make_msg)
  • actor_ref — the target actor.
  • parallelism — maximum requests in flight simultaneously (must be > 0).
  • timeout — per-request deadline; the stream fails with StreamError::ActorAskTimeout if a reply is not received in time.
  • make_msg: Fn(In, ReplyPort<Out>) -> Msg — builds the actor message. Your actor must call reply_to.send(value) to deliver the reply.

Output order always matches input order regardless of reply order.

rust
use datum::actor::{Actor, ActorProcessingErr, ActorRef};
use datum::{ActorFlow, ReplyPort, Sink, Source};
use std::time::Duration;

// Define a message enum that carries the reply port.
enum DoubleMsg {
    Double {
        value: u64,
        reply_to: ReplyPort<u64>,
    },
}

// Without the `cluster` feature, any Send + 'static type implements
// ractor's Message trait automatically — no explicit impl needed.
#[cfg(feature = "cluster")]
impl datum::actor::Message for DoubleMsg {}

// A stateless actor that doubles each incoming value.
struct DoubleActor;

impl Actor for DoubleActor {
    type Msg = DoubleMsg;
    type State = ();
    type Arguments = ();

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        _args: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(())
    }

    async fn handle(
        &self,
        _myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        _state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        let DoubleMsg::Double { value, reply_to } = message;
        let _ = reply_to.send(value * 2);
        Ok(())
    }
}

// Spawn the actor on a dedicated Tokio runtime.
let rt = tokio::runtime::Builder::new_multi_thread()
    .enable_all()
    .build()
    .unwrap();
let (actor_ref, _handle) = rt.block_on(async {
    Actor::spawn(None, DoubleActor, ())
        .await
        .expect("actor spawns")
});

// Wire ActorFlow::ask into a stream: each element becomes a request/reply pair.
let result: Vec<u64> = Source::from_iter(1_u64..=4)
    .via(ActorFlow::ask(
        actor_ref.clone(),
        2, // max in-flight requests
        Duration::from_secs(1),
        |x, reply_to| DoubleMsg::Double { value: x, reply_to },
    ))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

actor_ref.stop(None);

Without the cluster feature, any Send + 'static type satisfies Ractor's Message trait automatically. The #[cfg(feature = "cluster")] impl is only needed when building with cluster support enabled.

Timeouts and failures

rust
use datum::actor::{Actor, ActorProcessingErr, ActorRef};
use datum::{ActorFlow, ReplyPort, Sink, Source, StreamError};
use std::time::Duration;

// An actor that holds on to the reply port and never responds.
enum SilentMsg {
    Silent { _reply_to: ReplyPort<u64> },
}

#[cfg(feature = "cluster")]
impl datum::actor::Message for SilentMsg {}

struct SilentActor;

impl Actor for SilentActor {
    type Msg = SilentMsg;
    type State = Vec<ReplyPort<u64>>; // holds ports open to prevent Drop
    type Arguments = ();

    async fn pre_start(
        &self,
        _myself: ActorRef<Self::Msg>,
        _args: Self::Arguments,
    ) -> Result<Self::State, ActorProcessingErr> {
        Ok(Vec::new())
    }

    async fn handle(
        &self,
        _myself: ActorRef<Self::Msg>,
        message: Self::Msg,
        state: &mut Self::State,
    ) -> Result<(), ActorProcessingErr> {
        let SilentMsg::Silent { _reply_to } = message;
        state.push(_reply_to); // intentionally never send
        Ok(())
    }
}

let rt = tokio::runtime::Builder::new_multi_thread()
    .enable_all()
    .build()
    .unwrap();
let (actor_ref, _handle) = rt.block_on(async {
    Actor::spawn(None, SilentActor, ())
        .await
        .expect("actor spawns")
});

let timeout = Duration::from_millis(50);
let result = Source::single(1_u64)
    .via(ActorFlow::ask(
        actor_ref.clone(),
        1,
        timeout,
        |_x, reply_to| SilentMsg::Silent {
            _reply_to: reply_to,
        },
    ))
    .run_with(Sink::collect())
    .unwrap()
    .wait();

actor_ref.stop(None);
Failure causeStreamError variant
Reply not received within timeoutActorAskTimeout { timeout }
Actor terminated before replyingActorTerminated
Reply port dropped without a sendActorAskResponseDropped
make_msg closure panicsActorAskTaskFailed { reason }

ActorFlow::ask_with_status

When your actor needs to signal stream failures — not just return values — use ActorFlow::ask_with_status. The actor replies with ActorStatus::Ok(value) to emit an element or ActorStatus::Err(error) to fail the stream:

rust
use datum::{ActorFlow, ActorStatus, Sink, Source};

// Reply with Ok to emit the doubled value, or Err to fail.
Source::from_iter(1_u64..=3)
    .via(ActorFlow::ask_with_status(
        actor_ref,
        2,
        timeout,
        |x, reply_to| MyMsg::Process { value: x, reply_to },
    ))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

ActorStatus::Err(error) fails the stream with the given StreamError, identical to an upstream failure.

ActorFlow::watch

ActorFlow::watch(actor_ref) is a pass-through flow that fails the stream if actor_ref terminates while the stream is running:

rust
use datum::{ActorFlow, ActorSource, ActorSourceMessage, Keep, Sink};

let (source_ref, completion) = ActorSource::actor_ref::<u64>()
    .via(ActorFlow::watch(guarded_ref.clone()))
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

// If guarded_ref stops, the stream fails with StreamError::Failed.

The flow binds Ractor's monitor supervision: once the watched actor terminates, the stream fails with StreamError::Failed containing a termination message, even if upstream is idle.

ActorPubSub

ActorPubSub uses Ractor's pg process groups as a local pub/sub bus. Each group name is a string identifier shared between sources and sinks.

ActorPubSub::source::<T>(group) subscribes its materialized actor to group. Multiple sources can subscribe to the same group simultaneously.

ActorPubSub::sink::<T>(group) broadcasts each stream element to every current group member as ActorSourceMessage::Element(x), and sends ActorSourceMessage::Complete when the stream ends.

rust
use datum::{ActorPubSub, Keep, Sink, Source};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};

static GROUP_COUNTER: AtomicUsize = AtomicUsize::new(0);

// Use a unique group name per test run to avoid cross-test interference.
let group = format!(
    "docs-pubsub-{}",
    GROUP_COUNTER.fetch_add(1, Ordering::SeqCst),
);

// Source subscribes its materialized actor to the named pg group.
let (_source_ref, completion) = ActorPubSub::source::<u64>(group.clone())
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

// Wait until the source actor has joined the group.
let deadline = Instant::now() + Duration::from_secs(2);
while ractor::pg::get_members(&group).is_empty() {
    assert!(
        Instant::now() < deadline,
        "source actor did not join pg group"
    );
    std::thread::sleep(Duration::from_millis(1));
}

// Sink broadcasts to every current member of the group.
Source::from_iter([1_u64, 2])
    .run_with(ActorPubSub::sink(group))
    .unwrap()
    .wait()
    .unwrap();

Wait for the source actor to join the group before sending (the pg::get_members poll above). The sink skips members that have terminated between the group snapshot and the send, so one stale entry does not block delivery to remaining subscribers.

Performance notes

ActorFlow::ask uses a spin-then-park wait loop per in-flight request. For a single request with parallelism 1 the steady-state round-trip includes a Ractor mailbox send, the actor's async message dispatch, and the reply-port wake. Fast-completing actors (sub-µs) are caught in the spin window. Slower actors park the consumer thread and are woken by ReplyPort::send.

The ordered_sum benchmark in roadmap/benchmarks/actor-ask.md records Datum vs Akka wall-clock and CPU numbers for this path.

Next steps