Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Generation Pipeline

Step-by-step generation process orchestrated by datasynth-runtime.

Pipeline Overview

┌─────────────────────────────────────────────────────────────────────┐
│                      GenerationOrchestrator                          │
│                                                                      │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐   │
│  │Init  │→│Master│→│Open  │→│Trans │→│Close │→│Inject│→│Export│   │
│  │      │ │Data  │ │Bal   │ │      │ │      │ │      │ │      │   │
│  └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘ └──────┘   │
│                                                                      │
└─────────────────────────────────────────────────────────────────────┘

Stage 1: Initialization

Purpose: Prepare generation environment

#![allow(unused)]
fn main() {
pub fn initialize(&mut self) -> Result<()> {
    // 1. Validate configuration
    ConfigValidator::new().validate(&self.config)?;

    // 2. Initialize RNG with seed
    self.rng = ChaCha8Rng::seed_from_u64(self.config.global.seed);

    // 3. Create UUID factory
    self.uuid_factory = DeterministicUuidFactory::new(self.config.global.seed);

    // 4. Set up memory guard
    self.memory_guard = MemoryGuard::new(self.config.memory_config());

    // 5. Create output directories
    fs::create_dir_all(&self.output_path)?;

    Ok(())
}
}

Outputs:

  • Validated configuration
  • Initialized RNG
  • UUID factory
  • Memory guard
  • Output directories

Stage 2: Master Data Generation

Purpose: Generate all entity master records

#![allow(unused)]
fn main() {
pub fn generate_master_data(&mut self) -> Result<MasterDataState> {
    let mut state = MasterDataState::new();

    // 1. Chart of Accounts
    let coa_gen = CoaGenerator::new(&self.config, &mut self.rng);
    state.chart_of_accounts = coa_gen.generate()?;

    // 2. Employees (needed for approvals)
    let emp_gen = EmployeeGenerator::new(&self.config, &mut self.rng);
    state.employees = emp_gen.generate()?;

    // 3. Vendors (reference employees)
    let vendor_gen = VendorGenerator::new(&self.config, &mut self.rng);
    state.vendors = vendor_gen.generate()?;

    // 4. Customers
    let customer_gen = CustomerGenerator::new(&self.config, &mut self.rng);
    state.customers = customer_gen.generate()?;

    // 5. Materials
    let material_gen = MaterialGenerator::new(&self.config, &mut self.rng);
    state.materials = material_gen.generate()?;

    // 6. Fixed Assets
    let asset_gen = AssetGenerator::new(&self.config, &mut self.rng);
    state.fixed_assets = asset_gen.generate()?;

    // 7. Register in entity registry
    self.registry.register_all(&state);

    Ok(state)
}
}

Outputs:

  • Chart of Accounts
  • Vendors, Customers
  • Materials, Fixed Assets
  • Employees
  • Entity Registry

Stage 3: Opening Balance Generation

Purpose: Create coherent opening balance sheet

#![allow(unused)]
fn main() {
pub fn generate_opening_balances(&mut self) -> Result<Vec<JournalEntry>> {
    let generator = OpeningBalanceGenerator::new(
        &self.config,
        &self.state.chart_of_accounts,
        &mut self.rng,
    );

    let entries = generator.generate()?;

    // Initialize balance tracker
    self.balance_tracker = BalanceTracker::new(&self.state.chart_of_accounts);
    for entry in &entries {
        self.balance_tracker.post(entry)?;
    }

    // Verify A = L + E
    assert!(self.balance_tracker.is_balanced());

    Ok(entries)
}
}

Outputs:

  • Opening balance entries
  • Initialized balance tracker

Stage 4: Transaction Generation

Purpose: Generate main transaction volume

#![allow(unused)]
fn main() {
pub fn generate_transactions(&mut self) -> Result<Vec<JournalEntry>> {
    let target = self.config.transactions.target_count;
    let mut entries = Vec::with_capacity(target as usize);

    // Calculate counts by source
    let p2p_count = (target as f64 * self.config.document_flows.p2p.flow_rate) as u64;
    let o2c_count = (target as f64 * self.config.document_flows.o2c.flow_rate) as u64;
    let other_count = target - p2p_count - o2c_count;

    // 1. Generate P2P flows
    let p2p_entries = self.generate_p2p_flows(p2p_count)?;
    entries.extend(p2p_entries);

    // 2. Generate O2C flows
    let o2c_entries = self.generate_o2c_flows(o2c_count)?;
    entries.extend(o2c_entries);

    // 3. Generate other entries (manual, recurring, etc.)
    let other_entries = self.generate_other_entries(other_count)?;
    entries.extend(other_entries);

    // 4. Sort by posting date
    entries.sort_by_key(|e| e.header.posting_date);

    // 5. Update balance tracker
    for entry in &entries {
        self.balance_tracker.post(entry)?;
    }

    Ok(entries)
}
}

P2P Flow Generation

#![allow(unused)]
fn main() {
fn generate_p2p_flows(&mut self, count: u64) -> Result<Vec<JournalEntry>> {
    let mut p2p_gen = P2pGenerator::new(&self.config, &self.registry, &mut self.rng);
    let mut doc_gen = DocumentFlowJeGenerator::new(&self.config);

    let mut entries = Vec::new();

    for _ in 0..count {
        // 1. Generate document flow
        let flow = p2p_gen.generate_flow()?;
        self.state.documents.add_p2p_flow(&flow);

        // 2. Generate journal entries from flow
        let flow_entries = doc_gen.generate_from_p2p(&flow)?;
        entries.extend(flow_entries);
    }

    Ok(entries)
}
}

Outputs:

  • Journal entries
  • Document records
  • Updated balances

Stage 5: Period Close

Purpose: Run period-end processes

#![allow(unused)]
fn main() {
pub fn run_period_close(&mut self) -> Result<()> {
    let close_engine = CloseEngine::new(&self.config.period_close);

    for period in self.config.periods() {
        // 1. Monthly close
        let monthly_entries = close_engine.run_monthly_close(
            period,
            &self.state,
            &mut self.balance_tracker,
        )?;
        self.state.entries.extend(monthly_entries);

        // 2. Quarterly close (if applicable)
        if period.is_quarter_end() {
            let quarterly_entries = close_engine.run_quarterly_close(
                period,
                &self.state,
            )?;
            self.state.entries.extend(quarterly_entries);
        }

        // 3. Generate trial balance
        let trial_balance = self.balance_tracker.to_trial_balance(period);
        self.state.trial_balances.push(trial_balance);
    }

    // 4. Annual close
    if self.config.has_year_end() {
        let annual_entries = close_engine.run_annual_close(&self.state)?;
        self.state.entries.extend(annual_entries);
    }

    Ok(())
}
}

Outputs:

  • Accrual entries
  • Depreciation entries
  • Closing entries
  • Trial balances

Stage 6: Anomaly Injection

Purpose: Add anomalies and generate labels

#![allow(unused)]
fn main() {
pub fn inject_anomalies(&mut self) -> Result<()> {
    if !self.config.anomaly_injection.enabled {
        return Ok(());
    }

    let mut injector = AnomalyInjector::new(
        &self.config.anomaly_injection,
        &mut self.rng,
    );

    // 1. Select entries for injection
    let target_count = (self.state.entries.len() as f64
        * self.config.anomaly_injection.total_rate) as usize;

    // 2. Inject anomalies
    let (modified, labels) = injector.inject(
        &mut self.state.entries,
        target_count,
    )?;

    // 3. Store labels
    self.state.anomaly_labels = labels;

    // 4. Apply data quality variations
    if self.config.data_quality.enabled {
        let dq_injector = DataQualityInjector::new(&self.config.data_quality);
        dq_injector.apply(&mut self.state)?;
    }

    Ok(())
}
}

Outputs:

  • Modified entries with anomalies
  • Anomaly labels for ML

Stage 7: Export

Purpose: Write all outputs

#![allow(unused)]
fn main() {
pub fn export(&self) -> Result<()> {
    // 1. Master data
    self.export_master_data()?;

    // 2. Transactions
    self.export_transactions()?;

    // 3. Documents
    self.export_documents()?;

    // 4. Subledgers
    self.export_subledgers()?;

    // 5. Trial balances
    self.export_trial_balances()?;

    // 6. Labels
    self.export_labels()?;

    // 7. Controls
    self.export_controls()?;

    // 8. Graphs (if enabled)
    if self.config.graph_export.enabled {
        self.export_graphs()?;
    }

    Ok(())
}
}

Outputs:

  • CSV/JSON files
  • Graph exports
  • Label files

Stage 8: Banking & Process Mining

Purpose: Generate banking/KYC/AML data and OCEL 2.0 event logs

If banking or OCEL generation is enabled in the config, this stage produces banking transactions with KYC profiles and/or OCEL 2.0 event logs for process mining.

Outputs:

  • Banking customers, accounts, transactions
  • KYC profiles and AML typology labels
  • OCEL 2.0 event logs, objects, process variants

Stage 9: Audit Generation

Purpose: Generate ISA-compliant audit data

If audit generation is enabled, generates engagement records, workpapers, evidence, risks, findings, and professional judgments.

Outputs:

  • Audit engagements, workpapers, evidence
  • Risk assessments and findings
  • Professional judgment documentation

Stage 10: Graph Export

Purpose: Build and export ML-ready graphs

If graph export is enabled, builds transaction, approval, and entity graphs and exports to configured formats.

Outputs:

  • PyTorch Geometric tensors (.pt)
  • Neo4j CSV + Cypher scripts
  • DGL graph structures

Stage 11: LLM Enrichment (v0.5.0)

Purpose: Enrich generated data with LLM-generated metadata

When LLM enrichment is enabled, uses the configured LlmProvider (Mock, OpenAI, Anthropic, or Custom) to generate realistic:

  • Vendor names appropriate for industry and spend category
  • Transaction descriptions and memo fields
  • Natural language explanations for injected anomalies

The Mock provider is deterministic and requires no network access, making it suitable for CI/CD pipelines.

Outputs:

  • Enriched vendor master data
  • Enriched journal entry descriptions
  • Anomaly explanation text

Stage 12: Diffusion Enhancement (v0.5.0)

Purpose: Optionally blend diffusion model outputs with rule-based data

When diffusion is enabled, uses a StatisticalDiffusionBackend to generate samples through a learned denoising process. The HybridGenerator blends diffusion outputs with rule-based data using one of three strategies:

  • Interpolate: Weighted average of rule-based and diffusion values
  • Select: Per-record random selection from either source
  • Ensemble: Column-level blending (diffusion for amounts, rule-based for categoricals)

Outputs:

  • Blended transaction amounts and attributes
  • Diffusion fit report (mean/std errors, correlation preservation)

Stage 13: Causal Overlay (v0.5.0)

Purpose: Apply causal structure to generated data

When causal generation is enabled, constructs a StructuralCausalModel from the configured causal graph (or a built-in template like fraud_detection or revenue_cycle) and generates data that respects causal relationships. Supports:

  • Observational generation: Data following the causal structure
  • Interventional generation: Data under do-calculus interventions (“what-if” scenarios)
  • Counterfactual generation: Counterfactual versions of existing records via abduction-action-prediction

The causal validator verifies that generated data preserves the specified causal structure.

Outputs:

  • Causally-structured records
  • Intervention results with effect estimates
  • Counterfactual pairs (factual + counterfactual)
  • Causal validation report

Stage 14: Source-to-Contract (v0.6.0)

Purpose: Generate the full S2C procurement pipeline

When source-to-pay is enabled, generates the complete sourcing lifecycle from spend analysis through supplier scorecards. The generation DAG follows:

Spend Analysis → Sourcing Project → Supplier Qualification → RFx Event → Bids →
Bid Evaluation → Procurement Contract → Catalog Items → [feeds into P2P] → Supplier Scorecard

Outputs:

  • Spend analysis records and category hierarchies
  • Sourcing projects with supplier qualification data
  • RFx events (RFI/RFP/RFQ), bids, and bid evaluations
  • Procurement contracts and catalog items
  • Supplier scorecards with performance metrics

Stage 15: Financial Reporting (v0.6.0)

Purpose: Generate bank reconciliations and financial statements

When financial reporting is enabled, produces bank reconciliations with auto-matching and full financial statement sets derived from the adjusted trial balance.

Bank reconciliations match payments to bank statement lines with configurable auto-match, manual match, and exception rates. Financial statements include:

  • Balance Sheet: Assets = Liabilities + Equity
  • Income Statement: Revenue - COGS - OpEx - Tax = Net Income
  • Cash Flow Statement: Indirect method with operating, investing, and financing categories
  • Statement of Changes in Equity: Retained earnings, dividends, comprehensive income

Also generates management KPIs (financial ratios) and budget variance analysis when configured.

Outputs:

  • Bank reconciliations with statement lines and reconciling items
  • Financial statements (balance sheet, income statement, cash flow, changes in equity)
  • Management KPIs and financial ratios
  • Budget vs. actual variance reports

Stage 16: HR Data (v0.6.0)

Purpose: Generate Hire-to-Retire (H2R) process data

When HR generation is enabled, produces payroll runs, time entries, and expense reports linked to the employee master data generated in Stage 2.

Outputs:

  • Payroll runs with employee pay line items (gross, deductions, net, employer cost)
  • Time entries with regular hours, overtime, PTO, and sick leave
  • Expense reports with categorized line items and approval workflows

Stage 17: Accounting Standards (v0.6.0)

Purpose: Generate ASC 606/IFRS 15 revenue recognition and impairment testing data

When accounting standards generation is enabled, produces customer contracts with performance obligations for revenue recognition and asset impairment test records.

Outputs:

  • Customer contracts with performance obligations (ASC 606/IFRS 15)
  • Revenue recognition schedules
  • Asset impairment tests with recoverable amount calculations

Stage 18: Manufacturing (v0.6.0)

Purpose: Generate manufacturing process data

When manufacturing is enabled, produces production orders, quality inspections, and cycle counts linked to materials from the master data.

Outputs:

  • Production orders with BOM components and routing steps
  • Quality inspections with pass/fail/conditional results
  • Inventory cycle counts with variance analysis

Stage 19: Sales Quotes, KPIs, and Budgets (v0.6.0)

Purpose: Generate sales pipeline and financial planning data

When enabled, produces the quote-to-order pipeline, management KPI computations, and budget variance analysis.

Outputs:

  • Sales quotes with line items, conversion tracking, and win/loss rates
  • Management KPIs (liquidity, profitability, efficiency, leverage ratios)
  • Budget records with actual vs. planned variance analysis

Parallel Execution

Stages that support parallelism:

#![allow(unused)]
fn main() {
// Parallel transaction generation
let entries: Vec<JournalEntry> = (0..num_threads)
    .into_par_iter()
    .flat_map(|thread_id| {
        let mut gen = JournalEntryGenerator::new(
            &config,
            seed + thread_id as u64,
        );
        gen.generate_batch(batch_size)
    })
    .collect();
}

Progress Tracking

#![allow(unused)]
fn main() {
pub fn run_with_progress<F>(&mut self, callback: F) -> Result<()>
where
    F: Fn(Progress),
{
    let tracker = ProgressTracker::new(self.config.total_items());

    for stage in self.stages() {
        tracker.set_phase(&stage.name);
        stage.run()?;
        tracker.advance(stage.items);
        callback(tracker.progress());
    }

    Ok(())
}
}

See Also