SiamCafe.net Blog
Technology

Spark Structured Streaming Home Lab Setup

spark structured streaming home lab setup
Spark Structured Streaming Home Lab Setup | SiamCafe Blog
2026-02-02· อ. บอม — SiamCafe.net· 8,608 คำ

Spark Streaming Lab

Spark Structured Streaming Home Lab Kafka Integration Window Aggregation Watermark Checkpoint Delta Lake Real-time Analytics Production

ComponentToolPortPurpose
Message BrokerApache Kafka9092Stream source and sink
Stream EngineApache Spark 3.54040Structured Streaming processing
Schema RegistryConfluent Schema Registry8081Avro/Protobuf schema management
StorageMinIO (S3)9000Checkpoint and output storage
LakehouseDelta LakeN/AACID table on object storage
NotebookJupyterLab8888Interactive development
Kafka UIKafka UI8080Monitor topics and messages

Docker Compose Setup

# === Docker Compose for Streaming Lab ===

# docker-compose.yml
# version: '3.8'
# services:
#   zookeeper:
#     image: confluentinc/cp-zookeeper:7.5.0
#     environment:
#       ZOOKEEPER_CLIENT_PORT: 2181
#
#   kafka:
#     image: confluentinc/cp-kafka:7.5.0
#     ports: ["9092:9092"]
#     environment:
#       KAFKA_BROKER_ID: 1
#       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#       KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092, HOST://localhost:9092
#       KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, HOST:PLAINTEXT
#       KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
#
#   spark:
#     image: bitnami/spark:3.5
#     ports: ["4040:4040", "8888:8888"]
#     environment:
#       SPARK_MODE: master
#     volumes:
#       - ./notebooks:/opt/notebooks
#
#   minio:
#     image: minio/minio
#     ports: ["9000:9000", "9001:9001"]
#     command: server /data --console-address ":9001"
#     environment:
#       MINIO_ROOT_USER: minioadmin
#       MINIO_ROOT_PASSWORD: minioadmin
#
#   kafka-ui:
#     image: provectuslabs/kafka-ui
#     ports: ["8080:8080"]
#     environment:
#       KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

# Create Kafka topic
# docker exec kafka kafka-topics --create \
#   --bootstrap-server localhost:9092 \
#   --topic events --partitions 3 --replication-factor 1

# Produce test events
# docker exec -it kafka kafka-console-producer \
#   --bootstrap-server localhost:9092 --topic events

from dataclasses import dataclass

@dataclass
class LabService:
    service: str
    image: str
    ram: str
    purpose: str

lab = [
    LabService("Zookeeper", "cp-zookeeper:7.5", "512MB", "Kafka coordination"),
    LabService("Kafka", "cp-kafka:7.5", "1GB", "Message broker"),
    LabService("Spark Master", "bitnami/spark:3.5", "2GB", "Stream processing"),
    LabService("Spark Worker", "bitnami/spark:3.5", "2GB", "Processing executor"),
    LabService("MinIO", "minio/minio", "512MB", "S3-compatible storage"),
    LabService("Kafka UI", "kafka-ui", "256MB", "Topic monitoring"),
]

print("=== Lab Services ===")
total_ram = 0
for s in lab:
    ram_mb = int(s.ram.replace("MB", "").replace("GB", "000"))
    total_ram += ram_mb
    print(f"  [{s.service}] Image: {s.image} | RAM: {s.ram}")
    print(f"    Purpose: {s.purpose}")
print(f"\n  Total RAM needed: ~{total_ram/1000:.1f}GB")

Structured Streaming Code

# === PySpark Structured Streaming ===

# from pyspark.sql import SparkSession
# from pyspark.sql.functions import *
# from pyspark.sql.types import *
#
# spark = SparkSession.builder \
#     .appName("StreamingLab") \
#     .config("spark.jars.packages",
#         "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
#         "io.delta:delta-spark_2.12:3.0.0") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .getOrCreate()
#
# # Read from Kafka
# df = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:29092") \
#     .option("subscribe", "events") \
#     .option("startingOffsets", "latest") \
#     .load()
#
# # Parse JSON events
# schema = StructType([
#     StructField("user_id", StringType()),
#     StructField("event_type", StringType()),
#     StructField("amount", DoubleType()),
#     StructField("timestamp", TimestampType()),
# ])
#
# events = df.select(
#     from_json(col("value").cast("string"), schema).alias("data")
# ).select("data.*")
#
# # Tumbling Window — Revenue per 5 minutes
# revenue_5min = events \
#     .withWatermark("timestamp", "10 minutes") \
#     .groupBy(
#         window("timestamp", "5 minutes"),
#         "event_type"
#     ) \
#     .agg(
#         sum("amount").alias("total_amount"),
#         count("*").alias("event_count"),
#         avg("amount").alias("avg_amount")
#     )
#
# # Write to Delta Lake
# query = revenue_5min.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "s3a://checkpoint/revenue") \
#     .start("s3a://lakehouse/revenue_5min")
#
# # Sliding Window — Moving average
# moving_avg = events \
#     .withWatermark("timestamp", "10 minutes") \
#     .groupBy(
#         window("timestamp", "10 minutes", "5 minutes")
#     ) \
#     .agg(avg("amount").alias("moving_avg"))

@dataclass
class WindowType:
    window: str
    syntax: str
    use_case: str
    example: str

windows = [
    WindowType("Tumbling", 'window("ts", "5 minutes")',
        "Fixed non-overlapping intervals",
        "Revenue per 5 minutes, hourly counts"),
    WindowType("Sliding", 'window("ts", "10 min", "5 min")',
        "Overlapping intervals for smoothing",
        "Moving average, trend detection"),
    WindowType("Session", 'session_window("ts", "30 min")',
        "Activity-based grouping",
        "User session analysis, visit duration"),
]

print("\n=== Window Types ===")
for w in windows:
    print(f"  [{w.window}] Syntax: {w.syntax}")
    print(f"    Use case: {w.use_case}")
    print(f"    Example: {w.example}")

Production Patterns

# === Production Streaming Patterns ===

@dataclass
class StreamPattern:
    pattern: str
    description: str
    checkpoint: str
    output_mode: str
    guarantee: str

patterns = [
    StreamPattern("ETL Pipeline", "Kafka → Transform → Delta Lake",
        "S3 checkpoint per query", "Append", "Exactly-once"),
    StreamPattern("Real-time Aggregation", "Kafka → Window Agg → Dashboard DB",
        "S3 checkpoint + state store", "Update/Complete", "Exactly-once"),
    StreamPattern("Event Enrichment", "Kafka → Join with dim table → Kafka",
        "S3 checkpoint", "Append", "At-least-once"),
    StreamPattern("Anomaly Detection", "Kafka → ML Score → Alert",
        "S3 checkpoint", "Append", "At-least-once"),
    StreamPattern("CDC Processing", "Debezium → Spark → Delta Lake",
        "S3 checkpoint", "Append", "Exactly-once"),
]

print("Production Patterns:")
for p in patterns:
    print(f"  [{p.pattern}] {p.description}")
    print(f"    Checkpoint: {p.checkpoint}")
    print(f"    Output: {p.output_mode} | Guarantee: {p.guarantee}")

# Monitoring
monitoring = {
    "Processing Rate": "Records processed per second per batch",
    "Batch Duration": "Time to process each micro-batch",
    "Input Rate": "Records arriving per second from Kafka",
    "Watermark Delay": "How far behind the watermark is",
    "State Size": "Memory used by aggregation state",
    "Kafka Lag": "Consumer group lag per partition",
}

print(f"\n\nMonitoring Metrics:")
for k, v in monitoring.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

Spark Structured Streaming คืออะไร

Stream Processing Engine Apache Spark DataFrame API Real-time Micro-batch Kafka File Sink Delta Lake Exactly-once Watermark Window Aggregation

ตั้ง Home Lab อย่างไร

Docker Compose Kafka Spark 3.5 JupyterLab Kafka UI MinIO S3 Delta Lake Python PySpark Schema Registry Interactive Development

Window Aggregation ทำอย่างไร

Tumbling ไม่ซ้อน Sliding ซ้อน Session Activity Gap window() Timestamp Duration Slide groupBy agg Watermark Late Data Threshold

Checkpoint ทำอย่างไร

State Fault Tolerance checkpointLocation writeStream Offset Kafka State Aggregation Commit Log Restart S3 HDFS ไม่ลบ

สรุป

Spark Structured Streaming Home Lab Docker Kafka Window Aggregation Watermark Checkpoint Delta Lake PySpark Real-time Analytics Production Deployment

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming AR VR Developmentอ่านบทความ → Spark Structured Streaming Open Source Contributionอ่านบทความ → Spark Structured Streaming Infrastructure as Codeอ่านบทความ → Spark Structured Streaming Microservices Architectureอ่านบทความ → Spark Structured Streaming 12 Factor Appอ่านบทความ →

📚 ดูบทความทั้งหมด →