CDC Data Pipeline - Developer Guide
How to Build This System from Scratch
This guide walks you through building the CDC Data Pipeline step by step. Follow the steps in order - each builds on the previous one.
Table of Contents
- Understanding CDC
- Project Setup
- Step 1: Configuration
- Step 2: Bronze Layer
- Step 3: Data Quality
- Step 4: CDC Detection
- Step 5: Silver Layer
- Step 6: Gold Layer
- Step 7: Main Entry Point
- Testing Your Implementation
1. Understanding CDC
What is CDC?
CDC (Change Data Capture) is a technique to identify data that has changed between two points in time.
The Problem CDC Solves
Imagine you have a customer database with 1 million records. Every day, you receive a full export of all customers. But only 100 customers actually changed.
WITHOUT CDC:
-----------
Day 1: Process 1,000,000 records --> 1,000,000 processed
Day 2: Process 1,000,000 records --> 1,000,000 processed (even if only 100 changed!)
Day 3: Process 1,000,000 records --> 1,000,000 processed
WITH CDC:
---------
Day 1: Process 1,000,000 records --> 1,000,000 NEW records
Day 2: Process only 100 changes --> 100 processed (much faster!)
Day 3: Process only 50 changes --> 50 processed
How Hash-Based CDC Works
We use MD5 hashing to detect changes. Here's the concept:
Step 1: Generate a "fingerprint" (hash) for each record
A hash is like a fingerprint for data. If any value changes, the fingerprint changes.
Record: customer_id=C001, name="John", city="NYC"
We take the VALUE columns (name, city) and create a hash:
Hash = MD5("John" + "|" + "NYC") = "abc123..."
Important: We do NOT include the KEY (customer_id) in the hash.
The key is used to MATCH records. The hash is used to DETECT changes.
Step 2: Save the hash as a "snapshot"
After processing Day 1, we save:
{ customer_id: "C001", hash: "abc123..." }
Step 3: Next day, compare hashes
Today's data: C001, hash = "xyz789..." (name changed to "Johnny")
Yesterday's data: C001, hash = "abc123..."
The hashes are different! --> This record is UPDATED
Step 4: Categorize each record
NEW: Key exists TODAY but NOT in YESTERDAY
UPDATED: Key exists in BOTH but hash is DIFFERENT
UNCHANGED: Key exists in BOTH and hash is SAME
Visual Example
YESTERDAY'S SNAPSHOT TODAY'S DATA RESULT
-------------------- ------------ ------
C001 | abc123 C001 | abc123 UNCHANGED (same hash)
C002 | def456 C002 | xxx999 UPDATED (hash changed!)
C003 | ghi789 C003 | ghi789 UNCHANGED
C004 | jkl012 NEW (not in yesterday)
2. Project Setup
Create the Folder Structure
mkdir -p de-cdc/src
mkdir -p de-cdc/data/source
mkdir -p de-cdc/orchestration
cd de-cdc
Create requirements.txt
cat > requirements.txt << 'EOF'
pyspark>=3.5.0
click>=8.0.0
EOF
Install Dependencies
pip install -r requirements.txt
Files We Will Create (in order)
| Order | File | Purpose | Why This Order |
|---|---|---|---|
| 1 | config.py |
Configuration and helper functions | All other files import from here |
| 2 | src/bronze.py |
Read source files, write Parquet | First step in pipeline |
| 3 | src/data_quality.py |
Validate data | Runs after Bronze, before CDC |
| 4 | src/cdc.py |
Detect changes (the core logic) | Needs Bronze data |
| 5 | src/silver.py |
Transform data | Needs CDC results |
| 6 | src/gold.py |
Export to CSV | Final step |
| 7 | main.py |
Entry point to run everything | Ties all steps together |
Step 1: Configuration (config.py)
Why We Need This File
Configuration should be in ONE place. If you need to change a path or add a new entity, you only change one file.
What This File Does
- Defines all folder paths (where source files are, where to write output)
- Defines entity configurations (which columns are keys, which are values)
- Provides helper functions for working with dates and entities
Create config.py
"""
Configuration for CDC Data Pipeline.
This file is the SINGLE SOURCE OF TRUTH for all settings.
Other files import from here.
"""
from pathlib import Path
# ============================================================
# PATHS - Where data lives
# ============================================================
# Get the project root directory (where this file is located)
PROJECT_ROOT = Path(__file__).parent
# All data goes under the "data" folder
DATA_DIR = PROJECT_ROOT / "data"
# Define paths for each layer
SOURCE_PATH = DATA_DIR / "source" # Input: raw text files
BRONZE_PATH = DATA_DIR / "bronze" # Output: raw Parquet files
SILVER_PATH = DATA_DIR / "silver" # Output: transformed Parquet
GOLD_PATH = DATA_DIR / "gold" # Output: final CSV files
CDC_SNAPSHOTS_PATH = DATA_DIR / "cdc_snapshots" # Hashes for CDC comparison
# Create directories if they don't exist
# This runs when the file is imported
for path in [SOURCE_PATH, BRONZE_PATH, SILVER_PATH, GOLD_PATH, CDC_SNAPSHOTS_PATH]:
path.mkdir(parents=True, exist_ok=True)
# ============================================================
# ENTITY CONFIGURATION
# ============================================================
# Each entity needs to know:
# - key_columns: Used to MATCH records across days (like a primary key)
# - value_columns: Used to DETECT changes (we hash these)
ENTITIES = {
"customer": {
"key_columns": ["customer_id"],
"value_columns": ["first_name", "last_name", "email", "phone", "status"],
},
"account": {
"key_columns": ["account_id"],
"value_columns": ["account_type", "balance", "currency", "status"],
},
}
# ============================================================
# HELPER FUNCTIONS
# ============================================================
def get_source_path_for_date(date_str: str) -> Path:
"""
Convert a date string to the source folder path.
WHY: Source files are organized by year/month/day.
This function builds that path from a date string.
Example:
Input: "20260113"
Output: source/2026/01/13/
Args:
date_str: Date in YYYYMMDD format (e.g., "20260113")
Returns:
Path object pointing to the source folder for that date
"""
year = date_str[:4] # First 4 characters: "2026"
month = date_str[4:6] # Next 2 characters: "01"
day = date_str[6:8] # Last 2 characters: "13"
return SOURCE_PATH / year / month / day
def discover_entities(date_str: str) -> list:
"""
Find all .txt files in the source folder for a given date.
WHY: We want the pipeline to automatically discover new entities.
If someone adds a new file like "Product.txt", it should be processed
without changing any code.
Example:
If source/2026/01/13/ contains:
- Customer.txt
- Account.txt
- Product.txt
This function returns: ["account", "customer", "product"]
Args:
date_str: Date in YYYYMMDD format
Returns:
List of entity names (lowercase), sorted alphabetically
"""
source_dir = get_source_path_for_date(date_str)
# If the folder doesn't exist, return empty list
if not source_dir.exists():
return []
entities = []
# Find all .txt files in the folder
for txt_file in source_dir.glob("*.txt"):
# Get the filename without extension, in lowercase
# Customer.txt --> customer
entity_name = txt_file.stem.lower()
entities.append(entity_name)
return sorted(entities)
def get_entity_config(entity: str, sample_df=None) -> dict:
"""
Get configuration for an entity (key columns and value columns).
WHY: Different entities have different primary keys.
For known entities (customer, account), we have predefined configs.
For NEW entities (discovered automatically), we auto-detect the columns.
Auto-detection logic:
1. Look for columns ending with "_id" --> use as key
2. If no "_id" columns, use the first column as key
3. All other columns are value columns
Args:
entity: Entity name (e.g., "customer", "product")
sample_df: Optional DataFrame to detect columns from (for new entities)
Returns:
Dictionary with "key_columns" and "value_columns" lists
"""
# If entity is already defined, return its config
if entity in ENTITIES:
return ENTITIES[entity]
# For new entities, auto-detect from the DataFrame
if sample_df is not None:
# Get column names, excluding internal columns (starting with "_")
columns = [c for c in sample_df.columns if not c.startswith("_")]
# Strategy 1: Find columns ending with "_id"
key_cols = [c for c in columns if c.endswith("_id")]
# Strategy 2: If no "_id" columns, use first column
if not key_cols:
key_cols = [columns[0]]
print(f" [Auto-detect] {entity}: Using '{columns[0]}' as key")
# Everything else is a value column
value_cols = [c for c in columns if c not in key_cols]
return {"key_columns": key_cols, "value_columns": value_cols}
# Fallback: assume entity_id is the key
return {"key_columns": [f"{entity}_id"], "value_columns": []}
Function Summary
| Function | What It Does | When It's Called |
|---|---|---|
get_source_path_for_date("20260113") |
Converts date to folder path | Bronze layer |
discover_entities("20260113") |
Finds all .txt files | Bronze layer |
get_entity_config("customer", df) |
Gets key/value columns | CDC layer |
Step 2: Bronze Layer (src/bronze.py)
Why We Need This File
Bronze is the ENTRY POINT of the pipeline. It reads raw source files and stores them in a format that's easy to process (Parquet).
What This File Does
- Creates a Spark session (the engine that processes data)
- Reads pipe-delimited text files
- Adds metadata (when was it ingested, what date is it for)
- Saves as Parquet format (efficient columnar storage)
Create src/bronze.py
"""
Bronze Layer - Ingest raw source files.
This is the FIRST step in the pipeline.
It reads raw text files and converts them to Parquet format.
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import current_timestamp, lit
from pathlib import Path
# Import our configuration
import config
def get_spark() -> SparkSession:
"""
Create and return a SparkSession.
WHY: SparkSession is the entry point to Spark. You need it to read
files, create DataFrames, and run SQL queries.
WHAT THIS DOES:
- Creates a Spark application named "CDC-Pipeline"
- Runs locally using all available CPU cores (local[*])
Returns:
SparkSession object
"""
return (
SparkSession.builder
.appName("CDC-Pipeline") # Name shown in Spark UI
.master("local[*]") # Run locally with all cores
.getOrCreate() # Create new or get existing session
)
def ingest_file(spark: SparkSession, entity: str, date_str: str) -> DataFrame:
"""
Read a single pipe-delimited source file into a DataFrame.
WHY: Source systems typically export data as text files.
We need to read these files and add tracking information.
WHAT THIS DOES:
1. Builds the file path (e.g., source/2026/01/13/Customer.txt)
2. Reads the pipe-delimited file using Spark
3. Adds metadata columns for tracking
4. Returns the DataFrame
Args:
spark: SparkSession to use for reading
entity: Entity name (e.g., "customer")
date_str: Date in YYYYMMDD format
Returns:
DataFrame containing the file data plus metadata columns
Raises:
FileNotFoundError: If the source file doesn't exist
"""
# Build the full file path
# Example: source/2026/01/13/Customer.txt
source_dir = config.get_source_path_for_date(date_str)
file_path = source_dir / f"{entity.capitalize()}.txt"
print(f" Reading: {file_path}")
# Check if file exists
if not file_path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Read the pipe-delimited file
# Options explained:
# - header=true: First row contains column names
# - delimiter=|: Columns are separated by pipe character
# - inferSchema=false: Keep all columns as strings (safer)
df = (
spark.read
.option("header", "true")
.option("delimiter", "|")
.option("inferSchema", "false")
.csv(str(file_path))
)
# Add metadata columns (prefixed with _ to identify as internal)
# These help us track when and where the data came from
df = (
df
.withColumn("_ingestion_timestamp", current_timestamp()) # When we read it
.withColumn("_processing_date", lit(date_str)) # Which date's data
.withColumn("_source_entity", lit(entity)) # Which entity
)
return df
def write_bronze(df: DataFrame, entity: str, date_str: str) -> Path:
"""
Write a DataFrame to the Bronze layer as Parquet.
WHY: Parquet is a columnar format that's much faster to read than text.
It also supports compression and schema information.
WHAT THIS DOES:
1. Builds the output path (e.g., bronze/customer/date=20260113/)
2. Writes the DataFrame as Parquet files
3. Returns the output path
Args:
df: DataFrame to write
entity: Entity name (e.g., "customer")
date_str: Date in YYYYMMDD format
Returns:
Path where the data was written
"""
# Build output path with date partition
# Example: bronze/customer/date=20260113/
output_path = config.BRONZE_PATH / entity / f"date={date_str}"
# Write as Parquet
# mode="overwrite" means: replace if already exists
df.write.mode("overwrite").parquet(str(output_path))
# Print confirmation
record_count = df.count()
print(f" Wrote {record_count} records to {output_path}")
return output_path
def process_bronze(spark: SparkSession, date_str: str) -> dict:
"""
Main entry point: Process all entities for the Bronze layer.
WHY: This is the function that gets called from main.py.
It orchestrates the entire Bronze layer processing.
WHAT THIS DOES:
1. Discovers all .txt files for this date (using config.discover_entities)
2. For each file: reads it and writes to Bronze
3. Returns a dictionary with record counts
Args:
spark: SparkSession to use
date_str: Date in YYYYMMDD format
Returns:
Dictionary mapping entity names to record counts
Example: {"customer": 5, "account": 6}
"""
print(f"\n{'='*50}")
print(f"BRONZE LAYER - {date_str}")
print(f"{'='*50}")
# Step 1: Discover all entities (find all .txt files)
entities = config.discover_entities(date_str)
print(f" Discovered: {entities}")
# Step 2: Process each entity
results = {}
for entity in entities:
# Read the file
df = ingest_file(spark, entity, date_str)
# Write to Bronze
write_bronze(df, entity, date_str)
# Track the count
results[entity] = df.count()
return results
Function Summary
| Function | Purpose | Called By |
|---|---|---|
get_spark() |
Creates Spark session | main.py |
ingest_file(spark, entity, date) |
Reads one .txt file | process_bronze() |
write_bronze(df, entity, date) |
Writes to Parquet | process_bronze() |
process_bronze(spark, date) |
Orchestrates Bronze layer | main.py |
Step 3: Data Quality (src/data_quality.py)
Why We Need This File
Before processing data, we should check if it's valid. Bad data can cause problems downstream.
What This File Does
- Defines reusable check functions (null check, unique check, etc.)
- Defines validators for each entity type
- Runs all checks and returns results
Create src/data_quality.py
"""
Data Quality - Validate data before processing.
This runs AFTER Bronze and BEFORE CDC.
It checks for common data problems.
"""
from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from dataclasses import dataclass
from typing import List, Dict
import json
# Import our configuration
import config
@dataclass
class QualityResult:
"""
Container for quality check results.
WHY: Using a dataclass makes it easy to pass results around
and access them with clear attribute names.
Attributes:
entity: Name of the entity that was checked
total_records: How many records were in the data
passed_checks: List of check names that passed
failed_checks: List of check names that failed
overall_passed: True if ALL checks passed
"""
entity: str
total_records: int
passed_checks: List[str]
failed_checks: List[str]
overall_passed: bool
# ============================================================
# REUSABLE CHECK FUNCTIONS
# ============================================================
# These functions can be used by any validator.
# They each check ONE thing and return the result.
def check_not_null(df: DataFrame, columns: List[str]) -> Dict:
"""
Check that specified columns have no null values.
WHY: Null values in required fields can break downstream processing.
For example, a customer without an ID cannot be matched.
WHAT THIS DOES:
For each column, count how many rows have null values.
If count is 0, the check passes.
Args:
df: DataFrame to check
columns: List of column names to check
Returns:
Dictionary mapping column names to True (passed) or False (failed)
Example: {"customer_id": True, "email": False}
"""
results = {}
for col_name in columns:
# Count rows where this column is null
null_count = df.filter(col(col_name).isNull()).count()
# Pass if no nulls found
results[col_name] = (null_count == 0)
return results
def check_unique(df: DataFrame, columns: List[str]) -> Dict:
"""
Check that there are no duplicate values in the specified columns.
WHY: Primary keys should be unique. Duplicates can cause
incorrect joins and data corruption.
WHAT THIS DOES:
Compare total row count with distinct row count.
If they're equal, all values are unique.
Args:
df: DataFrame to check
columns: List of column names to check together
Returns:
Dictionary with "unique" key set to True or False
"""
total = df.count()
distinct = df.select(*columns).distinct().count()
return {"unique": total == distinct}
def check_allowed_values(df: DataFrame, column: str, allowed: List[str]) -> Dict:
"""
Check that a column only contains values from an allowed list.
WHY: Some columns should only have specific values.
For example, status should only be A, I, or D.
WHAT THIS DOES:
Count rows where the value is NOT in the allowed list.
If count is 0, the check passes.
Args:
df: DataFrame to check
column: Column name to check
allowed: List of allowed values
Returns:
Dictionary with "{column}_values" key set to True or False
"""
# Find rows with invalid values (not null AND not in allowed list)
invalid = df.filter(
col(column).isNotNull() & ~col(column).isin(allowed)
).count()
return {f"{column}_values": invalid == 0}
def check_email_format(df: DataFrame, column: str) -> Dict:
"""
Check that email addresses have a valid format.
WHY: Invalid emails can cause issues when sending notifications
or matching customer records.
WHAT THIS DOES:
Uses a regex pattern to validate email format.
The pattern checks for: [email protected]
Args:
df: DataFrame to check
column: Column name containing emails
Returns:
Dictionary with "{column}_format" key set to True or False
"""
# Simple email pattern: [email protected]
pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
# Find rows with invalid emails
invalid = df.filter(
col(column).isNotNull() & ~col(column).rlike(pattern)
).count()
return {f"{column}_format": invalid == 0}
# ============================================================
# ENTITY-SPECIFIC VALIDATORS
# ============================================================
# Each entity can have its own validation rules.
def validate_customer(df: DataFrame) -> QualityResult:
"""
Run all quality checks for customer data.
CHECKS:
1. customer_id, first_name, last_name are not null
2. customer_id is unique
3. email has valid format
4. status is one of: A, I, D, P
"""
passed = []
failed = []
# Check 1: Required columns not null
null_checks = check_not_null(df, ["customer_id", "first_name", "last_name"])
for check, result in null_checks.items():
# Add to passed or failed list based on result
(passed if result else failed).append(f"not_null:{check}")
# Check 2: Customer ID is unique
unique_check = check_unique(df, ["customer_id"])
(passed if unique_check["unique"] else failed).append("unique:customer_id")
# Check 3: Email format
email_check = check_email_format(df, "email")
(passed if email_check["email_format"] else failed).append("format:email")
# Check 4: Status values
status_check = check_allowed_values(df, "status", ["A", "I", "D", "P"])
(passed if status_check["status_values"] else failed).append("values:status")
# Return result
return QualityResult(
entity="customer",
total_records=df.count(),
passed_checks=passed,
failed_checks=failed,
overall_passed=len(failed) == 0
)
def validate_account(df: DataFrame) -> QualityResult:
"""
Run all quality checks for account data.
CHECKS:
1. account_id, account_type are not null
2. account_id is unique
"""
passed = []
failed = []
null_checks = check_not_null(df, ["account_id", "account_type"])
for check, result in null_checks.items():
(passed if result else failed).append(f"not_null:{check}")
unique_check = check_unique(df, ["account_id"])
(passed if unique_check["unique"] else failed).append("unique:account_id")
return QualityResult(
entity="account",
total_records=df.count(),
passed_checks=passed,
failed_checks=failed,
overall_passed=len(failed) == 0
)
def validate_generic(df: DataFrame, entity: str) -> QualityResult:
"""
Generic validator for unknown entities.
WHY: When a new entity is discovered (like Product.txt),
we don't have a specific validator for it.
This function provides basic checks based on column naming conventions.
CHECKS:
1. All columns ending with "_id" are not null
2. The first "_id" column is unique
"""
passed = []
failed = []
# Find all ID columns (columns ending with "_id")
id_cols = [c for c in df.columns if c.endswith("_id") and not c.startswith("_")]
if id_cols:
# Check: ID columns not null
null_checks = check_not_null(df, id_cols)
for check, result in null_checks.items():
(passed if result else failed).append(f"not_null:{check}")
# Check: First ID column is unique (assume it's the primary key)
unique_check = check_unique(df, [id_cols[0]])
(passed if unique_check["unique"] else failed).append(f"unique:{id_cols[0]}")
return QualityResult(
entity=entity,
total_records=df.count(),
passed_checks=passed,
failed_checks=failed,
overall_passed=len(failed) == 0
)
# ============================================================
# MAIN FUNCTION
# ============================================================
def run_quality_checks(spark, date_str: str) -> Dict[str, QualityResult]:
"""
Main entry point: Run quality checks for all entities.
WHAT THIS DOES:
1. Finds all entities in the Bronze layer
2. For each entity, runs the appropriate validator
3. Prints results to console
4. Saves report to JSON file
5. Returns results dictionary
Args:
spark: SparkSession
date_str: Date in YYYYMMDD format
Returns:
Dictionary mapping entity names to QualityResult objects
"""
print(f"\n{'='*50}")
print(f"DATA QUALITY - {date_str}")
print(f"{'='*50}")
results = {}
# Map of known validators
validators = {
"customer": validate_customer,
"account": validate_account,
}
# Find all entities in Bronze layer
bronze_entities = [d.name for d in config.BRONZE_PATH.iterdir() if d.is_dir()]
for entity in bronze_entities:
# Build path to Bronze data for this date
bronze_path = config.BRONZE_PATH / entity / f"date={date_str}"
if not bronze_path.exists():
continue
# Read the Bronze data
df = spark.read.parquet(str(bronze_path))
# Get the appropriate validator (or use generic)
if entity in validators:
validator = validators[entity]
result = validator(df)
else:
result = validate_generic(df, entity)
# Store result
results[entity] = result
# Print summary
status = "PASSED" if result.overall_passed else "FAILED"
print(f" {entity}: {status} "
f"({len(result.passed_checks)} passed, {len(result.failed_checks)} failed)")
# Print failed checks
for check in result.failed_checks:
print(f" [FAILED] {check}")
return results
Function Summary
| Function | Purpose | Reusable? |
|---|---|---|
check_not_null(df, columns) |
Check columns for nulls | Yes |
check_unique(df, columns) |
Check for duplicates | Yes |
check_allowed_values(df, col, list) |
Check value is in list | Yes |
check_email_format(df, column) |
Validate email pattern | Yes |
validate_customer(df) |
Customer-specific checks | No |
validate_account(df) |
Account-specific checks | No |
validate_generic(df, entity) |
Generic checks for new entities | No |
run_quality_checks(spark, date) |
Main entry point | No |
Step 4: CDC Detection (src/cdc.py)
Why We Need This File
This is the CORE of the pipeline. It implements the Change Data Capture logic.
What This File Does
- Generates MD5 hashes for each record
- Loads previous day's snapshot (if exists)
- Compares hashes to find NEW and UPDATED records
- Saves current snapshot for next run
Create src/cdc.py
"""
CDC (Change Data Capture) - Detect new and updated records.
This is the CORE logic of the pipeline.
It compares today's data with yesterday's snapshot to find changes.
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, concat_ws, md5, lit, coalesce
from dataclasses import dataclass
from typing import Optional
# Import our configuration
import config
@dataclass
class CDCResult:
"""
Container for CDC detection results.
Attributes:
entity: Name of the entity
new_records: DataFrame containing NEW records (flag='I' for Insert)
updated_records: DataFrame containing UPDATED records (flag='U' for Update)
unchanged_records: DataFrame containing UNCHANGED records (no flag)
total_new: Count of new records
total_updated: Count of updated records
total_unchanged: Count of unchanged records
"""
entity: str
new_records: DataFrame
updated_records: DataFrame
unchanged_records: DataFrame
total_new: int = 0
total_updated: int = 0
total_unchanged: int = 0
def generate_hash(df: DataFrame, columns: list) -> DataFrame:
"""
Generate MD5 hash of specified columns.
WHY: We use hashing to detect changes efficiently.
Instead of comparing every column of every row,
we just compare one hash value per row.
HOW IT WORKS:
1. Take each value column and convert to string
2. Replace nulls with empty string (nulls can't be hashed)
3. Concatenate all values with "||" separator
4. Calculate MD5 hash of the concatenated string
Example:
columns = ["name", "city"]
values = ["John", "NYC"]
concatenated = "John||NYC"
hash = MD5("John||NYC") = "abc123..."
Args:
df: DataFrame to add hash to
columns: List of columns to include in the hash
Returns:
DataFrame with new "_row_hash" column
"""
# Prepare columns for hashing
# coalesce replaces null with empty string
cols_for_hash = [
coalesce(col(c).cast("string"), lit(""))
for c in columns
]
# Generate the hash
# concat_ws joins all column values with "||"
# md5 calculates the hash
return df.withColumn(
"_row_hash",
md5(concat_ws("||", *cols_for_hash))
)
def get_previous_snapshot(spark: SparkSession, entity: str, current_date: str) -> Optional[DataFrame]:
"""
Load the previous day's snapshot for comparison.
WHY: To detect changes, we need to know what the data looked like yesterday.
Snapshots store only the key columns and hash (not full data).
Args:
spark: SparkSession
entity: Entity name
current_date: Current date being processed
Returns:
DataFrame with previous snapshot, or None if not found
"""
# Get previous date (this function is in config.py)
prev_date = config.get_previous_date(current_date)
if prev_date is None:
return None
# Build path to previous snapshot
snapshot_path = config.CDC_SNAPSHOTS_PATH / entity / f"snapshot_{prev_date}.parquet"
if not snapshot_path.exists():
return None
print(f" Loading previous snapshot: {prev_date}")
return spark.read.parquet(str(snapshot_path))
def save_snapshot(df: DataFrame, entity: str, date_str: str) -> None:
"""
Save current hashes as snapshot for next run.
WHY: Tomorrow's CDC will need today's data to compare against.
We save only key + hash to minimize storage.
Args:
df: DataFrame with _row_hash column
entity: Entity name
date_str: Current date
"""
snapshot_path = config.CDC_SNAPSHOTS_PATH / entity / f"snapshot_{date_str}.parquet"
snapshot_path.parent.mkdir(parents=True, exist_ok=True)
# Get key columns from config
entity_config = config.get_entity_config(entity, df)
key_cols = entity_config["key_columns"]
# Save only key columns + hash (not full data)
df.select(*key_cols, "_row_hash").write.mode("overwrite").parquet(str(snapshot_path))
def detect_changes(spark: SparkSession, current_df: DataFrame, entity: str, date_str: str) -> CDCResult:
"""
Main CDC logic: Compare current data with previous snapshot.
THIS IS THE CORE ALGORITHM:
1. Generate hash for each row in current data
2. Load previous snapshot (key + hash from yesterday)
3. Find NEW records: keys that exist today but not yesterday
4. Find UPDATED records: keys exist in both, but hash is different
5. Find UNCHANGED records: keys exist in both, hash is same
6. Save current snapshot for next run
Args:
spark: SparkSession
current_df: DataFrame with today's data
entity: Entity name
date_str: Today's date
Returns:
CDCResult containing DataFrames for new, updated, and unchanged records
"""
# Get column configuration
entity_config = config.get_entity_config(entity, current_df)
key_cols = entity_config["key_columns"]
value_cols = entity_config["value_columns"]
# STEP 1: Generate hash for current data
current_with_hash = generate_hash(current_df, value_cols)
# STEP 2: Load previous snapshot
previous_df = get_previous_snapshot(spark, entity, date_str)
# STEP 3: If no previous data, everything is NEW
if previous_df is None:
print(f" No previous snapshot - all records are NEW")
new_records = current_with_hash.withColumn("_cdc_flag", lit("I"))
save_snapshot(current_with_hash, entity, date_str)
return CDCResult(
entity=entity,
new_records=new_records,
updated_records=new_records.limit(0), # Empty DataFrame
unchanged_records=current_with_hash.limit(0),
total_new=new_records.count(),
total_updated=0,
total_unchanged=0
)
# STEP 4: Find NEW records
# LEFT ANTI JOIN: Records in current but NOT in previous
new_records = (
current_with_hash.alias("curr")
.join(previous_df.alias("prev"), on=key_cols, how="left_anti")
.withColumn("_cdc_flag", lit("I")) # I = Insert (new record)
)
# STEP 5: Find UPDATED records
# INNER JOIN where hashes are DIFFERENT
updated_records = (
current_with_hash.alias("curr")
.join(
previous_df.select(*key_cols, "_row_hash").alias("prev"),
on=key_cols,
how="inner"
)
.where(col("curr._row_hash") != col("prev._row_hash"))
.select("curr.*")
.withColumn("_cdc_flag", lit("U")) # U = Update
)
# STEP 6: Find UNCHANGED records
# INNER JOIN where hashes are SAME
unchanged_records = (
current_with_hash.alias("curr")
.join(
previous_df.select(*key_cols, "_row_hash").alias("prev"),
on=key_cols,
how="inner"
)
.where(col("curr._row_hash") == col("prev._row_hash"))
.select("curr.*")
)
# STEP 7: Save snapshot for next run
save_snapshot(current_with_hash, entity, date_str)
return CDCResult(
entity=entity,
new_records=new_records,
updated_records=updated_records,
unchanged_records=unchanged_records,
total_new=new_records.count(),
total_updated=updated_records.count(),
total_unchanged=unchanged_records.count()
)
def process_cdc(spark: SparkSession, date_str: str) -> dict:
"""
Main entry point: Run CDC for all entities.
Args:
spark: SparkSession
date_str: Date to process
Returns:
Dictionary mapping entity names to CDCResult objects
"""
print(f"\n{'='*50}")
print(f"CDC DETECTION - {date_str}")
print(f"{'='*50}")
results = {}
# Find all entities in Bronze layer
bronze_entities = [d.name for d in config.BRONZE_PATH.iterdir() if d.is_dir()]
for entity in bronze_entities:
bronze_path = config.BRONZE_PATH / entity / f"date={date_str}"
if not bronze_path.exists():
continue
# Read Bronze data
current_df = spark.read.parquet(str(bronze_path))
# Run CDC detection
cdc_result = detect_changes(spark, current_df, entity, date_str)
print(f" {entity}: NEW={cdc_result.total_new}, "
f"UPDATED={cdc_result.total_updated}, "
f"UNCHANGED={cdc_result.total_unchanged}")
results[entity] = cdc_result
return results
The CDC Algorithm Visualized
STEP 1: Hash Current Data
-------------------------
Current DataFrame:
+-------+------+------+-----------+
|cust_id| name | city | _row_hash |
+-------+------+------+-----------+
| C001 | John | NYC | abc123 |
| C002 | Jane | LA | def456 |
| C003 | Bob | CHI | ghi789 |
+-------+------+------+-----------+
STEP 2: Load Previous Snapshot
------------------------------
Previous Snapshot (from yesterday):
+-------+-----------+
|cust_id| _row_hash |
+-------+-----------+
| C001 | abc123 |
| C002 | zzz999 | <-- Different hash! Jane's data changed
+-------+-----------+
STEP 3-5: Compare
-----------------
C001: Hash same (abc123 = abc123) --> UNCHANGED
C002: Hash different (def456 != zzz999) --> UPDATED
C003: Not in previous --> NEW
RESULT:
new_records: [C003]
updated_records: [C002]
unchanged_records: [C001]
Line-by-Line Code Breakdown
Here are detailed explanations of the most important functions:
generate_hash() - Line by Line
def generate_hash(df: DataFrame, columns: list) -> DataFrame:
Line 1: Define function that takes a DataFrame and list of columns, returns DataFrame with hash added.
cols_for_hash = [
coalesce(col(c).cast("string"), lit(""))
for c in columns
]
Lines 2-5: Create a list of column expressions for hashing.
- col(c) - Reference the column by name
- .cast("string") - Convert to string (hashing needs strings)
- coalesce(..., lit("")) - If value is null, use empty string instead
- This is a list comprehension - runs for each column in columns
return df.withColumn(
"_row_hash",
md5(concat_ws("||", *cols_for_hash))
)
Lines 6-9: Add the hash column to the DataFrame.
- withColumn("_row_hash", ...) - Add new column named "_row_hash"
- concat_ws("||", ...) - Join all column values with "||" separator
- Example: ["John", "NYC"] becomes "John||NYC"
- md5(...) - Calculate MD5 hash of the concatenated string
- *cols_for_hash - Unpack the list (Python syntax)
detect_changes() - Finding NEW Records
new_records = (
current_with_hash.alias("curr")
.join(previous_df.alias("prev"), on=key_cols, how="left_anti")
.withColumn("_cdc_flag", lit("I"))
)
Breaking this down:
current_with_hash.alias("curr")-
Give the current DataFrame an alias "curr" for reference
-
.join(..., how="left_anti") - LEFT ANTI JOIN returns rows from LEFT table that have NO match in RIGHT table
-
Think of it as: "Give me rows in current that are NOT in previous"
-
on=key_cols -
Join on the key columns (e.g., customer_id)
-
.withColumn("_cdc_flag", lit("I")) - Add a flag column with value "I" (Insert = new record)
Visual:
CURRENT PREVIOUS RESULT (left_anti)
+------+ +------+ +------+
| C001 | -- matches -- | C001 | | | (excluded)
| C002 | -- matches -- | C002 | | | (excluded)
| C003 | -- NO match | | --> | C003 | (included!)
+------+ +------+ +------+
detect_changes() - Finding UPDATED Records
updated_records = (
current_with_hash.alias("curr")
.join(
previous_df.select(*key_cols, "_row_hash").alias("prev"),
on=key_cols,
how="inner"
)
.where(col("curr._row_hash") != col("prev._row_hash"))
.select("curr.*")
.withColumn("_cdc_flag", lit("U"))
)
Breaking this down:
previous_df.select(*key_cols, "_row_hash")-
From previous data, only select key columns and hash
-
.join(..., how="inner") -
INNER JOIN returns rows that exist in BOTH tables
-
.where(col("curr._row_hash") != col("prev._row_hash")) - Filter to rows where hashes are DIFFERENT
curr._row_hashrefers to hash from current data-
prev._row_hashrefers to hash from previous data -
.select("curr.*") -
Select all columns from current data (not previous)
-
.withColumn("_cdc_flag", lit("U")) - Add flag "U" (Update)
Visual:
CURRENT PREVIOUS RESULT
+------+--------+ +------+--------+ +------+
| C001 | abc123 | inner | C001 | abc123 | | | hash same
| C002 | def456 | join | C002 | zzz999 | --> | C002 | hash different!
+------+--------+ +------+--------+ +------+
apply_replacements() - Line by Line
def apply_replacements(df: DataFrame, column: str, mappings: dict) -> DataFrame:
if column not in df.columns:
return df
Lines 1-3: If the column doesn't exist, return DataFrame unchanged.
items = list(mappings.items())
case_expr = when(col(column) == items[0][0], lit(items[0][1]))
Lines 4-5:
- Convert mappings dict to list of (key, value) pairs
- Start building CASE expression with first mapping
- items[0][0] is first key ("A"), items[0][1] is first value ("Active")
for old_val, new_val in items[1:]:
case_expr = case_expr.when(col(column) == old_val, lit(new_val))
Lines 6-7: Add remaining mappings to CASE expression.
- items[1:] means all items except the first
- Each .when() adds another condition
return df.withColumn(column, case_expr.otherwise(col(column)))
Line 8: Apply the CASE expression.
- .otherwise(col(column)) means: if no match, keep original value
- withColumn(column, ...) replaces the existing column
The final SQL equivalent:
CASE
WHEN status = 'A' THEN 'Active'
WHEN status = 'I' THEN 'Inactive'
WHEN status = 'D' THEN 'Deleted'
ELSE status
END
Step 5: Silver Layer (src/silver.py)
Why We Need This File
Silver layer transforms the data - applying business rules and cleaning.
What This File Does
- Takes only NEW + UPDATED records from CDC
- Applies value replacements (e.g., "A" → "Active")
- Saves transformed data
Create src/silver.py
"""
Silver Layer - Transform data.
This runs AFTER CDC.
It only processes NEW and UPDATED records (not unchanged).
"""
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, when, lit
from pathlib import Path
import config
# Value replacement rules
# These map coded values to readable values
REPLACEMENT_RULES = {
"status": {
"A": "Active",
"I": "Inactive",
"D": "Deleted",
},
}
def apply_replacements(df: DataFrame, column: str, mappings: dict) -> DataFrame:
"""
Replace coded values with readable values.
WHY: Source systems often use short codes (A, I, D).
Users prefer readable values (Active, Inactive, Deleted).
HOW IT WORKS:
Builds a CASE WHEN expression:
CASE WHEN column = 'A' THEN 'Active'
WHEN column = 'I' THEN 'Inactive'
ELSE column -- Keep original if no match
END
Args:
df: DataFrame to transform
column: Column name to replace values in
mappings: Dictionary of old_value -> new_value
Returns:
DataFrame with replaced values
"""
# Skip if column doesn't exist
if column not in df.columns:
return df
# Build the CASE WHEN expression
items = list(mappings.items())
# Start with first item
case_expr = when(col(column) == items[0][0], lit(items[0][1]))
# Add remaining items
for old_val, new_val in items[1:]:
case_expr = case_expr.when(col(column) == old_val, lit(new_val))
# Apply transformation
# .otherwise() keeps original value if no match
return df.withColumn(column, case_expr.otherwise(col(column)))
def transform_entity(df: DataFrame, entity: str) -> DataFrame:
"""
Apply all transformations for an entity.
Currently applies:
1. Value replacements from REPLACEMENT_RULES
You can add more transformations here!
Args:
df: DataFrame to transform
entity: Entity name (not used yet, but available for entity-specific logic)
Returns:
Transformed DataFrame
"""
result = df
# Apply all replacement rules
for column, mappings in REPLACEMENT_RULES.items():
result = apply_replacements(result, column, mappings)
return result
def process_silver(spark: SparkSession, cdc_results: dict, date_str: str) -> dict:
"""
Main entry point: Process Silver layer.
IMPORTANT: This only processes NEW + UPDATED records!
Unchanged records are skipped (they're already in Silver from before).
Args:
spark: SparkSession
cdc_results: Dictionary of entity -> CDCResult from CDC step
date_str: Date being processed
Returns:
Dictionary mapping entity names to record counts
"""
print(f"\n{'='*50}")
print(f"SILVER LAYER - {date_str}")
print(f"{'='*50}")
results = {}
for entity, cdc_result in cdc_results.items():
# Combine NEW and UPDATED records
# These are the only ones we need to process
changes_df = cdc_result.new_records.union(cdc_result.updated_records)
count = changes_df.count()
if count == 0:
print(f" {entity}: No changes to process")
results[entity] = 0
continue
# Apply transformations
transformed = transform_entity(changes_df, entity)
# Write to Silver
output_path = config.SILVER_PATH / entity / f"date={date_str}"
transformed.write.mode("overwrite").parquet(str(output_path))
print(f" {entity}: {count} records transformed")
results[entity] = count
return results
Step 6: Gold Layer (src/gold.py)
Why We Need This File
Gold layer creates the final output - clean CSV files for downstream systems.
What This File Does
- Takes NEW + UPDATED records
- Removes internal columns (prefixed with _)
- Exports as single CSV file with clean name
Create src/gold.py
"""
Gold Layer - Export final CSV files.
This is the LAST step in the pipeline.
It creates clean CSV files for downstream systems.
"""
from pyspark.sql import DataFrame
from pathlib import Path
import shutil
import config
def export_to_csv(df: DataFrame, entity: str, date_str: str) -> Path:
"""
Export DataFrame to a single clean CSV file.
WHY: Spark typically creates multiple part files.
Downstream systems often expect a single file.
This function creates one clean file.
WHAT THIS DOES:
1. Removes internal columns (starting with _)
2. Writes to temp directory (Spark creates part- files)
3. Renames the part file to our clean filename
4. Cleans up temp files
Output filename: new_customer_20260113.csv
Args:
df: DataFrame to export
entity: Entity name
date_str: Date being processed
Returns:
Path to the created CSV file
"""
output_dir = config.GOLD_PATH / entity
output_dir.mkdir(parents=True, exist_ok=True)
# Create clean filename
final_file = output_dir / f"new_{entity}_{date_str}.csv"
# Remove internal columns (prefixed with _)
columns_to_keep = [c for c in df.columns if not c.startswith("_")]
df_clean = df.select(*columns_to_keep)
# Write to temp directory
# coalesce(1) forces everything into one file
temp_dir = output_dir / f"_temp_{entity}_{date_str}"
(
df_clean
.coalesce(1)
.write
.mode("overwrite")
.option("header", "true")
.csv(str(temp_dir))
)
# Find the part file and rename it
for file in temp_dir.glob("part-*.csv"):
if final_file.exists():
final_file.unlink() # Delete existing file
shutil.move(str(file), str(final_file))
break
# Clean up temp directory
shutil.rmtree(str(temp_dir), ignore_errors=True)
return final_file
def process_gold(spark, cdc_results: dict, date_str: str) -> dict:
"""
Main entry point: Export changed records to CSV.
Args:
spark: SparkSession (not used but kept for consistency)
cdc_results: Dictionary of entity -> CDCResult
date_str: Date being processed
Returns:
Dictionary mapping entity names to record counts
"""
print(f"\n{'='*50}")
print(f"GOLD LAYER - {date_str}")
print(f"{'='*50}")
results = {}
for entity, cdc_result in cdc_results.items():
# Combine NEW and UPDATED records
changes_df = cdc_result.new_records.union(cdc_result.updated_records)
count = changes_df.count()
if count == 0:
print(f" {entity}: No changes to export")
results[entity] = 0
continue
# Export to CSV
output_path = export_to_csv(changes_df, entity, date_str)
print(f" {entity}: {count} records --> {output_path.name}")
results[entity] = count
return results
Step 7: Main Entry Point (main.py)
Why We Need This File
This is the command-line interface. It ties all the steps together.
Create main.py
#!/usr/bin/env python3
"""
CDC Data Pipeline - Main Entry Point
Run with: python main.py --date 20260113
"""
import click
import config
from src.bronze import get_spark, process_bronze
from src.data_quality import run_quality_checks
from src.cdc import process_cdc
from src.silver import process_silver
from src.gold import process_gold
def run_pipeline(date_str: str) -> None:
"""
Run the complete pipeline for one date.
STEPS:
1. Bronze: Ingest source files
2. Data Quality: Validate data
3. CDC: Detect changes
4. Silver: Transform data
5. Gold: Export to CSV
"""
print(f"\n{'='*60}")
print(f"CDC DATA PIPELINE - {date_str}")
print(f"{'='*60}")
# Create Spark session
spark = get_spark()
spark.sparkContext.setLogLevel("WARN") # Reduce log noise
try:
# Step 1: Bronze (Ingestion)
bronze_results = process_bronze(spark, date_str)
# Step 2: Data Quality
quality_results = run_quality_checks(spark, date_str)
# Step 3: CDC Detection
cdc_results = process_cdc(spark, date_str)
# Step 4: Silver (Transformations)
silver_results = process_silver(spark, cdc_results, date_str)
# Step 5: Gold (Export)
gold_results = process_gold(spark, cdc_results, date_str)
# Print completion message
print(f"\n{'='*60}")
print(f"COMPLETED: {date_str}")
print(f"{'='*60}")
finally:
# Always stop Spark when done
spark.stop()
@click.command()
@click.option('--date', type=str, required=True, help='Date to process (YYYYMMDD)')
def main(date: str):
"""Run the CDC pipeline for a specific date."""
run_pipeline(date)
if __name__ == "__main__":
main()
Testing Your Implementation
1. Create Sample Data
mkdir -p data/source/2026/01/13
cat > data/source/2026/01/13/Customer.txt << 'EOF'
customer_id|first_name|last_name|email|phone|status
C001|John|Doe|[email protected]|1234567890|A
C002|Jane|Smith|[email protected]|2345678901|A
EOF
2. Run Day 1
python main.py --date 20260113
Expected: All records are NEW (no previous snapshot).
3. Create Day 2 Data (with changes)
mkdir -p data/source/2026/01/14
cat > data/source/2026/01/14/Customer.txt << 'EOF'
customer_id|first_name|last_name|email|phone|status
C001|John|Doe|[email protected]|1234567890|A
C002|Jane|Johnson|[email protected]|2345678901|A
C003|Bob|Wilson|[email protected]|3456789012|A
EOF
4. Run Day 2
python main.py --date 20260114
Expected: - C001: UNCHANGED (same data) - C002: UPDATED (last_name and email changed) - C003: NEW (not in Day 1)
Summary
You have built a complete CDC pipeline:
| Step | File | What It Does |
|---|---|---|
| 1 | config.py | Configuration and helpers |
| 2 | bronze.py | Ingest source files to Parquet |
| 3 | data_quality.py | Validate data quality |
| 4 | cdc.py | Detect NEW and UPDATED records |
| 5 | silver.py | Apply transformations |
| 6 | gold.py | Export to CSV |
| 7 | main.py | Orchestrate everything |
The key insight: CDC uses hash comparison to efficiently detect changes without comparing every field of every record.
Line-by-Line: Data Quality Functions
check_not_null() - How It Works
def check_not_null(df: DataFrame, columns: List[str]) -> Dict:
results = {}
for col_name in columns:
null_count = df.filter(col(col_name).isNull()).count()
results[col_name] = (null_count == 0)
return results
Step by step:
results = {}-
Create empty dictionary to store results
-
for col_name in columns: -
Loop through each column name we want to check
-
df.filter(col(col_name).isNull()) - Filter the DataFrame to only rows where this column IS NULL
col(col_name)references the column-
.isNull()checks if value is null -
.count() -
Count how many rows have null values
-
results[col_name] = (null_count == 0) - Store True if no nulls found, False if any nulls exist
Example:
DataFrame:
+------+-------+
| id | name |
+------+-------+
| C001 | John |
| C002 | null | <-- This is null!
| C003 | Bob |
+------+-------+
check_not_null(df, ["id", "name"])
Result: {"id": True, "name": False}
check_unique() - How It Works
def check_unique(df: DataFrame, columns: List[str]) -> Dict:
total = df.count()
distinct = df.select(*columns).distinct().count()
return {"unique": total == distinct}
Step by step:
total = df.count()-
Count total number of rows
-
df.select(*columns) - Select only the columns we're checking
-
*columnsunpacks the list: ["id"] becomes just "id" -
.distinct().count() -
Get unique rows and count them
-
return {"unique": total == distinct} - If total equals distinct, all values are unique
Example:
DataFrame:
+------+
| id |
+------+
| C001 |
| C002 |
| C001 | <-- Duplicate!
+------+
total = 3
distinct = 2
Result: {"unique": False} (3 != 2)
validate_customer() - How It Works
def validate_customer(df: DataFrame) -> QualityResult:
passed = []
failed = []
# Check 1: Not null
null_checks = check_not_null(df, ["customer_id", "first_name"])
for check, result in null_checks.items():
(passed if result else failed).append(f"not_null:{check}")
Breaking down the pattern:
passed = []andfailed = []-
Two lists to collect results
-
null_checks = check_not_null(df, [...]) - Call our reusable check function
-
Returns:
{"customer_id": True, "first_name": True} -
for check, result in null_checks.items(): - Loop through each result
-
check= column name,result= True/False -
(passed if result else failed).append(...) - This is a Python trick!
- If result is True:
passed.append(...) - If result is False:
failed.append(...)
Visual flow:
null_checks = {"customer_id": True, "first_name": False}
Loop iteration 1:
check = "customer_id", result = True
passed.append("not_null:customer_id")
Loop iteration 2:
check = "first_name", result = False
failed.append("not_null:first_name")
Final:
passed = ["not_null:customer_id"]
failed = ["not_null:first_name"]
How to Extend the Pipeline
This section shows you how to add new functionality.
Extension 1: Add a New Data Quality Check
Scenario: You want to check that a phone number contains only digits.
Step 1: Create the check function in src/data_quality.py
def check_phone_format(df: DataFrame, column: str) -> Dict:
"""
Check that phone numbers contain only digits.
Valid: 1234567890
Invalid: 123-456-7890, (123) 456-7890
"""
# Regex pattern: start (^) + one or more digits ([0-9]+) + end ($)
pattern = r"^[0-9]+$"
# Count invalid rows
invalid = df.filter(
col(column).isNotNull() & ~col(column).rlike(pattern)
).count()
# Return True if no invalid rows found
return {f"{column}_format": invalid == 0}
Step 2: Use it in a validator
def validate_customer(df: DataFrame) -> QualityResult:
passed = []
failed = []
# ... existing checks ...
# Add your new check
phone_check = check_phone_format(df, "phone")
(passed if phone_check["phone_format"] else failed).append("format:phone")
# ... rest of function ...
Step 3: Run and verify
python main.py --date 20260113
# Look for "format:phone" in the output
Extension 2: Add a New Transformation
Scenario: You want to create a full_name column combining first and last name.
Step 1: Create the function in src/silver.py
from pyspark.sql.functions import concat, lit
def add_full_name(df: DataFrame) -> DataFrame:
"""
Create full_name column: "John" + " " + "Doe" = "John Doe"
"""
# Check if columns exist
if "first_name" not in df.columns or "last_name" not in df.columns:
return df
# concat() joins values, lit(" ") adds a space
return df.withColumn(
"full_name",
concat(col("first_name"), lit(" "), col("last_name"))
)
Step 2: Call it in transform_entity()
def transform_entity(df: DataFrame, entity: str) -> DataFrame:
result = df
# Existing transformations
for column, mappings in REPLACEMENT_RULES.items():
result = apply_replacements(result, column, mappings)
# NEW: Add full_name for customer
if entity == "customer":
result = add_full_name(result)
return result
Step 3: Verify by viewing Silver data
python main.py --date 20260113
python view_data.py silver customer --date 20260113
# You should see the new "full_name" column
Extension 3: Add a New Value Replacement
Scenario: You want to convert country codes to full names.
Step 1: Add to REPLACEMENT_RULES in config.py
REPLACEMENT_RULES = {
"status": {
"A": "Active",
"I": "Inactive",
"D": "Deleted",
},
# NEW: Add country code mappings
"country_code": {
"US": "United States",
"UK": "United Kingdom",
"CA": "Canada",
"AU": "Australia",
},
}
That's it! The Silver layer automatically applies all rules from REPLACEMENT_RULES.
Extension 4: Add a New Entity Validator
Scenario: You added a new entity "Product" and want custom validation.
Step 1: Create validator function in src/data_quality.py
def validate_product(df: DataFrame) -> QualityResult:
"""Custom validation for Product entity."""
passed = []
failed = []
# Check 1: product_id not null
null_checks = check_not_null(df, ["product_id", "name"])
for check, result in null_checks.items():
(passed if result else failed).append(f"not_null:{check}")
# Check 2: product_id unique
unique_check = check_unique(df, ["product_id"])
(passed if unique_check["unique"] else failed).append("unique:product_id")
# Check 3: price must be positive
invalid_price = df.filter(col("price").cast("double") < 0).count()
(passed if invalid_price == 0 else failed).append("check:positive_price")
return QualityResult(
entity="product",
total_records=df.count(),
passed_checks=passed,
failed_checks=failed,
overall_passed=len(failed) == 0
)
Step 2: Register it in run_quality_checks()
def run_quality_checks(spark, date_str: str) -> Dict[str, QualityResult]:
# ...
validators = {
"customer": validate_customer,
"account": validate_account,
"product": validate_product, # NEW: Add your validator
}
# ...
Extension 5: Add a New Entity Configuration
Scenario: You added "Transaction" data and want specific CDC columns.
Step 1: Add to ENTITIES in config.py
ENTITIES = {
"customer": {
"key_columns": ["customer_id"],
"value_columns": ["first_name", "last_name", "email", "phone", "status"],
},
"account": {
"key_columns": ["account_id"],
"value_columns": ["account_type", "balance", "currency", "status"],
},
# NEW: Add transaction config
"transaction": {
"key_columns": ["transaction_id"],
"value_columns": ["customer_id", "amount", "transaction_type", "timestamp"],
},
}
Why do this? Without explicit config, CDC auto-detects columns. Adding explicit config gives you control over exactly which columns to use for key matching and change detection.
Extension Checklist
| Want to... | Edit This File | Add This |
|---|---|---|
| New DQ check function | data_quality.py |
def check_xxx(df, ...) |
| New entity validator | data_quality.py |
def validate_xxx(df) + register in validators dict |
| New transformation | silver.py |
def transform_xxx(df) + call in transform_entity() |
| New value mapping | config.py |
Add to REPLACEMENT_RULES dict |
| New entity config | config.py |
Add to ENTITIES dict |
| New column in output | silver.py |
Add withColumn() in transformation |
Quick Reference for Developers
Common PySpark Operations
| What You Want | PySpark Code |
|---|---|
| Check if column is null | col("name").isNull() |
| Check if column is NOT null | col("name").isNotNull() |
| Filter rows | df.filter(condition) |
| Add a new column | df.withColumn("new_col", expression) |
| Select specific columns | df.select("col1", "col2") |
| Count rows | df.count() |
| Get distinct values | df.select("col").distinct() |
| Regex match | col("email").rlike(pattern) |
| Convert type | col("price").cast("double") |
| Concatenate strings | concat(col("a"), lit(" "), col("b")) |
| CASE WHEN | when(condition, value).otherwise(default) |
File Edit Quick Reference
| File | What It Controls |
|---|---|
config.py |
Paths, entity configs, value mappings |
bronze.py |
How files are read |