Spark Streaming Troubleshooting
Spark Structured Streaming Troubleshooting แก้ปัญหา OOM Checkpoint Watermark Performance Kafka State Store AQE Partition Skew
| ปัญหา | สาเหตุ | วิธีแก้ | Priority |
|---|---|---|---|
| OOM (Out of Memory) | State Store ใหญ่ / Shuffle / Large Batch | Watermark + RocksDB + AQE | Critical |
| Slow Batch | Partition Skew / No AQE / Low Parallelism | เพิ่ม Partitions + AQE + Broadcast | High |
| Checkpoint Corrupt | Disk Full / Schema Change / Bug | ลบ Checkpoint + Idempotent Sink | High |
| Late Data Loss | Watermark ตัดข้อมูลเร็วเกินไป | เพิ่ม Watermark Duration | Medium |
| Data Duplication | Checkpoint Recovery / At-least-once | Idempotent Sink (Upsert) | Medium |
| Kafka Lag | Processing ช้ากว่า Ingestion | Scale Executors + maxOffsetsPerTrigger | High |
Memory & State Issues
# === Spark Structured Streaming Memory Fix ===
# spark-submit configuration
# spark-submit \
# --master yarn \
# --deploy-mode cluster \
# --num-executors 10 \
# --executor-memory 8g \
# --executor-cores 4 \
# --driver-memory 4g \
# --conf spark.sql.shuffle.partitions=500 \
# --conf spark.sql.adaptive.enabled=true \
# --conf spark.sql.adaptive.coalescePartitions.enabled=true \
# --conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider \
# --conf spark.memory.offHeap.enabled=true \
# --conf spark.memory.offHeap.size=4g \
# --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
# streaming_app.py
# Watermark + State Cleanup
# df = spark.readStream \
# .format("kafka") \
# .option("kafka.bootstrap.servers", "kafka:9092") \
# .option("subscribe", "events") \
# .option("maxOffsetsPerTrigger", 100000) \
# .load()
#
# result = df \
# .withWatermark("event_time", "1 hour") \
# .groupBy(
# window("event_time", "5 minutes"),
# "user_id"
# ) \
# .agg(count("*").alias("event_count"))
from dataclasses import dataclass
@dataclass
class MemoryFix:
problem: str
symptom: str
config: str
value: str
effect: str
fixes = [
MemoryFix("State Store OOM",
"java.lang.OutOfMemoryError: Java heap space",
"withWatermark('event_time', '1 hour')",
"ตั้ง Watermark ลบ State เก่ากว่า 1 ชั่วโมง",
"State Size ลดลง 80-90%"),
MemoryFix("State Store OOM (Large)",
"State size > Executor Memory",
"stateStore.providerClass=RocksDBStateStoreProvider",
"ใช้ RocksDB เก็บ State บน Disk แทน Memory",
"รองรับ State ขนาด TB"),
MemoryFix("Shuffle OOM",
"OOM ตอน groupBy / join",
"spark.sql.shuffle.partitions=500",
"เพิ่ม Partition ลด Data ต่อ Partition",
"ลด Memory ต่อ Task"),
MemoryFix("Large Batch OOM",
"OOM เมื่อข้อมูลสะสมมาก",
"maxOffsetsPerTrigger=100000",
"จำกัดข้อมูลต่อ Batch ไม่ให้มากเกินไป",
"Batch Size สม่ำเสมอ"),
MemoryFix("AQE Not Enabled",
"Partition Skew ทำให้ Task ช้า/OOM",
"spark.sql.adaptive.enabled=true",
"AQE ปรับ Partition อัตโนมัติ",
"ลด Skew + ลด Partition ที่เล็กเกินไป"),
]
print("=== Memory Fixes ===")
for f in fixes:
print(f"\n [{f.problem}]")
print(f" Symptom: {f.symptom}")
print(f" Config: {f.config}")
print(f" Value: {f.value}")
print(f" Effect: {f.effect}")
Checkpoint & Recovery
# === Checkpoint Management ===
# Checkpoint Configuration
# query = result.writeStream \
# .format("parquet") \
# .option("checkpointLocation", "s3://bucket/checkpoints/app-v1") \
# .option("path", "s3://bucket/output/") \
# .trigger(processingTime="30 seconds") \
# .outputMode("append") \
# .start()
# Idempotent Sink (Upsert to Database)
# def upsert_to_db(batch_df, batch_id):
# batch_df.write \
# .format("jdbc") \
# .option("url", "jdbc:postgresql://db:5432/analytics") \
# .option("dbtable", "events") \
# .option("user", "app") \
# .option("password", "secret") \
# .mode("append") \
# .save()
# # Use ON CONFLICT DO UPDATE for true idempotency
#
# query = result.writeStream \
# .foreachBatch(upsert_to_db) \
# .option("checkpointLocation", "s3://bucket/checkpoints/db-sink") \
# .start()
@dataclass
class CheckpointIssue:
issue: str
cause: str
fix: str
prevention: str
checkpoint_issues = [
CheckpointIssue("Checkpoint Corruption",
"Disk Full, Network Error ขณะเขียน Checkpoint",
"ลบ Checkpoint Directory เริ่มใหม่ + Idempotent Sink",
"ใช้ S3/HDFS ไม่ใช้ Local Disk + Monitor Disk Usage"),
CheckpointIssue("Schema Change Break",
"เปลี่ยน Schema แล้ว Checkpoint เก่า Incompatible",
"ลบ Checkpoint + ตั้ง Kafka Offset เป็น earliest/latest",
"ใช้ Schema Registry + Compatible Schema Evolution"),
CheckpointIssue("Checkpoint Size Too Large",
"State Store มี Key ล้านๆ ตัว ไม่มี Watermark",
"เพิ่ม Watermark ลด State + ใช้ RocksDB",
"ตั้ง Watermark ทุก Stateful Query"),
CheckpointIssue("Slow Recovery",
"Checkpoint ใหญ่ เริ่ม Stream ใหม่ช้ามาก",
"เพิ่ม minBatchesToRetain=10 ลด History",
"ตั้ง State TTL + Compact Checkpoint เป็นระยะ"),
]
print("=== Checkpoint Issues ===")
for c in checkpoint_issues:
print(f" [{c.issue}]")
print(f" Cause: {c.cause}")
print(f" Fix: {c.fix}")
print(f" Prevent: {c.prevention}")
Performance Tuning
# === Performance Tuning Checklist ===
@dataclass
class TuningParam:
category: str
param: str
default_val: str
recommended: str
impact: str
tuning = [
TuningParam("Parallelism",
"spark.sql.shuffle.partitions",
"200",
"2-3x Kafka Partitions (e.g. 500)",
"ลด Skew ใช้ Core เต็มที่"),
TuningParam("AQE",
"spark.sql.adaptive.enabled",
"false (Spark 3.0) / true (3.2+)",
"true",
"Auto-coalesce + Skew join + Optimize"),
TuningParam("Batch Size",
"maxOffsetsPerTrigger",
"Unlimited",
"100000-500000",
"Batch Size สม่ำเสมอ ไม่ OOM"),
TuningParam("Trigger",
"trigger(processingTime=...)",
"ASAP (0ms)",
"10-60 seconds",
"Balance Latency vs Throughput"),
TuningParam("Serializer",
"spark.serializer",
"JavaSerializer",
"KryoSerializer",
"ลด Serialization Time 30-50%"),
TuningParam("Join",
"spark.sql.autoBroadcastJoinThreshold",
"10MB",
"50-100MB",
"Broadcast Join ลด Shuffle"),
TuningParam("State Store",
"stateStore.providerClass",
"HDFSBackedStateStoreProvider",
"RocksDBStateStoreProvider",
"รองรับ State ขนาดใหญ่มาก"),
TuningParam("Compression",
"spark.sql.streaming.stateStore.compression.codec",
"none",
"lz4",
"ลด Checkpoint Size 40-60%"),
]
print("=== Performance Tuning ===")
for t in tuning:
print(f" [{t.category}] {t.param}")
print(f" Default: {t.default_val} → Recommended: {t.recommended}")
print(f" Impact: {t.impact}")
เคล็ดลับ
- Watermark: ตั้ง Watermark ทุก Stateful Query ป้องกัน State OOM
- RocksDB: ใช้ RocksDB State Store สำหรับ State ขนาดใหญ่
- AQE: เปิด Adaptive Query Execution ลด Skew อัตโนมัติ
- Idempotent: ใช้ Idempotent Sink (Upsert) ป้องกันข้อมูลซ้ำ
- Monitor: ดู Spark UI Batch Duration Processing Rate State Size
ปัญหาที่พบบ่อยมีอะไร
OOM State Store Partition Skew Slow Batch Checkpoint Corruption Late Data Kafka Offset Serialization Schema Evolution Backlog
Memory Issues แก้อย่างไร
Watermark ลบ State RocksDB Disk-based shuffle.partitions maxOffsetsPerTrigger AQE Off-heap Kryo Serializer Executor Memory
Checkpoint แก้อย่างไร
ลบ Checkpoint เริ่มใหม่ Idempotent Sink Upsert S3 HDFS Schema Registry Watermark RocksDB minBatchesToRetain Recovery Compact
Performance Tuning ทำอย่างไร
shuffle.partitions AQE maxOffsetsPerTrigger Trigger Interval Kryo Broadcast Join RocksDB Compression lz4 Spark UI Monitor Alert
สรุป
Spark Structured Streaming Troubleshooting OOM Watermark RocksDB Checkpoint AQE Performance Tuning Kafka Idempotent Sink Production
