Learning Objectives
By the end of this tutorial, you will:
- β Understand what streaming is and why it matters
- β Know when to use streaming vs batch processing
- β Build real-time data processing applications
- β Handle time-based operations and late data
- β Create production-ready streaming pipelines
Prerequisites: Basic PySpark knowledge (DataFrames, transformations)
Spark Structured Streaming
Learning Objectives
By the end of this , you will:
- β Understand what streaming is and why it matters
- β Know when to use streaming vs batch processing
- β Build real-time data processing applications
- β Handle time-based operations and late data
- β Create production-ready streaming pipelines
Prerequisites: Basic PySpark knowledge (DataFrames, transformations)
Table of Contents
- Introduction to Streaming
- Setup
- Part 1: Basic Concepts
- Part 2: File Streaming
- Part 3: Socket Streaming
- Part 4: Time Windows
- Part 5: Real-World Example
- Summary & Best Practices
Introduction to Streaming
What is Streaming?
Traditional Batch Processing:
[Collect Data] β [Wait Hours] β [Process All at Once] β [Get Results]
Example: Process yesterday's sales data every morning
Stream Processing:
[Data Arrives] β [Process Immediately] β [Get Results] β [Continuous...]
Example: Detect fraud as transactions happen
Why Do We Need Streaming?
Real-World Use Cases:
Financial Services
- Fraud detection (catch fraudulent transactions in milliseconds)
- Stock trading (make decisions on real-time market data)
- Risk monitoring (track exposure continuously)
E-commerce
- Real-time recommendations (suggest products as users browse)
- Inventory management (update stock levels instantly)
- Clickstream analysis (understand user behavior immediately)
IoT & Manufacturing
- Sensor monitoring (detect equipment failures before they happen)
- Quality control (identify defects in real-time)
- Predictive maintenance (schedule repairs proactively)
Social Media & Gaming
- Trending topics (detect what's viral right now)
- Live leaderboards (update rankings instantly)
- Content moderation (flag inappropriate content immediately)
When to Use Streaming vs Batch?
Use Streaming When:
- β±οΈ Low latency matters (seconds to minutes)
- π Continuous processing needed
- π Real-time dashboards required
- π¨ Immediate alerts necessary
- π° Business value of fresh data is high
Use Batch When:
- π Daily/weekly reports are sufficient
- πΎ Processing large historical data
- π’ Complex computations need full dataset
- π΅ Cost optimization is priority
- π Trends over time are focus
What is Structured Streaming?
Key Concept: Treat a stream as a continuously growing table.
Traditional Streaming (Complex):
- Manual state management
- Handle failures yourself
- Complex event time logic
Structured Streaming (Easy):
- Automatic state management
- Built-in fault tolerance
- Simple SQL-like operations
Mental Model:
Input Stream Query Result Stream
βββββββββββββββ ββββββββ βββββββββββββββ
β New Row 1 ββββββββ€ SQL βββββββββββββββ Output 1 β
β New Row 2 ββββββββ€Logic βββββββββββββββ Output 2 β
β New Row 3 ββββββββ€ βββββββββββββββ Output 3 β
β ... β ββββββββ β ... β
βββββββββββββββ βββββββββββββββ
(Unbounded Table) (Continuous Results)
Setup
Quick Start
Step 1: Generate All Tutorial Files
# Run the initial setup script
python initial_setup.py
# Choose option 2 (Scripts + Sample Data) - RECOMMENDED
# This creates everything you need including sample data
Step 2: Navigate to Tutorial Directory
cd streaming_tutorial
Step 3: Verify Setup
python 00_setup_environment.py
You should see:
β Created: data/input_stream
β Created: data/ecommerce_events
β Created: checkpoints/...
β Environment setup complete!
What You'll Have:
streaming_tutorial/
βββ data/
β βββ input_stream/ # 3 sample transaction files (if option 2)
β βββ ecommerce_events/ # 5 sample event files (if option 2)
βββ checkpoints/ # For fault tolerance
βββ output/ # Results storage
βββ 01_basic_streaming.py # Example scripts
βββ 02_output_modes.py
βββ ... (13 scripts total)
βββ README.md
βββ QUICK_REFERENCE.md
Part 1: Basic Streaming Concepts
π Concept: What is a Streaming DataFrame?
Regular DataFrame (Batch):
- Fixed data that doesn't change
- Process once, get result once
- Example: Read a CSV file
Streaming DataFrame:
- Data continuously arrives
- Process repeatedly as new data comes
- Example: Monitor a folder for new files
The Key Difference:
# Batch
df = spark.read.parquet("data.parquet") # Read once
df.count() # Returns a number
# Streaming
df = spark.readStream.parquet("data/") # Continuous read
df.isStreaming # Returns True
# Can't call .count() or .show() directly!
# Need to start a streaming query
Example 1: Your First Streaming Application
π― What We'll Learn:
- How to create a streaming DataFrame
- Basic transformations on streams
- Writing to console output
- Understanding
isStreamingproperty
π‘ Why This Matters: Before connecting to real data sources, we need to understand streaming fundamentals. The "rate" source generates test data automatically - perfect for learning!
π Concept: The Rate Source
The rate source generates rows automatically at a specified rate:
rowsPerSecond: How fast to generate data- Perfect for testing without external data
- Generates: timestamp + incremental value
Run the Example:
python 01_basic_streaming.py
What You'll See:
Batch: 0
+--------------------+-----+--------------------+-------------+-------+
|timestamp |value|timestamp_str |value_squared|is_even|
+--------------------+-----+--------------------+-------------+-------+
|2025-01-15 10:30:00 |0 |2025-01-15 10:30:00 |0 |true |
|2025-01-15 10:30:00 |1 |2025-01-15 10:30:00 |1 |false |
...
Batch: 1
+--------------------+-----+--------------------+-------------+-------+
|2025-01-15 10:30:01 |5 |2025-01-15 10:30:01 |25 |false |
|2025-01-15 10:30:01 |6 |2025-01-15 10:30:01 |36 |true |
...
π Key Observations:
- Batch Numbers Increase: Each batch represents a micro-batch of data
- Timestamps Update: Shows when data was generated
- Continuous Processing: Runs until you stop it (Ctrl+C)
- Transformations Work: Just like regular DataFrames!
Code Walkthrough:
# 1. Create streaming source
streaming_df = spark.readStream \
.format("rate") \ # Use rate source
.option("rowsPerSecond", 5) \ # Generate 5 rows per second
.load()
# 2. Check if it's streaming
print(streaming_df.isStreaming) # True
# 3. Transform (same as batch!)
processed_df = streaming_df \
.withColumn("value_squared", col("value") * col("value"))
# 4. Write to console (start the query)
query = processed_df.writeStream \
.outputMode("append") \ # Only new rows
.format("console") \ # Write to screen
.start() # Start processing!
# 5. Wait for data
query.awaitTermination(timeout=20) # Run for 20 seconds
# 6. Stop the query
query.stop()
π€ Discussion Points:
- Why can't we use
.show()on streaming DataFrames? - What happens if we don't call
.start()? - How is this different from a batch job?
Example 2: Understanding Output Modes
π― What We'll Learn:
- Three output modes: Append, Complete, Update
- When to use each mode
- How aggregations work in streaming
π‘ Why This Matters: Output modes control what data gets written to the sink. Choosing the wrong mode causes errors or unexpected results!
π Concept: Output Modes
Think of a streaming result table that continuously updates:
APPEND Mode:
Time 1: [Row 1, Row 2] β Send these
Time 2: [Row 1, Row 2, Row 3] β Send only Row 3 (new)
Time 3: [Row 1, Row 2, Row 3, Row 4] β Send only Row 4 (new)
- Use for: Non-aggregations, new data only
- Example: Log monitoring, transaction processing
COMPLETE Mode:
Time 1: [A:5, B:3] β Send entire table
Time 2: [A:8, B:7, C:2] β Send entire table again
Time 3: [A:10, B:9, C:5] β Send entire table again
- Use for: Aggregations where you want full results
- Example: Dashboards, real-time counts
UPDATE Mode:
Time 1: [A:5, B:3] β Send these
Time 2: [A:8, C:2] β Send only changed (A updated, C new)
Time 3: [B:9, C:5] β Send only changed (B updated, C updated)
- Use for: Aggregations, only send changes
- Example: Efficient updates, incremental processing
Run the Example:
python 02_output_modes.py
What You'll See:
With Complete Mode:
Batch: 0
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A |2 |4.5 |
|B |1 |2.0 |
|C |2 |6.0 |
+--------+-----+---------+
Batch: 1 (Shows ENTIRE table again)
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A |5 |12.3 | β Updated
|B |3 |8.1 | β Updated
|C |4 |10.5 | β Updated
+--------+-----+---------+
With Update Mode:
Batch: 0
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A |2 |4.5 |
|B |1 |2.0 |
|C |2 |6.0 |
+--------+-----+---------+
Batch: 1 (Shows ONLY changed rows)
+--------+-----+---------+
|category|count|avg_value|
+--------+-----+---------+
|A |5 |12.3 | β Only updated rows
|C |4 |10.5 |
+--------+-----+---------+
Decision Tree:
Are you doing aggregations (groupBy, count, sum)?
ββ YES β Use Complete or Update
β ββ Need full results each time? β Complete
β ββ Want efficient updates? β Update
β
ββ NO β Use Append
ββ Just adding new rows
π€ Discussion Points:
- Why does Complete mode resend all data?
- When would Update mode be more efficient?
- Can you use Append with aggregations?
Part 2: File Source Streaming
π Concept: Streaming from Files
The Problem: Your application generates files continuously:
- Logs written every minute
- Sensors dump data files
- ETL jobs produce output files
The Solution: Monitor a directory and process new files automatically!
How It Works:
Directory Monitoring:
data/input/
βββ file1.json β Processed
βββ file2.json β Processed
βββ file3.json β Processing now...
βββ file4.json β Waiting in queue
Spark Streaming:
- Watches the directory
- Detects new files
- Processes automatically
- Tracks what's been processed (checkpoint)
π Key Concepts:
Schema Definition (CRITICAL!):
# MUST define schema for file streaming schema = StructType([ StructField("transaction_id", StringType(), True), StructField("amount", DoubleType(), True) ])Why? Spark can't infer schema from empty directory at startup
maxFilesPerTrigger:
.option("maxFilesPerTrigger", 1) # Process 1 file per batchWhy? Controls throughput, prevents overwhelming system
Checkpointing (Fault Tolerance):
.option("checkpointLocation", "checkpoints/file_stream")Why? Remembers processed files, enables restart from failure
Example 3: File Streaming with Transactions
π― What We'll Learn:
- Reading JSON files continuously
- Schema definition for streaming
- Checkpoint management
- Handling multiple file arrivals
π‘ Why This Matters: File streaming is one of the most common production patterns. Many systems write data to files (logs, data lakes, exports) that need real-time processing.
Real-World Scenario: You're building a payment monitoring system. Transaction files arrive every few seconds from different payment gateways. You need to:
- Process each file as it arrives
- Apply validation rules
- Calculate hourly metrics
- Never miss a file (even if system restarts)
Setup:
If you chose Option 2 during setup, you already have 3 sample transaction files in data/input_stream/. If not, you'll need to run the simulator.
Run the Example:
python 03_file_streaming.py
If you have sample data, you'll see:
π File streaming started!
Watching directory: data/input_stream
Found 3 sample files - processing now!
Batch: 0
+-------------------+------------+----------+-------------------+------------+----------+----+----------+
|transaction_id |user_id |amount |timestamp |category |amount_usd|hour|date |
+-------------------+------------+----------+-------------------+------------+----------+----+----------+
|TXN-SAMPLE-001-001 |USER-5431 |234.56 |2025-01-15 09:23:12|Electronics |234.56 |9 |2025-01-15|
|TXN-SAMPLE-001-002 |USER-7812 |89.99 |2025-01-15 09:45:30|Clothing |89.99 |9 |2025-01-15|
...
To Generate More Data (Optional):
Open a second terminal:
cd streaming_tutorial
python simulate_file_data.py
This creates 5 additional files, one every 5 seconds.
Code Walkthrough:
# 1. Define schema (REQUIRED for file streaming!)
schema = StructType([
StructField("transaction_id", StringType(), True),
StructField("user_id", StringType(), True),
StructField("amount", DoubleType(), True),
StructField("timestamp", TimestampType(), True),
StructField("category", StringType(), True)
])
# 2. Create streaming reader
streaming_df = spark.readStream \
.schema(schema) \ # Must provide schema!
.format("json") \ # File format
.option("maxFilesPerTrigger", 1) \ # Process 1 file per batch
.load("data/input_stream") # Directory to monitor
# 3. Apply transformations (business logic)
processed_df = streaming_df \
.withColumn("amount_usd", round(col("amount"), 2)) \
.withColumn("hour", hour(col("timestamp"))) \
.withColumn("date", to_date(col("timestamp"))) \
.filter(col("amount") > 0) # Data quality rule
# 4. Write with checkpoint (for fault tolerance!)
query = processed_df.writeStream \
.outputMode("append") \
.format("console") \
.option("checkpointLocation", "checkpoints/file_stream") \ # CRITICAL!
.start()
π What's Happening Behind the Scenes:
- File Discovery: Spark lists all files in the directory
- Checkpoint Check: "Have I processed this file before?"
- Read & Process: Parse JSON, apply transformations
- Write Results: Send to console (or any sink)
- Update Checkpoint: "I've processed file X"
- Wait for Next Trigger: Sleep, then check for new files
π Checkpoint Directory Contents:
ls -la checkpoints/file_stream/
You'll see:
commits/ β What's been processed
metadata/ β Stream configuration
offsets/ β File tracking
sources/ β Source information
πΎ Fault Tolerance Demo:
- Run the streaming application
- Let it process 2-3 files
- Kill it (Ctrl+C)
- Start it again
- It won't reprocess old files! (thanks to checkpoint)
π€ Discussion Points:
- What happens if a file is corrupted?
- How does Spark know which files are new?
- Can we process files in order by timestamp?
- What if files arrive faster than we can process?
Example 4: Writing to Multiple Sinks
π― What We'll Learn:
- One stream β Multiple outputs
- Console sink (debugging)
- Memory sink (SQL queries)
- File sink (persistence)
π‘ Why This Matters: Production systems often need multiple outputs:
- Real-time dashboard (memory)
- Persistent storage (files)
- Monitoring/debugging (console/logs)
Run the Example:
python 04_multiple_sinks.py
What You'll See:
- Console Output (for monitoring)
- In-Memory Table (for SQL queries)
- Parquet Files (for storage)
The script also queries the in-memory table:
SELECT category, COUNT(*) as count
FROM memory_table
GROUP BY category
Use Cases:
| Sink Type | Use Case | Example |
|---|---|---|
| Console | Development, debugging | See what's happening |
| Memory | Real-time SQL queries | Dashboard queries |
| File (Parquet) | Persistent storage | Data lake |
| Kafka | Downstream systems | Microservices |
| Database | Transactional | User updates |
π€ Discussion Points:
- Why write to multiple sinks?
- Performance impact of multiple queries?
- How to handle one sink failing?
Part 3: Socket Source Streaming
π Concept: Real-Time Text Processing
The Problem: You need to process text data in real-time:
- Social media feeds
- Chat messages
- Log streams
- IoT sensor messages
The Solution: Socket streaming - read data from a TCP socket as it arrives!
How It Works:
Socket Server Spark Streaming
(Port 9999)
β β
β "hello world" β
βββββββββββββββββββββββ>β Process
β β [hello: 1, world: 1]
β "hello spark" β
βββββββββββββββββββββββ>β Update
β β [hello: 2, world: 1, spark: 1]
β "streaming is fun" β
βββββββββββββββββββββββ>β Update
β β [hello: 2, world: 1, spark: 1, ...]
Example 5: Real-Time Word Count
π― What We'll Learn:
- Socket source configuration
- Text processing with explode
- Real-time aggregations
- Interactive streaming
π‘ Why This Matters: Word count is the "Hello World" of streaming. But it demonstrates crucial patterns:
- Parsing text streams
- Stateful aggregations
- Continuous updates
Real-World Applications:
- Social Media: Trending hashtags, sentiment analysis
- Log Analysis: Error pattern detection, alert keywords
- Chat Moderation: Profanity detection, spam filtering
Setup Requires Two Terminals:
Terminal 1 - Start Socket Server:
python socket_server.py
You'll see:
β Server listening on localhost:9999
Waiting for Spark to connect...
Terminal 2 - Start Spark Streaming:
python 05_socket_streaming.py
You'll see:
π‘ Socket streaming started!
Listening on: localhost:9999
Now Type in Terminal 1:
> spark streaming is awesome
β Sent: spark streaming is awesome
> spark makes real time data easy
β Sent: spark makes real time data easy
> streaming streaming streaming
β Sent: streaming streaming streaming
Terminal 2 Shows:
Batch: 0
+---------+-----+
|word |count|
+---------+-----+
|spark |1 |
|streaming|1 |
|is |1 |
|awesome |1 |
+---------+-----+
Batch: 1 (After second sentence)
+---------+-----+
|word |count|
+---------+-----+
|streaming|2 | β Updated!
|spark |2 | β Updated!
|data |1 |
|real |1 |
|time |1 |
|makes |1 |
|easy |1 |
|is |1 |
|awesome |1 |
+---------+-----+
Batch: 2 (After third sentence)
+---------+-----+
|word |count|
+---------+-----+
|streaming|5 | β See the accumulation!
|spark |2 |
...
Code Walkthrough:
# 1. Read from socket
lines = spark.readStream \
.format("socket") \
.option("host", "localhost") \ # Where to connect
.option("port", 9999) \ # Port number
.load()
# 2. Split lines into words
words = lines.select(
explode(split(col("value"), " ")).alias("word")
)
# explode: ["hello world"] β [hello], [world]
# 3. Count words (stateful aggregation!)
word_counts = words \
.filter(col("word") != "") \ # Remove empty strings
.groupBy("word") \ # Group by word
.count() \ # Count occurrences
.orderBy(desc("count")) # Most frequent first
# 4. Write (must use 'complete' for aggregations)
query = word_counts.writeStream \
.outputMode("complete") \ # Send full results each time
.format("console") \
.start()
π Behind the Scenes - State Management:
Spark maintains state for each word:
Internal State Table:
ββββββββββββ¬ββββββββ
β Word β Count β
ββββββββββββΌββββββββ€
β spark β 5 β
β streamingβ 8 β
β awesome β 2 β
ββββββββββββ΄ββββββββ
When "spark" arrives:
1. Look up "spark" in state β Found: 5
2. Increment: 5 + 1 = 6
3. Update state: spark β 6
4. Output updated result
π― Try These Experiments:
- Paste a long paragraph - Watch words accumulate
- Type the same word repeatedly - See count increase
- Stop and restart Spark - State is lost (no checkpoint!)
- Type fast - Spark batches multiple lines together
π€ Discussion Points:
- Where is the state stored?
- What happens if Spark crashes?
- How much memory does state use?
- Can we limit state size?
Part 4: Windowing & Time-Based Operations
π Concept: Why Do We Need Windows?
The Problem:
In streaming, data flows continuously. How do you answer questions like:
- "How many transactions in the last hour?"
- "What's the average temperature every 5 minutes?"
- "Revenue per day?"
Without windows, you'd aggregate all data forever β State grows infinitely!
The Solution: Time Windows
Divide the stream into time-based buckets:
Continuous Stream:
[Event @ 10:00:15] [Event @ 10:00:45] [Event @ 10:01:30] [Event @ 10:02:10]
With 1-Minute Windows:
Window 1 [10:00 - 10:01]: [Event @ 10:00:15, Event @ 10:00:45]
Window 2 [10:01 - 10:02]: [Event @ 10:01:30]
Window 3 [10:02 - 10:03]: [Event @ 10:02:10]
π Two Types of Windows
1. Tumbling Windows (Non-Overlapping)
Time: 00:00 00:10 00:20 00:30 00:40
βββββββββΌββββββββΌββββββββΌββββββββΌβββββββ>
Window:β W1 β W2 β W3 β W4 β W5
βββββββββ΄ββββββββ΄ββββββββ΄ββββββββ΄βββββββ
Each event belongs to ONE window
Example: Hourly sales reports
Use When:
- Need distinct time periods
- No data overlap between periods
- Example: Daily reports, hourly metrics
2. Sliding Windows (Overlapping)
Time: 00:00 00:10 00:20 00:30 00:40
βββββββββΌββββββββΌββββββββΌββββββββΌβββββββ>
Window:β W1 β
β βββββW2βββββ€
β β βββββW3βββββ€
β β β βββββW4βββββ€
Windows = 20 minutes, Slide = 10 minutes
Each event can belong to MULTIPLE windows
Use When:
- Need smooth transitions
- Want moving averages
- Example: "Traffic in last 30 min, updated every 5 min"
Example 6: Tumbling and Sliding Windows
π― What We'll Learn:
- Creating tumbling windows
- Creating sliding windows
- Window-based aggregations
- Choosing window size
π‘ Why This Matters: Time-based analytics are crucial for monitoring, alerting, and dashboards. Understanding windows is key to building production streaming apps.
Run the Example:
python 06_windowing.py
Part 1: Tumbling Windows (10 seconds)
Window [10:30:00 - 10:30:10]
+------------------+------------------+--------+-----+---------+
|window_start |window_end |category|count|avg_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:00|2025-01-15 10:30:10|A |35 |45.2 |
|2025-01-15 10:30:00|2025-01-15 10:30:10|B |32 |47.8 |
|2025-01-15 10:30:00|2025-01-15 10:30:10|C |33 |46.5 |
+------------------+------------------+--------+-----+---------+
Window [10:30:10 - 10:30:20] (Next window, no overlap)
+------------------+------------------+--------+-----+---------+
|window_start |window_end |category|count|avg_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:10|2025-01-15 10:30:20|A |33 |48.1 |
|2025-01-15 10:30:10|2025-01-15 10:30:20|B |34 |45.9 |
|2025-01-15 10:30:10|2025-01-15 10:30:20|C |33 |46.3 |
+------------------+------------------+--------+-----+---------+
Part 2: Sliding Windows (20 sec window, 10 sec slide)
Window [10:30:00 - 10:30:20] (First 20 seconds)
+------------------+------------------+--------+-----+---------+
|window_start |window_end |category|count|max_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:00|2025-01-15 10:30:20|A |68 |95 |
|2025-01-15 10:30:00|2025-01-15 10:30:20|B |66 |98 |
|2025-01-15 10:30:00|2025-01-15 10:30:20|C |66 |99 |
+------------------+------------------+--------+-----+---------+
Window [10:30:10 - 10:30:30] (Overlaps with previous!)
+------------------+------------------+--------+-----+---------+
|window_start |window_end |category|count|max_value|
+------------------+------------------+--------+-----+---------+
|2025-01-15 10:30:10|2025-01-15 10:30:30|A |67 |99 |
|2025-01-15 10:30:10|2025-01-15 10:30:30|B |68 |98 |
|2025-01-15 10:30:10|2025-01-15 10:30:30|C |65 |97 |
+------------------+------------------+--------+-----+---------+
Code Examples:
# Tumbling Window: 10-second buckets
tumbling = df.groupBy(
window(col("timestamp"), "10 seconds"), # Window size
col("category")
).agg(
count("*").alias("count"),
avg("value").alias("avg_value")
)
# Sliding Window: 20-second window, sliding every 10 seconds
sliding = df.groupBy(
window(col("timestamp"), "20 seconds", "10 seconds"), # (size, slide)
col("category")
).agg(
count("*").alias("count"),
max("value").alias("max_value")
)
π Choosing Window Size:
| Use Case | Window Size | Slide | Type |
|---|---|---|---|
| Minute-by-minute metrics | 1 minute | - | Tumbling |
| 5-minute smoothed average | 5 minutes | 1 minute | Sliding |
| Hourly reports | 1 hour | - | Tumbling |
| Real-time trending (30 min) | 30 minutes | 5 minutes | Sliding |
| Daily aggregations | 1 day | - | Tumbling |
π€ Discussion Points:
- What happens to events at window boundaries?
- How does Spark know which window an event belongs to?
- Memory implications of large windows?
- Why use sliding windows over tumbling?
Example 7: Watermarking - Handling Late Data
π Concept: The Late Data Problem
Real-World Scenario:
Mobile app sends event: "Purchase at 10:00 AM"
Network delay...
Event arrives at server: "10:05 AM"
Question: Which 5-minute window does this belong to?
- 10:00-10:05 window? (Event time)
- 10:05-10:10 window? (Arrival time)
Two Types of Time:
Event Time (when it happened)
- More accurate
- Can be late
- Example: Transaction timestamp
Processing Time (when Spark sees it)
- Always current
- No late data problem
- Example: Log ingestion time
The Problem with Late Data:
Without Watermark:
Window [10:00-10:05] State:
- Batch 1: 100 events β Output: count=100
- Batch 2: 5 events β Output: count=105
- Batch 3: 2 events β Output: count=107
- Batch 100: 1 event β Output: count=500
- (State kept forever! Memory grows unbounded!)
With Watermark (10 minutes):
Window [10:00-10:05] State:
- Batch at 10:08: accept late data (within 10 min)
- Batch at 10:16: DROP late data (> 10 min late)
- Batch at 10:20: DISCARD window state (save memory)
π Watermark: The Solution
A watermark tells Spark: "I won't accept events older than X"
df.withWatermark("event_time", "10 minutes")
# Translation: "Events more than 10 minutes late will be dropped"
How It Works:
Current Max Event Time: 10:30:00
Watermark (10 min): 10:20:00
β
Events with timestamp < 10:20:00 β DROPPED
Events with timestamp >= 10:20:00 β PROCESSED
Benefits:
- β Handles reasonable late data
- β Bounds memory usage
- β Enables state cleanup
- β Required for production streaming
π― What We'll Learn:
- Setting watermarks
- Late data tolerance
- State cleanup
- Memory management
π‘ Why This Matters: Without watermarks, streaming applications eventually run out of memory! This is critical for production systems.
Run the Example:
python 07_watermarking.py
What You'll See:
Window [10:30:00 - 10:30:30]
+------------------+------------------+---------+--------+-------------+---------+
|window_start |window_end |sensor_id|avg_temp|reading_count|max_delay|
+------------------+------------------+---------+--------+-------------+---------+
|2025-01-15 10:30:00|2025-01-15 10:30:30|SENSOR-0 |25.3 |8 |18 |
|2025-01-15 10:30:00|2025-01-15 10:30:30|SENSOR-1 |27.1 |7 |17 |
|2025-01-15 10:30:00|2025-01-15 10:30:30|SENSOR-2 |23.8 |9 |19 |
...
Notice the max_delay column - shows how late data arrived (in seconds).
Code Walkthrough:
# Simulate events with delays (some arrive late)
df_with_events = streaming_df \
.withColumn("sensor_id", ...) \
.withColumn("delay_seconds", (rand() * 30).cast("int")) \ # 0-30 sec delay
.withColumn("event_time",
col("timestamp") - expr("INTERVAL delay_seconds SECONDS"))
# event_time = when it happened
# timestamp = when Spark sees it
# Apply watermark: tolerate 20 seconds of lateness
windowed_avg = df_with_events \
.withWatermark("event_time", "20 seconds") \ # CRITICAL LINE!
.groupBy(
window(col("event_time"), "30 seconds"), # Use event_time, not timestamp
col("sensor_id")
) \
.agg(...)
π Watermark Trade-offs:
| Watermark Size | Late Data Handling | Memory Usage | State Cleanup |
|---|---|---|---|
| Small (1 min) | Drops more data | Low | Fast |
| Medium (10 min) | Balanced | Medium | Moderate |
| Large (1 hour) | Accepts most data | High | Slow |
Choosing Watermark:
Analysis of your data shows:
- 90% of events arrive within 5 minutes
- 95% arrive within 10 minutes
- 99% arrive within 30 minutes
Conservative choice: 30 minutes (catch 99%)
Balanced choice: 10 minutes (catch 95%, faster cleanup)
Aggressive choice: 5 minutes (catch 90%, minimal state)
π€ Discussion Points:
- What happens to events beyond the watermark?
- How to choose watermark duration?
- Can watermark be too small?
- Impact on exactly-once processing?
Part 5: Real-World Example - E-commerce Analytics
π Concept: Production Streaming Architecture
Real-World E-commerce Platform:
User Actions Streaming Pipeline Outputs
β β β
ββ View Product ββββββ β β
ββ Add to Cart βββββββΌβββββββ> [File Stream] ββββββ β
ββ Purchase ββββββββββ β β β
β β β β
β [Transform] [Query 1] β
β β Revenue βββββΌβ> Dashboard
β β β
β [Windows] [Query 2] β
β [Watermark] Funnel ββββββΌβ> Alerts
β β β
β [Aggregations] [Query 3] β
β β Products ββββΌβ> ML Model
β β β
β [Multiple Sinks] β
β β β
βββββββββββββββββββββββββββββββ΄ββββββββββββββββββββββββββββββ
Business Requirements:
- Revenue Tracking: How much money are we making right now? (5-min windows)
- Conversion Funnel: How many views β carts β purchases? (2-min windows)
- Trending Products: What's hot right now? (real-time, no windows)
Technical Requirements:
- Multiple queries from single source (efficient!)
- Different window sizes per use case
- Memory table for SQL queries (for dashboard)
- File output for downstream ML
- Console for monitoring
Example 8: Complete E-commerce Streaming Analytics
π― What We'll Learn:
- Multiple streaming queries simultaneously
- Different window sizes for different use cases
- Combining real-time and windowed aggregations
- Production patterns and best practices
π‘ Why This Matters: This is what real production streaming looks like! Multiple queries, different requirements, coordinated processing.
Setup:
If you generated with Option 2, you have 5 sample e-commerce event files ready to go!
Run the Example:
python 08_ecommerce_streaming.py
If you have sample data:
Found 5 sample event files - processing now!
To generate more data (optional), open second terminal:
python generate_ecommerce_events.py
What You'll See:
Query 1 - Revenue by Category (5-minute windows):
+------------------+------------------+------------+--------+--------------+---------+
|window_start |window_end |category |revenue |purchase_count|avg_price|
+------------------+------------------+------------+--------+--------------+---------+
|2025-01-15 10:30:00|2025-01-15 10:35:00|Electronics|5234.89 |12 |436.24 |
|2025-01-15 10:30:00|2025-01-15 10:35:00|Sports |2145.67 |18 |119.20 |
|2025-01-15 10:30:00|2025-01-15 10:35:00|Home |1876.23 |15 |125.08 |
|2025-01-15 10:30:00|2025-01-15 10:35:00|Fashion |987.45 |8 |123.43 |
+------------------+------------------+------------+--------+--------------+---------+
Query 2 - Conversion Funnel (2-minute windows):
CONVERSION FUNNEL
+----------+-------------+
|event_type|total_events |
+----------+-------------+
|view |1247 | β Top of funnel
|cart |423 | β 33.9% conversion
|purchase |187 | β 44.2% of carts convert
+----------+-------------+
Insights:
- View β Cart: 33.9%
- Cart β Purchase: 44.2%
- Overall: 15.0% buy what they view
Query 3 - Top Products (written to Parquet):
Written to: output/top_products/
- Laptop Pro 15: 45 interactions
- Smart Watch: 38 interactions
- Running Shoes: 32 interactions
Code Architecture:
# Single source
events_stream = spark.readStream \
.schema(schema) \
.format("json") \
.load("data/ecommerce_events")
# Query 1: Revenue (5-min windows)
revenue_by_category = events_stream \
.filter(col("event_type") == "purchase") \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window(col("timestamp"), "5 minutes"), # Business requirement
col("category")
).agg(...)
query1 = revenue_by_category.writeStream \
.outputMode("update") \
.format("console") \
.start()
# Query 2: Funnel (2-min windows)
conversion_funnel = events_stream \
.groupBy(
window(col("timestamp"), "2 minutes"), # Faster updates
col("event_type")
).count()
query2 = conversion_funnel.writeStream \
.outputMode("complete") \
.format("memory") \ # For SQL queries!
.queryName("conversion_funnel") \
.start()
# Query 3: Top Products (no windows - cumulative)
top_products = events_stream \
.filter(col("event_type").isin("view", "purchase")) \
.groupBy("product_name", "event_type") \
.count()
query3 = top_products.writeStream \
.outputMode("complete") \
.format("parquet") \ # For downstream ML
.option("path", "output/top_products") \
.start()
# All queries run simultaneously!
π Real-Time SQL Queries:
While streaming, you can query the in-memory table:
spark.sql("""
SELECT
event_type,
SUM(event_count) as total_events,
ROUND(SUM(event_count) * 100.0 / SUM(SUM(event_count)) OVER (), 2) as percentage
FROM conversion_funnel
GROUP BY event_type
ORDER BY
CASE event_type
WHEN 'view' THEN 1
WHEN 'cart' THEN 2
WHEN 'purchase' THEN 3
END
""").show()
Production Considerations:
Trigger Intervals:
.trigger(processingTime="10 seconds") # Process every 10 seconds- Balance: latency vs throughput
- Longer triggers = more efficient
- Shorter triggers = faster results
Checkpointing:
.option("checkpointLocation", "checkpoints/top_products")- Must be unique per query
- Enables exactly-once processing
- Allows restart from failure
Memory Management:
- Watermarks cleanup old state
- Update mode more efficient than complete
- Monitor Spark UI for state size
π― Business Impact:
From this streaming pipeline, the business gets:
Real-Time Revenue Dashboard:
- Which categories are performing best RIGHT NOW
- Update every 5 minutes
- Alert if revenue drops suddenly
Conversion Optimization:
- Where are users dropping off?
- A/B test results in real-time
- Immediate feedback on changes
Inventory Management:
- What's trending RIGHT NOW
- Auto-reorder popular products
- Prevent stockouts during spikes
ML Model Input:
- Feed fresh data to recommendation engine
- Update click-through predictions
- Personalize in real-time
π€ Discussion Points:
- Why multiple queries vs one big query?
- How to handle query failures?
- When to use complete vs update mode?
- How to monitor query performance?
Summary & Best Practices
π What You've Learned
Core Concepts:
- β Streaming vs Batch processing
- β Structured Streaming mental model
- β Output modes: Append, Complete, Update
- β Multiple data sources: Rate, File, Socket
- β Multiple sinks: Console, Memory, File
Advanced Topics:
- β Time-based windowing (Tumbling, Sliding)
- β Watermarking for late data
- β State management and memory
- β Multi-query streaming architecture
- β Production patterns and fault tolerance
π Quick Reference
Reading Streams:
# File Stream
df = spark.readStream \
.format("json") \
.schema(schema) \
.option("maxFilesPerTrigger", 1) \
.load("path/")
# Socket Stream
df = spark.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()
# Rate Stream (testing)
df = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 10) \
.load()
Writing Streams:
query = df.writeStream \
.outputMode("append") \ # or "complete" or "update"
.format("console") \ # or "parquet", "memory", "kafka"
.option("checkpointLocation", "checkpoints/") \
.trigger(processingTime="10 seconds") \
.start()
query.awaitTermination()
query.stop()
Windowing:
# Tumbling
window(col("timestamp"), "10 minutes")
# Sliding
window(col("timestamp"), "10 minutes", "5 minutes")
Watermarking:
df.withWatermark("event_time", "10 minutes")
β Best Practices for Production
1. Always Use Checkpoints
.option("checkpointLocation", "s3://bucket/checkpoints/query1")
- Enables fault tolerance
- Allows exactly-once processing
- Must be unique per query
2. Choose Appropriate Trigger Intervals
.trigger(processingTime="30 seconds") # vs "1 second"
- Balance latency vs efficiency
- Consider: 5-30 seconds for most apps
3. Set Watermarks for Aggregations
df.withWatermark("timestamp", "1 hour")
- Prevents memory growth
- Enables state cleanup
- Critical for production
4. Monitor Your Streaming Queries
spark.streams.active # List queries
query.status # Query health
query.lastProgress # Performance metrics
- Check Spark UI: http://localhost:4040
- Set up alerts for failures
- Monitor state store size
5. Use Appropriate Output Modes
Non-aggregations β append
Aggregations with full results β complete
Aggregations with updates β update
6. Handle Schema Evolution
.option("mergeSchema", "true") # For evolving schemas
7. Partition Output Data
.partitionBy("date", "hour") # For file sinks
8. Set Realistic Watermarks
Analyze your data first:
- 90th percentile latency β minimum watermark
- 99th percentile latency β recommended
- Business SLA β maximum
π Next Steps
Immediate Practice:
- Modify examples with your own data
- Experiment with different window sizes
- Try writing to different sinks
- Combine multiple sources
Advanced Topics to Explore:
- Kafka Integration: Connect to real message queues
- Stream-Stream Joins: Join two streams together
- Stream-Static Joins: Enrich streams with batch data
- Arbitrary Stateful Processing: Custom state management
- Foreach Sink: Custom write logic
- Continuous Processing: Ultra-low latency mode
Production Readiness:
- Set up monitoring (Datadog, Grafana)
- Configure alerts (PagerDuty, Slack)
- Implement error handling
- Test failure scenarios
- Document runbooks
π― Common Use Cases Summary
| Use Case | Source | Window | Watermark | Output |
|---|---|---|---|---|
| Log Monitoring | File | None | N/A | Kafka + Console |
| IoT Sensors | Kafka | 5 min tumbling | 10 min | Parquet + Alerts |
| Fraud Detection | Kafka | None | N/A | Database + Alerts |
| Clickstream | File | 1 hour sliding (10 min) | 30 min | Parquet + Memory |
| Social Media | Kafka | 2 min tumbling | 5 min | Kafka + Dashboard |
π Additional Resources
Official Documentation:
Community Resources:
- Stack Overflow: [spark-structured-streaming] tag
- Spark User Mailing List
- Databricks Community Forums
Practice Datasets:
- Kaggle: Real-time datasets
- GitHub: Sample streaming projects
- AWS Public Datasets
π¬ Feedback & Support
Questions?
- Review examples again
- Check Spark UI (localhost:4040)
- Read error messages carefully
- Use spark.sparkContext.setLogLevel("DEBUG")
Troubleshooting:
- Port already in use β Change port number
- Out of memory β Reduce rowsPerSecond, add watermarks
- No data appearing β Check directory paths
- Query fails β Check schema, output mode, watermark
π Congratulations!
You've completed the Spark Structured Streaming tutorial! You now have:
- β Solid understanding of streaming concepts
- β Hands-on experience with multiple sources
- β Knowledge of windowing and watermarking
- β Real-world production patterns
- β Foundation to build streaming applications
Happy Streaming! π