This hands-on exercise simulates a real-world fintech scenario where you'll process large-scale transaction data using PySpark. You'll work with approximately 25 million transaction records and perform ETL operations to create a medallion architecture (Bronze → Silver → Gold).
You work as a Data Engineer at FinPay, a digital payment platform. Your task is to process daily transaction data, clean it, enrich it, and create analytical datasets for the business intelligence team.
Save this as generate_fintech_data.py:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import random
from datetime import datetime, timedelta
# Initialize Spark
spark = SparkSession. \
builder. \
appName("FinTech Data Generator"). \
master("local[*]"). \
config('spark.executor.memory','12g'). \
config('spark.sql.shuffle.partitions','5'). \
getOrCreate()
# Configuration
NUM_RECORDS = 5_000_000
OUTPUT_PATH = "./data/raw_transactions"
print(f"Generating {NUM_RECORDS:,} transaction records...")
# Start and end timestamps (Unix timestamp in seconds)
start_timestamp = int(datetime(2024, 1, 1).timestamp())
end_timestamp = int(datetime(2024, 12, 31, 23, 59, 59).timestamp())
timestamp_range = end_timestamp - start_timestamp
# Generate data using Spark's range and SQL functions
df = spark.range(0, NUM_RECORDS) \
.withColumn("transaction_id", concat(lit("TXN"), lpad(col("id").cast("string"), 10, "0"))) \
.withColumn("user_id", concat(lit("USR"), lpad((rand() * 500000).cast("int").cast("string"), 8, "0"))) \
.withColumn("merchant_id", concat(lit("MER"), lpad((rand() * 50000).cast("int").cast("string"), 6, "0"))) \
.withColumn("transaction_date",
from_unixtime(lit(start_timestamp) + (rand() * lit(timestamp_range)).cast("long")).cast("timestamp")) \
.withColumn("amount", round(rand() * 5000 + 1, 2)) \
.withColumn("currency",
when(rand() < 0.7, "MYR")
.when(rand() < 0.85, "USD")
.when(rand() < 0.95, "SGD")
.otherwise("EUR")) \
.withColumn("payment_method",
when(rand() < 0.4, "CREDIT_CARD")
.when(rand() < 0.7, "DEBIT_CARD")
.when(rand() < 0.85, "E_WALLET")
.when(rand() < 0.95, "BANK_TRANSFER")
.otherwise("CRYPTO")) \
.withColumn("merchant_category",
when(rand() < 0.15, "GROCERIES")
.when(rand() < 0.30, "DINING")
.when(rand() < 0.45, "SHOPPING")
.when(rand() < 0.55, "TRANSPORTATION")
.when(rand() < 0.65, "ENTERTAINMENT")
.when(rand() < 0.75, "UTILITIES")
.when(rand() < 0.85, "HEALTHCARE")
.otherwise("OTHERS")) \
.withColumn("status",
when(rand() < 0.92, "SUCCESS")
.when(rand() < 0.96, "FAILED")
.otherwise("PENDING")) \
.withColumn("country",
when(rand() < 0.60, "Malaysia")
.when(rand() < 0.75, "Singapore")
.when(rand() < 0.85, "Indonesia")
.when(rand() < 0.92, "Thailand")
.otherwise("Philippines")) \
.withColumn("device_type",
when(rand() < 0.65, "MOBILE")
.when(rand() < 0.85, "WEB")
.otherwise("POS")) \
.withColumn("is_international", when(col("country") != "Malaysia", True).otherwise(False)) \
.withColumn("fraud_score", round(rand() * 100, 2)) \
.withColumn("processing_fee", round(col("amount") * 0.025, 2))
# Add some data quality issues for training purposes
df = df.withColumn("amount",
when(rand() < 0.02, lit(None)) # 2% null amounts
.when(rand() < 0.01, lit(-1 * col("amount"))) # 1% negative amounts
.otherwise(col("amount"))) \
.withColumn("user_id",
when(rand() < 0.005, lit(None)) # 0.5% null user_ids
.otherwise(col("user_id")))
# Drop the id column
df = df.drop("id")
# Show sample before writing
print("\nSample of generated data:")
df.show(5, truncate=False)
# Write to parquet with partitioning by date
print("\nWriting data to parquet files...")
print("This may take 5-10 minutes depending on your system...")
# Create date partition column for writing
df_with_partition = df.withColumn("partition_date", to_date(col("transaction_date")))
# Write with date partitioning
df_with_partition.write \
.mode("overwrite") \
.partitionBy("partition_date") \
.parquet(OUTPUT_PATH)
print(f"\n✓ Data generation complete!")
print(f"✓ Output location: {OUTPUT_PATH}")
# Verify the data
print("\nVerifying written data...")
verify_df = spark.read.parquet(OUTPUT_PATH)
record_count = verify_df.count()
print(f"✓ Total records verified: {record_count:,}")
print(f"✓ Number of date partitions: {verify_df.select('partition_date').distinct().count()}")
# Show some statistics
print("\n=== Data Statistics ===")
print("\nTransactions by Status:")
verify_df.groupBy("status").count().orderBy(desc("count")).show()
print("\nTransactions by Currency:")
verify_df.groupBy("currency").count().orderBy(desc("count")).show()
print("\nTransactions by Country:")
verify_df.groupBy("country").count().orderBy(desc("count")).show()
print("\nData Quality Issues:")
null_amounts = verify_df.filter(col("amount").isNull()).count()
negative_amounts = verify_df.filter(col("amount") < 0).count()
null_users = verify_df.filter(col("user_id").isNull()).count()
print(f"Null amounts: {null_amounts:,} ({null_amounts/record_count*100:.2f}%)")
print(f"Negative amounts: {negative_amounts:,} ({negative_amounts/record_count*100:.2f}%)")
print(f"Null user_ids: {null_users:,} ({null_users/record_count*100:.2f}%)")
print("\n" + "="*60)
print("✓ DATA GENERATION SUCCESSFUL!")
print("="*60)
print(f"\nYou can now start the exercise by reading from:")
print(f" {OUTPUT_PATH}")
print("\nFor the exercise, use 'transaction_date' column (not 'partition_date')")
print("="*60)
spark.stop()
To generate the data:
python generate_fintech_data.py
./data/raw_transactions./data/bronze/transactionstransaction_date (date only, not timestamp)user_id or null amounttransaction_date to date format (extract date from timestamp)transaction_year: Extract yeartransaction_month: Extract monthtransaction_day: Extract daytransaction_hour: Extract hourpayment_method values to uppercaseamount_in_myr column: Convert all amounts to MYR using these rates:is_high_value column: Flag transactions > 1000 MYRis_risky column: Flag transactions with fraud_score > 75time_of_day column: Categorize hours into "MORNING" (6-12), "AFTERNOON" (12-18), "EVENING" (18-24), "NIGHT" (0-6)./data/silver/transactionstransaction_year and transaction_monthCreate a daily aggregation with:
transaction_date./data/gold/daily_summary partitioned by transaction_year, transaction_monthCreate merchant category performance metrics:
merchant_categorytransaction_yeartransaction_month./data/gold/category_analysis partitioned by transaction_year, transaction_monthCreate user segments based on transaction patterns:
user_id./data/gold/user_segments (no partitioning)Create a fraud risk dashboard dataset:
transaction_date./data/gold/fraud_risk partitioned by transaction_year, transaction_monthCreate country performance metrics with window functions:
countrytransaction_date./data/gold/country_trends partitioned by transaction_year, transaction_monthdata/
├── raw_transactions/ # Generated data
├── bronze/
│ └── transactions/
│ └── transaction_date=YYYY-MM-DD/
├── silver/
│ └── transactions/
│ └── transaction_year=YYYY/
│ └── transaction_month=MM/
└── gold/
├── daily_summary/
│ └── transaction_year=YYYY/
│ └── transaction_month=MM/
├── category_analysis/
│ └── transaction_year=YYYY/
│ └── transaction_month=MM/
├── user_segments/
├── fraud_risk/
│ └── transaction_year=YYYY/
│ └── transaction_month=MM/
└── country_trends/
└── transaction_year=YYYY/
└── transaction_month=MM/
.cache() or .persist() for DataFrames used multiple timesGood luck with your PySpark ETL journey! 🚀