Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

datasynth-output

Output sinks for CSV, JSON, and streaming formats.

Overview

datasynth-output provides the output layer for SyntheticData:

  • CSV Sink: High-performance CSV writing with optional compression
  • JSON Sink: JSON and JSONL (newline-delimited) output
  • Streaming: Async streaming output for real-time generation
  • Control Export: Internal control and SoD rule export

Supported Formats

Standard Formats

FormatDescriptionExtension
CSVStandard comma-separated values.csv
JSONPretty-printed JSON arrays.json
JSONLNewline-delimited JSON.jsonl
ParquetApache Parquet columnar format.parquet

ERP Formats

FormatTarget ERPTables
SAP S/4HANASapExporterBKPF, BSEG, ACDOCA, LFA1, KNA1, MARA, CSKS, CEPC
Oracle EBSOracleExporterGL_JE_HEADERS, GL_JE_LINES, GL_JE_BATCHES
NetSuiteNetSuiteExporterJournal entries with subsidiary/multi-book support

Streaming Sinks

SinkDescription
CsvStreamingSinkStreaming CSV with automatic headers
JsonStreamingSinkStreaming JSON arrays
NdjsonStreamingSinkStreaming newline-delimited JSON
ParquetStreamingSinkStreaming Apache Parquet

Features

  • Configurable compression (gzip, zstd, snappy for Parquet)
  • Streaming writes for memory efficiency with backpressure support
  • ERP-native table schemas (SAP, Oracle, NetSuite)
  • Decimal values serialized as strings (IEEE 754 safe)
  • Configurable field ordering and headers
  • Automatic directory creation

Key Types

OutputConfig

#![allow(unused)]
fn main() {
pub struct OutputConfig {
    pub format: OutputFormat,
    pub compression: CompressionType,
    pub compression_level: u32,
    pub include_headers: bool,
    pub decimal_precision: u32,
}

pub enum OutputFormat {
    Csv,
    Json,
    Jsonl,
}

pub enum CompressionType {
    None,
    Gzip,
    Zstd,
}
}

CsvSink

#![allow(unused)]
fn main() {
pub struct CsvSink<T> {
    writer: BufWriter<Box<dyn Write>>,
    config: OutputConfig,
    headers_written: bool,
    _phantom: PhantomData<T>,
}
}

JsonSink

#![allow(unused)]
fn main() {
pub struct JsonSink<T> {
    writer: BufWriter<Box<dyn Write>>,
    format: JsonFormat,
    first_written: bool,
    _phantom: PhantomData<T>,
}
}

Usage Examples

CSV Output

#![allow(unused)]
fn main() {
use synth_output::{CsvSink, OutputConfig, OutputFormat};

// Create sink
let config = OutputConfig {
    format: OutputFormat::Csv,
    compression: CompressionType::None,
    include_headers: true,
    ..Default::default()
};

let mut sink = CsvSink::new("output/journal_entries.csv", config)?;

// Write data
sink.write_batch(&entries)?;
sink.flush()?;
}

Compressed Output

#![allow(unused)]
fn main() {
use synth_output::{CsvSink, OutputConfig, CompressionType};

let config = OutputConfig {
    compression: CompressionType::Gzip,
    compression_level: 6,
    ..Default::default()
};

let mut sink = CsvSink::new("output/entries.csv.gz", config)?;
sink.write_batch(&entries)?;
}

JSON Streaming

#![allow(unused)]
fn main() {
use synth_output::{JsonSink, OutputConfig, OutputFormat};

let config = OutputConfig {
    format: OutputFormat::Jsonl,
    ..Default::default()
};

let mut sink = JsonSink::new("output/entries.jsonl", config)?;

// Stream writes (memory efficient)
for entry in entries {
    sink.write(&entry)?;
}
sink.flush()?;
}

Control Export

#![allow(unused)]
fn main() {
use synth_output::ControlExporter;

let exporter = ControlExporter::new("output/controls/");

// Export all control-related data
exporter.export_controls(&internal_controls)?;
exporter.export_sod_rules(&sod_rules)?;
exporter.export_control_mappings(&mappings)?;
}

Sink Trait Implementation

All sinks implement the Sink trait:

#![allow(unused)]
fn main() {
impl<T: Serialize> Sink<T> for CsvSink<T> {
    type Error = OutputError;

    fn write(&mut self, item: &T) -> Result<(), Self::Error> {
        // Single item write
    }

    fn write_batch(&mut self, items: &[T]) -> Result<(), Self::Error> {
        // Batch write for efficiency
    }

    fn flush(&mut self) -> Result<(), Self::Error> {
        // Ensure all data written to disk
    }
}
}

Decimal Serialization

Financial amounts are serialized as strings to prevent IEEE 754 floating-point issues:

#![allow(unused)]
fn main() {
// Internal: Decimal
let amount = dec!(1234.56);

// CSV output: "1234.56" (string)
// JSON output: "1234.56" (string, not number)
}

This ensures exact decimal representation across all systems.

Performance Tips

Batch Writes

Prefer batch writes over individual writes:

#![allow(unused)]
fn main() {
// Good: Single batch write
sink.write_batch(&entries)?;

// Less efficient: Multiple writes
for entry in &entries {
    sink.write(entry)?;
}
}

Buffer Size

The default buffer size is 8KB. For very large outputs, consider adjusting:

#![allow(unused)]
fn main() {
let sink = CsvSink::with_buffer_size(
    "output/large.csv",
    config,
    64 * 1024, // 64KB buffer
)?;
}

Compression Trade-offs

CompressionSpeedSizeUse Case
NoneFastestLargestDevelopment, streaming
GzipMediumSmallGeneral purpose
ZstdFastSmallestProduction, archival

Output Structure

The output module creates organized directory structure:

output/
├── master_data/
│   ├── vendors.csv
│   └── customers.csv
├── transactions/
│   ├── journal_entries.csv
│   └── acdoca.csv
├── controls/
│   ├── internal_controls.csv
│   └── sod_rules.csv
└── labels/
    └── anomaly_labels.csv

Error Handling

#![allow(unused)]
fn main() {
pub enum OutputError {
    IoError(std::io::Error),
    SerializationError(String),
    CompressionError(String),
    DirectoryCreationError(PathBuf),
}
}

See Also