SiamCafe · Blog
Spark Structured Streaming Troubleshooting
บทความ

Spark Structured Streaming Troubleshooting

เผยแพร่ 28 พฤษภาคม 2569

Spark Streaming Troubleshooting

Spark Structured Streaming Troubleshooting แก้ปัญหา OOM Checkpoint Watermark Performance Kafka State Store AQE Partition Skew

ปัญหาสาเหตุวิธีแก้Priority
OOM (Out of Memory)State Store ใหญ่ / Shuffle / Large BatchWatermark + RocksDB + AQECritical
Slow BatchPartition Skew / No AQE / Low Parallelismเพิ่ม Partitions + AQE + BroadcastHigh
Checkpoint CorruptDisk Full / Schema Change / Bugลบ Checkpoint + Idempotent SinkHigh
Late Data LossWatermark ตัดข้อมูลเร็วเกินไปเพิ่ม Watermark DurationMedium
Data DuplicationCheckpoint Recovery / At-least-onceIdempotent Sink (Upsert)Medium
Kafka LagProcessing ช้ากว่า IngestionScale Executors + maxOffsetsPerTriggerHigh

Memory & State Issues

# === Spark Structured Streaming Memory Fix ===

# spark-submit configuration
# spark-submit \
#   --master yarn \
#   --deploy-mode cluster \
#   --num-executors 10 \
#   --executor-memory 8g \
#   --executor-cores 4 \
#   --driver-memory 4g \
#   --conf spark.sql.shuffle.partitions=500 \
#   --conf spark.sql.adaptive.enabled=true \
#   --conf spark.sql.adaptive.coalescePartitions.enabled=true \
#   --conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider \
#   --conf spark.memory.offHeap.enabled=true \
#   --conf spark.memory.offHeap.size=4g \
#   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
#   streaming_app.py

# Watermark + State Cleanup
# df = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:9092") \
#     .option("subscribe", "events") \
#     .option("maxOffsetsPerTrigger", 100000) \
#     .load()
#
# result = df \
#     .withWatermark("event_time", "1 hour") \
#     .groupBy(
#         window("event_time", "5 minutes"),
#         "user_id"
#     ) \
#     .agg(count("*").alias("event_count"))

from dataclasses import dataclass

@dataclass
class MemoryFix:
    problem: str
    symptom: str
    config: str
    value: str
    effect: str

fixes = [
    MemoryFix("State Store OOM",
        "java.lang.OutOfMemoryError: Java heap space",
        "withWatermark('event_time', '1 hour')",
        "ตั้ง Watermark ลบ State เก่ากว่า 1 ชั่วโมง",
        "State Size ลดลง 80-90%"),
    MemoryFix("State Store OOM (Large)",
        "State size > Executor Memory",
        "stateStore.providerClass=RocksDBStateStoreProvider",
        "ใช้ RocksDB เก็บ State บน Disk แทน Memory",
        "รองรับ State ขนาด TB"),
    MemoryFix("Shuffle OOM",
        "OOM ตอน groupBy / join",
        "spark.sql.shuffle.partitions=500",
        "เพิ่ม Partition ลด Data ต่อ Partition",
        "ลด Memory ต่อ Task"),
    MemoryFix("Large Batch OOM",
        "OOM เมื่อข้อมูลสะสมมาก",
        "maxOffsetsPerTrigger=100000",
        "จำกัดข้อมูลต่อ Batch ไม่ให้มากเกินไป",
        "Batch Size สม่ำเสมอ"),
    MemoryFix("AQE Not Enabled",
        "Partition Skew ทำให้ Task ช้า/OOM",
        "spark.sql.adaptive.enabled=true",
        "AQE ปรับ Partition อัตโนมัติ",
        "ลด Skew + ลด Partition ที่เล็กเกินไป"),
]

print("=== Memory Fixes ===")
for f in fixes:
    print(f"\n  [{f.problem}]")
    print(f"    Symptom: {f.symptom}")
    print(f"    Config: {f.config}")
    print(f"    Value: {f.value}")
    print(f"    Effect: {f.effect}")

Checkpoint & Recovery

# === Checkpoint Management ===

# Checkpoint Configuration
# query = result.writeStream \
#     .format("parquet") \
#     .option("checkpointLocation", "s3://bucket/checkpoints/app-v1") \
#     .option("path", "s3://bucket/output/") \
#     .trigger(processingTime="30 seconds") \
#     .outputMode("append") \
#     .start()

# Idempotent Sink (Upsert to Database)
# def upsert_to_db(batch_df, batch_id):
#     batch_df.write \
#         .format("jdbc") \
#         .option("url", "jdbc:postgresql://db:5432/analytics") \
#         .option("dbtable", "events") \
#         .option("user", "app") \
#         .option("password", "secret") \
#         .mode("append") \
#         .save()
#     # Use ON CONFLICT DO UPDATE for true idempotency
#
# query = result.writeStream \
#     .foreachBatch(upsert_to_db) \
#     .option("checkpointLocation", "s3://bucket/checkpoints/db-sink") \
#     .start()

@dataclass
class CheckpointIssue:
    issue: str
    cause: str
    fix: str
    prevention: str

checkpoint_issues = [
    CheckpointIssue("Checkpoint Corruption",
        "Disk Full, Network Error ขณะเขียน Checkpoint",
        "ลบ Checkpoint Directory เริ่มใหม่ + Idempotent Sink",
        "ใช้ S3/HDFS ไม่ใช้ Local Disk + Monitor Disk Usage"),
    CheckpointIssue("Schema Change Break",
        "เปลี่ยน Schema แล้ว Checkpoint เก่า Incompatible",
        "ลบ Checkpoint + ตั้ง Kafka Offset เป็น earliest/latest",
        "ใช้ Schema Registry + Compatible Schema Evolution"),
    CheckpointIssue("Checkpoint Size Too Large",
        "State Store มี Key ล้านๆ ตัว ไม่มี Watermark",
        "เพิ่ม Watermark ลด State + ใช้ RocksDB",
        "ตั้ง Watermark ทุก Stateful Query"),
    CheckpointIssue("Slow Recovery",
        "Checkpoint ใหญ่ เริ่ม Stream ใหม่ช้ามาก",
        "เพิ่ม minBatchesToRetain=10 ลด History",
        "ตั้ง State TTL + Compact Checkpoint เป็นระยะ"),
]

print("=== Checkpoint Issues ===")
for c in checkpoint_issues:
    print(f"  [{c.issue}]")
    print(f"    Cause: {c.cause}")
    print(f"    Fix: {c.fix}")
    print(f"    Prevent: {c.prevention}")

Performance Tuning

# === Performance Tuning Checklist ===

@dataclass
class TuningParam:
    category: str
    param: str
    default_val: str
    recommended: str
    impact: str

tuning = [
    TuningParam("Parallelism",
        "spark.sql.shuffle.partitions",
        "200",
        "2-3x Kafka Partitions (e.g. 500)",
        "ลด Skew ใช้ Core เต็มที่"),
    TuningParam("AQE",
        "spark.sql.adaptive.enabled",
        "false (Spark 3.0) / true (3.2+)",
        "true",
        "Auto-coalesce + Skew join + Optimize"),
    TuningParam("Batch Size",
        "maxOffsetsPerTrigger",
        "Unlimited",
        "100000-500000",
        "Batch Size สม่ำเสมอ ไม่ OOM"),
    TuningParam("Trigger",
        "trigger(processingTime=...)",
        "ASAP (0ms)",
        "10-60 seconds",
        "Balance Latency vs Throughput"),
    TuningParam("Serializer",
        "spark.serializer",
        "JavaSerializer",
        "KryoSerializer",
        "ลด Serialization Time 30-50%"),
    TuningParam("Join",
        "spark.sql.autoBroadcastJoinThreshold",
        "10MB",
        "50-100MB",
        "Broadcast Join ลด Shuffle"),
    TuningParam("State Store",
        "stateStore.providerClass",
        "HDFSBackedStateStoreProvider",
        "RocksDBStateStoreProvider",
        "รองรับ State ขนาดใหญ่มาก"),
    TuningParam("Compression",
        "spark.sql.streaming.stateStore.compression.codec",
        "none",
        "lz4",
        "ลด Checkpoint Size 40-60%"),
]

print("=== Performance Tuning ===")
for t in tuning:
    print(f"  [{t.category}] {t.param}")
    print(f"    Default: {t.default_val} → Recommended: {t.recommended}")
    print(f"    Impact: {t.impact}")

เคล็ดลับ

  • Watermark: ตั้ง Watermark ทุก Stateful Query ป้องกัน State OOM
  • RocksDB: ใช้ RocksDB State Store สำหรับ State ขนาดใหญ่
  • AQE: เปิด Adaptive Query Execution ลด Skew อัตโนมัติ
  • Idempotent: ใช้ Idempotent Sink (Upsert) ป้องกันข้อมูลซ้ำ
  • Monitor: ดู Spark UI Batch Duration Processing Rate State Size

ปัญหาที่พบบ่อยมีอะไร

OOM State Store Partition Skew Slow Batch Checkpoint Corruption Late Data Kafka Offset Serialization Schema Evolution Backlog