Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Pipeline Orchestration

New in v0.5.0

Integrate DataSynth into data engineering pipelines using Apache Airflow, dbt, MLflow, and Apache Spark.

Overview

DataSynth’s Python wrapper includes optional integrations for popular data engineering platforms, enabling synthetic data generation as part of automated workflows.

pip install datasynth-py[integrations]

Apache Airflow

Generate Data in a DAG

from airflow import DAG
from airflow.utils.dates import days_ago
from datasynth_py.integrations import (
    DataSynthOperator,
    DataSynthSensor,
    DataSynthValidateOperator,
)

config = {
    "global": {"industry": "retail", "start_date": "2024-01-01", "period_months": 12},
    "transactions": {"target_count": 50000},
}

with DAG("synthetic_data_pipeline", start_date=days_ago(1), schedule_interval="@weekly") as dag:

    validate = DataSynthValidateOperator(
        task_id="validate_config",
        config_path="/configs/retail.yaml",
    )

    generate = DataSynthOperator(
        task_id="generate_data",
        config=config,
        output_path="/data/synthetic/{{ ds }}",
    )

    wait = DataSynthSensor(
        task_id="wait_for_output",
        output_path="/data/synthetic/{{ ds }}",
    )

    validate >> generate >> wait

dbt Integration

Generate dbt Sources from Synthetic Data

from datasynth_py.integrations import DbtSourceGenerator, create_dbt_project

# Generate sources.yml pointing to synthetic CSV files
gen = DbtSourceGenerator()
gen.generate_sources_yaml("./synthetic_output", "./my_dbt_project")

# Generate seed CSVs for dbt
gen.generate_seeds("./synthetic_output", "./my_dbt_project")

# Or create a complete dbt project structure
project = create_dbt_project("./synthetic_output", "my_dbt_project")

This creates:

  • models/sources.yml with table definitions
  • seeds/ directory with CSV files
  • Standard dbt project structure

Testing dbt Models with Synthetic Data

# 1. Generate synthetic data
datasynth-data generate --config retail.yaml --output ./synthetic

# 2. Create dbt project from output
python -c "from datasynth_py.integrations import create_dbt_project; create_dbt_project('./synthetic', 'test_project')"

# 3. Run dbt
cd test_project && dbt seed && dbt run && dbt test

MLflow Tracking

Track Generation Experiments

from datasynth_py.integrations import DataSynthMlflowTracker

tracker = DataSynthMlflowTracker(experiment_name="data_generation")

# Track a generation run (logs config, metrics, artifacts)
run_info = tracker.track_generation("./output", config=config)

# Log additional quality metrics
tracker.log_quality_metrics({
    "benford_mad": 0.008,
    "correlation_preservation": 0.95,
    "completeness": 0.99,
})

# Compare recent runs
comparison = tracker.compare_runs(n=10)
for run in comparison:
    print(f"Run {run['run_id']}: quality={run['metrics'].get('statistical_fidelity', 'N/A')}")

A/B Testing Generation Configs

configs = [
    ("baseline", baseline_config),
    ("with_diffusion", diffusion_config),
    ("with_llm", llm_config),
]

for name, cfg in configs:
    with mlflow.start_run(run_name=name):
        result = synth.generate(config=cfg, output={"format": "csv", "sink": "temp_dir"})
        tracker.track_generation(result.output_dir, config=cfg)

Apache Spark

Read Synthetic Data as DataFrames

from datasynth_py.integrations import DataSynthSparkReader

reader = DataSynthSparkReader()

# Read a single table
je_df = reader.read_table(spark, "./output", "journal_entries")
je_df.show(5)

# Read all tables at once
tables = reader.read_all_tables(spark, "./output")
for name, df in tables.items():
    print(f"{name}: {df.count()} rows")

# Create temporary SQL views
reader.create_temp_views(spark, "./output")
spark.sql("""
    SELECT posting_date, SUM(amount) as total
    FROM journal_entries
    WHERE fiscal_period = 12
    GROUP BY posting_date
    ORDER BY posting_date
""").show()

End-to-End Pipeline Example

"""
Complete pipeline: Generate → Track → Load → Transform → Test
"""
from datasynth_py import DataSynth
from datasynth_py.config import blueprints
from datasynth_py.integrations import (
    DataSynthMlflowTracker,
    DataSynthSparkReader,
    DbtSourceGenerator,
)

# 1. Generate
synth = DataSynth()
config = blueprints.retail_small(transactions=50000)
result = synth.generate(config=config, output={"format": "csv", "sink": "temp_dir"})

# 2. Track with MLflow
tracker = DataSynthMlflowTracker(experiment_name="pipeline_test")
tracker.track_generation(result.output_dir, config=config)

# 3. Load into Spark
reader = DataSynthSparkReader()
reader.create_temp_views(spark, result.output_dir)

# 4. Create dbt project for transformation testing
gen = DbtSourceGenerator()
gen.generate_sources_yaml(result.output_dir, "./dbt_project")

See Also