Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Streaming Output

SyntheticData provides streaming output sinks for real-time data generation, enabling memory-efficient export of large datasets without loading everything into memory at once.

Overview

The streaming module in datasynth-output implements the StreamingSink trait for four output formats:

SinkDescriptionFile Extension
CsvStreamingSinkCSV with automatic headers.csv
JsonStreamingSinkPretty-printed JSON arrays.json
NdjsonStreamingSinkNewline-delimited JSON.jsonl / .ndjson
ParquetStreamingSinkApache Parquet columnar.parquet

All streaming sinks accept StreamEvent values through the process() method:

#![allow(unused)]
fn main() {
pub enum StreamEvent<T> {
    Data(T),       // A data record to write
    Flush,         // Force flush to disk
    Close,         // Close the stream
}
}

StreamingSink Trait

All streaming sinks implement:

#![allow(unused)]
fn main() {
pub trait StreamingSink<T: Serialize + Send> {
    /// Process a single stream event (data, flush, or close).
    fn process(&mut self, event: StreamEvent<T>) -> SynthResult<()>;

    /// Close the stream and flush remaining data.
    fn close(&mut self) -> SynthResult<()>;

    /// Return the number of items written so far.
    fn items_written(&self) -> u64;

    /// Return the number of bytes written so far.
    fn bytes_written(&self) -> u64;
}
}

When to Use Streaming vs Batch

ScenarioRecommendation
< 100K recordsBatch (CsvSink / JsonSink) — simpler API
100K–10M recordsStreaming — lower memory footprint
> 10M recordsStreaming with Parquet — columnar compression
Real-time consumersStreaming NDJSON — line-by-line parsing
REST/WebSocket APIStreaming — integrated with server endpoints

CSV Streaming

#![allow(unused)]
fn main() {
use datasynth_output::streaming::CsvStreamingSink;
use datasynth_core::traits::StreamEvent;

let mut sink = CsvStreamingSink::<JournalEntry>::new("output.csv".into())?;

// Write records one at a time (memory efficient)
for entry in generate_entries() {
    sink.process(StreamEvent::Data(entry))?;
}

// Periodic flush (optional — ensures data is on disk)
sink.process(StreamEvent::Flush)?;

// Close when done
sink.close()?;

println!("Wrote {} items ({} bytes)", sink.items_written(), sink.bytes_written());
}

Headers are written automatically on the first Data event.

JSON Streaming

Pretty-printed JSON Array

#![allow(unused)]
fn main() {
use datasynth_output::streaming::JsonStreamingSink;

let mut sink = JsonStreamingSink::<JournalEntry>::new("output.json".into())?;
for entry in entries {
    sink.process(StreamEvent::Data(entry))?;
}
sink.close()?;  // Writes closing bracket
}

Output:

[
  { "document_id": "abc-001", ... },
  { "document_id": "abc-002", ... }
]

Newline-Delimited JSON (NDJSON)

#![allow(unused)]
fn main() {
use datasynth_output::streaming::NdjsonStreamingSink;

let mut sink = NdjsonStreamingSink::<JournalEntry>::new("output.jsonl".into())?;
for entry in entries {
    sink.process(StreamEvent::Data(entry))?;
}
sink.close()?;
}

Output:

{"document_id":"abc-001",...}
{"document_id":"abc-002",...}

NDJSON is ideal for streaming consumers that process records line by line (e.g., jq, Kafka, log aggregators).

Parquet Streaming

Apache Parquet provides columnar compression, making it ideal for large analytical datasets:

#![allow(unused)]
fn main() {
use datasynth_output::streaming::ParquetStreamingSink;

let mut sink = ParquetStreamingSink::<JournalEntry>::new("output.parquet".into())?;
for entry in entries {
    sink.process(StreamEvent::Data(entry))?;
}
sink.close()?;
}

Parquet benefits:

  • Columnar storage: Efficient for analytical queries that touch few columns
  • Built-in compression: Snappy, Gzip, or Zstd per column group
  • Schema embedding: Self-describing files with full type information
  • Predicate pushdown: Query engines can skip irrelevant row groups

Configuration

Streaming output is enabled when using the server API or when the runtime detects memory pressure:

output:
  format: csv           # csv, json, jsonl, parquet
  streaming: true       # Enable streaming mode
  compression: none     # none, gzip, zstd (CSV/JSON) or snappy/gzip/zstd (Parquet)

Server Streaming

The server API uses streaming sinks for the /api/stream/ endpoints:

# Start streaming generation
curl -X POST http://localhost:3000/api/stream/start \
  -H "Content-Type: application/json" \
  -d '{"config": {...}, "format": "ndjson"}'

# WebSocket streaming
wscat -c ws://localhost:3000/ws/events

Backpressure

Streaming sinks monitor write throughput and provide backpressure signals:

  • items_written() / bytes_written(): Track progress for rate limiting
  • Flush events: Force periodic disk writes to bound memory usage
  • Disk space monitoring: The runtime’s DiskGuard can pause generation when disk space runs low

Performance

FormatThroughputFile SizeUse Case
CSV~150K rows/secLargestUniversal compatibility
NDJSON~120K rows/secLargeStreaming consumers
JSON~100K rows/secLargeHuman-readable
Parquet~80K rows/secSmallestAnalytics, data lakes

Throughput varies with record complexity and disk speed.

See Also