📊 CDC Data Pipeline

Training Guide - Change Data Capture with Medallion Architecture

CDC Data Pipeline - Training Guide

Table of Contents

  1. Project Overview
  2. Folder Structure
  3. How the Pipeline Works
  4. Configuration
  5. Source Files Deep Dive
  6. Dynamic Entity Discovery
  7. Data Quality Reports
  8. How to Extend
  9. Exercises

1. Project Overview

Business Scenario

Company: Global Bank Corp (fictional)

Situation: The bank receives daily data files from multiple source systems: - Customer System: Customer personal information (name, email, phone, status) - Account System: Bank accounts (type, balance, currency) - Relationship System: Links customers to their accounts

Problem: - Every day, source systems send FULL data extracts (not just changes) - Processing millions of records daily is slow and expensive - Downstream systems only need NEW and UPDATED records - No visibility into data quality issues before processing - Currently no way to track what changed between days

Business Need: 1. Detect which records are NEW or UPDATED each day 2. Transform data with business rules (e.g., status codes to readable values) 3. Export only the changed records for downstream consumption 4. Generate data quality reports for governance


Requirements

# Requirement Description
R1 Daily Ingestion Ingest pipe-delimited files from source systems
R2 Data Quality Validate data (null checks, uniqueness, format validation)
R3 Change Detection Identify NEW and UPDATED records compared to previous day
R4 Transformations Apply business rules (value replacements, derived columns)
R5 Export Generate CSV files containing only changed records
R6 Reporting Create quality reports for each run
R7 Dynamic Support new entities without code changes
R8 Scalable Handle large data volumes efficiently

Solution: CDC Data Pipeline

We will build a Change Data Capture (CDC) pipeline using Medallion Architecture:

ARCHITECTURE OVERVIEW
=====================

Source Systems          Pipeline Layers                    Output
--------------          ---------------                    ------

Customer.txt  --.                                      .-> new_customer.csv
                |       +--------+    +--------+       |
Account.txt  ---|------>| BRONZE |--->| SILVER |-------|-> new_account.csv
                |       | (Raw)  |    | (Clean)|       |
Relation.txt --'        +--------+    +--------+       '-> new_relation.csv
                             |             ^
                             v             |
                        +--------+    +--------+
                        |   DQ   |    |  CDC   |
                        | Check  |    | Detect |
                        +--------+    +--------+

Medallion Architecture Layers:

Layer Purpose Format
Bronze Raw data exactly as received Parquet
Silver Cleaned and transformed data Parquet
Gold Business-ready output CSV

CDC Approach: Hash-based comparison - Generate MD5 hash of each record's value columns - Compare hashes between current and previous day - Same key + different hash = UPDATED record - New key = NEW record


Data Flow Diagram

                           CDC DATA PIPELINE
    ============================================================

    STEP 1: INGESTION (Bronze)
    --------------------------
    Source Files (.txt)          Bronze Layer (Parquet)
    +------------------+         +----------------------+
    | Customer.txt     |  --->   | bronze/customer/     |
    | Account.txt      |  READ   | bronze/account/      |
    | Relationship.txt |  --->   | bronze/relationship/ |
    | [Any new .txt]   |         | bronze/[new entity]/ |
    +------------------+         +----------------------+
                                          |
                                          v
                                 STEP 2: DATA QUALITY
                                 --------------------
                                 - Check not null
                                 - Check unique keys
                                 - Validate formats
                                 - Generate DQ Report
                                          |
                                          v
                                 STEP 3: CDC DETECTION
                                 ---------------------
                                 Compare with previous snapshot
                                          |
                                 +--------+--------+
                                 |        |        |
                                 v        v        v
                                NEW    UPDATED  UNCHANGED
                               (I)      (U)      (skip)
                                 |        |
                                 +---+----+
                                     |
                                     v
                                 STEP 4: SILVER LAYER
                                 --------------------
                                 Apply transformations
                                 (only NEW + UPDATED)
                                          |
                                          v
                                 STEP 5: GOLD LAYER
                                 ------------------
                                 Export to CSV
                                 +----------------------+
                                 | new_customer_DATE.csv|
                                 | new_account_DATE.csv |
                                 +----------------------+

    ============================================================

Key Concepts

Concept Description
CDC Change Data Capture - detects NEW and UPDATED records
Medallion Bronze (raw) to Silver (clean) to Gold (output)
Hash-based CDC Uses MD5 hash of row values to detect changes
Dynamic Discovery Automatically finds new entities without code changes

2. Folder Structure

de-cdc/
|-- main.py                 # Entry point - runs the pipeline
|-- config.py               # All configuration in one place
|-- view_data.py            # Utility to view Parquet data
|-- requirements.txt        # Python dependencies
|
|-- data/
|   |-- source/             # Input: year/month/day/Entity.txt
|   |   +-- 2026/
|   |       +-- 01/
|   |           |-- 13/     # Day 1 data
|   |           |   |-- Customer.txt
|   |           |   |-- Account.txt
|   |           |   |-- Relationship.txt
|   |           |   +-- Product.txt   <-- New files auto-discovered
|   |           |-- 14/     # Day 2 data
|   |           +-- 15/     # Day 3 data
|   |
|   |-- bronze/             # Raw Parquet files
|   |-- silver/             # Transformed Parquet
|   |-- gold/               # Final CSV exports
|   |-- cdc_snapshots/      # Hash snapshots for CDC comparison
|   +-- reports/            # Data quality reports
|
|-- src/
|   |-- bronze.py           # Step 1: Ingestion
|   |-- data_quality.py     # Step 2: Validation
|   |-- cdc.py              # Step 3: Change detection
|   |-- silver.py           # Step 4: Transformations
|   +-- gold.py             # Step 5: CSV export
|
+-- orchestration/
    +-- dagster_pipeline.py # Dagster DAG for scheduling

3. How the Pipeline Works

Step-by-Step Execution

python main.py --date 20260113

STEP 1: BRONZE LAYER 1. Scans source/2026/01/13/ folder for all .txt files 2. Reads each pipe-delimited file 3. Adds metadata columns (_ingestion_timestamp, _processing_date) 4. Writes as Parquet to bronze/{entity}/date=YYYYMMDD/

STEP 2: DATA QUALITY 1. Reads Bronze layer data 2. Runs validation checks (not_null, unique, format, allowed_values) 3. Generates quality report 4. Pipeline continues even if checks fail (configurable)

STEP 3: CDC DETECTION 1. Generates MD5 hash of value columns 2. Compares with previous day's snapshot 3. Categorizes records: - NEW: Key not in previous data (flag = 'I') - UPDATED: Key exists but hash changed (flag = 'U') - UNCHANGED: Skip processing 4. Saves current snapshot for next run

STEP 4: SILVER LAYER 1. Takes only NEW + UPDATED records 2. Applies value replacements (e.g., "A" to "Active") 3. Writes to Silver layer as Parquet

STEP 5: GOLD LAYER 1. Takes NEW + UPDATED records 2. Removes internal columns 3. Exports as clean CSV file


4. Configuration (config.py)

Path Settings

SOURCE_PATH = DATA_DIR / "source"     # Input files
BRONZE_PATH = DATA_DIR / "bronze"     # Raw Parquet
SILVER_PATH = DATA_DIR / "silver"     # Transformed
GOLD_PATH   = DATA_DIR / "gold"       # Final CSV

Entity Configuration (Optional)

# These are OPTIONAL predefined entities with CUSTOM configurations.
# New entities are auto-discovered and use default settings.

ENTITIES = {
    "customer": {
        "key_columns": ["customer_id"],      # Primary key for CDC
        "value_columns": ["first_name", "last_name", ...],  # Columns to hash
    },
    ...
}

Why predefined entities? - Provides CUSTOM column mappings for known entities - Allows specific validators for known data structures - NEW entities use AUTO-DETECTION (first column with _id = key)

Value Replacements

REPLACEMENT_RULES = {
    "status": {"A": "Active", "I": "Inactive", "D": "Deleted"},
    "country_code": {"US": "United States", "UK": "United Kingdom"},
}

Helper Functions

Function Purpose
discover_entities(date_str) Find all .txt files for a date
get_entity_config(entity, df) Get config or auto-detect columns
get_available_dates() List all dates with source data
get_previous_date(date_str) Find previous date for CDC comparison

5. Source Files Deep Dive

5.1 bronze.py - Raw Ingestion

Purpose: Read text files and save as Parquet

def ingest_file(spark, entity, date_str):
    # 1. Build path: source/2026/01/13/Customer.txt
    # 2. Read pipe-delimited file
    # 3. Add metadata columns
    # 4. Return DataFrame

def process_bronze(spark, date_str):
    # 1. Call discover_entities() to find all .txt files
    # 2. For each entity: ingest and write to Parquet

5.2 data_quality.py - Validation

Purpose: Validate data before processing

Available checks: - check_not_null(df, columns) - Ensure no null values - check_unique(df, columns) - Ensure no duplicates - check_email_format(df, column) - Validate email pattern - check_allowed_values(df, column, allowed) - Validate against list

Validators: - validate_customer(df) - Customer-specific rules - validate_account(df) - Account-specific rules - validate_generic(df, entity) - Auto-generated for new entities

5.3 cdc.py - Change Detection

Purpose: Identify NEW and UPDATED records

def generate_hash(df, columns):
    """
    Create MD5 hash of value columns.

    Example:
    customer_id | first_name | last_name --> _row_hash
    C001        | John       | Doe       --> a1b2c3d4e5...

    If any value changes, the hash changes.
    """

def detect_changes(current_df, entity, date_str):
    """
    Compare current vs previous:

    CURRENT         PREVIOUS        RESULT
    -------         --------        ------
    C001, abc123    C001, abc123    UNCHANGED
    C002, def456    C002, xyz789    UPDATED (hash changed)
    C003, ghi789    (not found)     NEW
    """

5.4 silver.py - Transformations

Purpose: Apply business rules

def apply_replacements(df, column, mappings):
    # "A" --> "Active", "I" --> "Inactive"

def transform_entity(df, entity):
    # Apply all REPLACEMENT_RULES from config

5.5 gold.py - Final Export

Purpose: Create clean CSV output

def export_to_csv(df, entity, date_str):
    # 1. Remove internal columns (starting with _)
    # 2. Write single CSV file: new_customer_20260113.csv

6. Dynamic Entity Discovery

How It Works

The pipeline automatically discovers new entities:

  1. Bronze Layer: Scans source/YYYY/MM/DD/ for all .txt files
  2. CDC: Auto-detects key/value columns for new entities
  3. Data Quality: Uses generic validator for unknown entities
  4. Silver/Gold: Process whatever CDC returns

Why Key and Value Columns Matter

CDC needs to distinguish between key columns and value columns:

KEY COLUMN       VALUE COLUMNS
-----------      ------------------
customer_id      first_name, last_name, email, status
     |                        |
     v                        v
Used to MATCH            Used to DETECT
records across           if anything CHANGED
days (same ID            (hash comparison)
= same record)

Example:

Day 1:  C001 | John | Doe       hash("John|Doe") = abc123
Day 2:  C001 | John | Smith     hash("John|Smith") = xyz789

Result: C001 is UPDATED (same key, different hash)

Without knowing which column is the key: - We cannot match records across days - We cannot detect NEW vs UPDATED

Auto-Detection Rules

Priority Logic
1st Look for columns ending with _id --> use as key(s)
2nd If no _id columns found --> use FIRST column as key
Then All remaining columns --> value columns (for CDC hash)

Example: Adding a New Entity

# Create new file
cat > data/source/2026/01/13/Product.txt << 'EOF'
product_id|name|price|category
P001|Widget|19.99|Electronics
P002|Gadget|29.99|Electronics
EOF

# Run pipeline - Product is automatically discovered
python main.py --date 20260113

# Output:
# Discovered entities: ['account', 'customer', 'product', 'relationship']
# Product: 2 records --> bronze/product/...

Removing Files

If a file is removed, the pipeline skips it without errors:

rm data/source/2026/01/14/Product.txt
python main.py --date 20260114
# Only processes Customer, Account, Relationship

7. Data Quality Reports

Console Output

During pipeline execution, DQ results are printed:

DATA QUALITY - 20260113
==================================================
  Customer: PASSED (6 passed, 0 failed)
  Account: PASSED (3 passed, 0 failed)
  Product: ISSUES (1 passed, 1 failed)
    Failed: check:positive_price

JSON Report

Quality results are saved to data/reports/dq_report_YYYYMMDD.json:

{
  "date": "20260113",
  "overall_passed": false,
  "entities": {
    "customer": {
      "passed": true,
      "total_records": 5,
      "passed_checks": ["not_null:customer_id", "unique:customer_id", ...],
      "failed_checks": []
    },
    "product": {
      "passed": false,
      "total_records": 2,
      "passed_checks": ["not_null:product_id"],
      "failed_checks": ["check:positive_price"]
    }
  }
}

Pipeline Behavior on Failure

Current behavior: Pipeline continues even if DQ fails. To stop on failure, set STOP_ON_DQ_FAILURE = True in config.


8. How to Extend

8.1 Add Custom Validator for Known Entity

Edit data_quality.py:

def validate_product(df):
    passed, failed = [], []

    # Check price is positive
    invalid = df.filter(col("price") < 0).count()
    (passed if invalid == 0 else failed).append("check:positive_price")

    return QualityResult(entity="product", ...)

# Add to validators dict
validators = {
    ...
    "product": validate_product,
}

8.2 Add New Quality Check

def check_date_not_future(df, column):
    from datetime import datetime
    today = datetime.now().strftime("%Y-%m-%d")
    invalid = df.filter(col(column) > today).count()
    return {f"{column}_not_future": invalid == 0}

8.3 Add New Transformation

Edit silver.py:

def calculate_full_name(df):
    from pyspark.sql.functions import concat, lit
    return df.withColumn(
        "full_name",
        concat(col("first_name"), lit(" "), col("last_name"))
    )

def transform_entity(df, entity):
    result = df
    # Existing transformations...

    if entity == "customer":
        result = calculate_full_name(result)

    return result

8.4 Add New Value Replacement

Edit config.py:

REPLACEMENT_RULES = {
    ...
    "account_type": {
        "SAVINGS": "Savings Account",
        "CHECKING": "Checking Account",
    },
}

9. Exercises


Exercise 1: Add Phone Number Validation

Currently, the pipeline does not validate phone number format. Your task is to add a data quality check that ensures phone numbers contain only digits.

Problem: Phone numbers like 123-456-7890 or (123) 456-7890 should fail validation. Only 1234567890 is valid.

What to do: 1. Open src/data_quality.py 2. Create a new function called check_phone_format(df, column) 3. Use regex pattern ^[0-9]+$ to validate 4. Add this check inside the validate_customer() function 5. Run the pipeline and verify "format:phone" appears in the output


Exercise 2: Add Balance Check for Savings Accounts

Currently, there is no check to prevent negative balances in savings accounts. Your task is to add a data quality check for this business rule.

Problem: A SAVINGS account with balance -500 should fail validation.

What to do: 1. Open src/data_quality.py 2. Create a new function called check_positive_balance(df) 3. Filter records where account_type == "SAVINGS" AND balance < 0 4. Add this check inside the validate_account() function 5. Run the pipeline and verify "check:positive_balance" appears in the output


Exercise 3: Add Readable Account Type Labels

Currently, account types appear as codes like "SAVINGS", "CHECKING". Your task is to make these more readable in the Silver layer output.

Problem: Users want to see "Savings Account" instead of "SAVINGS".

What to do: 1. Open config.py 2. Find the REPLACEMENT_RULES dictionary 3. Add a new entry for "account_type" with mappings: - SAVINGS --> Savings Account - CHECKING --> Checking Account - CREDIT --> Credit Card 4. Run the pipeline and view Silver data to verify the change


Exercise 4: Extract Email Domain

Currently, we only store the full email address. Your task is to create a new column that extracts just the domain part.

Problem: From john@gmail.com, we want to create a new column email_domain containing gmail.com.

What to do: 1. Open src/silver.py 2. Create a new function called extract_email_domain(df) 3. Use PySpark's split() function to split by "@" and get the second part 4. Call this function inside transform_entity() for the customer entity 5. Run the pipeline and view Silver customer data to verify the new column


Exercise 5: Create a CDC Summary Report

Currently, the pipeline prints results but there is no formatted summary. Your task is to create a nice summary table.

Expected output:

CDC Summary for 20260113
========================
Entity       | NEW | UPDATED | UNCHANGED
-------------|-----|---------|----------
Customer     |   5 |       0 |         0
Account      |   6 |       0 |         0

What to do: 1. Create a new file src/summary.py 2. Create a function called print_summary(cdc_results, date_str) 3. Loop through the CDC results and print a formatted table 4. Open main.py and import your new function 5. Call print_summary() after the CDC detection step


Exercise 6: Save Record Counts to a File

Currently, record counts are only shown on screen. Your task is to save them to a text file for later reference.

Expected output file: data/reports/record_counts_20260113.txt

What to do: 1. Open main.py 2. Create a function called save_record_counts(date_str, bronze_results, cdc_results) 3. Write the counts to a text file in the data/reports/ folder 4. Call this function at the end of run_pipeline() 5. Run the pipeline and verify the file was created


Quick Reference

Commands

Command Description
python main.py --all Process all available dates
python main.py --date 20260113 Process specific date
python main.py --today Process today's date
python view_data.py bronze customer --date 20260113 View data

File Naming

Layer Format Example
Source {Entity}.txt Customer.txt
Bronze {entity}/date={YYYYMMDD}/ customer/date=20260113/
Gold new_{entity}_{YYYYMMDD}.csv new_customer_20260113.csv

Congratulations! You now understand the CDC Data Pipeline.

↑