Appearance
Streaming IO
Datum's io module provides byte-stream IO for the most common sources and sinks: synchronous file IO (FileIO), async Tokio-backed file and TCP IO (TokioFileIO, TokioTcp), byte-stream framing (Framing), compression (Compression), and adapters for any std::io::Read or std::io::Write (StreamConverters). All of these compose with the standard Source/Flow/Sink DSL.
File IO
FileIO is a synchronous, low-overhead file adapter backed by StreamConverters under the hood. It runs the reader on Datum's thread pool and keeps a bounded queue between the reader thread and the stream consumer to avoid unbounded read-ahead.
FileIO::from_path(path, chunk_size) → Source<Vec<u8>>
FileIO::from_path_default(path) → Source<Vec<u8>> (chunk_size = 8192)
FileIO::to_path(path) → Sink<Vec<u8>, StreamCompletion<NotUsed>>to_path creates the file if it does not exist and truncates it if it does. The materialized StreamCompletion<NotUsed> resolves when the file is fully flushed and closed.
rust
use datum::{FileIO, Sink, Source};
use std::fs;
// Use a unique temp path to avoid conflicts between parallel test runs.
let path = std::env::temp_dir().join(format!("datum_docs_file_io_{}.bin", std::process::id()));
// Write a file with FileIO::to_path, then read it back with FileIO::from_path.
Source::single(b"datum streaming io test".to_vec())
.run_with(FileIO::to_path(&path))
.unwrap()
.wait()
.unwrap();
let chunks: Vec<Vec<u8>> = FileIO::from_path(&path, 8)
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
let data: Vec<u8> = chunks.into_iter().flatten().collect();
assert_eq!(data, b"datum streaming io test");Performance. Sync file source: 2.21x faster than Akka (wall and CPU). Sync file sink: 2.55x faster. Both CPU and wall times are close, so there is no hidden busy-spin cost.
Tokio file IO
TokioFileIO uses Tokio's async file IO. Compared with FileIO, it adds one producer Tokio task and uses ConsumerWaker::unpark to wake the stream thread after each chunk arrives. The materialized value carries an IoResult with a byte count and terminal status — useful for observability and auditing.
TokioFileIO::from_path(path, chunk_size) → TokioByteSource (= Source<Vec<u8>, StreamCompletion<IoResult>>)
TokioFileIO::from_path_default(path) → TokioByteSource
TokioFileIO::to_path(path) → TokioByteSink (= Sink<Vec<u8>, StreamCompletion<IoResult>>)IoResult fields:
| Method | Description |
|---|---|
.bytes() | Bytes successfully transferred |
.is_success() | true when no error occurred |
.status() | StreamResult<()> — Ok(()) on success, Err(StreamError) on failure |
rust
use datum::{Keep, Sink, TokioFileIO};
let (io_result, frames) = TokioFileIO::from_path("events.log", 8192)
.via(datum::Framing::delimiter(b"\n".to_vec(), 4096, true))
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let lines = frames.wait().unwrap();
let result = io_result.wait().unwrap();
println!("{} bytes read, {} lines", result.bytes(), lines.len());Performance.
- Tokio file source: 1.46x Akka wall-clock (159 µs vs 231 µs). CPU is 200 µs — higher than wall. This is tracked-acceptable: the source uses a two-thread producer/consumer model; the Tokio task calls
ConsumerWaker::unpark()after each chunk send, but residual Tokio task-scheduling overhead keeps CPU above wall. The named lever is the two-thread model itself. - Tokio file sink: 1.77x Akka wall-clock. CPU is close to wall; no spin concern.
Framing
Framing turns a raw byte stream (possibly delivered in arbitrary chunks) into a stream of complete logical frames. All framing flows have type Flow<Vec<u8>, Vec<u8>> and compose with any byte source.
Delimiter framing
rust
use datum::{Framing, Sink, Source};
// Framing::delimiter(delimiter, max_frame_length, allow_truncation)
// splits a byte stream on a byte sequence and strips the delimiter.
// allow_truncation = true emits a partial final frame when the stream ends
// without a terminating delimiter; false would fail with StreamError.
let chunks: Vec<Vec<u8>> = vec![
b"hello\nworld\n".to_vec(),
b"foo".to_vec(), // no trailing newline — emitted as a truncated frame
];
let frames: Vec<Vec<u8>> = Source::from_iter(chunks)
.via(Framing::delimiter(b"\n".to_vec(), 256, true))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();allow_truncation:
true— emit the remaining buffer as a final frame when the stream ends without a terminating delimiter.false— fail the stream withStreamErrorif the stream ends mid-frame.
JSON object framing
rust
use datum::{Framing, Sink, Source};
// Framing::json(max_object_length) splits concatenated JSON objects.
// Input chunks may split anywhere — even inside a key string.
let input: Vec<Vec<u8>> = vec![b"{\"a\":1}{\"b\"".to_vec(), b":2}".to_vec()];
let frames: Vec<Vec<u8>> = Source::from_iter(input)
.via(Framing::json(1024))
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();Framing::json(max_object_length) extracts top-level JSON objects from a concatenated byte stream. It handles objects split across arbitrary chunk boundaries and advances past outer array brackets and commas so it works on both {...}{...} and [{...},{...}] input.
Framing methods
| Method | Splits on |
|---|---|
Framing::delimiter(delimiter, max_len, allow_truncation) | Byte sequence; strips the delimiter |
Framing::length_field(field_len, field_offset, max_len, byte_order) | Length-prefixed frames; field_len is 1–4 bytes |
Framing::json(max_len) | Top-level JSON objects |
FramingByteOrder::BigEndian and FramingByteOrder::LittleEndian control how length_field interprets the length header.
Performance. Delimiter framing: 1.46x Akka, 3.18x lower allocation. JSON framing: at parity (1.00x); no gap to close.
Compression
Compression flows compress or decompress a Vec<u8> byte stream in-process using flate2. All four variants return Flow<Vec<u8>, Vec<u8>>.
| Flow | Direction |
|---|---|
Compression::gzip() | Compress (gzip format) |
Compression::gunzip() | Decompress (gzip format) |
Compression::deflate() | Compress (zlib/deflate format) |
Compression::inflate() | Decompress (zlib/deflate format) |
rust
use datum::{Compression, Sink, Source};
// Compress a byte stream with gzip, then decompress it back.
let compressed: Vec<Vec<u8>> = Source::from_iter([b"hello ".to_vec(), b"world".to_vec()])
.via(Compression::gzip())
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
let recovered: Vec<u8> = Source::from_iter(compressed)
.via(Compression::gunzip())
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap()
.into_iter()
.flatten()
.collect();Compression and decompression flows compose directly with framing — for example, gzip a newline-delimited log file after framing it by line:
rust
use datum::{Compression, FileIO, Framing, Sink};
let compressed_lines = FileIO::from_path_default("events.log")
.via(Framing::delimiter(b"\n".to_vec(), 4096, true))
.via(Compression::gzip())
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();StreamConverters
StreamConverters wraps any std::io::Read or std::io::Write implementation as a Source<Vec<u8>> or Sink<Vec<u8>, StreamCompletion<NotUsed>>. This is the primary extension point for custom IO: HTTP response bodies, in-memory cursors, pipes, and any other Read/Write all work without extra adapters.
The factory closure is called at materialization, not at blueprint construction. Building the source or sink has no side effects.
StreamConverters::from_reader(factory, chunk_size) → Source<Vec<u8>>
where factory: Fn() -> std::io::Result<R> + Send + Sync + 'static
R: std::io::Read + Send + 'static
StreamConverters::to_writer(factory) → Sink<Vec<u8>, StreamCompletion<NotUsed>>
where factory: Fn() -> std::io::Result<W> + Send + Sync + 'static
W: std::io::Write + Send + 'staticrust
use datum::{Sink, StreamConverters};
use std::io::Cursor;
// StreamConverters::from_reader(factory, chunk_size) wraps any std::io::Read.
// The factory is called at materialization (blueprint-safe, not at construction).
let chunks: Vec<Vec<u8>> = StreamConverters::from_reader(
|| Ok(Cursor::new(b"hello world".to_vec())),
4, // emit at most 4 bytes per chunk
)
.run_with(Sink::collect())
.unwrap()
.wait()
.unwrap();
let combined: Vec<u8> = chunks.into_iter().flatten().collect();
assert_eq!(combined, b"hello world");The writer sink flushes exactly once — either on stream completion or on error — and drops the writer after flushing. Upstream errors are forwarded to the materialized StreamCompletion.
TCP
TokioTcp provides plain TCP server and client sources backed by Tokio's async runtime. TLS and UDP are outside the current scope.
Binding a server
TokioTcp::bind_default(addr) returns a Source<TcpIncomingConnection, StreamCompletion<TcpBinding>>. The materialized TcpBinding is available as soon as the listener is bound — before any connections are accepted. Each element in the source is one accepted TcpIncomingConnection.
rust
use datum::{Keep, Sink, TokioTcp};
// Bind on an OS-assigned port.
let (binding_completion, incoming) = TokioTcp::bind_default("127.0.0.1:0")
.to_mat(Sink::head(), Keep::both)
.run()
.unwrap();
let binding = binding_completion.wait().unwrap();
println!("Listening on {}", binding.local_addr());
// Accept one connection and split it into independent byte source and sink.
let conn = incoming.wait().unwrap();
let (source, sink) = conn.into_parts();
// source: TokioByteSource — reads bytes from the connection
// sink: TokioByteSink — writes bytes to the connectionTcpIncomingConnection methods:
| Method | Returns |
|---|---|
.local_addr() | SocketAddr of the server side |
.remote_addr() | SocketAddr of the connected client |
.connection() | TcpConnection metadata struct |
.into_parts() | (TokioByteSource, TokioByteSink) — independent halves |
.into_flow() | Flow<Vec<u8>, Vec<u8>, NotUsed> — coupled echo flow |
Outgoing connections
TokioTcp::outgoing_connection_default(addr) returns Flow<Vec<u8>, Vec<u8>, StreamCompletion<TcpConnection>>. The upstream input is written to the socket; bytes received from the socket are emitted downstream. The materialized TcpConnection carries the local and remote addresses.
rust
use datum::{Keep, Sink, Source, TokioTcp};
// Send "ping" to a server and collect whatever it echoes back.
let (conn_completion, response) = Source::single(b"ping".to_vec())
.via(TokioTcp::outgoing_connection_default("127.0.0.1:8080"))
.to_mat(Sink::collect(), Keep::both)
.run()
.unwrap();
let bytes = response.wait().unwrap();
let conn = conn_completion.wait().unwrap();
println!("Connected {} → {}", conn.local_addr(), conn.remote_addr());Minimal echo server
rust
use datum::{Keep, Sink, Source, TokioTcp};
// Bind the server.
let (binding_completion, first_incoming) = TokioTcp::bind_default("127.0.0.1:0")
.to_mat(Sink::head(), Keep::both)
.run()
.unwrap();
let binding = binding_completion.wait().unwrap();
// Accept one connection and echo everything back.
let conn = first_incoming.wait().unwrap();
let echo_flow = conn.into_flow();
Source::empty::<Vec<u8>>()
.via(echo_flow)
.run_with(Sink::ignore())
.unwrap()
.wait()
.ok();Performance. TCP echo round-trip (64-byte payload): 1.70x Akka wall-clock (205 µs vs 350 µs). CPU is 600 µs — significantly higher than wall. This is tracked-acceptable: 64-byte items complete within the 256-yield spin window, so the consumer thread is spinning when unpark fires. Reducing the spin budget measured a ~4x wall-clock regression. The named lever is the spin budget constant (READ_READY_SPINS).
Performance summary
| Scenario | Datum µs/op | CPU µs/op | Akka µs/op | Speedup | Notes |
|---|---|---|---|---|---|
| delimiter_framing_lines_10k | 919 | 875 | 1,343 | 1.46x | 3.18x lower alloc |
| json_framing_objects_10k | 1,065 | 1,125 | 1,065 | 1.00x | at parity |
| file_source_256k_chunk8192 | 107 | 100 | 236 | 2.21x | sync FileIO |
| file_sink_256k_chunk8192 | 1,435 | 500 | 3,667 | 2.55x | sync FileIO |
| tokio_file_source_256k_chunk8192 | 159 | 200 | 231 | 1.46x | CPU > wall: tracked-acceptable (two-thread model) |
| tokio_file_sink_256k_chunk8192 | 2,032 | 2,300 | 3,604 | 1.77x | |
| tcp_echo_roundtrip_64b | 205 | 600 | 350 | 1.70x | CPU > wall: tracked-acceptable (spin budget) |
Next steps
- Futures Interop — integrating
async/awaitwith streams - Execution Model — how the thread pool, fused paths, and spin-then-park interact