PySpark ETL Training Exercise: Fintech Transaction Analytics

Overview

This hands-on exercise simulates a real-world fintech scenario where you'll process large-scale transaction data using PySpark. You'll work with approximately 25 million transaction records and perform ETL operations to create a medallion architecture (Bronze → Silver → Gold).


Exercise Scenario

You work as a Data Engineer at FinPay, a digital payment platform. Your task is to process daily transaction data, clean it, enrich it, and create analytical datasets for the business intelligence team.

Business Requirements:

  1. Process raw transaction data (Bronze layer)
  2. Clean and standardize the data (Silver layer)
  3. Create aggregated business metrics (Gold layer)

Data Generation Script

Save this as generate_fintech_data.py:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import random
from datetime import datetime, timedelta

# Initialize Spark
spark = SparkSession. \
        builder. \
        appName("FinTech Data Generator"). \
        master("local[*]"). \
        config('spark.executor.memory','12g'). \
        config('spark.sql.shuffle.partitions','5'). \
        getOrCreate()

# Configuration
NUM_RECORDS = 5_000_000
OUTPUT_PATH = "./data/raw_transactions"

print(f"Generating {NUM_RECORDS:,} transaction records...")

# Start and end timestamps (Unix timestamp in seconds)
start_timestamp = int(datetime(2024, 1, 1).timestamp())
end_timestamp = int(datetime(2024, 12, 31, 23, 59, 59).timestamp())
timestamp_range = end_timestamp - start_timestamp

# Generate data using Spark's range and SQL functions
df = spark.range(0, NUM_RECORDS) \
    .withColumn("transaction_id", concat(lit("TXN"), lpad(col("id").cast("string"), 10, "0"))) \
    .withColumn("user_id", concat(lit("USR"), lpad((rand() * 500000).cast("int").cast("string"), 8, "0"))) \
    .withColumn("merchant_id", concat(lit("MER"), lpad((rand() * 50000).cast("int").cast("string"), 6, "0"))) \
    .withColumn("transaction_date", 
                from_unixtime(lit(start_timestamp) + (rand() * lit(timestamp_range)).cast("long")).cast("timestamp")) \
    .withColumn("amount", round(rand() * 5000 + 1, 2)) \
    .withColumn("currency", 
                when(rand() < 0.7, "MYR")
                .when(rand() < 0.85, "USD")
                .when(rand() < 0.95, "SGD")
                .otherwise("EUR")) \
    .withColumn("payment_method", 
                when(rand() < 0.4, "CREDIT_CARD")
                .when(rand() < 0.7, "DEBIT_CARD")
                .when(rand() < 0.85, "E_WALLET")
                .when(rand() < 0.95, "BANK_TRANSFER")
                .otherwise("CRYPTO")) \
    .withColumn("merchant_category", 
                when(rand() < 0.15, "GROCERIES")
                .when(rand() < 0.30, "DINING")
                .when(rand() < 0.45, "SHOPPING")
                .when(rand() < 0.55, "TRANSPORTATION")
                .when(rand() < 0.65, "ENTERTAINMENT")
                .when(rand() < 0.75, "UTILITIES")
                .when(rand() < 0.85, "HEALTHCARE")
                .otherwise("OTHERS")) \
    .withColumn("status", 
                when(rand() < 0.92, "SUCCESS")
                .when(rand() < 0.96, "FAILED")
                .otherwise("PENDING")) \
    .withColumn("country", 
                when(rand() < 0.60, "Malaysia")
                .when(rand() < 0.75, "Singapore")
                .when(rand() < 0.85, "Indonesia")
                .when(rand() < 0.92, "Thailand")
                .otherwise("Philippines")) \
    .withColumn("device_type", 
                when(rand() < 0.65, "MOBILE")
                .when(rand() < 0.85, "WEB")
                .otherwise("POS")) \
    .withColumn("is_international", when(col("country") != "Malaysia", True).otherwise(False)) \
    .withColumn("fraud_score", round(rand() * 100, 2)) \
    .withColumn("processing_fee", round(col("amount") * 0.025, 2))

# Add some data quality issues for training purposes
df = df.withColumn("amount", 
                   when(rand() < 0.02, lit(None))  # 2% null amounts
                   .when(rand() < 0.01, lit(-1 * col("amount")))  # 1% negative amounts
                   .otherwise(col("amount"))) \
    .withColumn("user_id", 
                when(rand() < 0.005, lit(None))  # 0.5% null user_ids
                .otherwise(col("user_id")))

# Drop the id column
df = df.drop("id")

# Show sample before writing
print("\nSample of generated data:")
df.show(5, truncate=False)

# Write to parquet with partitioning by date
print("\nWriting data to parquet files...")
print("This may take 5-10 minutes depending on your system...")

# Create date partition column for writing
df_with_partition = df.withColumn("partition_date", to_date(col("transaction_date")))

# Write with date partitioning
df_with_partition.write \
    .mode("overwrite") \
    .partitionBy("partition_date") \
    .parquet(OUTPUT_PATH)

print(f"\n✓ Data generation complete!")
print(f"✓ Output location: {OUTPUT_PATH}")

# Verify the data
print("\nVerifying written data...")
verify_df = spark.read.parquet(OUTPUT_PATH)
record_count = verify_df.count()
print(f"✓ Total records verified: {record_count:,}")
print(f"✓ Number of date partitions: {verify_df.select('partition_date').distinct().count()}")

# Show some statistics
print("\n=== Data Statistics ===")
print("\nTransactions by Status:")
verify_df.groupBy("status").count().orderBy(desc("count")).show()

print("\nTransactions by Currency:")
verify_df.groupBy("currency").count().orderBy(desc("count")).show()

print("\nTransactions by Country:")
verify_df.groupBy("country").count().orderBy(desc("count")).show()

print("\nData Quality Issues:")
null_amounts = verify_df.filter(col("amount").isNull()).count()
negative_amounts = verify_df.filter(col("amount") < 0).count()
null_users = verify_df.filter(col("user_id").isNull()).count()

print(f"Null amounts: {null_amounts:,} ({null_amounts/record_count*100:.2f}%)")
print(f"Negative amounts: {negative_amounts:,} ({negative_amounts/record_count*100:.2f}%)")
print(f"Null user_ids: {null_users:,} ({null_users/record_count*100:.2f}%)")

print("\n" + "="*60)
print("✓ DATA GENERATION SUCCESSFUL!")
print("="*60)
print(f"\nYou can now start the exercise by reading from:")
print(f"  {OUTPUT_PATH}")
print("\nFor the exercise, use 'transaction_date' column (not 'partition_date')")
print("="*60)

spark.stop()

To generate the data:

python generate_fintech_data.py

Exercise Structure

BEGINNER LEVEL - Bronze Layer (Data Ingestion & Basic Transformations)

Task 1.1: Read Raw Data

Task 1.2: Basic Data Exploration

Task 1.3: Data Quality Check

Task 1.4: Save Bronze Layer


INTERMEDIATE LEVEL - Silver Layer (Data Cleaning & Enrichment)

Task 2.1: Data Cleaning

Task 2.2: Data Standardization

Task 2.3: Data Enrichment

Task 2.4: Save Silver Layer


ADVANCED LEVEL - Gold Layer (Aggregations & Analytics)

Task 3.1: Daily Transaction Summary

Create a daily aggregation with:

Task 3.2: Merchant Category Analysis

Create merchant category performance metrics:

Task 3.3: User Behavior Segmentation

Create user segments based on transaction patterns:

Task 3.4: Fraud Risk Analysis

Create a fraud risk dashboard dataset:

Task 3.5: Country-wise Transaction Trends

Create country performance metrics with window functions:


Submission Guidelines

  1. Create a Jupyter notebook or Python script for each layer
  2. Include comments explaining your approach
  3. Ensure all parquet files are properly partitioned
  4. Verify data quality at each layer
  5. Document any assumptions made

Evaluation Criteria

Beginner (30 points)

Intermediate (35 points)

Advanced (35 points)


Expected Output Structure

data/
├── raw_transactions/          # Generated data
├── bronze/
│   └── transactions/
│       └── transaction_date=YYYY-MM-DD/
├── silver/
│   └── transactions/
│       └── transaction_year=YYYY/
│           └── transaction_month=MM/
└── gold/
    ├── daily_summary/
    │   └── transaction_year=YYYY/
    │       └── transaction_month=MM/
    ├── category_analysis/
    │   └── transaction_year=YYYY/
    │       └── transaction_month=MM/
    ├── user_segments/
    ├── fraud_risk/
    │   └── transaction_year=YYYY/
    │       └── transaction_month=MM/
    └── country_trends/
        └── transaction_year=YYYY/
            └── transaction_month=MM/

Tips & Best Practices

  1. Performance: Use .cache() or .persist() for DataFrames used multiple times
  2. Partitioning: Choose partition columns with reasonable cardinality
  3. Memory: Monitor Spark UI for memory usage
  4. Testing: Test with a sample first before processing all data
  5. Documentation: Comment your code explaining business logic

Bonus Challenges (Optional)

  1. Implement data validation checks using Great Expectations
  2. Add logging for each transformation step
  3. Create a data quality report with statistics
  4. Implement incremental loading logic
  5. Optimize join operations in user segmentation

Good luck with your PySpark ETL journey! 🚀