Spark Structured Streaming
Spark Structured Streaming Real-time Stream Processing Kafka Window Watermark State Management Production Deployment
| Feature | Structured Streaming | Kafka Streams | Flink |
|---|---|---|---|
| Processing | Micro-batch / Continuous | Record-by-record | Record-by-record |
| Latency | 100ms - seconds | ms | ms |
| Exactly-once | Yes (with checkpoint) | Yes | Yes |
| State | Built-in + RocksDB | Built-in + RocksDB | Built-in + RocksDB |
| SQL Support | Full SQL | KSQL (separate) | Flink SQL |
| Best For | Batch + Stream unified | Kafka-native apps | Low-latency stream |
Core Concepts
# === Structured Streaming Basics ===
# Read from Kafka
# df = spark.readStream \
# .format("kafka") \
# .option("kafka.bootstrap.servers", "broker1:9092, broker2:9092") \
# .option("subscribe", "events") \
# .option("startingOffsets", "latest") \
# .load()
#
# # Parse JSON payload
# from pyspark.sql.functions import from_json, col, window
# from pyspark.sql.types import StructType, StringType, TimestampType, DoubleType
#
# schema = StructType() \
# .add("user_id", StringType()) \
# .add("event_type", StringType()) \
# .add("amount", DoubleType()) \
# .add("timestamp", TimestampType())
#
# parsed = df.select(
# from_json(col("value").cast("string"), schema).alias("data")
# ).select("data.*")
#
# # Tumbling Window aggregation
# windowed = parsed \
# .withWatermark("timestamp", "10 minutes") \
# .groupBy(
# window(col("timestamp"), "5 minutes"),
# col("event_type")
# ).agg(
# count("*").alias("event_count"),
# sum("amount").alias("total_amount"),
# avg("amount").alias("avg_amount")
# )
#
# # Write to Delta Lake
# query = windowed.writeStream \
# .format("delta") \
# .outputMode("append") \
# .option("checkpointLocation", "/checkpoint/events") \
# .trigger(processingTime="30 seconds") \
# .start("/data/event_metrics")
from dataclasses import dataclass
@dataclass
class StreamConcept:
concept: str
description: str
api: str
use_case: str
concepts = [
StreamConcept("readStream",
"อ่านข้อมูลแบบ Streaming จาก Source",
"spark.readStream.format('kafka').load()",
"อ่าน Event จาก Kafka Topic"),
StreamConcept("writeStream",
"เขียนข้อมูลแบบ Streaming ไป Sink",
"df.writeStream.format('delta').start()",
"เขียนผลลัพธ์ไป Delta Lake"),
StreamConcept("Trigger",
"กำหนดความถี่ในการประมวลผล Micro-batch",
"trigger(processingTime='30 seconds')",
"ควบคุม Latency vs Throughput"),
StreamConcept("Checkpoint",
"เก็บ State สำหรับ Recovery เมื่อ Restart",
"option('checkpointLocation', '/path')",
"Exactly-once guarantee, Fault tolerance"),
StreamConcept("Output Mode",
"กำหนดว่าเขียนข้อมูลอย่างไร",
"outputMode('append'|'complete'|'update')",
"append = new rows, complete = all, update = changed"),
]
print("=== Core Concepts ===")
for c in concepts:
print(f" [{c.concept}] {c.description}")
print(f" API: {c.api}")
print(f" Use: {c.use_case}")
Window and Watermark
# === Window Functions ===
# Tumbling Window (ไม่ซ้อน)
# .groupBy(window(col("ts"), "5 minutes"))
# |--5min--|--5min--|--5min--|
#
# Sliding Window (ซ้อนกัน)
# .groupBy(window(col("ts"), "10 minutes", "5 minutes"))
# |----10min----|
# |----10min----|
# |----10min----|
#
# Session Window (ตาม Gap)
# .groupBy(session_window(col("ts"), "10 minutes"))
# |--events--gap>10min--|--events--gap>10min--|
# Watermark example
# df.withWatermark("event_time", "10 minutes") \
# .groupBy(window("event_time", "5 minutes")) \
# .count()
#
# Timeline:
# Max event_time seen = 12:30
# Watermark = 12:30 - 10min = 12:20
# Window [12:10-12:15] still open (12:15 > 12:20? No, closed)
# Window [12:15-12:20] still open (12:20 >= 12:20? Closing)
# Window [12:20-12:25] still open (12:25 > 12:20? Yes, open)
# Late data with event_time < 12:20 will be DROPPED
@dataclass
class WindowType:
window_type: str
syntax: str
overlap: str
use_case: str
state_size: str
windows = [
WindowType("Tumbling",
"window(col('ts'), '5 minutes')",
"ไม่ซ้อนกัน ทุก Event อยู่ใน 1 Window",
"Metrics ทุก 5 นาที, Hourly Report",
"เล็ก — 1 Window ต่อช่วงเวลา"),
WindowType("Sliding",
"window(col('ts'), '10 min', '5 min')",
"ซ้อนกัน 1 Event อยู่ได้หลาย Window",
"Moving Average, Rolling Count",
"กลาง — หลาย Window overlap"),
WindowType("Session",
"session_window(col('ts'), '10 min')",
"ไม่ซ้อน แต่ขนาดไม่เท่ากัน ตาม Gap",
"User Session Analysis, Clickstream",
"ใหญ่ — ไม่รู้จำนวน Session ล่วงหน้า"),
]
print("=== Window Types ===")
for w in windows:
print(f" [{w.window_type}] {w.syntax}")
print(f" Overlap: {w.overlap}")
print(f" Use: {w.use_case}")
print(f" State: {w.state_size}")
Production Operations
# === Production Checklist ===
@dataclass
class ProdConfig:
config: str
setting: str
why: str
default_risk: str
configs = [
ProdConfig("Checkpoint Location",
"HDFS/S3 reliable storage, NOT local disk",
"Recovery after restart, exactly-once guarantee",
"Local disk = lose state on node failure"),
ProdConfig("Watermark Duration",
"Based on max expected lateness of data",
"Controls state size and late data handling",
"No watermark = state grows forever = OOM"),
ProdConfig("Trigger Interval",
"processingTime based on latency requirement",
"Latency vs throughput tradeoff",
"Default = as fast as possible = resource waste"),
ProdConfig("Max Offsets Per Trigger",
"maxOffsetsPerTrigger = 100000",
"Prevent processing huge backlog at once",
"No limit = OOM when catching up from lag"),
ProdConfig("State Store Backend",
"RocksDB for large state",
"In-memory default can OOM with large state",
"Default HashMap = OOM with millions of keys"),
ProdConfig("Kafka Consumer Config",
"max.poll.records, session.timeout.ms",
"Prevent consumer timeout during heavy processing",
"Default may timeout on slow processing"),
]
print("=== Production Config ===")
for c in configs:
print(f" [{c.config}]")
print(f" Setting: {c.setting}")
print(f" Why: {c.why}")
print(f" Risk if default: {c.default_risk}")
เคล็ดลับ
- Watermark: ตั้ง Watermark เสมอ ป้องกัน State โตไม่จำกัด
- Checkpoint: ใช้ S3/HDFS สำหรับ Checkpoint ไม่ใช้ Local Disk
- RocksDB: ใช้ RocksDB State Store สำหรับ State ขนาดใหญ่
- Monitor: ดู Processing Time vs Trigger Interval ถ้าเกิน = Lag
- Delta: ใช้ Delta Lake เป็น Sink สำหรับ ACID + Time Travel
การนำความรู้ไปประยุกต์ใช้งานจริง
แหล่งเรียนรู้ที่แนะนำ ได้แก่ Official Documentation ที่อัพเดทล่าสุดเสมอ Online Course จาก Coursera Udemy edX ช่อง YouTube คุณภาพทั้งไทยและอังกฤษ และ Community อย่าง Discord Reddit Stack Overflow ที่ช่วยแลกเปลี่ยนประสบการณ์กับนักพัฒนาทั่วโลก
Structured Streaming คืออะไร
Spark SQL Stream Processing Micro-batch Continuous Exactly-once Kafka Delta Lake DataFrame API Catalyst Optimizer Event-time
Window Functions ใช้อย่างไร
Tumbling 5 นาทีไม่ซ้อน Sliding 10 นาที Slide 5 นาทีซ้อน Session Gap 10 นาที groupBy window session_window Metrics Average
Watermark ทำงานอย่างไร
Late Data จัดการข้อมูลสาย withWatermark timestamp duration State ไม่โต Memory ตัดข้อมูลเกิน Window ค่ามากใช้ Memory น้อยตัดข้อมูล
Production Deployment ทำอย่างไร
Checkpoint S3 HDFS Trigger Interval Kafka Source RocksDB State Monitor Spark UI Prometheus Alert Lag Delta Lake ACID Scale K8s YARN
สรุป
Spark Structured Streaming Kafka Window Tumbling Sliding Session Watermark State Checkpoint RocksDB Delta Lake Production Monitor
