🌊 Spark Structured Streaming

Learning Objectives

By the end of this tutorial, you will:

  • βœ… Understand what streaming is and why it matters
  • βœ… Know when to use streaming vs batch processing
  • βœ… Build real-time data processing applications
  • βœ… Handle time-based operations and late data
  • βœ… Create production-ready streaming pipelines

Prerequisites: Basic PySpark knowledge (DataFrames, transformations)

Spark Structured Streaming

Learning Objectives

By the end of this , you will:

Prerequisites: Basic PySpark knowledge (DataFrames, transformations)


Table of Contents

  1. Introduction to Streaming
  2. Setup
  3. Part 1: Basic Concepts
  4. Part 2: File Streaming
  5. Part 3: Socket Streaming
  6. Part 4: Time Windows
  7. Part 5: Real-World Example
  8. Summary & Best Practices

Introduction to Streaming

What is Streaming?

Traditional Batch Processing:

[Collect Data] β†’ [Wait Hours] β†’ [Process All at Once] β†’ [Get Results]
Example: Process yesterday's sales data every morning

Stream Processing:

[Data Arrives] β†’ [Process Immediately] β†’ [Get Results] β†’ [Continuous...]
Example: Detect fraud as transactions happen

Why Do We Need Streaming?

Real-World Use Cases:

  1. Financial Services

    • Fraud detection (catch fraudulent transactions in milliseconds)
    • Stock trading (make decisions on real-time market data)
    • Risk monitoring (track exposure continuously)
  2. E-commerce

    • Real-time recommendations (suggest products as users browse)
    • Inventory management (update stock levels instantly)
    • Clickstream analysis (understand user behavior immediately)
  3. IoT & Manufacturing

    • Sensor monitoring (detect equipment failures before they happen)
    • Quality control (identify defects in real-time)
    • Predictive maintenance (schedule repairs proactively)
  4. Social Media & Gaming

    • Trending topics (detect what's viral right now)
    • Live leaderboards (update rankings instantly)
    • Content moderation (flag inappropriate content immediately)

When to Use Streaming vs Batch?

Use Streaming When:

Use Batch When:

What is Structured Streaming?

Key Concept: Treat a stream as a continuously growing table.

Traditional Streaming (Complex):
- Manual state management
- Handle failures yourself
- Complex event time logic

Structured Streaming (Easy):
- Automatic state management
- Built-in fault tolerance
- Simple SQL-like operations

Mental Model:

Input Stream          Query                Result Stream
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”             β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ New Row 1   │─────── SQL  │─────────────│ Output 1    β”‚
β”‚ New Row 2   │───────Logic │─────────────│ Output 2    β”‚
β”‚ New Row 3   │───────      │─────────────│ Output 3    β”‚
β”‚    ...      β”‚      β””β”€β”€β”€β”€β”€β”€β”˜             β”‚    ...      β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                           β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
(Unbounded Table)                        (Continuous Results)

Setup

Quick Start

Step 1: Generate All Tutorial Files

# Run the initial setup script
python initial_setup.py

# Choose option 2 (Scripts + Sample Data) - RECOMMENDED
# This creates everything you need including sample data

Step 2: Navigate to Tutorial Directory

cd streaming_tutorial

Step 3: Verify Setup

python 00_setup_environment.py

You should see:

βœ“ Created: data/input_stream
βœ“ Created: data/ecommerce_events
βœ“ Created: checkpoints/...
βœ“ Environment setup complete!

What You'll Have:

streaming_tutorial/
β”œβ”€β”€ data/
β”‚   β”œβ”€β”€ input_stream/           # 3 sample transaction files (if option 2)
β”‚   └── ecommerce_events/       # 5 sample event files (if option 2)
β”œβ”€β”€ checkpoints/                # For fault tolerance
β”œβ”€β”€ output/                     # Results storage
β”œβ”€β”€ 01_basic_streaming.py       # Example scripts
β”œβ”€β”€ 02_output_modes.py
β”œβ”€β”€ ... (13 scripts total)
β”œβ”€β”€ README.md
└── QUICK_REFERENCE.md

Part 1: Basic Streaming Concepts

πŸŽ“ Concept: What is a Streaming DataFrame?

Regular DataFrame (Batch):

Streaming DataFrame:

The Key Difference:

# Batch
df = spark.read.parquet("data.parquet")  # Read once
df.count()  # Returns a number

# Streaming
df = spark.readStream.parquet("data/")   # Continuous read
df.isStreaming  # Returns True
# Can't call .count() or .show() directly!
# Need to start a streaming query

Example 1: Your First Streaming Application

🎯 What We'll Learn:

πŸ’‘ Why This Matters: Before connecting to real data sources, we need to understand streaming fundamentals. The "rate" source generates test data automatically - perfect for learning!

πŸ“ Concept: The Rate Source

The rate source generates rows automatically at a specified rate:

Run the Example:

python 01_basic_streaming.py

What You'll See:

Batch: 0
+--------------------+-----+--------------------+-------------+-------+
|timestamp           |value|timestamp_str       |value_squared|is_even|
+--------------------+-----+--------------------+-------------+-------+
|2025-01-15 10:30:00 |0    |2025-01-15 10:30:00 |0            |true   |
|2025-01-15 10:30:00 |1    |2025-01-15 10:30:00 |1            |false  |
...

Batch: 1
+--------------------+-----+--------------------+-------------+-------+
|2025-01-15 10:30:01 |5    |2025-01-15 10:30:01 |25           |false  |
|2025-01-15 10:30:01 |6    |2025-01-15 10:30:01 |36           |true   |
...

πŸ“š Key Observations:

  1. Batch Numbers Increase: Each batch represents a micro-batch of data
  2. Timestamps Update: Shows when data was generated
  3. Continuous Processing: Runs until you stop it (Ctrl+C)
  4. Transformations Work: Just like regular DataFrames!

Code Walkthrough:

# 1. Create streaming source
streaming_df = spark.readStream \
    .format("rate") \              # Use rate source
    .option("rowsPerSecond", 5) \  # Generate 5 rows per second
    .load()

# 2. Check if it's streaming
print(streaming_df.isStreaming)  # True

# 3. Transform (same as batch!)
processed_df = streaming_df \
    .withColumn("value_squared", col("value") * col("value"))

# 4. Write to console (start the query)
query = processed_df.writeStream \
    .outputMode("append") \        # Only new rows
    .format("console") \           # Write to screen
    .start()                       # Start processing!

# 5. Wait for data
query.awaitTermination(timeout=20)  # Run for 20 seconds

# 6. Stop the query
query.stop()

πŸ€” Discussion Points:


Example 2: Understanding Output Modes

🎯 What We'll Learn:

πŸ’‘ Why This Matters: Output modes control what data gets written to the sink. Choosing the wrong mode causes errors or unexpected results!

πŸ“ Concept: Output Modes

Think of a streaming result table that continuously updates:

APPEND Mode:

Time 1: [Row 1, Row 2]          ← Send these
Time 2: [Row 1, Row 2, Row 3]   ← Send only Row 3 (new)
Time 3: [Row 1, Row 2, Row 3, Row 4] ← Send only Row 4 (new)

COMPLETE Mode:

Time 1: [A:5, B:3]              ← Send entire table
Time 2: [A:8, B:7, C:2]         ← Send entire table again
Time 3: [A:10, B:9, C:5]        ← Send entire table again

UPDATE Mode:

Time 1: [A:5, B:3]              ← Send these
Time 2: [A:8, C:2]              ← Send only changed (A updated, C new)
Time 3: [B:9, C:5]              ← Send only changed (B updated, C updated)

Run the Example:

python 02_output_modes.py

What You'll See:

With Complete Mode:

Batch: 0
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A       |2    |4.5      |
|B       |1    |2.0      |
|C       |2    |6.0      |
+--------+-----+---------+

Batch: 1  (Shows ENTIRE table again)
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A       |5    |12.3     |  ← Updated
|B       |3    |8.1      |  ← Updated
|C       |4    |10.5     |  ← Updated
+--------+-----+---------+

With Update Mode:

Batch: 0
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A       |2    |4.5      |
|B       |1    |2.0      |
|C       |2    |6.0      |
+--------+-----+---------+

Batch: 1  (Shows ONLY changed rows)
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A       |5    |12.3     |  ← Only updated rows
|C       |4    |10.5     |
+--------+-----+---------+

Decision Tree:

Are you doing aggregations (groupBy, count, sum)?
β”œβ”€ YES β†’ Use Complete or Update
β”‚         β”œβ”€ Need full results each time? β†’ Complete
β”‚         └─ Want efficient updates? β†’ Update
β”‚
└─ NO β†’ Use Append
          └─ Just adding new rows

πŸ€” Discussion Points:


Part 2: File Source Streaming

πŸŽ“ Concept: Streaming from Files

The Problem: Your application generates files continuously:

The Solution: Monitor a directory and process new files automatically!

How It Works:

Directory Monitoring:
data/input/
β”œβ”€β”€ file1.json  ← Processed
β”œβ”€β”€ file2.json  ← Processed
β”œβ”€β”€ file3.json  ← Processing now...
└── file4.json  ← Waiting in queue

Spark Streaming:
- Watches the directory
- Detects new files
- Processes automatically
- Tracks what's been processed (checkpoint)

πŸ“ Key Concepts:

  1. Schema Definition (CRITICAL!):

    # MUST define schema for file streaming
    schema = StructType([
        StructField("transaction_id", StringType(), True),
        StructField("amount", DoubleType(), True)
    ])
    

    Why? Spark can't infer schema from empty directory at startup

  2. maxFilesPerTrigger:

    .option("maxFilesPerTrigger", 1)  # Process 1 file per batch
    

    Why? Controls throughput, prevents overwhelming system

  3. Checkpointing (Fault Tolerance):

    .option("checkpointLocation", "checkpoints/file_stream")
    

    Why? Remembers processed files, enables restart from failure


Example 3: File Streaming with Transactions

🎯 What We'll Learn:

πŸ’‘ Why This Matters: File streaming is one of the most common production patterns. Many systems write data to files (logs, data lakes, exports) that need real-time processing.

Real-World Scenario: You're building a payment monitoring system. Transaction files arrive every few seconds from different payment gateways. You need to:

Setup:

If you chose Option 2 during setup, you already have 3 sample transaction files in data/input_stream/. If not, you'll need to run the simulator.

Run the Example:

python 03_file_streaming.py

If you have sample data, you'll see:

πŸ“ File streaming started!
   Watching directory: data/input_stream
   Found 3 sample files - processing now!

Batch: 0
+-------------------+------------+----------+-------------------+------------+----------+----+----------+
|transaction_id     |user_id     |amount    |timestamp          |category    |amount_usd|hour|date      |
+-------------------+------------+----------+-------------------+------------+----------+----+----------+
|TXN-SAMPLE-001-001 |USER-5431   |234.56    |2025-01-15 09:23:12|Electronics |234.56    |9   |2025-01-15|
|TXN-SAMPLE-001-002 |USER-7812   |89.99     |2025-01-15 09:45:30|Clothing    |89.99     |9   |2025-01-15|
...

To Generate More Data (Optional):

Open a second terminal:

cd streaming_tutorial
python simulate_file_data.py

This creates 5 additional files, one every 5 seconds.

Code Walkthrough:

# 1. Define schema (REQUIRED for file streaming!)
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True),
    StructField("category", StringType(), True)
])

# 2. Create streaming reader
streaming_df = spark.readStream \
    .schema(schema) \                      # Must provide schema!
    .format("json") \                      # File format
    .option("maxFilesPerTrigger", 1) \     # Process 1 file per batch
    .load("data/input_stream")             # Directory to monitor

# 3. Apply transformations (business logic)
processed_df = streaming_df \
    .withColumn("amount_usd", round(col("amount"), 2)) \
    .withColumn("hour", hour(col("timestamp"))) \
    .withColumn("date", to_date(col("timestamp"))) \
    .filter(col("amount") > 0)             # Data quality rule

# 4. Write with checkpoint (for fault tolerance!)
query = processed_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("checkpointLocation", "checkpoints/file_stream") \  # CRITICAL!
    .start()

πŸ“š What's Happening Behind the Scenes:

  1. File Discovery: Spark lists all files in the directory
  2. Checkpoint Check: "Have I processed this file before?"
  3. Read & Process: Parse JSON, apply transformations
  4. Write Results: Send to console (or any sink)
  5. Update Checkpoint: "I've processed file X"
  6. Wait for Next Trigger: Sleep, then check for new files

πŸ” Checkpoint Directory Contents:

ls -la checkpoints/file_stream/

You'll see:

commits/          ← What's been processed
metadata/         ← Stream configuration
offsets/          ← File tracking
sources/          ← Source information

πŸ’Ύ Fault Tolerance Demo:

  1. Run the streaming application
  2. Let it process 2-3 files
  3. Kill it (Ctrl+C)
  4. Start it again
  5. It won't reprocess old files! (thanks to checkpoint)

πŸ€” Discussion Points:


Example 4: Writing to Multiple Sinks

🎯 What We'll Learn:

πŸ’‘ Why This Matters: Production systems often need multiple outputs:

Run the Example:

python 04_multiple_sinks.py

What You'll See:

  1. Console Output (for monitoring)
  2. In-Memory Table (for SQL queries)
  3. Parquet Files (for storage)

The script also queries the in-memory table:

SELECT category, COUNT(*) as count
FROM memory_table
GROUP BY category

Use Cases:

Sink Type Use Case Example
Console Development, debugging See what's happening
Memory Real-time SQL queries Dashboard queries
File (Parquet) Persistent storage Data lake
Kafka Downstream systems Microservices
Database Transactional User updates

πŸ€” Discussion Points:


Part 3: Socket Source Streaming

πŸŽ“ Concept: Real-Time Text Processing

The Problem: You need to process text data in real-time:

The Solution: Socket streaming - read data from a TCP socket as it arrives!

How It Works:

Socket Server          Spark Streaming
(Port 9999)           
    β”‚                      β”‚
    β”‚  "hello world"       β”‚
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€>β”‚  Process
    β”‚                      β”‚  [hello: 1, world: 1]
    β”‚  "hello spark"       β”‚
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€>β”‚  Update
    β”‚                      β”‚  [hello: 2, world: 1, spark: 1]
    β”‚  "streaming is fun"  β”‚
    β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€>β”‚  Update
    β”‚                      β”‚  [hello: 2, world: 1, spark: 1, ...]

Example 5: Real-Time Word Count

🎯 What We'll Learn:

πŸ’‘ Why This Matters: Word count is the "Hello World" of streaming. But it demonstrates crucial patterns:

Real-World Applications:

Setup Requires Two Terminals:

Terminal 1 - Start Socket Server:

python socket_server.py

You'll see:

βœ“ Server listening on localhost:9999
  Waiting for Spark to connect...

Terminal 2 - Start Spark Streaming:

python 05_socket_streaming.py

You'll see:

πŸ“‘ Socket streaming started!
   Listening on: localhost:9999

Now Type in Terminal 1:

> spark streaming is awesome
  βœ“ Sent: spark streaming is awesome

> spark makes real time data easy
  βœ“ Sent: spark makes real time data easy

> streaming streaming streaming
  βœ“ Sent: streaming streaming streaming

Terminal 2 Shows:

Batch: 0
+---------+-----+
|word     |count|
+---------+-----+
|spark    |1    |
|streaming|1    |
|is       |1    |
|awesome  |1    |
+---------+-----+

Batch: 1  (After second sentence)
+---------+-----+
|word     |count|
+---------+-----+
|streaming|2    |  ← Updated!
|spark    |2    |  ← Updated!
|data     |1    |
|real     |1    |
|time     |1    |
|makes    |1    |
|easy     |1    |
|is       |1    |
|awesome  |1    |
+---------+-----+

Batch: 2  (After third sentence)
+---------+-----+
|word     |count|
+---------+-----+
|streaming|5    |  ← See the accumulation!
|spark    |2    |
...

Code Walkthrough:

# 1. Read from socket
lines = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \  # Where to connect
    .option("port", 9999) \         # Port number
    .load()

# 2. Split lines into words
words = lines.select(
    explode(split(col("value"), " ")).alias("word")
)
# explode: ["hello world"] β†’ [hello], [world]

# 3. Count words (stateful aggregation!)
word_counts = words \
    .filter(col("word") != "") \     # Remove empty strings
    .groupBy("word") \               # Group by word
    .count() \                       # Count occurrences
    .orderBy(desc("count"))          # Most frequent first

# 4. Write (must use 'complete' for aggregations)
query = word_counts.writeStream \
    .outputMode("complete") \        # Send full results each time
    .format("console") \
    .start()

πŸ“š Behind the Scenes - State Management:

Spark maintains state for each word:

Internal State Table:
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Word     β”‚ Count β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€
β”‚ spark    β”‚   5   β”‚
β”‚ streamingβ”‚   8   β”‚
β”‚ awesome  β”‚   2   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”˜

When "spark" arrives:
1. Look up "spark" in state β†’ Found: 5
2. Increment: 5 + 1 = 6
3. Update state: spark β†’ 6
4. Output updated result

🎯 Try These Experiments:

  1. Paste a long paragraph - Watch words accumulate
  2. Type the same word repeatedly - See count increase
  3. Stop and restart Spark - State is lost (no checkpoint!)
  4. Type fast - Spark batches multiple lines together

πŸ€” Discussion Points:


Part 4: Windowing & Time-Based Operations

πŸŽ“ Concept: Why Do We Need Windows?

The Problem:

In streaming, data flows continuously. How do you answer questions like:

Without windows, you'd aggregate all data forever β†’ State grows infinitely!

The Solution: Time Windows

Divide the stream into time-based buckets:

Continuous Stream:
[Event @ 10:00:15] [Event @ 10:00:45] [Event @ 10:01:30] [Event @ 10:02:10]

With 1-Minute Windows:
Window 1 [10:00 - 10:01]: [Event @ 10:00:15, Event @ 10:00:45]
Window 2 [10:01 - 10:02]: [Event @ 10:01:30]
Window 3 [10:02 - 10:03]: [Event @ 10:02:10]

πŸ“ Two Types of Windows

1. Tumbling Windows (Non-Overlapping)

Time:  00:00   00:10   00:20   00:30   00:40
       β”œβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€>
Window:β”‚  W1   β”‚  W2   β”‚  W3   β”‚  W4   β”‚  W5
       └───────┴───────┴───────┴───────┴───────

Each event belongs to ONE window
Example: Hourly sales reports

Use When:

2. Sliding Windows (Overlapping)

Time:  00:00   00:10   00:20   00:30   00:40
       β”œβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€>
Window:β”‚   W1   β”‚
       β”‚   β”œβ”€β”€β”€β”€W2─────
       β”‚   β”‚    β”œβ”€β”€β”€β”€W3─────
       β”‚   β”‚    β”‚    β”œβ”€β”€β”€β”€W4─────

Windows = 20 minutes, Slide = 10 minutes
Each event can belong to MULTIPLE windows

Use When:


Example 6: Tumbling and Sliding Windows

🎯 What We'll Learn:

πŸ’‘ Why This Matters: Time-based analytics are crucial for monitoring, alerting, and dashboards. Understanding windows is key to building production streaming apps.

Run the Example:

python 06_windowing.py

Part 1: Tumbling Windows (10 seconds)

Window [10:30:00 - 10:30:10]
+------------------+------------------+--------+-----+---------+
|window_start      |window_end        |category|count|avg_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:00|2025-01-15 10:30:10|A      |35   |45.2    |
|2025-01-15 10:30:00|2025-01-15 10:30:10|B      |32   |47.8    |
|2025-01-15 10:30:00|2025-01-15 10:30:10|C      |33   |46.5    |
+------------------+------------------+--------+-----+---------+

Window [10:30:10 - 10:30:20]  (Next window, no overlap)
+------------------+------------------+--------+-----+---------+
|window_start      |window_end        |category|count|avg_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:10|2025-01-15 10:30:20|A      |33   |48.1    |
|2025-01-15 10:30:10|2025-01-15 10:30:20|B      |34   |45.9    |
|2025-01-15 10:30:10|2025-01-15 10:30:20|C      |33   |46.3    |
+------------------+------------------+--------+-----+---------+

Part 2: Sliding Windows (20 sec window, 10 sec slide)

Window [10:30:00 - 10:30:20]  (First 20 seconds)
+------------------+------------------+--------+-----+---------+
|window_start      |window_end        |category|count|max_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:00|2025-01-15 10:30:20|A      |68   |95      |
|2025-01-15 10:30:00|2025-01-15 10:30:20|B      |66   |98      |
|2025-01-15 10:30:00|2025-01-15 10:30:20|C      |66   |99      |
+------------------+------------------+--------+-----+---------+

Window [10:30:10 - 10:30:30]  (Overlaps with previous!)
+------------------+------------------+--------+-----+---------+
|window_start      |window_end        |category|count|max_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:10|2025-01-15 10:30:30|A      |67   |99      |
|2025-01-15 10:30:10|2025-01-15 10:30:30|B      |68   |98      |
|2025-01-15 10:30:10|2025-01-15 10:30:30|C      |65   |97      |
+------------------+------------------+--------+-----+---------+

Code Examples:

# Tumbling Window: 10-second buckets
tumbling = df.groupBy(
    window(col("timestamp"), "10 seconds"),  # Window size
    col("category")
).agg(
    count("*").alias("count"),
    avg("value").alias("avg_value")
)

# Sliding Window: 20-second window, sliding every 10 seconds
sliding = df.groupBy(
    window(col("timestamp"), "20 seconds", "10 seconds"),  # (size, slide)
    col("category")
).agg(
    count("*").alias("count"),
    max("value").alias("max_value")
)

πŸ“Š Choosing Window Size:

Use Case Window Size Slide Type
Minute-by-minute metrics 1 minute - Tumbling
5-minute smoothed average 5 minutes 1 minute Sliding
Hourly reports 1 hour - Tumbling
Real-time trending (30 min) 30 minutes 5 minutes Sliding
Daily aggregations 1 day - Tumbling

πŸ€” Discussion Points:


Example 7: Watermarking - Handling Late Data

πŸŽ“ Concept: The Late Data Problem

Real-World Scenario:

Mobile app sends event:  "Purchase at 10:00 AM"
Network delay...
Event arrives at server: "10:05 AM"

Question: Which 5-minute window does this belong to?
- 10:00-10:05 window? (Event time)
- 10:05-10:10 window? (Arrival time)

Two Types of Time:

  1. Event Time (when it happened)

    • More accurate
    • Can be late
    • Example: Transaction timestamp
  2. Processing Time (when Spark sees it)

    • Always current
    • No late data problem
    • Example: Log ingestion time

The Problem with Late Data:

Without Watermark:
Window [10:00-10:05] State:
- Batch 1: 100 events   β†’ Output: count=100
- Batch 2: 5 events     β†’ Output: count=105
- Batch 3: 2 events     β†’ Output: count=107
- Batch 100: 1 event    β†’ Output: count=500
- (State kept forever! Memory grows unbounded!)

With Watermark (10 minutes):
Window [10:00-10:05] State:
- Batch at 10:08: accept late data (within 10 min)
- Batch at 10:16: DROP late data (> 10 min late)
- Batch at 10:20: DISCARD window state (save memory)

πŸ“ Watermark: The Solution

A watermark tells Spark: "I won't accept events older than X"

df.withWatermark("event_time", "10 minutes")
# Translation: "Events more than 10 minutes late will be dropped"

How It Works:

Current Max Event Time: 10:30:00
Watermark (10 min):     10:20:00
                            ↓
Events with timestamp < 10:20:00 β†’ DROPPED
Events with timestamp >= 10:20:00 β†’ PROCESSED

Benefits:

  1. βœ… Handles reasonable late data
  2. βœ… Bounds memory usage
  3. βœ… Enables state cleanup
  4. βœ… Required for production streaming

🎯 What We'll Learn:

πŸ’‘ Why This Matters: Without watermarks, streaming applications eventually run out of memory! This is critical for production systems.

Run the Example:

python 07_watermarking.py

What You'll See:

Window [10:30:00 - 10:30:30]
+------------------+------------------+---------+--------+-------------+---------+
|window_start      |window_end        |sensor_id|avg_temp|reading_count|max_delay|
+------------------+------------------+---------+--------+-------------+---------+
|2025-01-15 10:30:00|2025-01-15 10:30:30|SENSOR-0 |25.3   |8            |18       |
|2025-01-15 10:30:00|2025-01-15 10:30:30|SENSOR-1 |27.1   |7            |17       |
|2025-01-15 10:30:00|2025-01-15 10:30:30|SENSOR-2 |23.8   |9            |19       |
...

Notice the max_delay column - shows how late data arrived (in seconds).

Code Walkthrough:

# Simulate events with delays (some arrive late)
df_with_events = streaming_df \
    .withColumn("sensor_id", ...) \
    .withColumn("delay_seconds", (rand() * 30).cast("int")) \  # 0-30 sec delay
    .withColumn("event_time", 
                col("timestamp") - expr("INTERVAL delay_seconds SECONDS"))
    # event_time = when it happened
    # timestamp = when Spark sees it

# Apply watermark: tolerate 20 seconds of lateness
windowed_avg = df_with_events \
    .withWatermark("event_time", "20 seconds") \  # CRITICAL LINE!
    .groupBy(
        window(col("event_time"), "30 seconds"),  # Use event_time, not timestamp
        col("sensor_id")
    ) \
    .agg(...)

πŸ“Š Watermark Trade-offs:

Watermark Size Late Data Handling Memory Usage State Cleanup
Small (1 min) Drops more data Low Fast
Medium (10 min) Balanced Medium Moderate
Large (1 hour) Accepts most data High Slow

Choosing Watermark:

Analysis of your data shows:
- 90% of events arrive within 5 minutes
- 95% arrive within 10 minutes
- 99% arrive within 30 minutes

Conservative choice: 30 minutes (catch 99%)
Balanced choice: 10 minutes (catch 95%, faster cleanup)
Aggressive choice: 5 minutes (catch 90%, minimal state)

πŸ€” Discussion Points:


Part 5: Real-World Example - E-commerce Analytics

πŸŽ“ Concept: Production Streaming Architecture

Real-World E-commerce Platform:

User Actions                  Streaming Pipeline              Outputs
     β”‚                             β”‚                             β”‚
     β”œβ”€ View Product ─────┐       β”‚                             β”‚
     β”œβ”€ Add to Cart ──────┼───────> [File Stream] ─────┐       β”‚
     β”œβ”€ Purchase β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜       β”‚                     β”‚       β”‚
     β”‚                             β”‚                     ↓       β”‚
     β”‚                         [Transform]          [Query 1]   β”‚
     β”‚                             β”‚                Revenue ────┼─> Dashboard
     β”‚                             ↓                             β”‚
     β”‚                         [Windows]            [Query 2]   β”‚
     β”‚                         [Watermark]          Funnel ─────┼─> Alerts
     β”‚                             ↓                             β”‚
     β”‚                         [Aggregations]       [Query 3]   β”‚
     β”‚                             β”‚                Products ───┼─> ML Model
     β”‚                             ↓                             β”‚
     β”‚                         [Multiple Sinks]                 β”‚
     β”‚                             β”‚                             β”‚
     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Business Requirements:

  1. Revenue Tracking: How much money are we making right now? (5-min windows)
  2. Conversion Funnel: How many views β†’ carts β†’ purchases? (2-min windows)
  3. Trending Products: What's hot right now? (real-time, no windows)

Technical Requirements:


Example 8: Complete E-commerce Streaming Analytics

🎯 What We'll Learn:

πŸ’‘ Why This Matters: This is what real production streaming looks like! Multiple queries, different requirements, coordinated processing.

Setup:

If you generated with Option 2, you have 5 sample e-commerce event files ready to go!

Run the Example:

python 08_ecommerce_streaming.py

If you have sample data:

Found 5 sample event files - processing now!

To generate more data (optional), open second terminal:

python generate_ecommerce_events.py

What You'll See:

Query 1 - Revenue by Category (5-minute windows):

+------------------+------------------+------------+--------+--------------+---------+
|window_start      |window_end        |category    |revenue |purchase_count|avg_price|
+------------------+------------------+------------+--------+--------------+---------+
|2025-01-15 10:30:00|2025-01-15 10:35:00|Electronics|5234.89 |12            |436.24   |
|2025-01-15 10:30:00|2025-01-15 10:35:00|Sports     |2145.67 |18            |119.20   |
|2025-01-15 10:30:00|2025-01-15 10:35:00|Home       |1876.23 |15            |125.08   |
|2025-01-15 10:30:00|2025-01-15 10:35:00|Fashion    |987.45  |8             |123.43   |
+------------------+------------------+------------+--------+--------------+---------+

Query 2 - Conversion Funnel (2-minute windows):

CONVERSION FUNNEL
+----------+-------------+
|event_type|total_events |
+----------+-------------+
|view      |1247         |  ← Top of funnel
|cart      |423          |  ← 33.9% conversion
|purchase  |187          |  ← 44.2% of carts convert
+----------+-------------+

Insights:
- View β†’ Cart: 33.9%
- Cart β†’ Purchase: 44.2%
- Overall: 15.0% buy what they view

Query 3 - Top Products (written to Parquet):

Written to: output/top_products/
- Laptop Pro 15: 45 interactions
- Smart Watch: 38 interactions
- Running Shoes: 32 interactions

Code Architecture:

# Single source
events_stream = spark.readStream \
    .schema(schema) \
    .format("json") \
    .load("data/ecommerce_events")

# Query 1: Revenue (5-min windows)
revenue_by_category = events_stream \
    .filter(col("event_type") == "purchase") \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(
        window(col("timestamp"), "5 minutes"),  # Business requirement
        col("category")
    ).agg(...)

query1 = revenue_by_category.writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# Query 2: Funnel (2-min windows)
conversion_funnel = events_stream \
    .groupBy(
        window(col("timestamp"), "2 minutes"),  # Faster updates
        col("event_type")
    ).count()

query2 = conversion_funnel.writeStream \
    .outputMode("complete") \
    .format("memory") \              # For SQL queries!
    .queryName("conversion_funnel") \
    .start()

# Query 3: Top Products (no windows - cumulative)
top_products = events_stream \
    .filter(col("event_type").isin("view", "purchase")) \
    .groupBy("product_name", "event_type") \
    .count()

query3 = top_products.writeStream \
    .outputMode("complete") \
    .format("parquet") \             # For downstream ML
    .option("path", "output/top_products") \
    .start()

# All queries run simultaneously!

πŸ“Š Real-Time SQL Queries:

While streaming, you can query the in-memory table:

spark.sql("""
    SELECT 
        event_type,
        SUM(event_count) as total_events,
        ROUND(SUM(event_count) * 100.0 / SUM(SUM(event_count)) OVER (), 2) as percentage
    FROM conversion_funnel
    GROUP BY event_type
    ORDER BY 
        CASE event_type 
            WHEN 'view' THEN 1 
            WHEN 'cart' THEN 2 
            WHEN 'purchase' THEN 3 
        END
""").show()

Production Considerations:

  1. Trigger Intervals:

    .trigger(processingTime="10 seconds")  # Process every 10 seconds
    
    • Balance: latency vs throughput
    • Longer triggers = more efficient
    • Shorter triggers = faster results
  2. Checkpointing:

    .option("checkpointLocation", "checkpoints/top_products")
    
    • Must be unique per query
    • Enables exactly-once processing
    • Allows restart from failure
  3. Memory Management:

    • Watermarks cleanup old state
    • Update mode more efficient than complete
    • Monitor Spark UI for state size

🎯 Business Impact:

From this streaming pipeline, the business gets:

  1. Real-Time Revenue Dashboard:

    • Which categories are performing best RIGHT NOW
    • Update every 5 minutes
    • Alert if revenue drops suddenly
  2. Conversion Optimization:

    • Where are users dropping off?
    • A/B test results in real-time
    • Immediate feedback on changes
  3. Inventory Management:

    • What's trending RIGHT NOW
    • Auto-reorder popular products
    • Prevent stockouts during spikes
  4. ML Model Input:

    • Feed fresh data to recommendation engine
    • Update click-through predictions
    • Personalize in real-time

πŸ€” Discussion Points:


Summary & Best Practices

πŸŽ“ What You've Learned

Core Concepts:

Advanced Topics:


πŸ“‹ Quick Reference

Reading Streams:

# File Stream
df = spark.readStream \
    .format("json") \
    .schema(schema) \
    .option("maxFilesPerTrigger", 1) \
    .load("path/")

# Socket Stream
df = spark.readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Rate Stream (testing)
df = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 10) \
    .load()

Writing Streams:

query = df.writeStream \
    .outputMode("append") \           # or "complete" or "update"
    .format("console") \              # or "parquet", "memory", "kafka"
    .option("checkpointLocation", "checkpoints/") \
    .trigger(processingTime="10 seconds") \
    .start()

query.awaitTermination()
query.stop()

Windowing:

# Tumbling
window(col("timestamp"), "10 minutes")

# Sliding
window(col("timestamp"), "10 minutes", "5 minutes")

Watermarking:

df.withWatermark("event_time", "10 minutes")

βœ… Best Practices for Production

1. Always Use Checkpoints

.option("checkpointLocation", "s3://bucket/checkpoints/query1")

2. Choose Appropriate Trigger Intervals

.trigger(processingTime="30 seconds")  # vs "1 second"

3. Set Watermarks for Aggregations

df.withWatermark("timestamp", "1 hour")

4. Monitor Your Streaming Queries

spark.streams.active          # List queries
query.status                  # Query health
query.lastProgress           # Performance metrics

5. Use Appropriate Output Modes

Non-aggregations β†’ append
Aggregations with full results β†’ complete
Aggregations with updates β†’ update

6. Handle Schema Evolution

.option("mergeSchema", "true")  # For evolving schemas

7. Partition Output Data

.partitionBy("date", "hour")  # For file sinks

8. Set Realistic Watermarks

Analyze your data first:
- 90th percentile latency β†’ minimum watermark
- 99th percentile latency β†’ recommended
- Business SLA β†’ maximum

πŸš€ Next Steps

Immediate Practice:

  1. Modify examples with your own data
  2. Experiment with different window sizes
  3. Try writing to different sinks
  4. Combine multiple sources

Advanced Topics to Explore:

  1. Kafka Integration: Connect to real message queues
  2. Stream-Stream Joins: Join two streams together
  3. Stream-Static Joins: Enrich streams with batch data
  4. Arbitrary Stateful Processing: Custom state management
  5. Foreach Sink: Custom write logic
  6. Continuous Processing: Ultra-low latency mode

Production Readiness:

  1. Set up monitoring (Datadog, Grafana)
  2. Configure alerts (PagerDuty, Slack)
  3. Implement error handling
  4. Test failure scenarios
  5. Document runbooks

🎯 Common Use Cases Summary

Use Case Source Window Watermark Output
Log Monitoring File None N/A Kafka + Console
IoT Sensors Kafka 5 min tumbling 10 min Parquet + Alerts
Fraud Detection Kafka None N/A Database + Alerts
Clickstream File 1 hour sliding (10 min) 30 min Parquet + Memory
Social Media Kafka 2 min tumbling 5 min Kafka + Dashboard

πŸ“š Additional Resources

Official Documentation:

Community Resources:

Practice Datasets:


πŸ’¬ Feedback & Support

Questions?

Troubleshooting:


πŸŽ‰ Congratulations!

You've completed the Spark Structured Streaming tutorial! You now have:


Happy Streaming! πŸš€

↑