🔧 CDC Pipeline Developer Guide

Complete Technical Reference & Implementation Details

CDC Data Pipeline - Developer Guide

How to Build This System from Scratch

This guide walks you through building the CDC Data Pipeline step by step. Follow the steps in order - each builds on the previous one.


Table of Contents

  1. Understanding CDC
  2. Project Setup
  3. Step 1: Configuration
  4. Step 2: Bronze Layer
  5. Step 3: Data Quality
  6. Step 4: CDC Detection
  7. Step 5: Silver Layer
  8. Step 6: Gold Layer
  9. Step 7: Main Entry Point
  10. Testing Your Implementation

1. Understanding CDC

What is CDC?

CDC (Change Data Capture) is a technique to identify data that has changed between two points in time.

The Problem CDC Solves

Imagine you have a customer database with 1 million records. Every day, you receive a full export of all customers. But only 100 customers actually changed.

WITHOUT CDC:
-----------
Day 1: Process 1,000,000 records    --> 1,000,000 processed
Day 2: Process 1,000,000 records    --> 1,000,000 processed (even if only 100 changed!)
Day 3: Process 1,000,000 records    --> 1,000,000 processed

WITH CDC:
---------
Day 1: Process 1,000,000 records    --> 1,000,000 NEW records
Day 2: Process only 100 changes     --> 100 processed (much faster!)
Day 3: Process only 50 changes      --> 50 processed

How Hash-Based CDC Works

We use MD5 hashing to detect changes. Here's the concept:

Step 1: Generate a "fingerprint" (hash) for each record

A hash is like a fingerprint for data. If any value changes, the fingerprint changes.

Record: customer_id=C001, name="John", city="NYC"

We take the VALUE columns (name, city) and create a hash:
Hash = MD5("John" + "|" + "NYC") = "abc123..."

Important: We do NOT include the KEY (customer_id) in the hash.
The key is used to MATCH records. The hash is used to DETECT changes.

Step 2: Save the hash as a "snapshot"

After processing Day 1, we save:

{ customer_id: "C001", hash: "abc123..." }

Step 3: Next day, compare hashes

Today's data:     C001, hash = "xyz789..."  (name changed to "Johnny")
Yesterday's data: C001, hash = "abc123..."

The hashes are different! --> This record is UPDATED

Step 4: Categorize each record

NEW:       Key exists TODAY but NOT in YESTERDAY
UPDATED:   Key exists in BOTH but hash is DIFFERENT
UNCHANGED: Key exists in BOTH and hash is SAME

Visual Example

YESTERDAY'S SNAPSHOT          TODAY'S DATA              RESULT
--------------------          ------------              ------

C001 | abc123                  C001 | abc123            UNCHANGED (same hash)
C002 | def456                  C002 | xxx999            UPDATED (hash changed!)
C003 | ghi789                  C003 | ghi789            UNCHANGED
                               C004 | jkl012            NEW (not in yesterday)

2. Project Setup

Create the Folder Structure

mkdir -p de-cdc/src
mkdir -p de-cdc/data/source
mkdir -p de-cdc/orchestration
cd de-cdc

Create requirements.txt

cat > requirements.txt << 'EOF'
pyspark>=3.5.0
click>=8.0.0
EOF

Install Dependencies

pip install -r requirements.txt

Files We Will Create (in order)

Order File Purpose Why This Order
1 config.py Configuration and helper functions All other files import from here
2 src/bronze.py Read source files, write Parquet First step in pipeline
3 src/data_quality.py Validate data Runs after Bronze, before CDC
4 src/cdc.py Detect changes (the core logic) Needs Bronze data
5 src/silver.py Transform data Needs CDC results
6 src/gold.py Export to CSV Final step
7 main.py Entry point to run everything Ties all steps together

Step 1: Configuration (config.py)

Why We Need This File

Configuration should be in ONE place. If you need to change a path or add a new entity, you only change one file.

What This File Does

  1. Defines all folder paths (where source files are, where to write output)
  2. Defines entity configurations (which columns are keys, which are values)
  3. Provides helper functions for working with dates and entities

Create config.py

"""
Configuration for CDC Data Pipeline.

This file is the SINGLE SOURCE OF TRUTH for all settings.
Other files import from here.
"""
from pathlib import Path

# ============================================================
# PATHS - Where data lives
# ============================================================

# Get the project root directory (where this file is located)
PROJECT_ROOT = Path(__file__).parent

# All data goes under the "data" folder
DATA_DIR = PROJECT_ROOT / "data"

# Define paths for each layer
SOURCE_PATH = DATA_DIR / "source"           # Input: raw text files
BRONZE_PATH = DATA_DIR / "bronze"           # Output: raw Parquet files
SILVER_PATH = DATA_DIR / "silver"           # Output: transformed Parquet
GOLD_PATH = DATA_DIR / "gold"               # Output: final CSV files
CDC_SNAPSHOTS_PATH = DATA_DIR / "cdc_snapshots"  # Hashes for CDC comparison

# Create directories if they don't exist
# This runs when the file is imported
for path in [SOURCE_PATH, BRONZE_PATH, SILVER_PATH, GOLD_PATH, CDC_SNAPSHOTS_PATH]:
    path.mkdir(parents=True, exist_ok=True)


# ============================================================
# ENTITY CONFIGURATION
# ============================================================

# Each entity needs to know:
# - key_columns: Used to MATCH records across days (like a primary key)
# - value_columns: Used to DETECT changes (we hash these)

ENTITIES = {
    "customer": {
        "key_columns": ["customer_id"],
        "value_columns": ["first_name", "last_name", "email", "phone", "status"],
    },
    "account": {
        "key_columns": ["account_id"],
        "value_columns": ["account_type", "balance", "currency", "status"],
    },
}


# ============================================================
# HELPER FUNCTIONS
# ============================================================

def get_source_path_for_date(date_str: str) -> Path:
    """
    Convert a date string to the source folder path.

    WHY: Source files are organized by year/month/day.
    This function builds that path from a date string.

    Example:
        Input:  "20260113"
        Output: source/2026/01/13/

    Args:
        date_str: Date in YYYYMMDD format (e.g., "20260113")

    Returns:
        Path object pointing to the source folder for that date
    """
    year = date_str[:4]    # First 4 characters: "2026"
    month = date_str[4:6]  # Next 2 characters: "01"
    day = date_str[6:8]    # Last 2 characters: "13"
    return SOURCE_PATH / year / month / day


def discover_entities(date_str: str) -> list:
    """
    Find all .txt files in the source folder for a given date.

    WHY: We want the pipeline to automatically discover new entities.
    If someone adds a new file like "Product.txt", it should be processed
    without changing any code.

    Example:
        If source/2026/01/13/ contains:
            - Customer.txt
            - Account.txt
            - Product.txt

        This function returns: ["account", "customer", "product"]

    Args:
        date_str: Date in YYYYMMDD format

    Returns:
        List of entity names (lowercase), sorted alphabetically
    """
    source_dir = get_source_path_for_date(date_str)

    # If the folder doesn't exist, return empty list
    if not source_dir.exists():
        return []

    entities = []

    # Find all .txt files in the folder
    for txt_file in source_dir.glob("*.txt"):
        # Get the filename without extension, in lowercase
        # Customer.txt --> customer
        entity_name = txt_file.stem.lower()
        entities.append(entity_name)

    return sorted(entities)


def get_entity_config(entity: str, sample_df=None) -> dict:
    """
    Get configuration for an entity (key columns and value columns).

    WHY: Different entities have different primary keys.
    For known entities (customer, account), we have predefined configs.
    For NEW entities (discovered automatically), we auto-detect the columns.

    Auto-detection logic:
    1. Look for columns ending with "_id" --> use as key
    2. If no "_id" columns, use the first column as key
    3. All other columns are value columns

    Args:
        entity: Entity name (e.g., "customer", "product")
        sample_df: Optional DataFrame to detect columns from (for new entities)

    Returns:
        Dictionary with "key_columns" and "value_columns" lists
    """
    # If entity is already defined, return its config
    if entity in ENTITIES:
        return ENTITIES[entity]

    # For new entities, auto-detect from the DataFrame
    if sample_df is not None:
        # Get column names, excluding internal columns (starting with "_")
        columns = [c for c in sample_df.columns if not c.startswith("_")]

        # Strategy 1: Find columns ending with "_id"
        key_cols = [c for c in columns if c.endswith("_id")]

        # Strategy 2: If no "_id" columns, use first column
        if not key_cols:
            key_cols = [columns[0]]
            print(f"  [Auto-detect] {entity}: Using '{columns[0]}' as key")

        # Everything else is a value column
        value_cols = [c for c in columns if c not in key_cols]

        return {"key_columns": key_cols, "value_columns": value_cols}

    # Fallback: assume entity_id is the key
    return {"key_columns": [f"{entity}_id"], "value_columns": []}

Function Summary

Function What It Does When It's Called
get_source_path_for_date("20260113") Converts date to folder path Bronze layer
discover_entities("20260113") Finds all .txt files Bronze layer
get_entity_config("customer", df) Gets key/value columns CDC layer

Step 2: Bronze Layer (src/bronze.py)

Why We Need This File

Bronze is the ENTRY POINT of the pipeline. It reads raw source files and stores them in a format that's easy to process (Parquet).

What This File Does

  1. Creates a Spark session (the engine that processes data)
  2. Reads pipe-delimited text files
  3. Adds metadata (when was it ingested, what date is it for)
  4. Saves as Parquet format (efficient columnar storage)

Create src/bronze.py

"""
Bronze Layer - Ingest raw source files.

This is the FIRST step in the pipeline.
It reads raw text files and converts them to Parquet format.
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, lit
from pathlib import Path

# Import our configuration
import config


def get_spark() -> SparkSession:
    """
    Create and return a SparkSession.

    WHY: SparkSession is the entry point to Spark. You need it to read
    files, create DataFrames, and run SQL queries.

    WHAT THIS DOES:
    - Creates a Spark application named "CDC-Pipeline"
    - Runs locally using all available CPU cores (local[*])

    Returns:
        SparkSession object
    """
    return (
        SparkSession.builder
        .appName("CDC-Pipeline")     # Name shown in Spark UI
        .master("local[*]")          # Run locally with all cores
        .getOrCreate()               # Create new or get existing session
    )


def ingest_file(spark: SparkSession, entity: str, date_str: str) -> DataFrame:
    """
    Read a single pipe-delimited source file into a DataFrame.

    WHY: Source systems typically export data as text files.
    We need to read these files and add tracking information.

    WHAT THIS DOES:
    1. Builds the file path (e.g., source/2026/01/13/Customer.txt)
    2. Reads the pipe-delimited file using Spark
    3. Adds metadata columns for tracking
    4. Returns the DataFrame

    Args:
        spark: SparkSession to use for reading
        entity: Entity name (e.g., "customer")
        date_str: Date in YYYYMMDD format

    Returns:
        DataFrame containing the file data plus metadata columns

    Raises:
        FileNotFoundError: If the source file doesn't exist
    """
    # Build the full file path
    # Example: source/2026/01/13/Customer.txt
    source_dir = config.get_source_path_for_date(date_str)
    file_path = source_dir / f"{entity.capitalize()}.txt"

    print(f"  Reading: {file_path}")

    # Check if file exists
    if not file_path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")

    # Read the pipe-delimited file
    # Options explained:
    # - header=true: First row contains column names
    # - delimiter=|: Columns are separated by pipe character
    # - inferSchema=false: Keep all columns as strings (safer)
    df = (
        spark.read
        .option("header", "true")
        .option("delimiter", "|")
        .option("inferSchema", "false")
        .csv(str(file_path))
    )

    # Add metadata columns (prefixed with _ to identify as internal)
    # These help us track when and where the data came from
    df = (
        df
        .withColumn("_ingestion_timestamp", current_timestamp())  # When we read it
        .withColumn("_processing_date", lit(date_str))            # Which date's data
        .withColumn("_source_entity", lit(entity))                # Which entity
    )

    return df


def write_bronze(df: DataFrame, entity: str, date_str: str) -> Path:
    """
    Write a DataFrame to the Bronze layer as Parquet.

    WHY: Parquet is a columnar format that's much faster to read than text.
    It also supports compression and schema information.

    WHAT THIS DOES:
    1. Builds the output path (e.g., bronze/customer/date=20260113/)
    2. Writes the DataFrame as Parquet files
    3. Returns the output path

    Args:
        df: DataFrame to write
        entity: Entity name (e.g., "customer")
        date_str: Date in YYYYMMDD format

    Returns:
        Path where the data was written
    """
    # Build output path with date partition
    # Example: bronze/customer/date=20260113/
    output_path = config.BRONZE_PATH / entity / f"date={date_str}"

    # Write as Parquet
    # mode="overwrite" means: replace if already exists
    df.write.mode("overwrite").parquet(str(output_path))

    # Print confirmation
    record_count = df.count()
    print(f"  Wrote {record_count} records to {output_path}")

    return output_path


def process_bronze(spark: SparkSession, date_str: str) -> dict:
    """
    Main entry point: Process all entities for the Bronze layer.

    WHY: This is the function that gets called from main.py.
    It orchestrates the entire Bronze layer processing.

    WHAT THIS DOES:
    1. Discovers all .txt files for this date (using config.discover_entities)
    2. For each file: reads it and writes to Bronze
    3. Returns a dictionary with record counts

    Args:
        spark: SparkSession to use
        date_str: Date in YYYYMMDD format

    Returns:
        Dictionary mapping entity names to record counts
        Example: {"customer": 5, "account": 6}
    """
    print(f"\n{'='*50}")
    print(f"BRONZE LAYER - {date_str}")
    print(f"{'='*50}")

    # Step 1: Discover all entities (find all .txt files)
    entities = config.discover_entities(date_str)
    print(f"  Discovered: {entities}")

    # Step 2: Process each entity
    results = {}
    for entity in entities:
        # Read the file
        df = ingest_file(spark, entity, date_str)

        # Write to Bronze
        write_bronze(df, entity, date_str)

        # Track the count
        results[entity] = df.count()

    return results

Function Summary

Function Purpose Called By
get_spark() Creates Spark session main.py
ingest_file(spark, entity, date) Reads one .txt file process_bronze()
write_bronze(df, entity, date) Writes to Parquet process_bronze()
process_bronze(spark, date) Orchestrates Bronze layer main.py

Step 3: Data Quality (src/data_quality.py)

Why We Need This File

Before processing data, we should check if it's valid. Bad data can cause problems downstream.

What This File Does

  1. Defines reusable check functions (null check, unique check, etc.)
  2. Defines validators for each entity type
  3. Runs all checks and returns results

Create src/data_quality.py

"""
Data Quality - Validate data before processing.

This runs AFTER Bronze and BEFORE CDC.
It checks for common data problems.
"""
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from dataclasses import dataclass
from typing import List, Dict
import json

# Import our configuration
import config


@dataclass
class QualityResult:
    """
    Container for quality check results.

    WHY: Using a dataclass makes it easy to pass results around
    and access them with clear attribute names.

    Attributes:
        entity: Name of the entity that was checked
        total_records: How many records were in the data
        passed_checks: List of check names that passed
        failed_checks: List of check names that failed
        overall_passed: True if ALL checks passed
    """
    entity: str
    total_records: int
    passed_checks: List[str]
    failed_checks: List[str]
    overall_passed: bool


# ============================================================
# REUSABLE CHECK FUNCTIONS
# ============================================================
# These functions can be used by any validator.
# They each check ONE thing and return the result.

def check_not_null(df: DataFrame, columns: List[str]) -> Dict:
    """
    Check that specified columns have no null values.

    WHY: Null values in required fields can break downstream processing.
    For example, a customer without an ID cannot be matched.

    WHAT THIS DOES:
    For each column, count how many rows have null values.
    If count is 0, the check passes.

    Args:
        df: DataFrame to check
        columns: List of column names to check

    Returns:
        Dictionary mapping column names to True (passed) or False (failed)
        Example: {"customer_id": True, "email": False}
    """
    results = {}
    for col_name in columns:
        # Count rows where this column is null
        null_count = df.filter(col(col_name).isNull()).count()
        # Pass if no nulls found
        results[col_name] = (null_count == 0)
    return results


def check_unique(df: DataFrame, columns: List[str]) -> Dict:
    """
    Check that there are no duplicate values in the specified columns.

    WHY: Primary keys should be unique. Duplicates can cause
    incorrect joins and data corruption.

    WHAT THIS DOES:
    Compare total row count with distinct row count.
    If they're equal, all values are unique.

    Args:
        df: DataFrame to check
        columns: List of column names to check together

    Returns:
        Dictionary with "unique" key set to True or False
    """
    total = df.count()
    distinct = df.select(*columns).distinct().count()
    return {"unique": total == distinct}


def check_allowed_values(df: DataFrame, column: str, allowed: List[str]) -> Dict:
    """
    Check that a column only contains values from an allowed list.

    WHY: Some columns should only have specific values.
    For example, status should only be A, I, or D.

    WHAT THIS DOES:
    Count rows where the value is NOT in the allowed list.
    If count is 0, the check passes.

    Args:
        df: DataFrame to check
        column: Column name to check
        allowed: List of allowed values

    Returns:
        Dictionary with "{column}_values" key set to True or False
    """
    # Find rows with invalid values (not null AND not in allowed list)
    invalid = df.filter(
        col(column).isNotNull() & ~col(column).isin(allowed)
    ).count()
    return {f"{column}_values": invalid == 0}


def check_email_format(df: DataFrame, column: str) -> Dict:
    """
    Check that email addresses have a valid format.

    WHY: Invalid emails can cause issues when sending notifications
    or matching customer records.

    WHAT THIS DOES:
    Uses a regex pattern to validate email format.
    The pattern checks for: [email protected]

    Args:
        df: DataFrame to check
        column: Column name containing emails

    Returns:
        Dictionary with "{column}_format" key set to True or False
    """
    # Simple email pattern: [email protected]
    pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"

    # Find rows with invalid emails
    invalid = df.filter(
        col(column).isNotNull() & ~col(column).rlike(pattern)
    ).count()

    return {f"{column}_format": invalid == 0}


# ============================================================
# ENTITY-SPECIFIC VALIDATORS
# ============================================================
# Each entity can have its own validation rules.

def validate_customer(df: DataFrame) -> QualityResult:
    """
    Run all quality checks for customer data.

    CHECKS:
    1. customer_id, first_name, last_name are not null
    2. customer_id is unique
    3. email has valid format
    4. status is one of: A, I, D, P
    """
    passed = []
    failed = []

    # Check 1: Required columns not null
    null_checks = check_not_null(df, ["customer_id", "first_name", "last_name"])
    for check, result in null_checks.items():
        # Add to passed or failed list based on result
        (passed if result else failed).append(f"not_null:{check}")

    # Check 2: Customer ID is unique
    unique_check = check_unique(df, ["customer_id"])
    (passed if unique_check["unique"] else failed).append("unique:customer_id")

    # Check 3: Email format
    email_check = check_email_format(df, "email")
    (passed if email_check["email_format"] else failed).append("format:email")

    # Check 4: Status values
    status_check = check_allowed_values(df, "status", ["A", "I", "D", "P"])
    (passed if status_check["status_values"] else failed).append("values:status")

    # Return result
    return QualityResult(
        entity="customer",
        total_records=df.count(),
        passed_checks=passed,
        failed_checks=failed,
        overall_passed=len(failed) == 0
    )


def validate_account(df: DataFrame) -> QualityResult:
    """
    Run all quality checks for account data.

    CHECKS:
    1. account_id, account_type are not null
    2. account_id is unique
    """
    passed = []
    failed = []

    null_checks = check_not_null(df, ["account_id", "account_type"])
    for check, result in null_checks.items():
        (passed if result else failed).append(f"not_null:{check}")

    unique_check = check_unique(df, ["account_id"])
    (passed if unique_check["unique"] else failed).append("unique:account_id")

    return QualityResult(
        entity="account",
        total_records=df.count(),
        passed_checks=passed,
        failed_checks=failed,
        overall_passed=len(failed) == 0
    )


def validate_generic(df: DataFrame, entity: str) -> QualityResult:
    """
    Generic validator for unknown entities.

    WHY: When a new entity is discovered (like Product.txt),
    we don't have a specific validator for it.
    This function provides basic checks based on column naming conventions.

    CHECKS:
    1. All columns ending with "_id" are not null
    2. The first "_id" column is unique
    """
    passed = []
    failed = []

    # Find all ID columns (columns ending with "_id")
    id_cols = [c for c in df.columns if c.endswith("_id") and not c.startswith("_")]

    if id_cols:
        # Check: ID columns not null
        null_checks = check_not_null(df, id_cols)
        for check, result in null_checks.items():
            (passed if result else failed).append(f"not_null:{check}")

        # Check: First ID column is unique (assume it's the primary key)
        unique_check = check_unique(df, [id_cols[0]])
        (passed if unique_check["unique"] else failed).append(f"unique:{id_cols[0]}")

    return QualityResult(
        entity=entity,
        total_records=df.count(),
        passed_checks=passed,
        failed_checks=failed,
        overall_passed=len(failed) == 0
    )


# ============================================================
# MAIN FUNCTION
# ============================================================

def run_quality_checks(spark, date_str: str) -> Dict[str, QualityResult]:
    """
    Main entry point: Run quality checks for all entities.

    WHAT THIS DOES:
    1. Finds all entities in the Bronze layer
    2. For each entity, runs the appropriate validator
    3. Prints results to console
    4. Saves report to JSON file
    5. Returns results dictionary

    Args:
        spark: SparkSession
        date_str: Date in YYYYMMDD format

    Returns:
        Dictionary mapping entity names to QualityResult objects
    """
    print(f"\n{'='*50}")
    print(f"DATA QUALITY - {date_str}")
    print(f"{'='*50}")

    results = {}

    # Map of known validators
    validators = {
        "customer": validate_customer,
        "account": validate_account,
    }

    # Find all entities in Bronze layer
    bronze_entities = [d.name for d in config.BRONZE_PATH.iterdir() if d.is_dir()]

    for entity in bronze_entities:
        # Build path to Bronze data for this date
        bronze_path = config.BRONZE_PATH / entity / f"date={date_str}"

        if not bronze_path.exists():
            continue

        # Read the Bronze data
        df = spark.read.parquet(str(bronze_path))

        # Get the appropriate validator (or use generic)
        if entity in validators:
            validator = validators[entity]
            result = validator(df)
        else:
            result = validate_generic(df, entity)

        # Store result
        results[entity] = result

        # Print summary
        status = "PASSED" if result.overall_passed else "FAILED"
        print(f"  {entity}: {status} "
              f"({len(result.passed_checks)} passed, {len(result.failed_checks)} failed)")

        # Print failed checks
        for check in result.failed_checks:
            print(f"    [FAILED] {check}")

    return results

Function Summary

Function Purpose Reusable?
check_not_null(df, columns) Check columns for nulls Yes
check_unique(df, columns) Check for duplicates Yes
check_allowed_values(df, col, list) Check value is in list Yes
check_email_format(df, column) Validate email pattern Yes
validate_customer(df) Customer-specific checks No
validate_account(df) Account-specific checks No
validate_generic(df, entity) Generic checks for new entities No
run_quality_checks(spark, date) Main entry point No

Step 4: CDC Detection (src/cdc.py)

Why We Need This File

This is the CORE of the pipeline. It implements the Change Data Capture logic.

What This File Does

  1. Generates MD5 hashes for each record
  2. Loads previous day's snapshot (if exists)
  3. Compares hashes to find NEW and UPDATED records
  4. Saves current snapshot for next run

Create src/cdc.py

"""
CDC (Change Data Capture) - Detect new and updated records.

This is the CORE logic of the pipeline.
It compares today's data with yesterday's snapshot to find changes.
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, concat_ws, md5, lit, coalesce
from dataclasses import dataclass
from typing import Optional

# Import our configuration
import config


@dataclass
class CDCResult:
    """
    Container for CDC detection results.

    Attributes:
        entity: Name of the entity
        new_records: DataFrame containing NEW records (flag='I' for Insert)
        updated_records: DataFrame containing UPDATED records (flag='U' for Update)
        unchanged_records: DataFrame containing UNCHANGED records (no flag)
        total_new: Count of new records
        total_updated: Count of updated records
        total_unchanged: Count of unchanged records
    """
    entity: str
    new_records: DataFrame
    updated_records: DataFrame
    unchanged_records: DataFrame
    total_new: int = 0
    total_updated: int = 0
    total_unchanged: int = 0


def generate_hash(df: DataFrame, columns: list) -> DataFrame:
    """
    Generate MD5 hash of specified columns.

    WHY: We use hashing to detect changes efficiently.
    Instead of comparing every column of every row,
    we just compare one hash value per row.

    HOW IT WORKS:
    1. Take each value column and convert to string
    2. Replace nulls with empty string (nulls can't be hashed)
    3. Concatenate all values with "||" separator
    4. Calculate MD5 hash of the concatenated string

    Example:
        columns = ["name", "city"]
        values = ["John", "NYC"]
        concatenated = "John||NYC"
        hash = MD5("John||NYC") = "abc123..."

    Args:
        df: DataFrame to add hash to
        columns: List of columns to include in the hash

    Returns:
        DataFrame with new "_row_hash" column
    """
    # Prepare columns for hashing
    # coalesce replaces null with empty string
    cols_for_hash = [
        coalesce(col(c).cast("string"), lit(""))
        for c in columns
    ]

    # Generate the hash
    # concat_ws joins all column values with "||"
    # md5 calculates the hash
    return df.withColumn(
        "_row_hash",
        md5(concat_ws("||", *cols_for_hash))
    )


def get_previous_snapshot(spark: SparkSession, entity: str, current_date: str) -> Optional[DataFrame]:
    """
    Load the previous day's snapshot for comparison.

    WHY: To detect changes, we need to know what the data looked like yesterday.
    Snapshots store only the key columns and hash (not full data).

    Args:
        spark: SparkSession
        entity: Entity name
        current_date: Current date being processed

    Returns:
        DataFrame with previous snapshot, or None if not found
    """
    # Get previous date (this function is in config.py)
    prev_date = config.get_previous_date(current_date)

    if prev_date is None:
        return None

    # Build path to previous snapshot
    snapshot_path = config.CDC_SNAPSHOTS_PATH / entity / f"snapshot_{prev_date}.parquet"

    if not snapshot_path.exists():
        return None

    print(f"  Loading previous snapshot: {prev_date}")
    return spark.read.parquet(str(snapshot_path))


def save_snapshot(df: DataFrame, entity: str, date_str: str) -> None:
    """
    Save current hashes as snapshot for next run.

    WHY: Tomorrow's CDC will need today's data to compare against.
    We save only key + hash to minimize storage.

    Args:
        df: DataFrame with _row_hash column
        entity: Entity name
        date_str: Current date
    """
    snapshot_path = config.CDC_SNAPSHOTS_PATH / entity / f"snapshot_{date_str}.parquet"
    snapshot_path.parent.mkdir(parents=True, exist_ok=True)

    # Get key columns from config
    entity_config = config.get_entity_config(entity, df)
    key_cols = entity_config["key_columns"]

    # Save only key columns + hash (not full data)
    df.select(*key_cols, "_row_hash").write.mode("overwrite").parquet(str(snapshot_path))


def detect_changes(spark: SparkSession, current_df: DataFrame, entity: str, date_str: str) -> CDCResult:
    """
    Main CDC logic: Compare current data with previous snapshot.

    THIS IS THE CORE ALGORITHM:

    1. Generate hash for each row in current data
    2. Load previous snapshot (key + hash from yesterday)
    3. Find NEW records: keys that exist today but not yesterday
    4. Find UPDATED records: keys exist in both, but hash is different
    5. Find UNCHANGED records: keys exist in both, hash is same
    6. Save current snapshot for next run

    Args:
        spark: SparkSession
        current_df: DataFrame with today's data
        entity: Entity name
        date_str: Today's date

    Returns:
        CDCResult containing DataFrames for new, updated, and unchanged records
    """
    # Get column configuration
    entity_config = config.get_entity_config(entity, current_df)
    key_cols = entity_config["key_columns"]
    value_cols = entity_config["value_columns"]

    # STEP 1: Generate hash for current data
    current_with_hash = generate_hash(current_df, value_cols)

    # STEP 2: Load previous snapshot
    previous_df = get_previous_snapshot(spark, entity, date_str)

    # STEP 3: If no previous data, everything is NEW
    if previous_df is None:
        print(f"  No previous snapshot - all records are NEW")

        new_records = current_with_hash.withColumn("_cdc_flag", lit("I"))
        save_snapshot(current_with_hash, entity, date_str)

        return CDCResult(
            entity=entity,
            new_records=new_records,
            updated_records=new_records.limit(0),  # Empty DataFrame
            unchanged_records=current_with_hash.limit(0),
            total_new=new_records.count(),
            total_updated=0,
            total_unchanged=0
        )

    # STEP 4: Find NEW records
    # LEFT ANTI JOIN: Records in current but NOT in previous
    new_records = (
        current_with_hash.alias("curr")
        .join(previous_df.alias("prev"), on=key_cols, how="left_anti")
        .withColumn("_cdc_flag", lit("I"))  # I = Insert (new record)
    )

    # STEP 5: Find UPDATED records
    # INNER JOIN where hashes are DIFFERENT
    updated_records = (
        current_with_hash.alias("curr")
        .join(
            previous_df.select(*key_cols, "_row_hash").alias("prev"),
            on=key_cols,
            how="inner"
        )
        .where(col("curr._row_hash") != col("prev._row_hash"))
        .select("curr.*")
        .withColumn("_cdc_flag", lit("U"))  # U = Update
    )

    # STEP 6: Find UNCHANGED records
    # INNER JOIN where hashes are SAME
    unchanged_records = (
        current_with_hash.alias("curr")
        .join(
            previous_df.select(*key_cols, "_row_hash").alias("prev"),
            on=key_cols,
            how="inner"
        )
        .where(col("curr._row_hash") == col("prev._row_hash"))
        .select("curr.*")
    )

    # STEP 7: Save snapshot for next run
    save_snapshot(current_with_hash, entity, date_str)

    return CDCResult(
        entity=entity,
        new_records=new_records,
        updated_records=updated_records,
        unchanged_records=unchanged_records,
        total_new=new_records.count(),
        total_updated=updated_records.count(),
        total_unchanged=unchanged_records.count()
    )


def process_cdc(spark: SparkSession, date_str: str) -> dict:
    """
    Main entry point: Run CDC for all entities.

    Args:
        spark: SparkSession
        date_str: Date to process

    Returns:
        Dictionary mapping entity names to CDCResult objects
    """
    print(f"\n{'='*50}")
    print(f"CDC DETECTION - {date_str}")
    print(f"{'='*50}")

    results = {}

    # Find all entities in Bronze layer
    bronze_entities = [d.name for d in config.BRONZE_PATH.iterdir() if d.is_dir()]

    for entity in bronze_entities:
        bronze_path = config.BRONZE_PATH / entity / f"date={date_str}"

        if not bronze_path.exists():
            continue

        # Read Bronze data
        current_df = spark.read.parquet(str(bronze_path))

        # Run CDC detection
        cdc_result = detect_changes(spark, current_df, entity, date_str)

        print(f"  {entity}: NEW={cdc_result.total_new}, "
              f"UPDATED={cdc_result.total_updated}, "
              f"UNCHANGED={cdc_result.total_unchanged}")

        results[entity] = cdc_result

    return results

The CDC Algorithm Visualized

STEP 1: Hash Current Data
-------------------------
Current DataFrame:
+-------+------+------+-----------+
|cust_id| name | city | _row_hash |
+-------+------+------+-----------+
| C001  | John | NYC  | abc123    |
| C002  | Jane | LA   | def456    |
| C003  | Bob  | CHI  | ghi789    |
+-------+------+------+-----------+

STEP 2: Load Previous Snapshot
------------------------------
Previous Snapshot (from yesterday):
+-------+-----------+
|cust_id| _row_hash |
+-------+-----------+
| C001  | abc123    |
| C002  | zzz999    |  <-- Different hash! Jane's data changed
+-------+-----------+

STEP 3-5: Compare
-----------------
C001: Hash same (abc123 = abc123)     --> UNCHANGED
C002: Hash different (def456 != zzz999) --> UPDATED
C003: Not in previous                 --> NEW

RESULT:
new_records: [C003]
updated_records: [C002]
unchanged_records: [C001]

Line-by-Line Code Breakdown

Here are detailed explanations of the most important functions:

generate_hash() - Line by Line

def generate_hash(df: DataFrame, columns: list) -> DataFrame:

Line 1: Define function that takes a DataFrame and list of columns, returns DataFrame with hash added.

    cols_for_hash = [
        coalesce(col(c).cast("string"), lit(""))
        for c in columns
    ]

Lines 2-5: Create a list of column expressions for hashing. - col(c) - Reference the column by name - .cast("string") - Convert to string (hashing needs strings) - coalesce(..., lit("")) - If value is null, use empty string instead - This is a list comprehension - runs for each column in columns

    return df.withColumn(
        "_row_hash",
        md5(concat_ws("||", *cols_for_hash))
    )

Lines 6-9: Add the hash column to the DataFrame. - withColumn("_row_hash", ...) - Add new column named "_row_hash" - concat_ws("||", ...) - Join all column values with "||" separator - Example: ["John", "NYC"] becomes "John||NYC" - md5(...) - Calculate MD5 hash of the concatenated string - *cols_for_hash - Unpack the list (Python syntax)


detect_changes() - Finding NEW Records

    new_records = (
        current_with_hash.alias("curr")
        .join(previous_df.alias("prev"), on=key_cols, how="left_anti")
        .withColumn("_cdc_flag", lit("I"))
    )

Breaking this down:

  1. current_with_hash.alias("curr")
  2. Give the current DataFrame an alias "curr" for reference

  3. .join(..., how="left_anti")

  4. LEFT ANTI JOIN returns rows from LEFT table that have NO match in RIGHT table
  5. Think of it as: "Give me rows in current that are NOT in previous"

  6. on=key_cols

  7. Join on the key columns (e.g., customer_id)

  8. .withColumn("_cdc_flag", lit("I"))

  9. Add a flag column with value "I" (Insert = new record)

Visual:

CURRENT                  PREVIOUS             RESULT (left_anti)
+------+                 +------+             +------+
| C001 |   -- matches -- | C001 |             |      |  (excluded)
| C002 |   -- matches -- | C002 |             |      |  (excluded)
| C003 |   -- NO match   |      |     -->     | C003 |  (included!)
+------+                 +------+             +------+

detect_changes() - Finding UPDATED Records

    updated_records = (
        current_with_hash.alias("curr")
        .join(
            previous_df.select(*key_cols, "_row_hash").alias("prev"),
            on=key_cols,
            how="inner"
        )
        .where(col("curr._row_hash") != col("prev._row_hash"))
        .select("curr.*")
        .withColumn("_cdc_flag", lit("U"))
    )

Breaking this down:

  1. previous_df.select(*key_cols, "_row_hash")
  2. From previous data, only select key columns and hash

  3. .join(..., how="inner")

  4. INNER JOIN returns rows that exist in BOTH tables

  5. .where(col("curr._row_hash") != col("prev._row_hash"))

  6. Filter to rows where hashes are DIFFERENT
  7. curr._row_hash refers to hash from current data
  8. prev._row_hash refers to hash from previous data

  9. .select("curr.*")

  10. Select all columns from current data (not previous)

  11. .withColumn("_cdc_flag", lit("U"))

  12. Add flag "U" (Update)

Visual:

CURRENT                    PREVIOUS                RESULT
+------+--------+          +------+--------+       +------+
| C001 | abc123 |  inner   | C001 | abc123 |       |      | hash same
| C002 | def456 |  join    | C002 | zzz999 | -->   | C002 | hash different!
+------+--------+          +------+--------+       +------+

apply_replacements() - Line by Line

def apply_replacements(df: DataFrame, column: str, mappings: dict) -> DataFrame:
    if column not in df.columns:
        return df

Lines 1-3: If the column doesn't exist, return DataFrame unchanged.

    items = list(mappings.items())
    case_expr = when(col(column) == items[0][0], lit(items[0][1]))

Lines 4-5: - Convert mappings dict to list of (key, value) pairs - Start building CASE expression with first mapping - items[0][0] is first key ("A"), items[0][1] is first value ("Active")

    for old_val, new_val in items[1:]:
        case_expr = case_expr.when(col(column) == old_val, lit(new_val))

Lines 6-7: Add remaining mappings to CASE expression. - items[1:] means all items except the first - Each .when() adds another condition

    return df.withColumn(column, case_expr.otherwise(col(column)))

Line 8: Apply the CASE expression. - .otherwise(col(column)) means: if no match, keep original value - withColumn(column, ...) replaces the existing column

The final SQL equivalent:

CASE 
    WHEN status = 'A' THEN 'Active'
    WHEN status = 'I' THEN 'Inactive'
    WHEN status = 'D' THEN 'Deleted'
    ELSE status
END

Step 5: Silver Layer (src/silver.py)

Why We Need This File

Silver layer transforms the data - applying business rules and cleaning.

What This File Does

  1. Takes only NEW + UPDATED records from CDC
  2. Applies value replacements (e.g., "A" → "Active")
  3. Saves transformed data

Create src/silver.py

"""
Silver Layer - Transform data.

This runs AFTER CDC.
It only processes NEW and UPDATED records (not unchanged).
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, lit
from pathlib import Path

import config


# Value replacement rules
# These map coded values to readable values
REPLACEMENT_RULES = {
    "status": {
        "A": "Active",
        "I": "Inactive",
        "D": "Deleted",
    },
}


def apply_replacements(df: DataFrame, column: str, mappings: dict) -> DataFrame:
    """
    Replace coded values with readable values.

    WHY: Source systems often use short codes (A, I, D).
    Users prefer readable values (Active, Inactive, Deleted).

    HOW IT WORKS:
    Builds a CASE WHEN expression:
    CASE WHEN column = 'A' THEN 'Active'
         WHEN column = 'I' THEN 'Inactive'
         ELSE column  -- Keep original if no match
    END

    Args:
        df: DataFrame to transform
        column: Column name to replace values in
        mappings: Dictionary of old_value -> new_value

    Returns:
        DataFrame with replaced values
    """
    # Skip if column doesn't exist
    if column not in df.columns:
        return df

    # Build the CASE WHEN expression
    items = list(mappings.items())

    # Start with first item
    case_expr = when(col(column) == items[0][0], lit(items[0][1]))

    # Add remaining items
    for old_val, new_val in items[1:]:
        case_expr = case_expr.when(col(column) == old_val, lit(new_val))

    # Apply transformation
    # .otherwise() keeps original value if no match
    return df.withColumn(column, case_expr.otherwise(col(column)))


def transform_entity(df: DataFrame, entity: str) -> DataFrame:
    """
    Apply all transformations for an entity.

    Currently applies:
    1. Value replacements from REPLACEMENT_RULES

    You can add more transformations here!

    Args:
        df: DataFrame to transform
        entity: Entity name (not used yet, but available for entity-specific logic)

    Returns:
        Transformed DataFrame
    """
    result = df

    # Apply all replacement rules
    for column, mappings in REPLACEMENT_RULES.items():
        result = apply_replacements(result, column, mappings)

    return result


def process_silver(spark: SparkSession, cdc_results: dict, date_str: str) -> dict:
    """
    Main entry point: Process Silver layer.

    IMPORTANT: This only processes NEW + UPDATED records!
    Unchanged records are skipped (they're already in Silver from before).

    Args:
        spark: SparkSession
        cdc_results: Dictionary of entity -> CDCResult from CDC step
        date_str: Date being processed

    Returns:
        Dictionary mapping entity names to record counts
    """
    print(f"\n{'='*50}")
    print(f"SILVER LAYER - {date_str}")
    print(f"{'='*50}")

    results = {}

    for entity, cdc_result in cdc_results.items():
        # Combine NEW and UPDATED records
        # These are the only ones we need to process
        changes_df = cdc_result.new_records.union(cdc_result.updated_records)

        count = changes_df.count()
        if count == 0:
            print(f"  {entity}: No changes to process")
            results[entity] = 0
            continue

        # Apply transformations
        transformed = transform_entity(changes_df, entity)

        # Write to Silver
        output_path = config.SILVER_PATH / entity / f"date={date_str}"
        transformed.write.mode("overwrite").parquet(str(output_path))

        print(f"  {entity}: {count} records transformed")
        results[entity] = count

    return results

Step 6: Gold Layer (src/gold.py)

Why We Need This File

Gold layer creates the final output - clean CSV files for downstream systems.

What This File Does

  1. Takes NEW + UPDATED records
  2. Removes internal columns (prefixed with _)
  3. Exports as single CSV file with clean name

Create src/gold.py

"""
Gold Layer - Export final CSV files.

This is the LAST step in the pipeline.
It creates clean CSV files for downstream systems.
"""
from pyspark.sql import DataFrame
from pathlib import Path
import shutil

import config


def export_to_csv(df: DataFrame, entity: str, date_str: str) -> Path:
    """
    Export DataFrame to a single clean CSV file.

    WHY: Spark typically creates multiple part files.
    Downstream systems often expect a single file.
    This function creates one clean file.

    WHAT THIS DOES:
    1. Removes internal columns (starting with _)
    2. Writes to temp directory (Spark creates part- files)
    3. Renames the part file to our clean filename
    4. Cleans up temp files

    Output filename: new_customer_20260113.csv

    Args:
        df: DataFrame to export
        entity: Entity name
        date_str: Date being processed

    Returns:
        Path to the created CSV file
    """
    output_dir = config.GOLD_PATH / entity
    output_dir.mkdir(parents=True, exist_ok=True)

    # Create clean filename
    final_file = output_dir / f"new_{entity}_{date_str}.csv"

    # Remove internal columns (prefixed with _)
    columns_to_keep = [c for c in df.columns if not c.startswith("_")]
    df_clean = df.select(*columns_to_keep)

    # Write to temp directory
    # coalesce(1) forces everything into one file
    temp_dir = output_dir / f"_temp_{entity}_{date_str}"
    (
        df_clean
        .coalesce(1)
        .write
        .mode("overwrite")
        .option("header", "true")
        .csv(str(temp_dir))
    )

    # Find the part file and rename it
    for file in temp_dir.glob("part-*.csv"):
        if final_file.exists():
            final_file.unlink()  # Delete existing file
        shutil.move(str(file), str(final_file))
        break

    # Clean up temp directory
    shutil.rmtree(str(temp_dir), ignore_errors=True)

    return final_file


def process_gold(spark, cdc_results: dict, date_str: str) -> dict:
    """
    Main entry point: Export changed records to CSV.

    Args:
        spark: SparkSession (not used but kept for consistency)
        cdc_results: Dictionary of entity -> CDCResult
        date_str: Date being processed

    Returns:
        Dictionary mapping entity names to record counts
    """
    print(f"\n{'='*50}")
    print(f"GOLD LAYER - {date_str}")
    print(f"{'='*50}")

    results = {}

    for entity, cdc_result in cdc_results.items():
        # Combine NEW and UPDATED records
        changes_df = cdc_result.new_records.union(cdc_result.updated_records)
        count = changes_df.count()

        if count == 0:
            print(f"  {entity}: No changes to export")
            results[entity] = 0
            continue

        # Export to CSV
        output_path = export_to_csv(changes_df, entity, date_str)

        print(f"  {entity}: {count} records --> {output_path.name}")
        results[entity] = count

    return results

Step 7: Main Entry Point (main.py)

Why We Need This File

This is the command-line interface. It ties all the steps together.

Create main.py

#!/usr/bin/env python3
"""
CDC Data Pipeline - Main Entry Point

Run with: python main.py --date 20260113
"""
import click

import config
from src.bronze import get_spark, process_bronze
from src.data_quality import run_quality_checks
from src.cdc import process_cdc
from src.silver import process_silver
from src.gold import process_gold


def run_pipeline(date_str: str) -> None:
    """
    Run the complete pipeline for one date.

    STEPS:
    1. Bronze: Ingest source files
    2. Data Quality: Validate data
    3. CDC: Detect changes
    4. Silver: Transform data
    5. Gold: Export to CSV
    """
    print(f"\n{'='*60}")
    print(f"CDC DATA PIPELINE - {date_str}")
    print(f"{'='*60}")

    # Create Spark session
    spark = get_spark()
    spark.sparkContext.setLogLevel("WARN")  # Reduce log noise

    try:
        # Step 1: Bronze (Ingestion)
        bronze_results = process_bronze(spark, date_str)

        # Step 2: Data Quality
        quality_results = run_quality_checks(spark, date_str)

        # Step 3: CDC Detection
        cdc_results = process_cdc(spark, date_str)

        # Step 4: Silver (Transformations)
        silver_results = process_silver(spark, cdc_results, date_str)

        # Step 5: Gold (Export)
        gold_results = process_gold(spark, cdc_results, date_str)

        # Print completion message
        print(f"\n{'='*60}")
        print(f"COMPLETED: {date_str}")
        print(f"{'='*60}")

    finally:
        # Always stop Spark when done
        spark.stop()


@click.command()
@click.option('--date', type=str, required=True, help='Date to process (YYYYMMDD)')
def main(date: str):
    """Run the CDC pipeline for a specific date."""
    run_pipeline(date)


if __name__ == "__main__":
    main()

Testing Your Implementation

1. Create Sample Data

mkdir -p data/source/2026/01/13

cat > data/source/2026/01/13/Customer.txt << 'EOF'
customer_id|first_name|last_name|email|phone|status
C001|John|Doe|[email protected]|1234567890|A
C002|Jane|Smith|[email protected]|2345678901|A
EOF

2. Run Day 1

python main.py --date 20260113

Expected: All records are NEW (no previous snapshot).

3. Create Day 2 Data (with changes)

mkdir -p data/source/2026/01/14

cat > data/source/2026/01/14/Customer.txt << 'EOF'
customer_id|first_name|last_name|email|phone|status
C001|John|Doe|[email protected]|1234567890|A
C002|Jane|Johnson|[email protected]|2345678901|A
C003|Bob|Wilson|[email protected]|3456789012|A
EOF

4. Run Day 2

python main.py --date 20260114

Expected: - C001: UNCHANGED (same data) - C002: UPDATED (last_name and email changed) - C003: NEW (not in Day 1)


Summary

You have built a complete CDC pipeline:

Step File What It Does
1 config.py Configuration and helpers
2 bronze.py Ingest source files to Parquet
3 data_quality.py Validate data quality
4 cdc.py Detect NEW and UPDATED records
5 silver.py Apply transformations
6 gold.py Export to CSV
7 main.py Orchestrate everything

The key insight: CDC uses hash comparison to efficiently detect changes without comparing every field of every record.


Line-by-Line: Data Quality Functions

check_not_null() - How It Works

def check_not_null(df: DataFrame, columns: List[str]) -> Dict:
    results = {}
    for col_name in columns:
        null_count = df.filter(col(col_name).isNull()).count()
        results[col_name] = (null_count == 0)
    return results

Step by step:

  1. results = {}
  2. Create empty dictionary to store results

  3. for col_name in columns:

  4. Loop through each column name we want to check

  5. df.filter(col(col_name).isNull())

  6. Filter the DataFrame to only rows where this column IS NULL
  7. col(col_name) references the column
  8. .isNull() checks if value is null

  9. .count()

  10. Count how many rows have null values

  11. results[col_name] = (null_count == 0)

  12. Store True if no nulls found, False if any nulls exist

Example:

DataFrame:
+------+-------+
|  id  | name  |
+------+-------+
| C001 | John  |
| C002 | null  |  <-- This is null!
| C003 | Bob   |
+------+-------+

check_not_null(df, ["id", "name"])

Result: {"id": True, "name": False}

check_unique() - How It Works

def check_unique(df: DataFrame, columns: List[str]) -> Dict:
    total = df.count()
    distinct = df.select(*columns).distinct().count()
    return {"unique": total == distinct}

Step by step:

  1. total = df.count()
  2. Count total number of rows

  3. df.select(*columns)

  4. Select only the columns we're checking
  5. *columns unpacks the list: ["id"] becomes just "id"

  6. .distinct().count()

  7. Get unique rows and count them

  8. return {"unique": total == distinct}

  9. If total equals distinct, all values are unique

Example:

DataFrame:
+------+
|  id  |
+------+
| C001 |
| C002 |
| C001 |  <-- Duplicate!
+------+

total = 3
distinct = 2

Result: {"unique": False}  (3 != 2)

validate_customer() - How It Works

def validate_customer(df: DataFrame) -> QualityResult:
    passed = []
    failed = []

    # Check 1: Not null
    null_checks = check_not_null(df, ["customer_id", "first_name"])
    for check, result in null_checks.items():
        (passed if result else failed).append(f"not_null:{check}")

Breaking down the pattern:

  1. passed = [] and failed = []
  2. Two lists to collect results

  3. null_checks = check_not_null(df, [...])

  4. Call our reusable check function
  5. Returns: {"customer_id": True, "first_name": True}

  6. for check, result in null_checks.items():

  7. Loop through each result
  8. check = column name, result = True/False

  9. (passed if result else failed).append(...)

  10. This is a Python trick!
  11. If result is True: passed.append(...)
  12. If result is False: failed.append(...)

Visual flow:

null_checks = {"customer_id": True, "first_name": False}

Loop iteration 1:
  check = "customer_id", result = True
  passed.append("not_null:customer_id")

Loop iteration 2:
  check = "first_name", result = False
  failed.append("not_null:first_name")

Final:
  passed = ["not_null:customer_id"]
  failed = ["not_null:first_name"]

How to Extend the Pipeline

This section shows you how to add new functionality.


Extension 1: Add a New Data Quality Check

Scenario: You want to check that a phone number contains only digits.

Step 1: Create the check function in src/data_quality.py

def check_phone_format(df: DataFrame, column: str) -> Dict:
    """
    Check that phone numbers contain only digits.

    Valid: 1234567890
    Invalid: 123-456-7890, (123) 456-7890
    """
    # Regex pattern: start (^) + one or more digits ([0-9]+) + end ($)
    pattern = r"^[0-9]+$"

    # Count invalid rows
    invalid = df.filter(
        col(column).isNotNull() & ~col(column).rlike(pattern)
    ).count()

    # Return True if no invalid rows found
    return {f"{column}_format": invalid == 0}

Step 2: Use it in a validator

def validate_customer(df: DataFrame) -> QualityResult:
    passed = []
    failed = []

    # ... existing checks ...

    # Add your new check
    phone_check = check_phone_format(df, "phone")
    (passed if phone_check["phone_format"] else failed).append("format:phone")

    # ... rest of function ...

Step 3: Run and verify

python main.py --date 20260113
# Look for "format:phone" in the output

Extension 2: Add a New Transformation

Scenario: You want to create a full_name column combining first and last name.

Step 1: Create the function in src/silver.py

from pyspark.sql.functions import concat, lit

def add_full_name(df: DataFrame) -> DataFrame:
    """
    Create full_name column: "John" + " " + "Doe" = "John Doe"
    """
    # Check if columns exist
    if "first_name" not in df.columns or "last_name" not in df.columns:
        return df

    # concat() joins values, lit(" ") adds a space
    return df.withColumn(
        "full_name",
        concat(col("first_name"), lit(" "), col("last_name"))
    )

Step 2: Call it in transform_entity()

def transform_entity(df: DataFrame, entity: str) -> DataFrame:
    result = df

    # Existing transformations
    for column, mappings in REPLACEMENT_RULES.items():
        result = apply_replacements(result, column, mappings)

    # NEW: Add full_name for customer
    if entity == "customer":
        result = add_full_name(result)

    return result

Step 3: Verify by viewing Silver data

python main.py --date 20260113
python view_data.py silver customer --date 20260113
# You should see the new "full_name" column

Extension 3: Add a New Value Replacement

Scenario: You want to convert country codes to full names.

Step 1: Add to REPLACEMENT_RULES in config.py

REPLACEMENT_RULES = {
    "status": {
        "A": "Active",
        "I": "Inactive",
        "D": "Deleted",
    },
    # NEW: Add country code mappings
    "country_code": {
        "US": "United States",
        "UK": "United Kingdom",
        "CA": "Canada",
        "AU": "Australia",
    },
}

That's it! The Silver layer automatically applies all rules from REPLACEMENT_RULES.


Extension 4: Add a New Entity Validator

Scenario: You added a new entity "Product" and want custom validation.

Step 1: Create validator function in src/data_quality.py

def validate_product(df: DataFrame) -> QualityResult:
    """Custom validation for Product entity."""
    passed = []
    failed = []

    # Check 1: product_id not null
    null_checks = check_not_null(df, ["product_id", "name"])
    for check, result in null_checks.items():
        (passed if result else failed).append(f"not_null:{check}")

    # Check 2: product_id unique
    unique_check = check_unique(df, ["product_id"])
    (passed if unique_check["unique"] else failed).append("unique:product_id")

    # Check 3: price must be positive
    invalid_price = df.filter(col("price").cast("double") < 0).count()
    (passed if invalid_price == 0 else failed).append("check:positive_price")

    return QualityResult(
        entity="product",
        total_records=df.count(),
        passed_checks=passed,
        failed_checks=failed,
        overall_passed=len(failed) == 0
    )

Step 2: Register it in run_quality_checks()

def run_quality_checks(spark, date_str: str) -> Dict[str, QualityResult]:
    # ...

    validators = {
        "customer": validate_customer,
        "account": validate_account,
        "product": validate_product,  # NEW: Add your validator
    }

    # ...

Extension 5: Add a New Entity Configuration

Scenario: You added "Transaction" data and want specific CDC columns.

Step 1: Add to ENTITIES in config.py

ENTITIES = {
    "customer": {
        "key_columns": ["customer_id"],
        "value_columns": ["first_name", "last_name", "email", "phone", "status"],
    },
    "account": {
        "key_columns": ["account_id"],
        "value_columns": ["account_type", "balance", "currency", "status"],
    },
    # NEW: Add transaction config
    "transaction": {
        "key_columns": ["transaction_id"],
        "value_columns": ["customer_id", "amount", "transaction_type", "timestamp"],
    },
}

Why do this? Without explicit config, CDC auto-detects columns. Adding explicit config gives you control over exactly which columns to use for key matching and change detection.


Extension Checklist

Want to... Edit This File Add This
New DQ check function data_quality.py def check_xxx(df, ...)
New entity validator data_quality.py def validate_xxx(df) + register in validators dict
New transformation silver.py def transform_xxx(df) + call in transform_entity()
New value mapping config.py Add to REPLACEMENT_RULES dict
New entity config config.py Add to ENTITIES dict
New column in output silver.py Add withColumn() in transformation

Quick Reference for Developers

Common PySpark Operations

What You Want PySpark Code
Check if column is null col("name").isNull()
Check if column is NOT null col("name").isNotNull()
Filter rows df.filter(condition)
Add a new column df.withColumn("new_col", expression)
Select specific columns df.select("col1", "col2")
Count rows df.count()
Get distinct values df.select("col").distinct()
Regex match col("email").rlike(pattern)
Convert type col("price").cast("double")
Concatenate strings concat(col("a"), lit(" "), col("b"))
CASE WHEN when(condition, value).otherwise(default)

File Edit Quick Reference

File What It Controls
config.py Paths, entity configs, value mappings
bronze.py How files are read