Skip to content

Streaming IO

Datum's io module provides byte-stream IO for the most common sources and sinks: synchronous file IO (FileIO), async Tokio-backed file and TCP IO (TokioFileIO, TokioTcp), byte-stream framing (Framing), compression (Compression), and adapters for any std::io::Read or std::io::Write (StreamConverters). All of these compose with the standard Source/Flow/Sink DSL.

File IO

FileIO is a synchronous, low-overhead file adapter backed by StreamConverters under the hood. It runs the reader on Datum's thread pool and keeps a bounded queue between the reader thread and the stream consumer to avoid unbounded read-ahead.

FileIO::from_path(path, chunk_size)  →  Source<Vec<u8>>
FileIO::from_path_default(path)      →  Source<Vec<u8>>   (chunk_size = 8192)
FileIO::to_path(path)                →  Sink<Vec<u8>, StreamCompletion<NotUsed>>

to_path creates the file if it does not exist and truncates it if it does. The materialized StreamCompletion<NotUsed> resolves when the file is fully flushed and closed.

rust
use datum::{FileIO, Sink, Source};
use std::fs;

// Use a unique temp path to avoid conflicts between parallel test runs.
let path = std::env::temp_dir().join(format!("datum_docs_file_io_{}.bin", std::process::id()));

// Write a file with FileIO::to_path, then read it back with FileIO::from_path.
Source::single(b"datum streaming io test".to_vec())
    .run_with(FileIO::to_path(&path))
    .unwrap()
    .wait()
    .unwrap();

let chunks: Vec<Vec<u8>> = FileIO::from_path(&path, 8)
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

let data: Vec<u8> = chunks.into_iter().flatten().collect();
assert_eq!(data, b"datum streaming io test");

Performance. Sync file source: 2.21x faster than Akka (wall and CPU). Sync file sink: 2.55x faster. Both CPU and wall times are close, so there is no hidden busy-spin cost.

Tokio file IO

TokioFileIO uses Tokio's async file IO. Compared with FileIO, it adds one producer Tokio task and uses ConsumerWaker::unpark to wake the stream thread after each chunk arrives. The materialized value carries an IoResult with a byte count and terminal status — useful for observability and auditing.

TokioFileIO::from_path(path, chunk_size)  →  TokioByteSource  (= Source<Vec<u8>, StreamCompletion<IoResult>>)
TokioFileIO::from_path_default(path)      →  TokioByteSource
TokioFileIO::to_path(path)                →  TokioByteSink    (= Sink<Vec<u8>, StreamCompletion<IoResult>>)

IoResult fields:

MethodDescription
.bytes()Bytes successfully transferred
.is_success()true when no error occurred
.status()StreamResult<()>Ok(()) on success, Err(StreamError) on failure
rust
use datum::{Keep, Sink, TokioFileIO};

let (io_result, frames) = TokioFileIO::from_path("events.log", 8192)
    .via(datum::Framing::delimiter(b"\n".to_vec(), 4096, true))
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

let lines = frames.wait().unwrap();
let result = io_result.wait().unwrap();
println!("{} bytes read, {} lines", result.bytes(), lines.len());

Performance.

  • Tokio file source: 1.46x Akka wall-clock (159 µs vs 231 µs). CPU is 200 µs — higher than wall. This is tracked-acceptable: the source uses a two-thread producer/consumer model; the Tokio task calls ConsumerWaker::unpark() after each chunk send, but residual Tokio task-scheduling overhead keeps CPU above wall. The named lever is the two-thread model itself.
  • Tokio file sink: 1.77x Akka wall-clock. CPU is close to wall; no spin concern.

Framing

Framing turns a raw byte stream (possibly delivered in arbitrary chunks) into a stream of complete logical frames. All framing flows have type Flow<Vec<u8>, Vec<u8>> and compose with any byte source.

Delimiter framing

rust
use datum::{Framing, Sink, Source};

// Framing::delimiter(delimiter, max_frame_length, allow_truncation)
// splits a byte stream on a byte sequence and strips the delimiter.
// allow_truncation = true emits a partial final frame when the stream ends
// without a terminating delimiter; false would fail with StreamError.
let chunks: Vec<Vec<u8>> = vec![
    b"hello\nworld\n".to_vec(),
    b"foo".to_vec(), // no trailing newline — emitted as a truncated frame
];

let frames: Vec<Vec<u8>> = Source::from_iter(chunks)
    .via(Framing::delimiter(b"\n".to_vec(), 256, true))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

allow_truncation:

  • true — emit the remaining buffer as a final frame when the stream ends without a terminating delimiter.
  • false — fail the stream with StreamError if the stream ends mid-frame.

JSON object framing

rust
use datum::{Framing, Sink, Source};

// Framing::json(max_object_length) splits concatenated JSON objects.
// Input chunks may split anywhere — even inside a key string.
let input: Vec<Vec<u8>> = vec![b"{\"a\":1}{\"b\"".to_vec(), b":2}".to_vec()];

let frames: Vec<Vec<u8>> = Source::from_iter(input)
    .via(Framing::json(1024))
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

Framing::json(max_object_length) extracts top-level JSON objects from a concatenated byte stream. It handles objects split across arbitrary chunk boundaries and advances past outer array brackets and commas so it works on both {...}{...} and [{...},{...}] input.

Framing methods

MethodSplits on
Framing::delimiter(delimiter, max_len, allow_truncation)Byte sequence; strips the delimiter
Framing::length_field(field_len, field_offset, max_len, byte_order)Length-prefixed frames; field_len is 1–4 bytes
Framing::json(max_len)Top-level JSON objects

FramingByteOrder::BigEndian and FramingByteOrder::LittleEndian control how length_field interprets the length header.

Performance. Delimiter framing: 1.46x Akka, 3.18x lower allocation. JSON framing: at parity (1.00x); no gap to close.

Compression

Compression flows compress or decompress a Vec<u8> byte stream in-process using flate2. All four variants return Flow<Vec<u8>, Vec<u8>>.

FlowDirection
Compression::gzip()Compress (gzip format)
Compression::gunzip()Decompress (gzip format)
Compression::deflate()Compress (zlib/deflate format)
Compression::inflate()Decompress (zlib/deflate format)
rust
use datum::{Compression, Sink, Source};

// Compress a byte stream with gzip, then decompress it back.
let compressed: Vec<Vec<u8>> = Source::from_iter([b"hello ".to_vec(), b"world".to_vec()])
    .via(Compression::gzip())
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

let recovered: Vec<u8> = Source::from_iter(compressed)
    .via(Compression::gunzip())
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap()
    .into_iter()
    .flatten()
    .collect();

Compression and decompression flows compose directly with framing — for example, gzip a newline-delimited log file after framing it by line:

rust
use datum::{Compression, FileIO, Framing, Sink};

let compressed_lines = FileIO::from_path_default("events.log")
    .via(Framing::delimiter(b"\n".to_vec(), 4096, true))
    .via(Compression::gzip())
    .run_with(Sink::collect())
    .unwrap()
    .wait()
    .unwrap();

StreamConverters

StreamConverters wraps any std::io::Read or std::io::Write implementation as a Source<Vec<u8>> or Sink<Vec<u8>, StreamCompletion<NotUsed>>. This is the primary extension point for custom IO: HTTP response bodies, in-memory cursors, pipes, and any other Read/Write all work without extra adapters.

The factory closure is called at materialization, not at blueprint construction. Building the source or sink has no side effects.

StreamConverters::from_reader(factory, chunk_size)  →  Source<Vec<u8>>
    where factory: Fn() -> std::io::Result<R> + Send + Sync + 'static
          R: std::io::Read + Send + 'static

StreamConverters::to_writer(factory)  →  Sink<Vec<u8>, StreamCompletion<NotUsed>>
    where factory: Fn() -> std::io::Result<W> + Send + Sync + 'static
          W: std::io::Write + Send + 'static
rust
use datum::{Sink, StreamConverters};
use std::io::Cursor;

// StreamConverters::from_reader(factory, chunk_size) wraps any std::io::Read.
// The factory is called at materialization (blueprint-safe, not at construction).
let chunks: Vec<Vec<u8>> = StreamConverters::from_reader(
    || Ok(Cursor::new(b"hello world".to_vec())),
    4, // emit at most 4 bytes per chunk
)
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();

let combined: Vec<u8> = chunks.into_iter().flatten().collect();
assert_eq!(combined, b"hello world");

The writer sink flushes exactly once — either on stream completion or on error — and drops the writer after flushing. Upstream errors are forwarded to the materialized StreamCompletion.

TCP

TokioTcp provides plain TCP server and client sources backed by Tokio's async runtime. TLS and UDP are outside the current scope.

Binding a server

TokioTcp::bind_default(addr) returns a Source<TcpIncomingConnection, StreamCompletion<TcpBinding>>. The materialized TcpBinding is available as soon as the listener is bound — before any connections are accepted. Each element in the source is one accepted TcpIncomingConnection.

rust
use datum::{Keep, Sink, TokioTcp};

// Bind on an OS-assigned port.
let (binding_completion, incoming) = TokioTcp::bind_default("127.0.0.1:0")
    .to_mat(Sink::head(), Keep::both)
    .run()
    .unwrap();

let binding = binding_completion.wait().unwrap();
println!("Listening on {}", binding.local_addr());

// Accept one connection and split it into independent byte source and sink.
let conn = incoming.wait().unwrap();
let (source, sink) = conn.into_parts();
// source: TokioByteSource — reads bytes from the connection
// sink:   TokioByteSink   — writes bytes to the connection

TcpIncomingConnection methods:

MethodReturns
.local_addr()SocketAddr of the server side
.remote_addr()SocketAddr of the connected client
.connection()TcpConnection metadata struct
.into_parts()(TokioByteSource, TokioByteSink) — independent halves
.into_flow()Flow<Vec<u8>, Vec<u8>, NotUsed> — coupled echo flow

Outgoing connections

TokioTcp::outgoing_connection_default(addr) returns Flow<Vec<u8>, Vec<u8>, StreamCompletion<TcpConnection>>. The upstream input is written to the socket; bytes received from the socket are emitted downstream. The materialized TcpConnection carries the local and remote addresses.

rust
use datum::{Keep, Sink, Source, TokioTcp};

// Send "ping" to a server and collect whatever it echoes back.
let (conn_completion, response) = Source::single(b"ping".to_vec())
    .via(TokioTcp::outgoing_connection_default("127.0.0.1:8080"))
    .to_mat(Sink::collect(), Keep::both)
    .run()
    .unwrap();

let bytes = response.wait().unwrap();
let conn = conn_completion.wait().unwrap();
println!("Connected {} → {}", conn.local_addr(), conn.remote_addr());

Minimal echo server

rust
use datum::{Keep, Sink, Source, TokioTcp};

// Bind the server.
let (binding_completion, first_incoming) = TokioTcp::bind_default("127.0.0.1:0")
    .to_mat(Sink::head(), Keep::both)
    .run()
    .unwrap();
let binding = binding_completion.wait().unwrap();

// Accept one connection and echo everything back.
let conn = first_incoming.wait().unwrap();
let echo_flow = conn.into_flow();
Source::empty::<Vec<u8>>()
    .via(echo_flow)
    .run_with(Sink::ignore())
    .unwrap()
    .wait()
    .ok();

Performance. TCP echo round-trip (64-byte payload): 1.70x Akka wall-clock (205 µs vs 350 µs). CPU is 600 µs — significantly higher than wall. This is tracked-acceptable: 64-byte items complete within the 256-yield spin window, so the consumer thread is spinning when unpark fires. Reducing the spin budget measured a ~4x wall-clock regression. The named lever is the spin budget constant (READ_READY_SPINS).

Performance summary

ScenarioDatum µs/opCPU µs/opAkka µs/opSpeedupNotes
delimiter_framing_lines_10k9198751,3431.46x3.18x lower alloc
json_framing_objects_10k1,0651,1251,0651.00xat parity
file_source_256k_chunk81921071002362.21xsync FileIO
file_sink_256k_chunk81921,4355003,6672.55xsync FileIO
tokio_file_source_256k_chunk81921592002311.46xCPU > wall: tracked-acceptable (two-thread model)
tokio_file_sink_256k_chunk81922,0322,3003,6041.77x
tcp_echo_roundtrip_64b2056003501.70xCPU > wall: tracked-acceptable (spin budget)

Next steps