SiamCafe.net Blog
Technology

Spark Structured Streaming Scaling Strategy วิธี Scale — ปรับแต่งและขยายระบบ Streaming

spark structured streaming scaling strategy วธ scale
Spark Structured Streaming Scaling Strategy วิธี Scale | SiamCafe Blog
2025-10-10· อ. บอม — SiamCafe.net· 1,257 คำ

Spark Structured Streaming คืออะไรและทำงานอย่างไร

Spark Structured Streaming เป็น stream processing engine ที่สร้างบน Spark SQL engine ใช้แนวคิด micro-batch processing ที่แบ่ง streaming data เป็น batch เล็กๆแล้วประมวลผลด้วย Spark SQL ทำให้สามารถใช้ DataFrame/Dataset API เดียวกันสำหรับทั้ง batch และ streaming workloads

ตั้งแต่ Spark 2.3 เพิ่ม Continuous Processing mode ที่ประมวลผลแบบ record-by-record ได้ latency ต่ำถึง 1 millisecond แต่ยังเป็น experimental อยู่ ส่วน micro-batch mode ที่เป็นค่า default มี latency ประมาณ 100ms-หลายวินาทีแต่มี throughput สูงกว่าและ fault tolerance ดีกว่า

Structured Streaming รองรับ source หลายประเภทเช่น Kafka สำหรับ message streaming, File source สำหรับอ่านไฟล์ใหม่ที่ถูกเพิ่มเข้ามา, Socket source สำหรับทดสอบ และ Rate source สำหรับ generate ข้อมูลทดสอบ ส่วน sink รองรับ Kafka, File, Console, Memory และ ForeachBatch สำหรับ custom output

การ scale Structured Streaming ต้องพิจารณาหลายปัจจัยเช่น จำนวน partitions ของ Kafka topic, จำนวน executors และ cores, memory configuration, watermark สำหรับ late data handling และ checkpoint strategy สำหรับ fault tolerance

ตั้งค่า Spark Structured Streaming เบื้องต้น

สร้าง Streaming Application ที่อ่านจาก Kafka และเขียนไปยัง Delta Lake

# submit_streaming_job.sh — Submit Spark Streaming Job
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --name "kafka-to-delta-streaming" \
  --num-executors 10 \
  --executor-cores 4 \
  --executor-memory 8g \
  --driver-memory 4g \
  --conf spark.sql.shuffle.partitions=200 \
  --conf spark.streaming.kafka.maxRatePerPartition=10000 \
  --conf spark.sql.streaming.checkpointLocation=s3a://bucket/checkpoints/ \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.dynamicAllocation.minExecutors=5 \
  --conf spark.dynamicAllocation.maxExecutors=50 \
  --conf spark.sql.adaptive.enabled=true \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0, io.delta:delta-spark_2.12:3.1.0 \
  streaming_app.py

# streaming_app.py — Spark Structured Streaming Application
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \
    .appName("KafkaToDeltaStreaming") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Schema สำหรับ Kafka messages
event_schema = StructType([
    StructField("event_id", StringType()),
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("properties", MapType(StringType(), StringType())),
])

# อ่านจาก Kafka
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka1:9092, kafka2:9092, kafka3:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 1000000) \
    .option("kafka.consumer.group.id", "spark-streaming-group") \
    .option("failOnDataLoss", "false") \
    .load()

# Parse JSON messages
events_df = kafka_df \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*") \
    .withWatermark("timestamp", "10 minutes")

# Aggregate: นับ events ต่อ user ทุก 5 นาที
user_counts = events_df \
    .groupBy(
        window("timestamp", "5 minutes"),
        "user_id",
        "event_type"
    ) \
    .count()

# เขียนไป Delta Lake
query = user_counts.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3a://bucket/checkpoints/user-counts") \
    .option("mergeSchema", "true") \
    .trigger(processingTime="30 seconds") \
    .start("s3a://bucket/delta/user-event-counts")

query.awaitTermination()

กลยุทธ์ Scaling สำหรับ Streaming Workloads

วิธี scale Spark Structured Streaming ให้รองรับ throughput สูง

# กลยุทธ์ Scaling Spark Structured Streaming
#
# === 1. Horizontal Scaling (เพิ่ม Executors) ===
# กฎ: 1 Kafka partition = 1 Spark task
# ถ้า Kafka topic มี 100 partitions → ต้องมีอย่างน้อย 100 cores
#
# spark.executor.instances = ceil(kafka_partitions / executor_cores)
# ตัวอย่าง: 100 partitions, 4 cores/executor → 25 executors
#
# === 2. Dynamic Allocation ===
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.minExecutors=5
--conf spark.dynamicAllocation.maxExecutors=100
--conf spark.dynamicAllocation.executorIdleTimeout=60s
--conf spark.dynamicAllocation.schedulerBacklogTimeout=1s

# === 3. Kafka Partition Scaling ===
# เพิ่ม partitions ของ Kafka topic
kafka-topics.sh --alter --topic user-events \
  --partitions 200 \
  --bootstrap-server kafka1:9092

# === 4. Trigger Interval Tuning ===
# ลด trigger interval สำหรับ low latency
.trigger(processingTime="10 seconds")  # micro-batch ทุก 10 วินาที
# หรือใช้ available-now สำหรับ backfill
.trigger(availableNow=True)

# === 5. Shuffle Partitions ===
# กฎ: shuffle partitions ≈ 2-3x จำนวน cores
spark.conf.set("spark.sql.shuffle.partitions", 400)
# หรือใช้ AQE (Adaptive Query Execution)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")

# === 6. Memory Tuning ===
--executor-memory 8g
--conf spark.memory.fraction=0.6
--conf spark.memory.storageFraction=0.5
--conf spark.sql.streaming.stateStore.providerClass=org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
# RocksDB สำหรับ stateful streaming ที่ state ใหญ่มาก

# === 7. maxOffsetsPerTrigger ===
# จำกัด records ต่อ trigger เพื่อป้องกัน OOM
.option("maxOffsetsPerTrigger", 500000)

# === 8. Backpressure ===
spark.conf.set("spark.streaming.backpressure.enabled", "true")
spark.conf.set("spark.streaming.kafka.maxRatePerPartition", "10000")

Tuning Performance และ Resource Optimization

ปรับแต่ง performance สำหรับ production streaming

#!/usr/bin/env python3
# streaming_optimized.py — Optimized Streaming Application
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

def create_optimized_session():
    return SparkSession.builder \
        .appName("OptimizedStreaming") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.minPartitionSize", "64MB") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.streaming.stateStore.providerClass",
                "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") \
        .config("spark.sql.streaming.stateStore.rocksdb.compactOnCommit", "true") \
        .config("spark.sql.streaming.metricsEnabled", "true") \
        .config("spark.metrics.conf.*.sink.prometheusServlet.class",
                "org.apache.spark.metrics.sink.PrometheusServlet") \
        .config("spark.ui.prometheus.enabled", "true") \
        .getOrCreate()

spark = create_optimized_session()

# Schema (กำหนดชัดเจนเพื่อ performance)
schema = StructType([
    StructField("event_id", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("action", StringType(), False),
    StructField("amount", DoubleType(), True),
    StructField("ts", LongType(), False),
])

# อ่านจาก Kafka พร้อม optimization
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "transactions") \
    .option("startingOffsets", "latest") \
    .option("maxOffsetsPerTrigger", 500000) \
    .option("kafka.fetch.max.bytes", "52428800") \
    .option("kafka.max.partition.fetch.bytes", "10485760") \
    .load()

# Parse และ transform
events = raw_stream \
    .select(from_json(col("value").cast("string"), schema).alias("e")) \
    .select(
        col("e.event_id"),
        col("e.user_id"),
        col("e.action"),
        col("e.amount"),
        from_unixtime(col("e.ts")).cast("timestamp").alias("event_time")
    ) \
    .withWatermark("event_time", "15 minutes")

# Stateful aggregation: ยอดรวมต่อ user ทุก 10 นาที
agg = events \
    .groupBy(
        window("event_time", "10 minutes", "5 minutes"),
        "user_id"
    ) \
    .agg(
        count("*").alias("tx_count"),
        sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount"),
        max("amount").alias("max_amount"),
    )

# เขียนไป Kafka (สำหรับ downstream consumers)
output = agg \
    .select(
        col("user_id").alias("key"),
        to_json(struct("*")).alias("value")
    ) \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "user-aggregates") \
    .option("checkpointLocation", "/tmp/checkpoints/user-agg") \
    .trigger(processingTime="30 seconds") \
    .start()

output.awaitTermination()

Monitoring และ Alerting สำหรับ Streaming Jobs

สร้างระบบ monitoring สำหรับ Spark Streaming

#!/usr/bin/env python3
# streaming_monitor.py — Monitor Spark Streaming Jobs
import requests
import json
import time
from datetime import datetime

SPARK_UI_URL = "http://spark-master:4040"
SLACK_WEBHOOK = "https://hooks.slack.com/services/xxx"

def get_streaming_queries():
    """ดึงข้อมูล streaming queries จาก Spark UI REST API"""
    try:
        r = requests.get(f"{SPARK_UI_URL}/api/v1/applications")
        apps = r.json()
        
        results = []
        for app in apps:
            app_id = app["id"]
            r = requests.get(f"{SPARK_UI_URL}/api/v1/applications/{app_id}/streaming/statistics")
            if r.status_code == 200:
                results.append({"app": app["name"], "stats": r.json()})
        return results
    except Exception as e:
        print(f"Error: {e}")
        return []

def check_streaming_health(query_progress):
    """ตรวจสอบสุขภาพของ streaming query"""
    alerts = []
    
    if not query_progress:
        return [{"level": "critical", "message": "No streaming progress data"}]
    
    # ตรวจสอบ processing rate
    input_rate = query_progress.get("inputRowsPerSecond", 0)
    process_rate = query_progress.get("processedRowsPerSecond", 0)
    
    if process_rate > 0 and input_rate / process_rate > 1.5:
        alerts.append({
            "level": "warning",
            "message": f"Processing falling behind: input={input_rate:.0f}/s, process={process_rate:.0f}/s"
        })
    
    # ตรวจสอบ batch duration
    batch_duration = query_progress.get("batchDuration", 0)
    trigger_interval = query_progress.get("triggerExecution", {}).get("latency", {}).get("triggerExecution", 0)
    
    if batch_duration > 0 and trigger_interval > 0:
        utilization = batch_duration / trigger_interval * 100
        if utilization > 90:
            alerts.append({
                "level": "critical",
                "message": f"Batch utilization {utilization:.0f}% — near capacity"
            })
        elif utilization > 70:
            alerts.append({
                "level": "warning",
                "message": f"Batch utilization {utilization:.0f}% — consider scaling"
            })
    
    # ตรวจสอบ state store size
    state_size = query_progress.get("stateOperators", [{}])[0].get("numRowsTotal", 0) if query_progress.get("stateOperators") else 0
    if state_size > 10000000:
        alerts.append({
            "level": "warning",
            "message": f"Large state store: {state_size:,} rows"
        })
    
    return alerts

def notify(message, level="info"):
    icon = {"critical": "🔴", "warning": "🟡", "info": "🟢"}[level]
    print(f"[{icon}] {datetime.now()} {message}")
    if SLACK_WEBHOOK and level != "info":
        requests.post(SLACK_WEBHOOK, json={"text": f"{icon} Spark Streaming: {message}"})

def monitor_loop():
    print(f"Spark Streaming Monitor started: {SPARK_UI_URL}")
    
    while True:
        queries = get_streaming_queries()
        
        for q in queries:
            stats = q["stats"]
            alerts = check_streaming_health(stats)
            
            if not alerts:
                notify(f"{q['app']}: Healthy (rate={stats.get('processedRowsPerSecond', 0):.0f}/s)")
            else:
                for alert in alerts:
                    notify(f"{q['app']}: {alert['message']}", alert["level"])
        
        time.sleep(60)

if __name__ == "__main__":
    monitor_loop()

Fault Tolerance และ Exactly-Once Processing

ตั้งค่า checkpoint และ exactly-once semantics

# Fault Tolerance Configuration
#
# === Checkpointing ===
# Checkpoint เก็บ state ทั้งหมดของ streaming query:
# - Offsets ที่อ่านจาก source
# - State store data
# - Metadata ของ query
#
# ต้องใช้ reliable storage เช่น HDFS, S3, GCS
.option("checkpointLocation", "s3a://bucket/checkpoints/my-query")

# === Exactly-Once Semantics ===
# Structured Streaming รับประกัน exactly-once สำหรับ:
# - Kafka source + File/Delta sink (ใช้ checkpoint + idempotent writes)
# - Kafka source + Kafka sink (ใช้ transactional writes)
#
# ตั้งค่า Kafka transactional writes
.option("kafka.enable.idempotence", "true")
.option("kafka.transactional.id", "spark-streaming-tx")

# === Recovery Strategy ===
# 1. Auto-restart เมื่อ job fail
# ใน YARN:
spark-submit --conf spark.yarn.maxAppAttempts=5 \
  --conf spark.yarn.am.attemptFailuresValidityInterval=1h

# 2. Graceful shutdown
# ส่ง SIGTERM แทน SIGKILL
# query.stop()

# 3. Schema Evolution
# Delta Lake รองรับ schema evolution
.option("mergeSchema", "true")

# === ตัวอย่าง Recovery Test ===
# streaming_fault_test.py
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("FaultTest").getOrCreate()

# อ่านจาก Kafka
df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "test-topic") \
    .option("startingOffsets", "earliest") \
    .load()

# Transform
result = df.select(
    col("key").cast("string"),
    col("value").cast("string"),
    col("timestamp"),
    col("partition"),
    col("offset")
)

# เขียนไป Delta Lake พร้อม checkpoint
query = result.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoints/fault-test") \
    .trigger(processingTime="10 seconds") \
    .start("/tmp/delta/fault-test")

# ตรวจสอบ query status
import time
while query.isActive:
    progress = query.lastProgress
    if progress:
        print(f"Batch: {progress['batchId']}")
        print(f"  Input: {progress.get('numInputRows', 0)} rows")
        print(f"  Rate: {progress.get('processedRowsPerSecond', 0):.0f} rows/s")
        sources = progress.get('sources', [{}])
        if sources:
            print(f"  Offsets: {sources[0].get('endOffset', {})}")
    time.sleep(10)

FAQ คำถามที่พบบ่อย

Q: Spark Structured Streaming กับ Apache Flink ต่างกันอย่างไร?

A: Spark ใช้ micro-batch เป็นหลัก latency ประมาณ 100ms-วินาที เหมาะสำหรับ throughput สูง ใช้ DataFrame API เดียวกับ batch ส่วน Flink เป็น true streaming ที่ประมวลผลทีละ record latency ต่ำกว่า 10ms เหมาะสำหรับ low-latency use cases ถ้าต้องการ unified batch+streaming platform เลือก Spark ถ้าต้องการ low-latency เลือก Flink

Q: State store ใหญ่เกินไปทำอย่างไร?

A: ใช้ RocksDB state store provider แทน default in-memory store เพราะเก็บ state บน disk ได้ ตั้ง watermark ให้เหมาะสมเพื่อลบ state ที่เก่าเกินไป ใช้ mapGroupsWithState แทน groupBy เพื่อควบคุม state lifecycle เอง และพิจารณา state store cleanup ด้วย spark.sql.streaming.stateStore.maintenanceInterval

Q: maxOffsetsPerTrigger ควรตั้งเท่าไหร่?

A: ขึ้นอยู่กับ processing capacity ของ cluster เริ่มจากตั้งให้ batch ใช้เวลาประมาณ 50-70% ของ trigger interval เช่น trigger ทุก 30 วินาที batch ควรเสร็จใน 15-20 วินาที ถ้า batch ใช้เวลาเกิน trigger interval แสดงว่า maxOffsetsPerTrigger สูงเกินไป หรือต้องเพิ่ม resources

Q: จะ scale Kafka partitions โดยไม่หยุด streaming job ได้ไหม?

A: ได้ Spark Structured Streaming ตรวจจับ partitions ใหม่อัตโนมัติ แต่ต้อง restart streaming query เพื่อให้ consumer rebalance ใน production ใช้ rolling restart โดย stop query ด้วย query.stop() แล้ว start ใหม่ checkpoint จะ resume จาก offset ล่าสุดโดยอัตโนมัติ

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming AR VR Developmentอ่านบทความ → Spark Structured Streaming Microservices Architectureอ่านบทความ → Spark Structured Streaming RBAC ABAC Policyอ่านบทความ → Spark Structured Streaming 12 Factor Appอ่านบทความ → Spark Structured Streaming Open Source Contributionอ่านบทความ →

📚 ดูบทความทั้งหมด →