SiamCafe.net Blog
Technology

Spark Structured Streaming Pub Sub Architecture

spark structured streaming pub sub architecture
Spark Structured Streaming Pub Sub Architecture | SiamCafe Blog
2025-07-07· อ. บอม — SiamCafe.net· 9,673 คำ

Spark Structured Streaming และ Pub/Sub

Spark Structured Streaming เป็น Stream Processing Engine ที่ใช้ DataFrame API เหมือน Batch แต่ทำงาน Real-time รองรับ Kafka, Event Hubs และ Pub/Sub Sources ทำ Aggregation, Windowing, Joining บน Streaming Data

Pub/Sub Architecture ใช้ Kafka เป็น Message Broker ทำให้ Producer กับ Consumer Decouple จากกัน Spark อ่าน Messages จาก Kafka Topics ประมวลผลแล้วเขียนผลลัพธ์ไปยัง Sink

PySpark Structured Streaming กับ Kafka

# spark_streaming.py — PySpark Structured Streaming + Kafka
# pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    from_json, col, window, count, avg, max as spark_max,
    to_timestamp, expr, current_timestamp, lit,
)
from pyspark.sql.types import (
    StructType, StructField, StringType, DoubleType,
    TimestampType, IntegerType,
)

# === 1. Spark Session ===
spark = SparkSession.builder \
    .appName("RealTimeAnalytics") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

# === 2. Schema สำหรับ Events ===
event_schema = StructType([
    StructField("event_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("page", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("duration_ms", IntegerType(), True),
    StructField("device", StringType(), True),
    StructField("country", StringType(), True),
])

# === 3. อ่านจาก Kafka ===
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .option("failOnDataLoss", "false") \
    .load()

# Parse JSON
events_df = kafka_df \
    .select(from_json(col("value").cast("string"), event_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", to_timestamp("timestamp"))

# === 4. Real-time Aggregations ===

# 4a. Events per minute per page (Tumbling Window)
page_views = events_df \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(
        window("event_time", "1 minute"),
        "page",
    ) \
    .agg(
        count("*").alias("view_count"),
        avg("duration_ms").alias("avg_duration"),
    )

# 4b. Active users per 5 minutes (Sliding Window)
active_users = events_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window("event_time", "5 minutes", "1 minute"),
    ) \
    .agg(
        expr("approx_count_distinct(user_id)").alias("active_users"),
        count("*").alias("total_events"),
    )

# 4c. Events by country
country_stats = events_df \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(
        window("event_time", "1 minute"),
        "country",
    ) \
    .agg(
        count("*").alias("events"),
        expr("approx_count_distinct(user_id)").alias("users"),
    )

# === 5. เขียนผลลัพธ์ ===

# 5a. เขียนกลับ Kafka
page_views_query = page_views \
    .selectExpr("to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "page-views-agg") \
    .option("checkpointLocation", "/tmp/checkpoints/page-views") \
    .outputMode("update") \
    .start()

# 5b. เขียนไป Console (Debug)
# active_users.writeStream \
#     .format("console") \
#     .outputMode("update") \
#     .option("truncate", "false") \
#     .start()

# page_views_query.awaitTermination()

Kafka Producer และ Consumer

# kafka_pubsub.py — Kafka Pub/Sub Producer + Consumer
# pip install confluent-kafka

from confluent_kafka import Producer, Consumer, KafkaError
from confluent_kafka.admin import AdminClient, NewTopic
import json
import time
import uuid
import random
from datetime import datetime

# === Kafka Configuration ===
KAFKA_BROKER = "localhost:9092"

# === Admin — สร้าง Topics ===
def create_topics(topics):
    admin = AdminClient({"bootstrap.servers": KAFKA_BROKER})
    new_topics = [
        NewTopic(topic, num_partitions=6, replication_factor=1)
        for topic in topics
    ]
    fs = admin.create_topics(new_topics)
    for topic, f in fs.items():
        try:
            f.result()
            print(f"  Created topic: {topic}")
        except Exception as e:
            print(f"  Topic {topic}: {e}")

# === Producer ===
class EventProducer:
    def __init__(self):
        self.producer = Producer({
            "bootstrap.servers": KAFKA_BROKER,
            "acks": "all",
            "retries": 3,
            "linger.ms": 5,
            "batch.size": 16384,
        })

    def produce(self, topic, event):
        key = event.get("user_id", str(uuid.uuid4()))
        value = json.dumps(event)
        self.producer.produce(
            topic, key=key.encode(), value=value.encode(),
            callback=self._delivery_report,
        )
        self.producer.poll(0)

    def _delivery_report(self, err, msg):
        if err:
            print(f"Delivery failed: {err}")

    def flush(self):
        self.producer.flush()

    def generate_events(self, topic, n=1000, delay=0.01):
        """สร้าง Events จำลอง"""
        pages = ["/home", "/products", "/cart", "/checkout", "/blog"]
        devices = ["mobile", "desktop", "tablet"]
        countries = ["TH", "US", "JP", "SG", "MY"]
        event_types = ["page_view", "click", "scroll", "purchase"]

        for i in range(n):
            event = {
                "event_id": str(uuid.uuid4()),
                "user_id": f"user_{random.randint(1, 500)}",
                "event_type": random.choice(event_types),
                "page": random.choice(pages),
                "timestamp": datetime.now().isoformat(),
                "duration_ms": random.randint(100, 30000),
                "device": random.choice(devices),
                "country": random.choice(countries),
            }
            self.produce(topic, event)
            if delay > 0:
                time.sleep(delay)

        self.flush()
        print(f"Produced {n} events to {topic}")

# === Consumer ===
class EventConsumer:
    def __init__(self, group_id="analytics-group"):
        self.consumer = Consumer({
            "bootstrap.servers": KAFKA_BROKER,
            "group.id": group_id,
            "auto.offset.reset": "latest",
            "enable.auto.commit": True,
        })

    def consume(self, topics, max_messages=100):
        self.consumer.subscribe(topics)
        count = 0

        try:
            while count < max_messages:
                msg = self.consumer.poll(1.0)
                if msg is None:
                    continue
                if msg.error():
                    if msg.error().code() != KafkaError._PARTITION_EOF:
                        print(f"Error: {msg.error()}")
                    continue

                event = json.loads(msg.value().decode())
                count += 1
                if count % 100 == 0:
                    print(f"  Consumed {count} messages")

        finally:
            self.consumer.close()
            print(f"Total consumed: {count}")

# create_topics(["user-events", "page-views-agg", "alerts"])
# producer = EventProducer()
# producer.generate_events("user-events", n=1000)

Docker Compose สำหรับ Kafka Cluster

# === Docker Compose — Kafka + Spark ===
# docker-compose.yml

# version: '3.8'
# services:
#   zookeeper:
#     image: confluentinc/cp-zookeeper:7.6.0
#     environment:
#       ZOOKEEPER_CLIENT_PORT: 2181
#
#   kafka:
#     image: confluentinc/cp-kafka:7.6.0
#     depends_on:
#       - zookeeper
#     ports:
#       - "9092:9092"
#     environment:
#       KAFKA_BROKER_ID: 1
#       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
#       KAFKA_NUM_PARTITIONS: 6
#       KAFKA_DEFAULT_REPLICATION_FACTOR: 1
#       KAFKA_LOG_RETENTION_HOURS: 168
#
#   kafka-ui:
#     image: provectuslabs/kafka-ui:latest
#     ports:
#       - "8080:8080"
#     environment:
#       KAFKA_CLUSTERS_0_NAME: local
#       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
#
#   spark-master:
#     image: bitnami/spark:3.5
#     ports:
#       - "8081:8080"
#       - "7077:7077"
#     environment:
#       SPARK_MODE: master
#
#   spark-worker:
#     image: bitnami/spark:3.5
#     depends_on:
#       - spark-master
#     environment:
#       SPARK_MODE: worker
#       SPARK_MASTER_URL: spark://spark-master:7077
#       SPARK_WORKER_MEMORY: 4G
#       SPARK_WORKER_CORES: 2

# รัน Spark Job
# spark-submit --master spark://spark-master:7077 \
#   --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 \
#   spark_streaming.py

echo "Kafka + Spark Streaming Environment"
echo "  Kafka: localhost:9092"
echo "  Kafka UI: http://localhost:8080"
echo "  Spark Master: http://localhost:8081"

Best Practices

Spark Structured Streaming คืออะไร

Stream Processing Engine บน Apache Spark ใช้ DataFrame API เหมือน Batch แต่ Real-time รองรับ Event-time Watermarking Windowing Exactly-once เขียน Scala Python Java

Pub/Sub Architecture คืออะไร

Messaging Pattern Publisher ส่ง Messages ไป Topic Subscriber รับจาก Topic ที่สนใจ Decouple Producer Consumer เช่น Kafka Google Pub/Sub AWS SNS/SQS Redis Pub/Sub

ทำไมต้องใช้ Spark กับ Kafka

Kafka รับส่ง Messages เร็ว Spark อ่านจาก Kafka ประมวลผล Real-time Aggregation Joining Windowing เขียนผลกลับ Kafka หรือ Database ระบบ Real-time Analytics ทรงพลัง

Watermark คืออะไร

กลไกกำหนดว่ารอ Late Data นานแค่ไหน เช่น 10 นาที รอ Events ช้าได้ 10 นาที หลังจากนั้นทิ้ง จัดการ Out-of-order Events

สรุป

Spark Structured Streaming ร่วมกับ Kafka Pub/Sub ให้ระบบ Real-time Analytics ที่ทรงพลัง ใช้ DataFrame API เหมือน Batch Watermark จัดการ Late Data Checkpointing สำหรับ Fault Tolerance Kafka Partitions สำหรับ Parallelism Schema Registry จัดการ Schema Monitoring ผ่าน Spark UI

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming Architecture Design Patternอ่านบทความ → Spark Structured Streaming Event Driven Designอ่านบทความ → Spark Structured Streaming AR VR Developmentอ่านบทความ → Spark Structured Streaming Infrastructure as Codeอ่านบทความ → Spark Structured Streaming RBAC ABAC Policyอ่านบทความ →

📚 ดูบทความทั้งหมด →