it

Spark Structured Streaming Troubleshooting

Spark Structured Streaming Troubleshooting

Spark Streaming Troubleshooting

Spark Structured Streaming Troubleshooting

Spark Structured Streaming Troubleshooting แก้ปัญหา OOM Checkpoint Watermark Performance Kafka State Store AQE Partition Skew

เนื้อหาเกี่ยวข้อง — อ่านต่อ: TypeScript Zod Architecture Design Pattern — คู่มือฉบับสมบูรณ์ 2026

ปัญหาสาเหตุวิธีแก้Priority
OOM (Out of Memory)State Store ใหญ่ / Shuffle / Large BatchWatermark + RocksDB + AQECritical
Slow BatchPartition Skew / No AQE / Low Parallelismเพิ่ม Partitions + AQE + BroadcastHigh
Checkpoint CorruptDisk Full / Schema Change / Bugลบ Checkpoint + Idempotent SinkHigh
Late Data LossWatermark ตัดข้อมูลเร็วเกินไปเพิ่ม Watermark DurationMedium
Data DuplicationCheckpoint Recovery / At-least-onceIdempotent Sink (Upsert)Medium
Kafka LagProcessing ช้ากว่า IngestionScale Executors + maxOffsetsPerTriggerHigh

Memory & State Issues

# === Spark Structured Streaming Memory Fix ===

# spark-submit configuration
# spark-submit \
#   --master yarn \
#   --deploy-mode cluster \
#   --num-executors 10 \
#   --executor-memory 8g \
#   --executor-cores 4 \
#   --driver-memory 4g \
#   --conf spark.sql.shuffle.partitions=500 \
#   --conf spark.sql.adaptive.enabled=true \
#   --conf spark.sql.adaptive.coalescePartitions.enabled=true \
#   --conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider \
#   --conf spark.memory.offHeap.enabled=true \
#   --conf spark.memory.offHeap.size=4g \
#   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
#   streaming_app.py

# Watermark + State Cleanup
# df = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:9092") \
#     .option("subscribe", "events") \
#     .option("maxOffsetsPerTrigger", 100000) \
#     .load()
#
# result = df \
#     .withWatermark("event_time", "1 hour") \
#     .groupBy(
#         window("event_time", "5 minutes"),
#         "user_id"
#     ) \
#     .agg(count("*").alias("event_count"))

from dataclasses import dataclass

@dataclass
class MemoryFix:
    problem: str
    symptom: str
    config: str
    value: str
    effect: str

fixes = [
    MemoryFix("State Store OOM",
        "java.lang.OutOfMemoryError: Java heap space",
        "withWatermark('event_time', '1 hour')",
        "ตั้ง Watermark ลบ State เก่ากว่า 1 ชั่วโมง",
        "State Size ลดลง 80-90%"),
    MemoryFix("State Store OOM (Large)",
        "State size > Executor Memory",
        "stateStore.providerClass=RocksDBStateStoreProvider",
        "ใช้ RocksDB เก็บ State บน Disk แทน Memory",
        "รองรับ State ขนาด TB"),
    MemoryFix("Shuffle OOM",
        "OOM ตอน groupBy / join",
        "spark.sql.shuffle.partitions=500",
        "เพิ่ม Partition ลด Data ต่อ Partition",
        "ลด Memory ต่อ Task"),
    MemoryFix("Large Batch OOM",
        "OOM เมื่อข้อมูลสะสมมาก",
        "maxOffsetsPerTrigger=100000",
        "จำกัดข้อมูลต่อ Batch ไม่ให้มากเกินไป",
        "Batch Size สม่ำเสมอ"),
    MemoryFix("AQE Not Enabled",
        "Partition Skew ทำให้ Task ช้า/OOM",
        "spark.sql.adaptive.enabled=true",
        "AQE ปรับ Partition อัตโนมัติ",
        "ลด Skew + ลด Partition ที่เล็กเกินไป"),
]

print("=== Memory Fixes ===")
for f in fixes:
    print(f"\n  [{f.problem}]")
    print(f"    Symptom: {f.symptom}")
    print(f"    Config: {f.config}")
    print(f"    Value: {f.value}")
    print(f"    Effect: {f.effect}")

Checkpoint & Recovery

Spark Structured Streaming Troubleshooting
# === Checkpoint Management ===

# Checkpoint Configuration
# query = result.writeStream \
#     .format("parquet") \
#     .option("checkpointLocation", "s3://bucket/checkpoints/app-v1") \
#     .option("path", "s3://bucket/output/") \
#     .trigger(processingTime="30 seconds") \
#     .outputMode("append") \
#     .start()

# Idempotent Sink (Upsert to Database)
# def upsert_to_db(batch_df, batch_id):
#     batch_df.write \
#         .format("jdbc") \
#         .option("url", "jdbc:postgresql://db:5432/analytics") \
#         .option("dbtable", "events") \
#         .option("user", "app") \
#         .option("password", "secret") \
#         .mode("append") \
#         .save()
#     # Use ON CONFLICT DO UPDATE for true idempotency
#
# query = result.writeStream \
#     .foreachBatch(upsert_to_db) \
#     .option("checkpointLocation", "s3://bucket/checkpoints/db-sink") \
#     .start()

@dataclass
class CheckpointIssue:
    issue: str
    cause: str
    fix: str
    prevention: str

checkpoint_issues = [
    CheckpointIssue("Checkpoint Corruption",
        "Disk Full, Network Error ขณะเขียน Checkpoint",
        "ลบ Checkpoint Directory เริ่มใหม่ + Idempotent Sink",
        "ใช้ S3/HDFS ไม่ใช้ Local Disk + Monitor Disk Usage"),
    CheckpointIssue("Schema Change Break",
        "เปลี่ยน Schema แล้ว Checkpoint เก่า Incompatible",
        "ลบ Checkpoint + ตั้ง Kafka Offset เป็น earliest/latest",
        "ใช้ Schema Registry + Compatible Schema Evolution"),
    CheckpointIssue("Checkpoint Size Too Large",
        "State Store มี Key ล้านๆ ตัว ไม่มี Watermark",
        "เพิ่ม Watermark ลด State + ใช้ RocksDB",
        "ตั้ง Watermark ทุก Stateful Query"),
    CheckpointIssue("Slow Recovery",
        "Checkpoint ใหญ่ เริ่ม Stream ใหม่ช้ามาก",
        "เพิ่ม minBatchesToRetain=10 ลด History",
        "ตั้ง State TTL + Compact Checkpoint เป็นระยะ"),
]

print("=== Checkpoint Issues ===")
for c in checkpoint_issues:
    print(f"  [{c.issue}]")
    print(f"    Cause: {c.cause}")
    print(f"    Fix: {c.fix}")
    print(f"    Prevent: {c.prevention}")

Performance Tuning

# === Performance Tuning Checklist ===

@dataclass
class TuningParam:
    category: str
    param: str
    default_val: str
    recommended: str
    impact: str

tuning = [
    TuningParam("Parallelism",
        "spark.sql.shuffle.partitions",
        "200",
        "2-3x Kafka Partitions (e.g. 500)",
        "ลด Skew ใช้ Core เต็มที่"),
    TuningParam("AQE",
        "spark.sql.adaptive.enabled",
        "false (Spark 3.0) / true (3.2+)",
        "true",
        "Auto-coalesce + Skew join + Optimize"),
    TuningParam("Batch Size",
        "maxOffsetsPerTrigger",
        "Unlimited",
        "100000-500000",
        "Batch Size สม่ำเสมอ ไม่ OOM"),
    TuningParam("Trigger",
        "trigger(processingTime=...)",
        "ASAP (0ms)",
        "10-60 seconds",
        "Balance Latency vs Throughput"),
    TuningParam("Serializer",
        "spark.serializer",
        "JavaSerializer",
        "KryoSerializer",
        "ลด Serialization Time 30-50%"),
    TuningParam("Join",
        "spark.sql.autoBroadcastJoinThreshold",
        "10MB",
        "50-100MB",
        "Broadcast Join ลด Shuffle"),
    TuningParam("State Store",
        "stateStore.providerClass",
        "HDFSBackedStateStoreProvider",
        "RocksDBStateStoreProvider",
        "รองรับ State ขนาดใหญ่มาก"),
    TuningParam("Compression",
        "spark.sql.streaming.stateStore.compression.codec",
        "none",
        "lz4",
        "ลด Checkpoint Size 40-60%"),
]

print("=== Performance Tuning ===")
for t in tuning:
    print(f"  [{t.category}] {t.param}")
    print(f"    Default: {t.default_val} → Recommended: {t.recommended}")
    print(f"    Impact: {t.impact}")

เคล็ดลับ

  • Watermark: ตั้ง Watermark ทุก Stateful Query ป้องกัน State OOM
  • RocksDB: ใช้ RocksDB State Store สำหรับ State ขนาดใหญ่
  • AQE: เปิด Adaptive Query Execution ลด Skew อัตโนมัติ
  • Idempotent: ใช้ Idempotent Sink (Upsert) ป้องกันข้อมูลซ้ำ
  • Monitor: ดู Spark UI Batch Duration Processing Rate State Size

ปัญหาที่พบบ่อยมีอะไร

OOM State Store Partition Skew Slow Batch Checkpoint Corruption Late Data Kafka Offset Serialization Schema Evolution Backlog

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Azure Front Door Metric Collection

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง dbt Data Transform Cost Optimization ลดค่าใช้จ่าย — คู่มือฉบับสมบูรณ์ 2026

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง