Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Ecosystem Integrations

New in v0.5.0

DataSynth’s Python wrapper includes optional integrations with popular data engineering and ML platforms for seamless pipeline orchestration.

Installation

# Install all integrations
pip install datasynth-py[integrations]

# Install specific integrations
pip install datasynth-py[airflow]
pip install datasynth-py[dbt]
pip install datasynth-py[mlflow]
pip install datasynth-py[spark]

Apache Airflow

The Airflow integration provides custom operators and sensors for orchestrating synthetic data generation in Airflow DAGs.

DataSynthOperator

Generates synthetic data as an Airflow task:

from datasynth_py.integrations import DataSynthOperator

generate = DataSynthOperator(
    task_id="generate_synthetic_data",
    config={
        "global": {"industry": "retail", "start_date": "2024-01-01", "period_months": 12},
        "transactions": {"target_count": 50000},
        "output": {"format": "csv"},
    },
    output_path="/data/synthetic/{{ ds }}",
)
ParameterTypeDescription
task_idstrAirflow task identifier
configdictGeneration configuration (inline)
config_pathstrPath to YAML config file (alternative to config)
output_pathstrOutput directory (supports Jinja templates)

DataSynthSensor

Waits for synthetic data generation to complete:

from datasynth_py.integrations import DataSynthSensor

wait = DataSynthSensor(
    task_id="wait_for_data",
    output_path="/data/synthetic/{{ ds }}",
    poke_interval=30,
    timeout=600,
)

DataSynthValidateOperator

Validates a configuration file before generation:

from datasynth_py.integrations import DataSynthValidateOperator

validate = DataSynthValidateOperator(
    task_id="validate_config",
    config_path="/configs/retail.yaml",
)

Complete DAG Example

from airflow import DAG
from airflow.utils.dates import days_ago
from datasynth_py.integrations import (
    DataSynthOperator,
    DataSynthSensor,
    DataSynthValidateOperator,
)

with DAG(
    "weekly_synthetic_data",
    start_date=days_ago(1),
    schedule_interval="@weekly",
    catchup=False,
) as dag:

    validate = DataSynthValidateOperator(
        task_id="validate",
        config_path="/configs/retail.yaml",
    )

    generate = DataSynthOperator(
        task_id="generate",
        config_path="/configs/retail.yaml",
        output_path="/data/synthetic/{{ ds }}",
    )

    wait = DataSynthSensor(
        task_id="wait",
        output_path="/data/synthetic/{{ ds }}",
    )

    validate >> generate >> wait

dbt Integration

Generate dbt-compatible project structures from synthetic data output.

DbtSourceGenerator

from datasynth_py.integrations import DbtSourceGenerator

gen = DbtSourceGenerator()

Generate sources.yml

Creates a dbt sources.yml file pointing to synthetic data tables:

sources_path = gen.generate_sources_yaml(
    output_dir="./synthetic_output",
    dbt_project_dir="./my_dbt_project",
)
# Creates ./my_dbt_project/models/sources.yml

Generate Seeds

Copies synthetic CSV files as dbt seeds:

seeds_dir = gen.generate_seeds(
    output_dir="./synthetic_output",
    dbt_project_dir="./my_dbt_project",
)
# Copies CSVs to ./my_dbt_project/seeds/

create_dbt_project

Creates a complete dbt project structure from synthetic output:

from datasynth_py.integrations import create_dbt_project

project = create_dbt_project(
    output_dir="./synthetic_output",
    project_name="synthetic_test",
)

This creates:

synthetic_test/
├── dbt_project.yml
├── models/
│   └── sources.yml
├── seeds/
│   ├── journal_entries.csv
│   ├── vendors.csv
│   ├── customers.csv
│   └── ...
└── tests/

Usage with dbt CLI

cd synthetic_test
dbt seed      # Load synthetic CSVs
dbt run       # Run transformations
dbt test      # Run data tests

MLflow Integration

Track synthetic data generation runs as MLflow experiments for comparison and reproducibility.

DataSynthMlflowTracker

from datasynth_py.integrations import DataSynthMlflowTracker

tracker = DataSynthMlflowTracker(experiment_name="synthetic_data_experiments")

Track a Generation Run

run_info = tracker.track_generation(
    output_dir="./output",
    config=config,
)
# Logs: config parameters, output file counts, generation metadata

Log Quality Metrics

tracker.log_quality_metrics({
    "completeness": 0.98,
    "benford_mad": 0.008,
    "correlation_preservation": 0.95,
    "statistical_fidelity": 0.92,
})

Compare Runs

comparison = tracker.compare_runs(n=5)
for run in comparison:
    print(f"Run {run['run_id']}: {run['metrics']}")

Experiment Comparison

Use MLflow to compare different generation configurations:

import mlflow

configs = {
    "baseline": baseline_config,
    "with_diffusion": diffusion_config,
    "high_fraud": high_fraud_config,
}

for name, cfg in configs.items():
    with mlflow.start_run(run_name=name):
        result = synth.generate(config=cfg, output={"format": "csv", "sink": "temp_dir"})
        tracker.track_generation(result.output_dir, config=cfg)
        tracker.log_quality_metrics(evaluate_quality(result.output_dir))

View results in the MLflow UI:

mlflow ui --port 5000
# Open http://localhost:5000

Apache Spark

Read synthetic data output directly as Spark DataFrames for large-scale analysis.

DataSynthSparkReader

from datasynth_py.integrations import DataSynthSparkReader

reader = DataSynthSparkReader()

Read a Single Table

df = reader.read_table(spark, "./output", "journal_entries")
df.printSchema()
df.show(5)

Read All Tables

tables = reader.read_all_tables(spark, "./output")
for name, df in tables.items():
    print(f"{name}: {df.count()} rows, {len(df.columns)} columns")

Create Temporary Views

views = reader.create_temp_views(spark, "./output")

# Now use SQL
spark.sql("""
    SELECT
        v.vendor_id,
        v.name,
        COUNT(p.document_id) as payment_count,
        SUM(p.amount) as total_paid
    FROM vendors v
    JOIN payments p ON v.vendor_id = p.vendor_id
    GROUP BY v.vendor_id, v.name
    ORDER BY total_paid DESC
    LIMIT 10
""").show()

Spark + DataSynth Pipeline

from pyspark.sql import SparkSession
from datasynth_py import DataSynth
from datasynth_py.config import blueprints
from datasynth_py.integrations import DataSynthSparkReader

# Generate
synth = DataSynth()
config = blueprints.retail_small(transactions=100000)
result = synth.generate(config=config, output={"format": "csv", "sink": "temp_dir"})

# Load into Spark
spark = SparkSession.builder.appName("SyntheticAnalysis").getOrCreate()
reader = DataSynthSparkReader()
reader.create_temp_views(spark, result.output_dir)

# Analyze
spark.sql("""
    SELECT fiscal_period, COUNT(*) as entry_count, SUM(amount) as total_amount
    FROM journal_entries
    GROUP BY fiscal_period
    ORDER BY fiscal_period
""").show()

Integration Dependencies

IntegrationRequired PackageVersion
Airflowapache-airflow>= 2.5
dbtdbt-core>= 1.5
MLflowmlflow>= 2.0
Sparkpyspark>= 3.3

All integrations are optional — install only what you need.

See Also