Spark Structured Streaming Troubleshooting
Spark Streaming Troubleshooting
Spark Structured Streaming Troubleshooting แก้ปัญหา OOM Checkpoint Watermark Performance Kafka State Store AQE Partition Skew
| ปัญหา | สาเหตุ | วิธีแก้ | Priority |
|---|---|---|---|
| OOM (Out of Memory) | State Store ใหญ่ / Shuffle / Large Batch | Watermark + RocksDB + AQE | Critical |
| Slow Batch | Partition Skew / No AQE / Low Parallelism | เพิ่ม Partitions + AQE + Broadcast | High |
| Checkpoint Corrupt | Disk Full / Schema Change / Bug | ลบ Checkpoint + Idempotent Sink | High |
| Late Data Loss | Watermark ตัดข้อมูลเร็วเกินไป | เพิ่ม Watermark Duration | Medium |
| Data Duplication | Checkpoint Recovery / At-least-once | Idempotent Sink (Upsert) | Medium |
| Kafka Lag | Processing ช้ากว่า Ingestion | Scale Executors + maxOffsetsPerTrigger | High |
Memory & State Issues
# === Spark Structured Streaming Memory Fix ===
# spark-submit configuration
# spark-submit \
# --master yarn \
# --deploy-mode cluster \
# --num-executors 10 \
# --executor-memory 8g \
# --executor-cores 4 \
# --driver-memory 4g \
# --conf spark.sql.shuffle.partitions=500 \
# --conf spark.sql.adaptive.enabled=true \
# --conf spark.sql.adaptive.coalescePartitions.enabled=true \
# --conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider \
# --conf spark.memory.offHeap.enabled=true \
# --conf spark.memory.offHeap.size=4g \
# --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
# streaming_app.py
# Watermark + State Cleanup
# df = spark.readStream \
# .format("kafka") \
# .option("kafka.bootstrap.servers", "kafka:9092") \
# .option("subscribe", "events") \
# .option("maxOffsetsPerTrigger", 100000) \
# .load()
#
# result = df \
# .withWatermark("event_time", "1 hour") \
# .groupBy(
# window("event_time", "5 minutes"),
# "user_id"
# ) \
# .agg(count("*").alias("event_count"))
from dataclasses import dataclass
@dataclass
class MemoryFix:
problem: str
symptom: str
config: str
value: str
effect: str
fixes = [
MemoryFix("State Store OOM",
"java.lang.OutOfMemoryError: Java heap space",
"withWatermark('event_time', '1 hour')",
"ตั้ง Watermark ลบ State เก่ากว่า 1 ชั่วโมง",
"State Size ลดลง 80-90%"),
MemoryFix("State Store OOM (Large)",
"State size > Executor Memory",
"stateStore.providerClass=RocksDBStateStoreProvider",
"ใช้ RocksDB เก็บ State บน Disk แทน Memory",
"รองรับ State ขนาด TB"),
MemoryFix("Shuffle OOM",
"OOM ตอน groupBy / join",
"spark.sql.shuffle.partitions=500",
"เพิ่ม Partition ลด Data ต่อ Partition",
"ลด Memory ต่อ Task"),
MemoryFix("Large Batch OOM",
"OOM เมื่อข้อมูลสะสมมาก",
"maxOffsetsPerTrigger=100000",
"จำกัดข้อมูลต่อ Batch ไม่ให้มากเกินไป",
"Batch Size สม่ำเสมอ"),
MemoryFix("AQE Not Enabled",
"Partition Skew ทำให้ Task ช้า/OOM",
"spark.sql.adaptive.enabled=true",
"AQE ปรับ Partition อัตโนมัติ",
"ลด Skew + ลด Partition ที่เล็กเกินไป"),
]
print("=== Memory Fixes ===")
for f in fixes:
print(f"\n [{f.problem}]")
print(f" Symptom: {f.symptom}")
print(f" Config: {f.config}")
print(f" Value: {f.value}")
print(f" Effect: {f.effect}")
Checkpoint & Recovery
# === Checkpoint Management ===
# Checkpoint Configuration
# query = result.writeStream \
# .format("parquet") \
# .option("checkpointLocation", "s3://bucket/checkpoints/app-v1") \
# .option("path", "s3://bucket/output/") \
# .trigger(processingTime="30 seconds") \
# .outputMode("append") \
# .start()
# Idempotent Sink (Upsert to Database)
# def upsert_to_db(batch_df, batch_id):
# batch_df.write \
# .format("jdbc") \
# .option("url", "jdbc:postgresql://db:5432/analytics") \
# .option("dbtable", "events") \
# .option("user", "app") \
# .option("password", "secret") \
# .mode("append") \
# .save()
# # Use ON CONFLICT DO UPDATE for true idempotency
#
# query = result.writeStream \
# .foreachBatch(upsert_to_db) \
# .option("checkpointLocation", "s3://bucket/checkpoints/db-sink") \
# .start()
@dataclass
class CheckpointIssue:
issue: str
cause: str
fix: str
prevention: str
checkpoint_issues = [
CheckpointIssue("Checkpoint Corruption",
"Disk Full, Network Error ขณะเขียน Checkpoint",
"ลบ Checkpoint Directory เริ่มใหม่ + Idempotent Sink",
"ใช้ S3/HDFS ไม่ใช้ Local Disk + Monitor Disk Usage"),
CheckpointIssue("Schema Change Break",
"เปลี่ยน Schema แล้ว Checkpoint เก่า Incompatible",
"ลบ Checkpoint + ตั้ง Kafka Offset เป็น earliest/latest",
"ใช้ Schema Registry + Compatible Schema Evolution"),
CheckpointIssue("Checkpoint Size Too Large",
"State Store มี Key ล้านๆ ตัว ไม่มี Watermark",
"เพิ่ม Watermark ลด State + ใช้ RocksDB",
"ตั้ง Watermark ทุก Stateful Query"),
CheckpointIssue("Slow Recovery",
"Checkpoint ใหญ่ เริ่ม Stream ใหม่ช้ามาก",
"เพิ่ม minBatchesToRetain=10 ลด History",
"ตั้ง State TTL + Compact Checkpoint เป็นระยะ"),
]
print("=== Checkpoint Issues ===")
for c in checkpoint_issues:
print(f" [{c.issue}]")
print(f" Cause: {c.cause}")
print(f" Fix: {c.fix}")
print(f" Prevent: {c.prevention}")
Performance Tuning
# === Performance Tuning Checklist ===
@dataclass
class TuningParam:
category: str
param: str
default_val: str
recommended: str
impact: str
tuning = [
TuningParam("Parallelism",
"spark.sql.shuffle.partitions",
"200",
"2-3x Kafka Partitions (e.g. 500)",
"ลด Skew ใช้ Core เต็มที่"),
TuningParam("AQE",
"spark.sql.adaptive.enabled",
"false (Spark 3.0) / true (3.2+)",
"true",
"Auto-coalesce + Skew join + Optimize"),
TuningParam("Batch Size",
"maxOffsetsPerTrigger",
"Unlimited",
"100000-500000",
"Batch Size สม่ำเสมอ ไม่ OOM"),
TuningParam("Trigger",
"trigger(processingTime=...)",
"ASAP (0ms)",
"10-60 seconds",
"Balance Latency vs Throughput"),
TuningParam("Serializer",
"spark.serializer",
"JavaSerializer",
"KryoSerializer",
"ลด Serialization Time 30-50%"),
TuningParam("Join",
"spark.sql.autoBroadcastJoinThreshold",
"10MB",
"50-100MB",
"Broadcast Join ลด Shuffle"),
TuningParam("State Store",
"stateStore.providerClass",
"HDFSBackedStateStoreProvider",
"RocksDBStateStoreProvider",
"รองรับ State ขนาดใหญ่มาก"),
TuningParam("Compression",
"spark.sql.streaming.stateStore.compression.codec",
"none",
"lz4",
"ลด Checkpoint Size 40-60%"),
]
print("=== Performance Tuning ===")
for t in tuning:
print(f" [{t.category}] {t.param}")
print(f" Default: {t.default_val} → Recommended: {t.recommended}")
print(f" Impact: {t.impact}")
เคล็ดลับ
- Watermark: ตั้ง Watermark ทุก Stateful Query ป้องกัน State OOM
- RocksDB: ใช้ RocksDB State Store สำหรับ State ขนาดใหญ่
- AQE: เปิด Adaptive Query Execution ลด Skew อัตโนมัติ
- Idempotent: ใช้ Idempotent Sink (Upsert) ป้องกันข้อมูลซ้ำ
- Monitor: ดู Spark UI Batch Duration Processing Rate State Size
ปัญหาที่พบบ่อยมีอะไร
OOM State Store Partition Skew Slow Batch Checkpoint Corruption Late Data Kafka Offset Serialization Schema Evolution Backlog