Spark Streaming Lab
Spark Structured Streaming Home Lab Kafka Integration Window Aggregation Watermark Checkpoint Delta Lake Real-time Analytics Production
| Component | Tool | Port | Purpose |
|---|---|---|---|
| Message Broker | Apache Kafka | 9092 | Stream source and sink |
| Stream Engine | Apache Spark 3.5 | 4040 | Structured Streaming processing |
| Schema Registry | Confluent Schema Registry | 8081 | Avro/Protobuf schema management |
| Storage | MinIO (S3) | 9000 | Checkpoint and output storage |
| Lakehouse | Delta Lake | N/A | ACID table on object storage |
| Notebook | JupyterLab | 8888 | Interactive development |
| Kafka UI | Kafka UI | 8080 | Monitor topics and messages |
Docker Compose Setup
# === Docker Compose for Streaming Lab ===
# docker-compose.yml
# version: '3.8'
# services:
# zookeeper:
# image: confluentinc/cp-zookeeper:7.5.0
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
#
# kafka:
# image: confluentinc/cp-kafka:7.5.0
# ports: ["9092:9092"]
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092, HOST://localhost:9092
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, HOST:PLAINTEXT
# KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
#
# spark:
# image: bitnami/spark:3.5
# ports: ["4040:4040", "8888:8888"]
# environment:
# SPARK_MODE: master
# volumes:
# - ./notebooks:/opt/notebooks
#
# minio:
# image: minio/minio
# ports: ["9000:9000", "9001:9001"]
# command: server /data --console-address ":9001"
# environment:
# MINIO_ROOT_USER: minioadmin
# MINIO_ROOT_PASSWORD: minioadmin
#
# kafka-ui:
# image: provectuslabs/kafka-ui
# ports: ["8080:8080"]
# environment:
# KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
# Create Kafka topic
# docker exec kafka kafka-topics --create \
# --bootstrap-server localhost:9092 \
# --topic events --partitions 3 --replication-factor 1
# Produce test events
# docker exec -it kafka kafka-console-producer \
# --bootstrap-server localhost:9092 --topic events
from dataclasses import dataclass
@dataclass
class LabService:
service: str
image: str
ram: str
purpose: str
lab = [
LabService("Zookeeper", "cp-zookeeper:7.5", "512MB", "Kafka coordination"),
LabService("Kafka", "cp-kafka:7.5", "1GB", "Message broker"),
LabService("Spark Master", "bitnami/spark:3.5", "2GB", "Stream processing"),
LabService("Spark Worker", "bitnami/spark:3.5", "2GB", "Processing executor"),
LabService("MinIO", "minio/minio", "512MB", "S3-compatible storage"),
LabService("Kafka UI", "kafka-ui", "256MB", "Topic monitoring"),
]
print("=== Lab Services ===")
total_ram = 0
for s in lab:
ram_mb = int(s.ram.replace("MB", "").replace("GB", "000"))
total_ram += ram_mb
print(f" [{s.service}] Image: {s.image} | RAM: {s.ram}")
print(f" Purpose: {s.purpose}")
print(f"\n Total RAM needed: ~{total_ram/1000:.1f}GB")
Structured Streaming Code
# === PySpark Structured Streaming ===
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import *
# from pyspark.sql.types import *
#
# spark = SparkSession.builder \
# .appName("StreamingLab") \
# .config("spark.jars.packages",
# "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
# "io.delta:delta-spark_2.12:3.0.0") \
# .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
# .getOrCreate()
#
# # Read from Kafka
# df = spark.readStream \
# .format("kafka") \
# .option("kafka.bootstrap.servers", "kafka:29092") \
# .option("subscribe", "events") \
# .option("startingOffsets", "latest") \
# .load()
#
# # Parse JSON events
# schema = StructType([
# StructField("user_id", StringType()),
# StructField("event_type", StringType()),
# StructField("amount", DoubleType()),
# StructField("timestamp", TimestampType()),
# ])
#
# events = df.select(
# from_json(col("value").cast("string"), schema).alias("data")
# ).select("data.*")
#
# # Tumbling Window — Revenue per 5 minutes
# revenue_5min = events \
# .withWatermark("timestamp", "10 minutes") \
# .groupBy(
# window("timestamp", "5 minutes"),
# "event_type"
# ) \
# .agg(
# sum("amount").alias("total_amount"),
# count("*").alias("event_count"),
# avg("amount").alias("avg_amount")
# )
#
# # Write to Delta Lake
# query = revenue_5min.writeStream \
# .format("delta") \
# .outputMode("append") \
# .option("checkpointLocation", "s3a://checkpoint/revenue") \
# .start("s3a://lakehouse/revenue_5min")
#
# # Sliding Window — Moving average
# moving_avg = events \
# .withWatermark("timestamp", "10 minutes") \
# .groupBy(
# window("timestamp", "10 minutes", "5 minutes")
# ) \
# .agg(avg("amount").alias("moving_avg"))
@dataclass
class WindowType:
window: str
syntax: str
use_case: str
example: str
windows = [
WindowType("Tumbling", 'window("ts", "5 minutes")',
"Fixed non-overlapping intervals",
"Revenue per 5 minutes, hourly counts"),
WindowType("Sliding", 'window("ts", "10 min", "5 min")',
"Overlapping intervals for smoothing",
"Moving average, trend detection"),
WindowType("Session", 'session_window("ts", "30 min")',
"Activity-based grouping",
"User session analysis, visit duration"),
]
print("\n=== Window Types ===")
for w in windows:
print(f" [{w.window}] Syntax: {w.syntax}")
print(f" Use case: {w.use_case}")
print(f" Example: {w.example}")
Production Patterns
# === Production Streaming Patterns ===
@dataclass
class StreamPattern:
pattern: str
description: str
checkpoint: str
output_mode: str
guarantee: str
patterns = [
StreamPattern("ETL Pipeline", "Kafka → Transform → Delta Lake",
"S3 checkpoint per query", "Append", "Exactly-once"),
StreamPattern("Real-time Aggregation", "Kafka → Window Agg → Dashboard DB",
"S3 checkpoint + state store", "Update/Complete", "Exactly-once"),
StreamPattern("Event Enrichment", "Kafka → Join with dim table → Kafka",
"S3 checkpoint", "Append", "At-least-once"),
StreamPattern("Anomaly Detection", "Kafka → ML Score → Alert",
"S3 checkpoint", "Append", "At-least-once"),
StreamPattern("CDC Processing", "Debezium → Spark → Delta Lake",
"S3 checkpoint", "Append", "Exactly-once"),
]
print("Production Patterns:")
for p in patterns:
print(f" [{p.pattern}] {p.description}")
print(f" Checkpoint: {p.checkpoint}")
print(f" Output: {p.output_mode} | Guarantee: {p.guarantee}")
# Monitoring
monitoring = {
"Processing Rate": "Records processed per second per batch",
"Batch Duration": "Time to process each micro-batch",
"Input Rate": "Records arriving per second from Kafka",
"Watermark Delay": "How far behind the watermark is",
"State Size": "Memory used by aggregation state",
"Kafka Lag": "Consumer group lag per partition",
}
print(f"\n\nMonitoring Metrics:")
for k, v in monitoring.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Watermark: ตั้ง Watermark เสมอเมื่อใช้ Window Aggregation ป้องกัน State โตไม่หยุด
- Checkpoint: เก็บ Checkpoint ใน S3 ไม่ใช่ Local Disk สำหรับ Production
- Partition: จำนวน Kafka Partition กำหนด Parallelism สูงสุด
- State: ติดตาม State Size ถ้าโตมากอาจต้อง Tune Watermark
- Test: ทดสอบ Recovery โดยหยุด Spark แล้ว Restart ดูว่าข้อมูลไม่หาย
Spark Structured Streaming คืออะไร
Stream Processing Engine Apache Spark DataFrame API Real-time Micro-batch Kafka File Sink Delta Lake Exactly-once Watermark Window Aggregation
ตั้ง Home Lab อย่างไร
Docker Compose Kafka Spark 3.5 JupyterLab Kafka UI MinIO S3 Delta Lake Python PySpark Schema Registry Interactive Development
Window Aggregation ทำอย่างไร
Tumbling ไม่ซ้อน Sliding ซ้อน Session Activity Gap window() Timestamp Duration Slide groupBy agg Watermark Late Data Threshold
Checkpoint ทำอย่างไร
State Fault Tolerance checkpointLocation writeStream Offset Kafka State Aggregation Commit Log Restart S3 HDFS ไม่ลบ
สรุป
Spark Structured Streaming Home Lab Docker Kafka Window Aggregation Watermark Checkpoint Delta Lake PySpark Real-time Analytics Production Deployment
