datasynth-runtime
Runtime orchestration, parallel execution, and memory management.
Overview
datasynth-runtime provides the execution layer for SyntheticData:
- GenerationOrchestrator: Coordinates the complete generation workflow
- Parallel Execution: Multi-threaded generation with Rayon
- Memory Management: Integration with memory guard for OOM prevention
- Progress Tracking: Real-time progress reporting with pause/resume
Key Components
| Component | Description |
|---|---|
GenerationOrchestrator | Main workflow coordinator |
EnhancedOrchestrator | Extended orchestrator with all enterprise features |
ParallelExecutor | Thread pool management |
ProgressTracker | Progress bars and status reporting |
Generation Workflow
The orchestrator executes phases in order:
- Initialize: Load configuration, validate settings
- Master Data: Generate vendors, customers, materials, assets
- Opening Balances: Create coherent opening balance sheet
- Transactions: Generate journal entries with document flows
- Period Close: Run monthly/quarterly/annual close processes
- Anomalies: Inject configured anomalies and data quality issues
- Export: Write outputs and generate ML labels
- Banking: Generate KYC/AML data (if enabled)
- Audit: Generate ISA-compliant audit data (if enabled)
- Graphs: Build and export ML graphs (if enabled)
- LLM Enrichment: Enrich data with LLM-generated metadata (v0.5.0, if enabled)
- Diffusion Enhancement: Blend diffusion model outputs (v0.5.0, if enabled)
- Causal Overlay: Apply causal structure (v0.5.0, if enabled)
- S2C Sourcing: Generate Source-to-Contract procurement pipeline (v0.6.0, if enabled)
- Financial Reporting: Generate bank reconciliations and financial statements (v0.6.0, if enabled)
- HR Data: Generate payroll runs, time entries, and expense reports (v0.6.0, if enabled)
- Accounting Standards: Generate revenue recognition and impairment data (v0.6.0, if enabled)
- Manufacturing: Generate production orders, quality inspections, and cycle counts (v0.6.0, if enabled)
- Sales/KPIs/Budgets: Generate sales quotes, management KPIs, and budget variance data (v0.6.0, if enabled)
Key Types
GenerationOrchestrator
#![allow(unused)]
fn main() {
pub struct GenerationOrchestrator {
config: Config,
state: GenerationState,
progress: Arc<ProgressTracker>,
memory_guard: MemoryGuard,
}
pub struct GenerationState {
pub master_data: MasterDataState,
pub entries: Vec<JournalEntry>,
pub documents: DocumentState,
pub balances: BalanceState,
pub anomaly_labels: Vec<LabeledAnomaly>,
}
}
ProgressTracker
#![allow(unused)]
fn main() {
pub struct ProgressTracker {
pub current: AtomicU64,
pub total: u64,
pub phase: String,
pub paused: AtomicBool,
pub start_time: Instant,
}
pub struct Progress {
pub current: u64,
pub total: u64,
pub percent: f64,
pub phase: String,
pub entries_per_second: f64,
pub elapsed: Duration,
pub estimated_remaining: Duration,
}
}
Usage Examples
Basic Generation
#![allow(unused)]
fn main() {
use synth_runtime::GenerationOrchestrator;
let config = Config::from_yaml_file("config.yaml")?;
let orchestrator = GenerationOrchestrator::new(config)?;
// Run full generation
orchestrator.run()?;
}
With Progress Callback
#![allow(unused)]
fn main() {
orchestrator.run_with_progress(|progress| {
println!(
"[{:.1}%] {} - {}/{} ({:.0} entries/sec)",
progress.percent,
progress.phase,
progress.current,
progress.total,
progress.entries_per_second,
);
})?;
}
Parallel Execution
#![allow(unused)]
fn main() {
use synth_runtime::ParallelExecutor;
let executor = ParallelExecutor::new(4); // 4 threads
let results: Vec<JournalEntry> = executor.run(|thread_id| {
let mut generator = JournalEntryGenerator::new(config.clone(), seed + thread_id);
generator.generate_batch(batch_size)
})?;
}
Memory-Aware Generation
#![allow(unused)]
fn main() {
use synth_runtime::GenerationOrchestrator;
use synth_core::memory_guard::MemoryGuardConfig;
let memory_config = MemoryGuardConfig {
soft_limit: 1024 * 1024 * 1024, // 1GB
hard_limit: 2 * 1024 * 1024 * 1024, // 2GB
check_interval_ms: 1000,
..Default::default()
};
let orchestrator = GenerationOrchestrator::with_memory_config(config, memory_config)?;
orchestrator.run()?;
}
Pause/Resume
On Unix systems, generation can be paused and resumed:
# Start generation in background
datasynth-data generate --config config.yaml --output ./output &
# Send SIGUSR1 to toggle pause
kill -USR1 $(pgrep datasynth-data)
# Progress bar shows pause state
# [████████░░░░░░░░░░░░] 40% (PAUSED)
Programmatic Pause/Resume
#![allow(unused)]
fn main() {
// Pause
orchestrator.pause();
// Check state
if orchestrator.is_paused() {
println!("Generation paused");
}
// Resume
orchestrator.resume();
}
Enhanced Orchestrator
The EnhancedOrchestrator includes additional enterprise features:
#![allow(unused)]
fn main() {
use synth_runtime::EnhancedOrchestrator;
let orchestrator = EnhancedOrchestrator::new(config)?;
// All features enabled
orchestrator
.with_document_flows()
.with_intercompany()
.with_subledgers()
.with_fx()
.with_period_close()
.with_anomaly_injection()
.with_graph_export()
.run()?;
}
Enterprise Process Chain Phases (v0.6.0)
The EnhancedOrchestrator supports six new phases for enterprise process chains, controlled by PhaseConfig:
| Phase | Config Flag | Description |
|---|---|---|
| 14 | generate_sourcing | S2C procurement pipeline: spend analysis through supplier scorecards |
| 15 | generate_financial_statements / generate_bank_reconciliation | Financial statements and bank reconciliations |
| 16 | generate_hr | Payroll runs, time entries, expense reports |
| 17 | generate_accounting_standards | Revenue recognition (ASC 606/IFRS 15), impairment testing |
| 18 | generate_manufacturing | Production orders, quality inspections, cycle counts |
| 19 | generate_sales_kpi_budgets | Sales quotes, management KPIs, budget variance analysis |
Each phase is independently enabled and gracefully skips when its dependencies (e.g., master data) are unavailable.
Output Coordination
The orchestrator coordinates output to multiple sinks:
#![allow(unused)]
fn main() {
// Orchestrator automatically:
// 1. Creates output directories
// 2. Writes master data files
// 3. Writes transaction files
// 4. Writes subledger files
// 5. Writes labels for ML
// 6. Generates graphs if enabled
}
Error Handling
#![allow(unused)]
fn main() {
pub enum RuntimeError {
ConfigurationError(ConfigError),
GenerationError(String),
MemoryExceeded { limit: u64, current: u64 },
OutputError(OutputError),
Interrupted,
}
}
Performance Considerations
Thread Count
#![allow(unused)]
fn main() {
// Auto-detect (uses all cores)
let orchestrator = GenerationOrchestrator::new(config)?;
// Manual thread count
let orchestrator = GenerationOrchestrator::with_threads(config, 4)?;
}
Memory Management
The orchestrator monitors memory and can:
- Slow down generation when soft limit approached
- Pause generation at hard limit
- Stream output to reduce memory pressure
Batch Sizes
Batch sizes are automatically tuned based on:
- Available memory
- Number of threads
- Target throughput