SiamCafe.net Blog
Technology

Spark Structured Streaming Troubleshooting แก้ปัญหา

spark structured streaming troubleshooting แกปญหา
Spark Structured Streaming Troubleshooting แก้ปัญหา | SiamCafe Blog
2026-02-26· อ. บอม — SiamCafe.net· 8,516 คำ

Spark Streaming Troubleshooting

Spark Structured Streaming Troubleshooting แก้ปัญหา OOM Checkpoint Watermark Performance Kafka State Store AQE Partition Skew

ปัญหาสาเหตุวิธีแก้Priority
OOM (Out of Memory)State Store ใหญ่ / Shuffle / Large BatchWatermark + RocksDB + AQECritical
Slow BatchPartition Skew / No AQE / Low Parallelismเพิ่ม Partitions + AQE + BroadcastHigh
Checkpoint CorruptDisk Full / Schema Change / Bugลบ Checkpoint + Idempotent SinkHigh
Late Data LossWatermark ตัดข้อมูลเร็วเกินไปเพิ่ม Watermark DurationMedium
Data DuplicationCheckpoint Recovery / At-least-onceIdempotent Sink (Upsert)Medium
Kafka LagProcessing ช้ากว่า IngestionScale Executors + maxOffsetsPerTriggerHigh

Memory & State Issues

# === Spark Structured Streaming Memory Fix ===

# spark-submit configuration
# spark-submit \
#   --master yarn \
#   --deploy-mode cluster \
#   --num-executors 10 \
#   --executor-memory 8g \
#   --executor-cores 4 \
#   --driver-memory 4g \
#   --conf spark.sql.shuffle.partitions=500 \
#   --conf spark.sql.adaptive.enabled=true \
#   --conf spark.sql.adaptive.coalescePartitions.enabled=true \
#   --conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider \
#   --conf spark.memory.offHeap.enabled=true \
#   --conf spark.memory.offHeap.size=4g \
#   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
#   streaming_app.py

# Watermark + State Cleanup
# df = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:9092") \
#     .option("subscribe", "events") \
#     .option("maxOffsetsPerTrigger", 100000) \
#     .load()
#
# result = df \
#     .withWatermark("event_time", "1 hour") \
#     .groupBy(
#         window("event_time", "5 minutes"),
#         "user_id"
#     ) \
#     .agg(count("*").alias("event_count"))

from dataclasses import dataclass

@dataclass
class MemoryFix:
    problem: str
    symptom: str
    config: str
    value: str
    effect: str

fixes = [
    MemoryFix("State Store OOM",
        "java.lang.OutOfMemoryError: Java heap space",
        "withWatermark('event_time', '1 hour')",
        "ตั้ง Watermark ลบ State เก่ากว่า 1 ชั่วโมง",
        "State Size ลดลง 80-90%"),
    MemoryFix("State Store OOM (Large)",
        "State size > Executor Memory",
        "stateStore.providerClass=RocksDBStateStoreProvider",
        "ใช้ RocksDB เก็บ State บน Disk แทน Memory",
        "รองรับ State ขนาด TB"),
    MemoryFix("Shuffle OOM",
        "OOM ตอน groupBy / join",
        "spark.sql.shuffle.partitions=500",
        "เพิ่ม Partition ลด Data ต่อ Partition",
        "ลด Memory ต่อ Task"),
    MemoryFix("Large Batch OOM",
        "OOM เมื่อข้อมูลสะสมมาก",
        "maxOffsetsPerTrigger=100000",
        "จำกัดข้อมูลต่อ Batch ไม่ให้มากเกินไป",
        "Batch Size สม่ำเสมอ"),
    MemoryFix("AQE Not Enabled",
        "Partition Skew ทำให้ Task ช้า/OOM",
        "spark.sql.adaptive.enabled=true",
        "AQE ปรับ Partition อัตโนมัติ",
        "ลด Skew + ลด Partition ที่เล็กเกินไป"),
]

print("=== Memory Fixes ===")
for f in fixes:
    print(f"\n  [{f.problem}]")
    print(f"    Symptom: {f.symptom}")
    print(f"    Config: {f.config}")
    print(f"    Value: {f.value}")
    print(f"    Effect: {f.effect}")

Checkpoint & Recovery

# === Checkpoint Management ===

# Checkpoint Configuration
# query = result.writeStream \
#     .format("parquet") \
#     .option("checkpointLocation", "s3://bucket/checkpoints/app-v1") \
#     .option("path", "s3://bucket/output/") \
#     .trigger(processingTime="30 seconds") \
#     .outputMode("append") \
#     .start()

# Idempotent Sink (Upsert to Database)
# def upsert_to_db(batch_df, batch_id):
#     batch_df.write \
#         .format("jdbc") \
#         .option("url", "jdbc:postgresql://db:5432/analytics") \
#         .option("dbtable", "events") \
#         .option("user", "app") \
#         .option("password", "secret") \
#         .mode("append") \
#         .save()
#     # Use ON CONFLICT DO UPDATE for true idempotency
#
# query = result.writeStream \
#     .foreachBatch(upsert_to_db) \
#     .option("checkpointLocation", "s3://bucket/checkpoints/db-sink") \
#     .start()

@dataclass
class CheckpointIssue:
    issue: str
    cause: str
    fix: str
    prevention: str

checkpoint_issues = [
    CheckpointIssue("Checkpoint Corruption",
        "Disk Full, Network Error ขณะเขียน Checkpoint",
        "ลบ Checkpoint Directory เริ่มใหม่ + Idempotent Sink",
        "ใช้ S3/HDFS ไม่ใช้ Local Disk + Monitor Disk Usage"),
    CheckpointIssue("Schema Change Break",
        "เปลี่ยน Schema แล้ว Checkpoint เก่า Incompatible",
        "ลบ Checkpoint + ตั้ง Kafka Offset เป็น earliest/latest",
        "ใช้ Schema Registry + Compatible Schema Evolution"),
    CheckpointIssue("Checkpoint Size Too Large",
        "State Store มี Key ล้านๆ ตัว ไม่มี Watermark",
        "เพิ่ม Watermark ลด State + ใช้ RocksDB",
        "ตั้ง Watermark ทุก Stateful Query"),
    CheckpointIssue("Slow Recovery",
        "Checkpoint ใหญ่ เริ่ม Stream ใหม่ช้ามาก",
        "เพิ่ม minBatchesToRetain=10 ลด History",
        "ตั้ง State TTL + Compact Checkpoint เป็นระยะ"),
]

print("=== Checkpoint Issues ===")
for c in checkpoint_issues:
    print(f"  [{c.issue}]")
    print(f"    Cause: {c.cause}")
    print(f"    Fix: {c.fix}")
    print(f"    Prevent: {c.prevention}")

Performance Tuning

# === Performance Tuning Checklist ===

@dataclass
class TuningParam:
    category: str
    param: str
    default_val: str
    recommended: str
    impact: str

tuning = [
    TuningParam("Parallelism",
        "spark.sql.shuffle.partitions",
        "200",
        "2-3x Kafka Partitions (e.g. 500)",
        "ลด Skew ใช้ Core เต็มที่"),
    TuningParam("AQE",
        "spark.sql.adaptive.enabled",
        "false (Spark 3.0) / true (3.2+)",
        "true",
        "Auto-coalesce + Skew join + Optimize"),
    TuningParam("Batch Size",
        "maxOffsetsPerTrigger",
        "Unlimited",
        "100000-500000",
        "Batch Size สม่ำเสมอ ไม่ OOM"),
    TuningParam("Trigger",
        "trigger(processingTime=...)",
        "ASAP (0ms)",
        "10-60 seconds",
        "Balance Latency vs Throughput"),
    TuningParam("Serializer",
        "spark.serializer",
        "JavaSerializer",
        "KryoSerializer",
        "ลด Serialization Time 30-50%"),
    TuningParam("Join",
        "spark.sql.autoBroadcastJoinThreshold",
        "10MB",
        "50-100MB",
        "Broadcast Join ลด Shuffle"),
    TuningParam("State Store",
        "stateStore.providerClass",
        "HDFSBackedStateStoreProvider",
        "RocksDBStateStoreProvider",
        "รองรับ State ขนาดใหญ่มาก"),
    TuningParam("Compression",
        "spark.sql.streaming.stateStore.compression.codec",
        "none",
        "lz4",
        "ลด Checkpoint Size 40-60%"),
]

print("=== Performance Tuning ===")
for t in tuning:
    print(f"  [{t.category}] {t.param}")
    print(f"    Default: {t.default_val} → Recommended: {t.recommended}")
    print(f"    Impact: {t.impact}")

เคล็ดลับ

ปัญหาที่พบบ่อยมีอะไร

OOM State Store Partition Skew Slow Batch Checkpoint Corruption Late Data Kafka Offset Serialization Schema Evolution Backlog

Memory Issues แก้อย่างไร

Watermark ลบ State RocksDB Disk-based shuffle.partitions maxOffsetsPerTrigger AQE Off-heap Kryo Serializer Executor Memory

Checkpoint แก้อย่างไร

ลบ Checkpoint เริ่มใหม่ Idempotent Sink Upsert S3 HDFS Schema Registry Watermark RocksDB minBatchesToRetain Recovery Compact

Performance Tuning ทำอย่างไร

shuffle.partitions AQE maxOffsetsPerTrigger Trigger Interval Kryo Broadcast Join RocksDB Compression lz4 Spark UI Monitor Alert

สรุป

Spark Structured Streaming Troubleshooting OOM Watermark RocksDB Checkpoint AQE Performance Tuning Kafka Idempotent Sink Production

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming Architecture Design Patternอ่านบทความ → Spark Structured Streaming 12 Factor Appอ่านบทความ → Spark Structured Streaming AR VR Developmentอ่านบทความ → Spark Structured Streaming Infrastructure as Codeอ่านบทความ → Spark Structured Streaming RBAC ABAC Policyอ่านบทความ →

📚 ดูบทความทั้งหมด →