Spark Streaming Lab

Spark Structured Streaming Home Lab Kafka Integration Window Aggregation Watermark Checkpoint Delta Lake Real-time Analytics Production

ComponentToolPortPurpose
Message BrokerApache Kafka9092Stream source and sink
Stream EngineApache Spark 3.54040Structured Streaming processing
Schema RegistryConfluent Schema Registry8081Avro/Protobuf schema management
StorageMinIO (S3)9000Checkpoint and output storage
LakehouseDelta LakeN/AACID table on object storage
NotebookJupyterLab8888Interactive development
Kafka UIKafka UI8080Monitor topics and messages

Docker Compose Setup

# === Docker Compose for Streaming Lab ===

# docker-compose.yml
# version: '3.8'
# services:
#   zookeeper:
#     image: confluentinc/cp-zookeeper:7.5.0
#     environment:
#       ZOOKEEPER_CLIENT_PORT: 2181
#
#   kafka:
#     image: confluentinc/cp-kafka:7.5.0
#     ports: ["9092:9092"]
#     environment:
#       KAFKA_BROKER_ID: 1
#       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092, HOST://localhost:9092
#       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, HOST:PLAINTEXT
#       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
#
#   spark:
#     image: bitnami/spark:3.5
#     ports: ["4040:4040", "8888:8888"]
#     environment:
#       SPARK_MODE: master
#     volumes:
#       - ./notebooks:/opt/notebooks
#
#   minio:
#     image: minio/minio
#     ports: ["9000:9000", "9001:9001"]
#     command: server /data --console-address ":9001"
#     environment:
#       MINIO_ROOT_USER: minioadmin
#       MINIO_ROOT_PASSWORD: minioadmin
#
#   kafka-ui:
#     image: provectuslabs/kafka-ui
#     ports: ["8080:8080"]
#     environment:
#       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

# Create Kafka topic
# docker exec kafka kafka-topics --create \
#   --bootstrap-server localhost:9092 \
#   --topic events --partitions 3 --replication-factor 1

# Produce test events
# docker exec -it kafka kafka-console-producer \
#   --bootstrap-server localhost:9092 --topic events

from dataclasses import dataclass

@dataclass
class LabService:
    service: str
    image: str
    ram: str
    purpose: str

lab = [
    LabService("Zookeeper", "cp-zookeeper:7.5", "512MB", "Kafka coordination"),
    LabService("Kafka", "cp-kafka:7.5", "1GB", "Message broker"),
    LabService("Spark Master", "bitnami/spark:3.5", "2GB", "Stream processing"),
    LabService("Spark Worker", "bitnami/spark:3.5", "2GB", "Processing executor"),
    LabService("MinIO", "minio/minio", "512MB", "S3-compatible storage"),
    LabService("Kafka UI", "kafka-ui", "256MB", "Topic monitoring"),
]

print("=== Lab Services ===")
total_ram = 0
for s in lab:
    ram_mb = int(s.ram.replace("MB", "").replace("GB", "000"))
    total_ram += ram_mb
    print(f"  [{s.service}] Image: {s.image} | RAM: {s.ram}")
    print(f"    Purpose: {s.purpose}")
print(f"\n  Total RAM needed: ~{total_ram/1000:.1f}GB")

Structured Streaming Code

# === PySpark Structured Streaming ===

# from pyspark.sql import SparkSession
# from pyspark.sql.functions import *
# from pyspark.sql.types import *
#
# spark = SparkSession.builder \
#     .appName("StreamingLab") \
#     .config("spark.jars.packages",
#         "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
#         "io.delta:delta-spark_2.12:3.0.0") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .getOrCreate()
#
# # Read from Kafka
# df = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:29092") \
#     .option("subscribe", "events") \
#     .option("startingOffsets", "latest") \
#     .load()
#
# # Parse JSON events
# schema = StructType([
#     StructField("user_id", StringType()),
#     StructField("event_type", StringType()),
#     StructField("amount", DoubleType()),
#     StructField("timestamp", TimestampType()),
# ])
#
# events = df.select(
#     from_json(col("value").cast("string"), schema).alias("data")
# ).select("data.*")
#
# # Tumbling Window — Revenue per 5 minutes
# revenue_5min = events \
#     .withWatermark("timestamp", "10 minutes") \
#     .groupBy(
#         window("timestamp", "5 minutes"),
#         "event_type"
#     ) \
#     .agg(
#         sum("amount").alias("total_amount"),
#         count("*").alias("event_count"),
#         avg("amount").alias("avg_amount")
#     )
#
# # Write to Delta Lake
# query = revenue_5min.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://checkpoint/revenue") \
#     .start("s3a://lakehouse/revenue_5min")
#
# # Sliding Window — Moving average
# moving_avg = events \
#     .withWatermark("timestamp", "10 minutes") \
#     .groupBy(
#         window("timestamp", "10 minutes", "5 minutes")
#     ) \
#     .agg(avg("amount").alias("moving_avg"))

@dataclass
class WindowType:
    window: str
    syntax: str
    use_case: str
    example: str

windows = [
    WindowType("Tumbling", 'window("ts", "5 minutes")',
        "Fixed non-overlapping intervals",
        "Revenue per 5 minutes, hourly counts"),
    WindowType("Sliding", 'window("ts", "10 min", "5 min")',
        "Overlapping intervals for smoothing",
        "Moving average, trend detection"),
    WindowType("Session", 'session_window("ts", "30 min")',
        "Activity-based grouping",
        "User session analysis, visit duration"),
]

print("\n=== Window Types ===")
for w in windows:
    print(f"  [{w.window}] Syntax: {w.syntax}")
    print(f"    Use case: {w.use_case}")
    print(f"    Example: {w.example}")

Production Patterns

# === Production Streaming Patterns ===

@dataclass
class StreamPattern:
    pattern: str
    description: str
    checkpoint: str
    output_mode: str
    guarantee: str

patterns = [
    StreamPattern("ETL Pipeline", "Kafka → Transform → Delta Lake",
        "S3 checkpoint per query", "Append", "Exactly-once"),
    StreamPattern("Real-time Aggregation", "Kafka → Window Agg → Dashboard DB",
        "S3 checkpoint + state store", "Update/Complete", "Exactly-once"),
    StreamPattern("Event Enrichment", "Kafka → Join with dim table → Kafka",
        "S3 checkpoint", "Append", "At-least-once"),
    StreamPattern("Anomaly Detection", "Kafka → ML Score → Alert",
        "S3 checkpoint", "Append", "At-least-once"),
    StreamPattern("CDC Processing", "Debezium → Spark → Delta Lake",
        "S3 checkpoint", "Append", "Exactly-once"),
]

print("Production Patterns:")
for p in patterns:
    print(f"  [{p.pattern}] {p.description}")
    print(f"    Checkpoint: {p.checkpoint}")
    print(f"    Output: {p.output_mode} | Guarantee: {p.guarantee}")

# Monitoring
monitoring = {
    "Processing Rate": "Records processed per second per batch",
    "Batch Duration": "Time to process each micro-batch",
    "Input Rate": "Records arriving per second from Kafka",
    "Watermark Delay": "How far behind the watermark is",
    "State Size": "Memory used by aggregation state",
    "Kafka Lag": "Consumer group lag per partition",
}

print(f"\n\nMonitoring Metrics:")
for k, v in monitoring.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Watermark: ตั้ง Watermark เสมอเมื่อใช้ Window Aggregation ป้องกัน State โตไม่หยุด
  • Checkpoint: เก็บ Checkpoint ใน S3 ไม่ใช่ Local Disk สำหรับ Production
  • Partition: จำนวน Kafka Partition กำหนด Parallelism สูงสุด
  • State: ติดตาม State Size ถ้าโตมากอาจต้อง Tune Watermark
  • Test: ทดสอบ Recovery โดยหยุด Spark แล้ว Restart ดูว่าข้อมูลไม่หาย

Spark Structured Streaming คืออะไร

Stream Processing Engine Apache Spark DataFrame API Real-time Micro-batch Kafka File Sink Delta Lake Exactly-once Watermark Window Aggregation