SiamCafe.net Blog
Technology

Spark Structured Streaming Message Queue Design

spark structured streaming message queue design
Spark Structured Streaming Message Queue Design | SiamCafe Blog
2025-10-06· อ. บอม — SiamCafe.net· 1,534 คำ

Spark Structured Streaming Message Queue Design คืออะไร

Apache Spark Structured Streaming เป็น scalable stream processing engine ที่สร้างบน Spark SQL engine ช่วยประมวลผล real-time data streams ด้วย DataFrame/Dataset API เดียวกับ batch processing Message Queue เป็น middleware สำหรับส่งข้อความระหว่าง systems แบบ asynchronous เช่น Apache Kafka, RabbitMQ, Amazon Kinesis การออกแบบระบบที่รวม Spark Structured Streaming กับ Message Queue ช่วยสร้าง data pipeline ที่ scalable, fault-tolerant และ real-time สำหรับ use cases เช่น event processing, real-time analytics และ ETL

Architecture Overview

# architecture.py — Spark Streaming + Message Queue architecture
import json

class StreamingArchitecture:
    COMPONENTS = {
        "producers": {
            "name": "Data Producers",
            "description": "แหล่งข้อมูล — applications, IoT devices, logs, events",
            "examples": ["Web application events", "IoT sensor data", "Transaction logs", "User clickstream"],
        },
        "message_queue": {
            "name": "Message Queue (Kafka)",
            "description": "Buffer ข้อมูลระหว่าง producers กับ consumers — decouple systems",
            "features": ["Partitioning", "Replication", "Retention", "Consumer groups"],
        },
        "spark_streaming": {
            "name": "Spark Structured Streaming",
            "description": "ประมวลผล stream data — transform, aggregate, join, window",
            "modes": ["Append (เพิ่มข้อมูลใหม่)", "Update (อัปเดต changed rows)", "Complete (ส่งทั้ง result table)"],
        },
        "sink": {
            "name": "Output Sinks",
            "description": "ปลายทางของ processed data",
            "options": ["Delta Lake", "PostgreSQL", "Elasticsearch", "Another Kafka topic", "S3/GCS"],
        },
    }

    FLOW = """
    Data Flow:
    
    [Producers] → [Kafka Topics] → [Spark Structured Streaming] → [Sinks]
    
    Producer App → topic: raw-events    → Spark: transform + aggregate → Delta Lake
    IoT Sensors → topic: sensor-data    → Spark: window + alert       → Kafka: alerts
    Logs        → topic: application-logs → Spark: parse + enrich      → Elasticsearch
    """

    def show_components(self):
        print("=== Architecture Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print()

    def show_flow(self):
        print("=== Data Flow ===")
        print(self.FLOW)

arch = StreamingArchitecture()
arch.show_components()
arch.show_flow()

Spark Structured Streaming Implementation

# streaming_impl.py — Spark Structured Streaming with Kafka
import json

class SparkStreamingImpl:
    KAFKA_SOURCE = """
# spark_kafka_streaming.py — Read from Kafka with Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession.builder \\
    .appName("KafkaStreaming") \\
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \\
    .getOrCreate()

# 1. Read from Kafka
kafka_df = spark.readStream \\
    .format("kafka") \\
    .option("kafka.bootstrap.servers", "kafka:9092") \\
    .option("subscribe", "raw-events") \\
    .option("startingOffsets", "latest") \\
    .option("maxOffsetsPerTrigger", 10000) \\
    .load()

# 2. Parse JSON messages
event_schema = StructType([
    StructField("event_id", StringType()),
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("properties", MapType(StringType(), StringType())),
])

events_df = kafka_df \\
    .select(from_json(col("value").cast("string"), event_schema).alias("event")) \\
    .select("event.*") \\
    .withWatermark("timestamp", "10 minutes")

# 3. Windowed aggregation — events per minute per type
event_counts = events_df \\
    .groupBy(
        window("timestamp", "1 minute"),
        "event_type"
    ) \\
    .agg(
        count("*").alias("event_count"),
        countDistinct("user_id").alias("unique_users"),
    )

# 4. Write to Delta Lake
query = event_counts.writeStream \\
    .format("delta") \\
    .outputMode("append") \\
    .option("checkpointLocation", "/checkpoints/event_counts") \\
    .option("path", "/data/event_counts") \\
    .trigger(processingTime="30 seconds") \\
    .start()

query.awaitTermination()
"""

    KAFKA_SINK = """
# spark_to_kafka.py — Write processed data back to Kafka
from pyspark.sql.functions import to_json, struct

# Process and write alerts to Kafka
alerts_df = events_df \\
    .filter(col("event_type") == "error") \\
    .select(
        col("event_id").alias("key"),
        to_json(struct("*")).alias("value")
    )

alert_query = alerts_df.writeStream \\
    .format("kafka") \\
    .option("kafka.bootstrap.servers", "kafka:9092") \\
    .option("topic", "alerts") \\
    .option("checkpointLocation", "/checkpoints/alerts") \\
    .outputMode("append") \\
    .start()
"""

    def show_source(self):
        print("=== Kafka Source ===")
        print(self.KAFKA_SOURCE[:600])

    def show_sink(self):
        print(f"\n=== Kafka Sink ===")
        print(self.KAFKA_SINK[:400])

impl = SparkStreamingImpl()
impl.show_source()
impl.show_sink()

Message Queue Design Patterns

# patterns.py — Message queue design patterns
import json

class MQPatterns:
    PATTERNS = {
        "fan_out": {
            "name": "Fan-out (One-to-Many)",
            "description": "1 producer ส่ง message → หลาย consumers อ่าน (ผ่าน consumer groups)",
            "use_case": "Event notification — order created → inventory + payment + email",
            "kafka": "Multiple consumer groups subscribe to same topic",
        },
        "fan_in": {
            "name": "Fan-in (Many-to-One)",
            "description": "หลาย producers ส่ง → 1 consumer group ประมวลผล",
            "use_case": "Log aggregation — multiple services → single processing pipeline",
            "kafka": "Multiple producers → single topic → Spark streaming",
        },
        "dead_letter": {
            "name": "Dead Letter Queue (DLQ)",
            "description": "Messages ที่ process ไม่สำเร็จ → ส่งไป DLQ เพื่อ retry/investigate",
            "use_case": "Failed payment processing → DLQ → manual review",
            "kafka": "Custom error handling → produce to DLQ topic",
        },
        "event_sourcing": {
            "name": "Event Sourcing",
            "description": "เก็บทุก event เป็น immutable log — reconstruct state จาก events",
            "use_case": "Order lifecycle events → replay เพื่อ rebuild state",
            "kafka": "Kafka เป็น event store ธรรมชาติ (append-only, retention)",
        },
        "cqrs": {
            "name": "CQRS (Command Query Responsibility Segregation)",
            "description": "แยก write path กับ read path — Kafka เป็น bridge",
            "use_case": "Write → Kafka → Spark → Read DB (Elasticsearch, Redis)",
            "kafka": "Topic = command log, Spark = projector",
        },
    }

    KAFKA_BEST_PRACTICES = {
        "partitioning": "เลือก partition key ดี — distribute load evenly (e.g., user_id)",
        "retention": "ตั้ง retention ตาม use case — 7 days (logs), 30 days (events), forever (event sourcing)",
        "replication": "replication factor >= 3 สำหรับ production",
        "serialization": "ใช้ Avro/Protobuf แทน JSON — schema evolution, smaller size",
        "idempotency": "Enable idempotent producer — ป้องกัน duplicate messages",
    }

    def show_patterns(self):
        print("=== Message Queue Patterns ===\n")
        for key, p in self.PATTERNS.items():
            print(f"[{p['name']}]")
            print(f"  {p['description']}")
            print(f"  Use: {p['use_case']}")
            print()

    def show_best_practices(self):
        print("=== Kafka Best Practices ===")
        for key, value in self.KAFKA_BEST_PRACTICES.items():
            print(f"  [{key}] {value}")

patterns = MQPatterns()
patterns.show_patterns()
patterns.show_best_practices()

Performance Tuning

# performance.py — Spark Streaming performance tuning
import json
import random

class PerformanceTuning:
    SPARK_CONFIG = """
# spark_config.py — Optimized Spark Streaming configuration
spark_conf = {
    # Kafka consumer
    "spark.streaming.kafka.maxRatePerPartition": "10000",
    "maxOffsetsPerTrigger": "100000",
    
    # Processing
    "spark.sql.shuffle.partitions": "200",
    "spark.default.parallelism": "200",
    "spark.sql.adaptive.enabled": "true",
    "spark.sql.adaptive.coalescePartitions.enabled": "true",
    
    # Memory
    "spark.executor.memory": "8g",
    "spark.executor.memoryOverhead": "2g",
    "spark.driver.memory": "4g",
    
    # Checkpoint
    "spark.sql.streaming.checkpointLocation": "/checkpoints",
    "spark.sql.streaming.minBatchesToRetain": "100",
    
    # Trigger
    "trigger.processingTime": "30 seconds",  # Micro-batch interval
    # OR "trigger.availableNow": True,  # Process all available then stop
    # OR "trigger.continuous": "1 second",  # Continuous processing (experimental)
}
"""

    TUNING_TIPS = {
        "partitions": {
            "name": "Partition Alignment",
            "tip": "Kafka partitions = Spark parallelism — match Kafka partitions กับ Spark executors",
            "impact": "Throughput +50-200%",
        },
        "trigger": {
            "name": "Trigger Interval",
            "tip": "ลด trigger interval = latency ต่ำ แต่ overhead สูง — balance ที่ 10-60 seconds",
            "impact": "Latency vs throughput tradeoff",
        },
        "watermark": {
            "name": "Watermark",
            "tip": "ตั้ง watermark ให้เหมาะสม — สั้นเกิน = miss late data, ยาวเกิน = state ใหญ่",
            "impact": "Memory + accuracy",
        },
        "state_store": {
            "name": "State Store",
            "tip": "ใช้ RocksDB state store สำหรับ large state — default in-memory ไม่ scale",
            "impact": "Memory reduction 50-80%",
        },
    }

    def show_config(self):
        print("=== Spark Config ===")
        print(self.SPARK_CONFIG[:500])

    def show_tips(self):
        print(f"\n=== Tuning Tips ===")
        for key, tip in self.TUNING_TIPS.items():
            print(f"  [{tip['name']}] {tip['tip']} → {tip['impact']}")

    def metrics_dashboard(self):
        print(f"\n=== Streaming Metrics ===")
        metrics = {
            "Input rate": f"{random.randint(5000, 50000):,} records/sec",
            "Processing rate": f"{random.randint(5000, 60000):,} records/sec",
            "Batch duration": f"{random.uniform(5, 30):.1f} seconds",
            "Trigger interval": "30 seconds",
            "Records in batch": f"{random.randint(100000, 500000):,}",
            "State rows": f"{random.randint(10000, 100000):,}",
            "Kafka lag": f"{random.randint(0, 5000):,} records",
        }
        for m, v in metrics.items():
            print(f"  {m}: {v}")

perf = PerformanceTuning()
perf.show_config()
perf.show_tips()
perf.metrics_dashboard()

Infrastructure

# infra.py — Docker Compose infrastructure
import json

class Infrastructure:
    DOCKER_COMPOSE = """
# docker-compose.yml — Spark + Kafka infrastructure
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports: ["9092:9092"]
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_NUM_PARTITIONS: 12
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168

  spark-master:
    image: bitnami/spark:3.5
    ports: ["8080:8080", "7077:7077"]
    environment:
      SPARK_MODE: master

  spark-worker:
    image: bitnami/spark:3.5
    depends_on: [spark-master]
    environment:
      SPARK_MODE: worker
      SPARK_MASTER_URL: spark://spark-master:7077
      SPARK_WORKER_MEMORY: 8G
      SPARK_WORKER_CORES: 4
    deploy:
      replicas: 2

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports: ["8081:8080"]
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
"""

    def show_compose(self):
        print("=== Docker Compose ===")
        print(self.DOCKER_COMPOSE[:600])

    def monitoring_tips(self):
        print(f"\n=== Monitoring Checklist ===")
        tips = [
            "Kafka: consumer lag, partition distribution, broker health",
            "Spark: batch duration, input rate, processing rate, state size",
            "Alerts: lag > threshold, batch duration > trigger interval, OOM errors",
            "Tools: Grafana + Prometheus, Kafka UI, Spark UI (port 4040)",
        ]
        for tip in tips:
            print(f"  • {tip}")

infra = Infrastructure()
infra.show_compose()
infra.monitoring_tips()

FAQ - คำถามที่พบบ่อย

Q: Spark Structured Streaming กับ Flink อันไหนดี?

A: Spark: micro-batch (default), unified batch + stream API, ecosystem ใหญ่ (MLlib, Delta) Flink: true streaming (event-by-event), lower latency, better stateful processing ใช้ Spark: มี Spark ecosystem อยู่แล้ว, batch + stream unified, Delta Lake ใช้ Flink: ต้องการ low latency (< 1s), complex event processing, large state

Q: Kafka กับ RabbitMQ ต่างกัน?

A: Kafka: distributed log, high throughput, retention, replay ได้, เหมาะ streaming RabbitMQ: traditional message broker, routing, acknowledgment, เหมาะ task queues ใช้ Kafka: event streaming, log aggregation, real-time analytics ใช้ RabbitMQ: task distribution, RPC, simple pub/sub

Q: Checkpoint สำคัญอย่างไร?

A: Checkpoint = exactly-once guarantee — เก็บ progress ของ streaming query ถ้า Spark restart → resume จาก checkpoint (ไม่ duplicate, ไม่ miss data) ต้องตั้ง checkpointLocation ทุก streaming query เก็บใน reliable storage (HDFS, S3, GCS) — ไม่ใช่ local disk

Q: ขนาด Kafka cluster ที่เหมาะสม?

A: เล็ก (< 10K msg/s): 3 brokers, 6-12 partitions กลาง (10K-100K msg/s): 5-10 brokers, 50-100 partitions ใหญ่ (>100K msg/s): 10-50 brokers, 100+ partitions Replication factor: 3 (production standard) Partition = unit of parallelism — มากขึ้น = throughput สูงขึ้น

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming AR VR Developmentอ่านบทความ → Spark Structured Streaming Event Driven Designอ่านบทความ → Spark Structured Streaming RBAC ABAC Policyอ่านบทความ → Spark Structured Streaming Infrastructure as Codeอ่านบทความ → Spark Structured Streaming Open Source Contributionอ่านบทความ →

📚 ดูบทความทั้งหมด →