CDC Data Pipeline - Training Guide
Table of Contents
- Project Overview
- Folder Structure
- How the Pipeline Works
- Configuration
- Source Files Deep Dive
- Dynamic Entity Discovery
- Data Quality Reports
- How to Extend
- Exercises
1. Project Overview
Business Scenario
Company: Global Bank Corp (fictional)
Situation: The bank receives daily data files from multiple source systems: - Customer System: Customer personal information (name, email, phone, status) - Account System: Bank accounts (type, balance, currency) - Relationship System: Links customers to their accounts
Problem: - Every day, source systems send FULL data extracts (not just changes) - Processing millions of records daily is slow and expensive - Downstream systems only need NEW and UPDATED records - No visibility into data quality issues before processing - Currently no way to track what changed between days
Business Need: 1. Detect which records are NEW or UPDATED each day 2. Transform data with business rules (e.g., status codes to readable values) 3. Export only the changed records for downstream consumption 4. Generate data quality reports for governance
Requirements
| # | Requirement | Description |
|---|---|---|
| R1 | Daily Ingestion | Ingest pipe-delimited files from source systems |
| R2 | Data Quality | Validate data (null checks, uniqueness, format validation) |
| R3 | Change Detection | Identify NEW and UPDATED records compared to previous day |
| R4 | Transformations | Apply business rules (value replacements, derived columns) |
| R5 | Export | Generate CSV files containing only changed records |
| R6 | Reporting | Create quality reports for each run |
| R7 | Dynamic | Support new entities without code changes |
| R8 | Scalable | Handle large data volumes efficiently |
Solution: CDC Data Pipeline
We will build a Change Data Capture (CDC) pipeline using Medallion Architecture:
ARCHITECTURE OVERVIEW
=====================
Source Systems Pipeline Layers Output
-------------- --------------- ------
Customer.txt --. .-> new_customer.csv
| +--------+ +--------+ |
Account.txt ---|------>| BRONZE |--->| SILVER |-------|-> new_account.csv
| | (Raw) | | (Clean)| |
Relation.txt --' +--------+ +--------+ '-> new_relation.csv
| ^
v |
+--------+ +--------+
| DQ | | CDC |
| Check | | Detect |
+--------+ +--------+
Medallion Architecture Layers:
| Layer | Purpose | Format |
|---|---|---|
| Bronze | Raw data exactly as received | Parquet |
| Silver | Cleaned and transformed data | Parquet |
| Gold | Business-ready output | CSV |
CDC Approach: Hash-based comparison - Generate MD5 hash of each record's value columns - Compare hashes between current and previous day - Same key + different hash = UPDATED record - New key = NEW record
Data Flow Diagram
CDC DATA PIPELINE
============================================================
STEP 1: INGESTION (Bronze)
--------------------------
Source Files (.txt) Bronze Layer (Parquet)
+------------------+ +----------------------+
| Customer.txt | ---> | bronze/customer/ |
| Account.txt | READ | bronze/account/ |
| Relationship.txt | ---> | bronze/relationship/ |
| [Any new .txt] | | bronze/[new entity]/ |
+------------------+ +----------------------+
|
v
STEP 2: DATA QUALITY
--------------------
- Check not null
- Check unique keys
- Validate formats
- Generate DQ Report
|
v
STEP 3: CDC DETECTION
---------------------
Compare with previous snapshot
|
+--------+--------+
| | |
v v v
NEW UPDATED UNCHANGED
(I) (U) (skip)
| |
+---+----+
|
v
STEP 4: SILVER LAYER
--------------------
Apply transformations
(only NEW + UPDATED)
|
v
STEP 5: GOLD LAYER
------------------
Export to CSV
+----------------------+
| new_customer_DATE.csv|
| new_account_DATE.csv |
+----------------------+
============================================================
Key Concepts
| Concept | Description |
|---|---|
| CDC | Change Data Capture - detects NEW and UPDATED records |
| Medallion | Bronze (raw) to Silver (clean) to Gold (output) |
| Hash-based CDC | Uses MD5 hash of row values to detect changes |
| Dynamic Discovery | Automatically finds new entities without code changes |
2. Folder Structure
de-cdc/
|-- main.py # Entry point - runs the pipeline
|-- config.py # All configuration in one place
|-- view_data.py # Utility to view Parquet data
|-- requirements.txt # Python dependencies
|
|-- data/
| |-- source/ # Input: year/month/day/Entity.txt
| | +-- 2026/
| | +-- 01/
| | |-- 13/ # Day 1 data
| | | |-- Customer.txt
| | | |-- Account.txt
| | | |-- Relationship.txt
| | | +-- Product.txt <-- New files auto-discovered
| | |-- 14/ # Day 2 data
| | +-- 15/ # Day 3 data
| |
| |-- bronze/ # Raw Parquet files
| |-- silver/ # Transformed Parquet
| |-- gold/ # Final CSV exports
| |-- cdc_snapshots/ # Hash snapshots for CDC comparison
| +-- reports/ # Data quality reports
|
|-- src/
| |-- bronze.py # Step 1: Ingestion
| |-- data_quality.py # Step 2: Validation
| |-- cdc.py # Step 3: Change detection
| |-- silver.py # Step 4: Transformations
| +-- gold.py # Step 5: CSV export
|
+-- orchestration/
+-- dagster_pipeline.py # Dagster DAG for scheduling
3. How the Pipeline Works
Step-by-Step Execution
python main.py --date 20260113
STEP 1: BRONZE LAYER
1. Scans source/2026/01/13/ folder for all .txt files
2. Reads each pipe-delimited file
3. Adds metadata columns (_ingestion_timestamp, _processing_date)
4. Writes as Parquet to bronze/{entity}/date=YYYYMMDD/
STEP 2: DATA QUALITY 1. Reads Bronze layer data 2. Runs validation checks (not_null, unique, format, allowed_values) 3. Generates quality report 4. Pipeline continues even if checks fail (configurable)
STEP 3: CDC DETECTION 1. Generates MD5 hash of value columns 2. Compares with previous day's snapshot 3. Categorizes records: - NEW: Key not in previous data (flag = 'I') - UPDATED: Key exists but hash changed (flag = 'U') - UNCHANGED: Skip processing 4. Saves current snapshot for next run
STEP 4: SILVER LAYER 1. Takes only NEW + UPDATED records 2. Applies value replacements (e.g., "A" to "Active") 3. Writes to Silver layer as Parquet
STEP 5: GOLD LAYER 1. Takes NEW + UPDATED records 2. Removes internal columns 3. Exports as clean CSV file
4. Configuration (config.py)
Path Settings
SOURCE_PATH = DATA_DIR / "source" # Input files
BRONZE_PATH = DATA_DIR / "bronze" # Raw Parquet
SILVER_PATH = DATA_DIR / "silver" # Transformed
GOLD_PATH = DATA_DIR / "gold" # Final CSV
Entity Configuration (Optional)
# These are OPTIONAL predefined entities with CUSTOM configurations.
# New entities are auto-discovered and use default settings.
ENTITIES = {
"customer": {
"key_columns": ["customer_id"], # Primary key for CDC
"value_columns": ["first_name", "last_name", ...], # Columns to hash
},
...
}
Why predefined entities?
- Provides CUSTOM column mappings for known entities
- Allows specific validators for known data structures
- NEW entities use AUTO-DETECTION (first column with _id = key)
Value Replacements
REPLACEMENT_RULES = {
"status": {"A": "Active", "I": "Inactive", "D": "Deleted"},
"country_code": {"US": "United States", "UK": "United Kingdom"},
}
Helper Functions
| Function | Purpose |
|---|---|
discover_entities(date_str) |
Find all .txt files for a date |
get_entity_config(entity, df) |
Get config or auto-detect columns |
get_available_dates() |
List all dates with source data |
get_previous_date(date_str) |
Find previous date for CDC comparison |
5. Source Files Deep Dive
5.1 bronze.py - Raw Ingestion
Purpose: Read text files and save as Parquet
def ingest_file(spark, entity, date_str):
# 1. Build path: source/2026/01/13/Customer.txt
# 2. Read pipe-delimited file
# 3. Add metadata columns
# 4. Return DataFrame
def process_bronze(spark, date_str):
# 1. Call discover_entities() to find all .txt files
# 2. For each entity: ingest and write to Parquet
5.2 data_quality.py - Validation
Purpose: Validate data before processing
Available checks:
- check_not_null(df, columns) - Ensure no null values
- check_unique(df, columns) - Ensure no duplicates
- check_email_format(df, column) - Validate email pattern
- check_allowed_values(df, column, allowed) - Validate against list
Validators:
- validate_customer(df) - Customer-specific rules
- validate_account(df) - Account-specific rules
- validate_generic(df, entity) - Auto-generated for new entities
5.3 cdc.py - Change Detection
Purpose: Identify NEW and UPDATED records
def generate_hash(df, columns):
"""
Create MD5 hash of value columns.
Example:
customer_id | first_name | last_name --> _row_hash
C001 | John | Doe --> a1b2c3d4e5...
If any value changes, the hash changes.
"""
def detect_changes(current_df, entity, date_str):
"""
Compare current vs previous:
CURRENT PREVIOUS RESULT
------- -------- ------
C001, abc123 C001, abc123 UNCHANGED
C002, def456 C002, xyz789 UPDATED (hash changed)
C003, ghi789 (not found) NEW
"""
5.4 silver.py - Transformations
Purpose: Apply business rules
def apply_replacements(df, column, mappings):
# "A" --> "Active", "I" --> "Inactive"
def transform_entity(df, entity):
# Apply all REPLACEMENT_RULES from config
5.5 gold.py - Final Export
Purpose: Create clean CSV output
def export_to_csv(df, entity, date_str):
# 1. Remove internal columns (starting with _)
# 2. Write single CSV file: new_customer_20260113.csv
6. Dynamic Entity Discovery
How It Works
The pipeline automatically discovers new entities:
- Bronze Layer: Scans
source/YYYY/MM/DD/for all.txtfiles - CDC: Auto-detects key/value columns for new entities
- Data Quality: Uses generic validator for unknown entities
- Silver/Gold: Process whatever CDC returns
Why Key and Value Columns Matter
CDC needs to distinguish between key columns and value columns:
KEY COLUMN VALUE COLUMNS
----------- ------------------
customer_id first_name, last_name, email, status
| |
v v
Used to MATCH Used to DETECT
records across if anything CHANGED
days (same ID (hash comparison)
= same record)
Example:
Day 1: C001 | John | Doe hash("John|Doe") = abc123
Day 2: C001 | John | Smith hash("John|Smith") = xyz789
Result: C001 is UPDATED (same key, different hash)
Without knowing which column is the key: - We cannot match records across days - We cannot detect NEW vs UPDATED
Auto-Detection Rules
| Priority | Logic |
|---|---|
| 1st | Look for columns ending with _id --> use as key(s) |
| 2nd | If no _id columns found --> use FIRST column as key |
| Then | All remaining columns --> value columns (for CDC hash) |
Example: Adding a New Entity
# Create new file
cat > data/source/2026/01/13/Product.txt << 'EOF'
product_id|name|price|category
P001|Widget|19.99|Electronics
P002|Gadget|29.99|Electronics
EOF
# Run pipeline - Product is automatically discovered
python main.py --date 20260113
# Output:
# Discovered entities: ['account', 'customer', 'product', 'relationship']
# Product: 2 records --> bronze/product/...
Removing Files
If a file is removed, the pipeline skips it without errors:
rm data/source/2026/01/14/Product.txt
python main.py --date 20260114
# Only processes Customer, Account, Relationship
7. Data Quality Reports
Console Output
During pipeline execution, DQ results are printed:
DATA QUALITY - 20260113
==================================================
Customer: PASSED (6 passed, 0 failed)
Account: PASSED (3 passed, 0 failed)
Product: ISSUES (1 passed, 1 failed)
Failed: check:positive_price
JSON Report
Quality results are saved to data/reports/dq_report_YYYYMMDD.json:
{
"date": "20260113",
"overall_passed": false,
"entities": {
"customer": {
"passed": true,
"total_records": 5,
"passed_checks": ["not_null:customer_id", "unique:customer_id", ...],
"failed_checks": []
},
"product": {
"passed": false,
"total_records": 2,
"passed_checks": ["not_null:product_id"],
"failed_checks": ["check:positive_price"]
}
}
}
Pipeline Behavior on Failure
Current behavior: Pipeline continues even if DQ fails.
To stop on failure, set STOP_ON_DQ_FAILURE = True in config.
8. How to Extend
8.1 Add Custom Validator for Known Entity
Edit data_quality.py:
def validate_product(df):
passed, failed = [], []
# Check price is positive
invalid = df.filter(col("price") < 0).count()
(passed if invalid == 0 else failed).append("check:positive_price")
return QualityResult(entity="product", ...)
# Add to validators dict
validators = {
...
"product": validate_product,
}
8.2 Add New Quality Check
def check_date_not_future(df, column):
from datetime import datetime
today = datetime.now().strftime("%Y-%m-%d")
invalid = df.filter(col(column) > today).count()
return {f"{column}_not_future": invalid == 0}
8.3 Add New Transformation
Edit silver.py:
def calculate_full_name(df):
from pyspark.sql.functions import concat, lit
return df.withColumn(
"full_name",
concat(col("first_name"), lit(" "), col("last_name"))
)
def transform_entity(df, entity):
result = df
# Existing transformations...
if entity == "customer":
result = calculate_full_name(result)
return result
8.4 Add New Value Replacement
Edit config.py:
REPLACEMENT_RULES = {
...
"account_type": {
"SAVINGS": "Savings Account",
"CHECKING": "Checking Account",
},
}
9. Exercises
Exercise 1: Add Phone Number Validation
Currently, the pipeline does not validate phone number format. Your task is to add a data quality check that ensures phone numbers contain only digits.
Problem: Phone numbers like 123-456-7890 or (123) 456-7890 should fail validation. Only 1234567890 is valid.
What to do:
1. Open src/data_quality.py
2. Create a new function called check_phone_format(df, column)
3. Use regex pattern ^[0-9]+$ to validate
4. Add this check inside the validate_customer() function
5. Run the pipeline and verify "format:phone" appears in the output
Exercise 2: Add Balance Check for Savings Accounts
Currently, there is no check to prevent negative balances in savings accounts. Your task is to add a data quality check for this business rule.
Problem: A SAVINGS account with balance -500 should fail validation.
What to do:
1. Open src/data_quality.py
2. Create a new function called check_positive_balance(df)
3. Filter records where account_type == "SAVINGS" AND balance < 0
4. Add this check inside the validate_account() function
5. Run the pipeline and verify "check:positive_balance" appears in the output
Exercise 3: Add Readable Account Type Labels
Currently, account types appear as codes like "SAVINGS", "CHECKING". Your task is to make these more readable in the Silver layer output.
Problem: Users want to see "Savings Account" instead of "SAVINGS".
What to do:
1. Open config.py
2. Find the REPLACEMENT_RULES dictionary
3. Add a new entry for "account_type" with mappings:
- SAVINGS --> Savings Account
- CHECKING --> Checking Account
- CREDIT --> Credit Card
4. Run the pipeline and view Silver data to verify the change
Exercise 4: Extract Email Domain
Currently, we only store the full email address. Your task is to create a new column that extracts just the domain part.
Problem: From john@gmail.com, we want to create a new column email_domain containing gmail.com.
What to do:
1. Open src/silver.py
2. Create a new function called extract_email_domain(df)
3. Use PySpark's split() function to split by "@" and get the second part
4. Call this function inside transform_entity() for the customer entity
5. Run the pipeline and view Silver customer data to verify the new column
Exercise 5: Create a CDC Summary Report
Currently, the pipeline prints results but there is no formatted summary. Your task is to create a nice summary table.
Expected output:
CDC Summary for 20260113
========================
Entity | NEW | UPDATED | UNCHANGED
-------------|-----|---------|----------
Customer | 5 | 0 | 0
Account | 6 | 0 | 0
What to do:
1. Create a new file src/summary.py
2. Create a function called print_summary(cdc_results, date_str)
3. Loop through the CDC results and print a formatted table
4. Open main.py and import your new function
5. Call print_summary() after the CDC detection step
Exercise 6: Save Record Counts to a File
Currently, record counts are only shown on screen. Your task is to save them to a text file for later reference.
Expected output file: data/reports/record_counts_20260113.txt
What to do:
1. Open main.py
2. Create a function called save_record_counts(date_str, bronze_results, cdc_results)
3. Write the counts to a text file in the data/reports/ folder
4. Call this function at the end of run_pipeline()
5. Run the pipeline and verify the file was created
Quick Reference
Commands
| Command | Description |
|---|---|
python main.py --all |
Process all available dates |
python main.py --date 20260113 |
Process specific date |
python main.py --today |
Process today's date |
python view_data.py bronze customer --date 20260113 |
View data |
File Naming
| Layer | Format | Example |
|---|---|---|
| Source | {Entity}.txt |
Customer.txt |
| Bronze | {entity}/date={YYYYMMDD}/ |
customer/date=20260113/ |
| Gold | new_{entity}_{YYYYMMDD}.csv |
new_customer_20260113.csv |
Congratulations! You now understand the CDC Data Pipeline.