SiamCafe.net Blog
Technology

Spark Structured Streaming Stream Processing

spark structured streaming stream processing
Spark Structured Streaming Stream Processing | SiamCafe Blog
2025-12-02· อ. บอม — SiamCafe.net· 11,805 คำ

Spark Structured Streaming

Spark Structured Streaming Real-time Stream Processing Kafka Window Watermark State Management Production Deployment

FeatureStructured StreamingKafka StreamsFlink
ProcessingMicro-batch / ContinuousRecord-by-recordRecord-by-record
Latency100ms - secondsmsms
Exactly-onceYes (with checkpoint)YesYes
StateBuilt-in + RocksDBBuilt-in + RocksDBBuilt-in + RocksDB
SQL SupportFull SQLKSQL (separate)Flink SQL
Best ForBatch + Stream unifiedKafka-native appsLow-latency stream

Core Concepts

# === Structured Streaming Basics ===

# Read from Kafka
# df = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "broker1:9092, broker2:9092") \
#     .option("subscribe", "events") \
#     .option("startingOffsets", "latest") \
#     .load()
#
# # Parse JSON payload
# from pyspark.sql.functions import from_json, col, window
# from pyspark.sql.types import StructType, StringType, TimestampType, DoubleType
#
# schema = StructType() \
#     .add("user_id", StringType()) \
#     .add("event_type", StringType()) \
#     .add("amount", DoubleType()) \
#     .add("timestamp", TimestampType())
#
# parsed = df.select(
#     from_json(col("value").cast("string"), schema).alias("data")
# ).select("data.*")
#
# # Tumbling Window aggregation
# windowed = parsed \
#     .withWatermark("timestamp", "10 minutes") \
#     .groupBy(
#         window(col("timestamp"), "5 minutes"),
#         col("event_type")
#     ).agg(
#         count("*").alias("event_count"),
#         sum("amount").alias("total_amount"),
#         avg("amount").alias("avg_amount")
#     )
#
# # Write to Delta Lake
# query = windowed.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "/checkpoint/events") \
#     .trigger(processingTime="30 seconds") \
#     .start("/data/event_metrics")

from dataclasses import dataclass

@dataclass
class StreamConcept:
    concept: str
    description: str
    api: str
    use_case: str

concepts = [
    StreamConcept("readStream",
        "อ่านข้อมูลแบบ Streaming จาก Source",
        "spark.readStream.format('kafka').load()",
        "อ่าน Event จาก Kafka Topic"),
    StreamConcept("writeStream",
        "เขียนข้อมูลแบบ Streaming ไป Sink",
        "df.writeStream.format('delta').start()",
        "เขียนผลลัพธ์ไป Delta Lake"),
    StreamConcept("Trigger",
        "กำหนดความถี่ในการประมวลผล Micro-batch",
        "trigger(processingTime='30 seconds')",
        "ควบคุม Latency vs Throughput"),
    StreamConcept("Checkpoint",
        "เก็บ State สำหรับ Recovery เมื่อ Restart",
        "option('checkpointLocation', '/path')",
        "Exactly-once guarantee, Fault tolerance"),
    StreamConcept("Output Mode",
        "กำหนดว่าเขียนข้อมูลอย่างไร",
        "outputMode('append'|'complete'|'update')",
        "append = new rows, complete = all, update = changed"),
]

print("=== Core Concepts ===")
for c in concepts:
    print(f"  [{c.concept}] {c.description}")
    print(f"    API: {c.api}")
    print(f"    Use: {c.use_case}")

Window and Watermark

# === Window Functions ===

# Tumbling Window (ไม่ซ้อน)
# .groupBy(window(col("ts"), "5 minutes"))
# |--5min--|--5min--|--5min--|
#
# Sliding Window (ซ้อนกัน)
# .groupBy(window(col("ts"), "10 minutes", "5 minutes"))
# |----10min----|
#      |----10min----|
#           |----10min----|
#
# Session Window (ตาม Gap)
# .groupBy(session_window(col("ts"), "10 minutes"))
# |--events--gap>10min--|--events--gap>10min--|

# Watermark example
# df.withWatermark("event_time", "10 minutes") \
#   .groupBy(window("event_time", "5 minutes")) \
#   .count()
#
# Timeline:
# Max event_time seen = 12:30
# Watermark = 12:30 - 10min = 12:20
# Window [12:10-12:15] still open (12:15 > 12:20? No, closed)
# Window [12:15-12:20] still open (12:20 >= 12:20? Closing)
# Window [12:20-12:25] still open (12:25 > 12:20? Yes, open)
# Late data with event_time < 12:20 will be DROPPED

@dataclass
class WindowType:
    window_type: str
    syntax: str
    overlap: str
    use_case: str
    state_size: str

windows = [
    WindowType("Tumbling",
        "window(col('ts'), '5 minutes')",
        "ไม่ซ้อนกัน ทุก Event อยู่ใน 1 Window",
        "Metrics ทุก 5 นาที, Hourly Report",
        "เล็ก — 1 Window ต่อช่วงเวลา"),
    WindowType("Sliding",
        "window(col('ts'), '10 min', '5 min')",
        "ซ้อนกัน 1 Event อยู่ได้หลาย Window",
        "Moving Average, Rolling Count",
        "กลาง — หลาย Window overlap"),
    WindowType("Session",
        "session_window(col('ts'), '10 min')",
        "ไม่ซ้อน แต่ขนาดไม่เท่ากัน ตาม Gap",
        "User Session Analysis, Clickstream",
        "ใหญ่ — ไม่รู้จำนวน Session ล่วงหน้า"),
]

print("=== Window Types ===")
for w in windows:
    print(f"  [{w.window_type}] {w.syntax}")
    print(f"    Overlap: {w.overlap}")
    print(f"    Use: {w.use_case}")
    print(f"    State: {w.state_size}")

Production Operations

# === Production Checklist ===

@dataclass
class ProdConfig:
    config: str
    setting: str
    why: str
    default_risk: str

configs = [
    ProdConfig("Checkpoint Location",
        "HDFS/S3 reliable storage, NOT local disk",
        "Recovery after restart, exactly-once guarantee",
        "Local disk = lose state on node failure"),
    ProdConfig("Watermark Duration",
        "Based on max expected lateness of data",
        "Controls state size and late data handling",
        "No watermark = state grows forever = OOM"),
    ProdConfig("Trigger Interval",
        "processingTime based on latency requirement",
        "Latency vs throughput tradeoff",
        "Default = as fast as possible = resource waste"),
    ProdConfig("Max Offsets Per Trigger",
        "maxOffsetsPerTrigger = 100000",
        "Prevent processing huge backlog at once",
        "No limit = OOM when catching up from lag"),
    ProdConfig("State Store Backend",
        "RocksDB for large state",
        "In-memory default can OOM with large state",
        "Default HashMap = OOM with millions of keys"),
    ProdConfig("Kafka Consumer Config",
        "max.poll.records, session.timeout.ms",
        "Prevent consumer timeout during heavy processing",
        "Default may timeout on slow processing"),
]

print("=== Production Config ===")
for c in configs:
    print(f"  [{c.config}]")
    print(f"    Setting: {c.setting}")
    print(f"    Why: {c.why}")
    print(f"    Risk if default: {c.default_risk}")

เคล็ดลับ

การนำความรู้ไปประยุกต์ใช้งานจริง

แหล่งเรียนรู้ที่แนะนำ ได้แก่ Official Documentation ที่อัพเดทล่าสุดเสมอ Online Course จาก Coursera Udemy edX ช่อง YouTube คุณภาพทั้งไทยและอังกฤษ และ Community อย่าง Discord Reddit Stack Overflow ที่ช่วยแลกเปลี่ยนประสบการณ์กับนักพัฒนาทั่วโลก

Structured Streaming คืออะไร

Spark SQL Stream Processing Micro-batch Continuous Exactly-once Kafka Delta Lake DataFrame API Catalyst Optimizer Event-time

Window Functions ใช้อย่างไร

Tumbling 5 นาทีไม่ซ้อน Sliding 10 นาที Slide 5 นาทีซ้อน Session Gap 10 นาที groupBy window session_window Metrics Average

Watermark ทำงานอย่างไร

Late Data จัดการข้อมูลสาย withWatermark timestamp duration State ไม่โต Memory ตัดข้อมูลเกิน Window ค่ามากใช้ Memory น้อยตัดข้อมูล

Production Deployment ทำอย่างไร

Checkpoint S3 HDFS Trigger Interval Kafka Source RocksDB State Monitor Spark UI Prometheus Alert Lag Delta Lake ACID Scale K8s YARN

สรุป

Spark Structured Streaming Kafka Window Tumbling Sliding Session Watermark State Checkpoint RocksDB Delta Lake Production Monitor

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming 12 Factor Appอ่านบทความ → Spark Structured Streaming Infrastructure as Codeอ่านบทความ → Spark Structured Streaming AR VR Developmentอ่านบทความ → Spark Structured Streaming RBAC ABAC Policyอ่านบทความ → Spark Structured Streaming Architecture Design Patternอ่านบทความ →

📚 ดูบทความทั้งหมด →