Spark Structured Streaming Home Lab Setup — ตั้ง
Spark Streaming Lab

Spark Structured Streaming Home Lab Kafka Integration Window Aggregation Watermark Checkpoint Delta Lake Real-time Analytics Production
| Component | Tool | Port | Purpose |
|---|---|---|---|
| Message Broker | Apache Kafka | 9092 | Stream source and sink |
| Stream Engine | Apache Spark 3.5 | 4040 | Structured Streaming processing |
| Schema Registry | Confluent Schema Registry | 8081 | Avro/Protobuf schema management |
| Storage | MinIO (S3) | 9000 | Checkpoint and output storage |
| Lakehouse | Delta Lake | N/A | ACID table on object storage |
| Notebook | JupyterLab | 8888 | Interactive development |
| Kafka UI | Kafka UI | 8080 | Monitor topics and messages |
Docker Compose Setup
=== Docker Compose for Streaming Lab ===
อ่านเพิ่ม: MinIO S3 Compatible Storage self-hosted ทดแทน AWS S3 · อ่านเพิ่ม: LXC vs Docker เลือก Container Technology อะไรดี · อ่านเพิ่ม: Proxmox VE Cluster ทำ High Availability สำหรับ Home Lab
docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092, HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, HOST:PLAINTEXT
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
spark:
image: bitnami/spark:3.5
ports: ["4040:4040", "8888:8888"]
environment:
SPARK_MODE: master
volumes:
- ./notebooks:/opt/notebooks
minio:

image: minio/minio
ports: ["9000:9000", "9001:9001"]
command: server /data --console-address ":9001"
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
kafka-ui:
image: provectuslabs/kafka-ui
ports: ["8080:8080"]
environment:
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
Create Kafka topic
เนื้อหาเกี่ยวข้อง — Accumulation — คู่มือฉบับสมบูรณ์ 2026
docker exec kafka kafka-topics --create \
--bootstrap-server localhost:9092 \
--topic events --partitions 3 --replication-factor 1
Produce test events
docker exec -it kafka kafka-console-producer \
--bootstrap-server localhost:9092 --topic events
from dataclasses import dataclass
@dataclass
แนะนำเพิ่มเติม — สัญญาณเทรดรายวัน XM Signal
class LabService:
service: str
image: str
ram: str
purpose: str
lab = [
LabService("Zookeeper", "cp-zookeeper:7.5", "512MB", "Kafka coordination"),
LabService("Kafka", "cp-kafka:7.5", "1GB", "Message broker"),
LabService("Spark Master", "bitnami/spark:3.5", "2GB", "Stream processing"),
LabService("Spark Worker", "bitnami/spark:3.5", "2GB", "Processing executor"),
LabService("MinIO", "minio/minio", "512MB", "S3-compatible storage"),
LabService("Kafka UI", "kafka-ui", "256MB", "Topic monitoring"),
]
print("=== Lab Services ===")
total_ram = 0
for s in lab:
ram_mb = int(s.ram.replace("MB", "").replace("GB", "000"))
total_ram += ram_mb
print(f" [{s.service}] Image: {s.image} | RAM: {s.ram}")
print(f" Purpose: {s.purpose}")
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: WordPress Headless GreenOps Sustainability
print(f"\n Total RAM needed: ~{total_ram/1000:.1f}GB")
Structured Streaming Code
=== PySpark Structured Streaming ===
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("StreamingLab") \
.config("spark.jars.packages",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"
"io.delta:delta-spark_2.12:3.0.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# Read from Kafka
df = spark.readStream \
แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:29092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON events
schema = StructType([
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("amount", DoubleType()),
StructField("timestamp", TimestampType()),
เนื้อหาเกี่ยวข้อง — Medusa Commerce High Availability HA Setup
])
events = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Tumbling Window — Revenue per 5 minutes
revenue_5min = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"),
"event_type"
) \
.agg(
sum("amount").alias("total_amount"),
count("*").alias("event_count"),
avg("amount").alias("avg_amount")
)
# Write to Delta Lake
query = revenue_5min.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3a://checkpoint/revenue") \
.start("s3a://lakehouse/revenue_5min")
# Sliding Window — Moving average
moving_avg = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Flux CD GitOps Data Pipeline ETL — ทุกสิ่งที่ต้องรู้ในปี 2026
window("timestamp", "10 minutes", "5 minutes")
) \
.agg(avg("amount").alias("moving_avg"))
@dataclass
class WindowType:
window: str
syntax: str
use_case: str
example: str
windows = [
WindowType("Tumbling", 'window("ts", "5 minutes")',
"Fixed non-overlapping intervals",
"Revenue per 5 minutes, hourly counts"),
WindowType("Sliding", 'window("ts", "10 min", "5 min")',
"Overlapping intervals for smoothing",
"Moving average, trend detection"),
WindowType("Session", 'session_window("ts", "30 min")',
"Activity-based grouping",
"User session analysis, visit duration"),
]
print("\n=== Window Types ===")
for w in windows:
print(f" [{w.window}] Syntax: {w.syntax}")
print(f" Use case: {w.use_case}")
print(f" Example: {w.example}")
Production Patterns
# === Production Streaming Patterns ===
@dataclass
class StreamPattern:
pattern: str
description: str
checkpoint: str
output_mode: str
guarantee: str
patterns = [
StreamPattern("ETL Pipeline", "Kafka → Transform → Delta Lake",
"S3 checkpoint per query", "Append", "Exactly-once"),
StreamPattern("Real-time Aggregation", "Kafka → Window Agg → Dashboard DB",
"S3 checkpoint + state store", "Update/Complete", "Exactly-once"),
StreamPattern("Event Enrichment", "Kafka → Join with dim table → Kafka",
"S3 checkpoint", "Append", "At-least-once"),
StreamPattern("Anomaly Detection", "Kafka → ML Score → Alert",
"S3 checkpoint", "Append", "At-least-once"),
StreamPattern("CDC Processing", "Debezium → Spark → Delta Lake",
"S3 checkpoint", "Append", "Exactly-once"),
]
print("Production Patterns:")
for p in patterns:
print(f" [{p.pattern}] {p.description}")
print(f" Checkpoint: {p.checkpoint}")
print(f" Output: {p.output_mode} | Guarantee: {p.guarantee}")
# Monitoring
monitoring = {
"Processing Rate": "Records processed per second per batch",
"Batch Duration": "Time to process each micro-batch",
"Input Rate": "Records arriving per second from Kafka",
"Watermark Delay": "How far behind the watermark is",
"State Size": "Memory used by aggregation state",
"Kafka Lag": "Consumer group lag per partition",
}
print(f"\n\nMonitoring Metrics:")
for k, v in monitoring.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Watermark: ตั้ง Watermark เสมอเมื่อใช้ Window Aggregation ป้องกัน State โตไม่หยุด
- Checkpoint: เก็บ Checkpoint ใน S3 ไม่ใช่ Local Disk สำหรับ Production
- Partition: จำนวน Kafka Partition กำหนด Parallelism สูงสุด
- State: ติดตาม State Size ถ้าโตมากอาจต้อง Tune Watermark
- Test: ทดสอบ Recovery โดยหยุด Spark แล้ว Restart ดูว่าข้อมูลไม่หาย
Spark Structured Streaming คืออะไร
Stream Processing Engine Apache Spark DataFrame API Real-time Micro-batch Kafka File Sink Delta Lake Exactly-once Watermark Window Aggregation





