ai

Spark Structured Streaming Home Lab Setup — ตั้ง

Spark Structured Streaming Home Lab Setup — ตั้ง

Spark Streaming Lab

Spark Structured Streaming Home Lab Setup — ตั้ง

Spark Structured Streaming Home Lab Kafka Integration Window Aggregation Watermark Checkpoint Delta Lake Real-time Analytics Production

ComponentToolPortPurpose
Message BrokerApache Kafka9092Stream source and sink
Stream EngineApache Spark 3.54040Structured Streaming processing
Schema RegistryConfluent Schema Registry8081Avro/Protobuf schema management
StorageMinIO (S3)9000Checkpoint and output storage
LakehouseDelta LakeN/AACID table on object storage
NotebookJupyterLab8888Interactive development
Kafka UIKafka UI8080Monitor topics and messages

Docker Compose Setup

=== Docker Compose for Streaming Lab ===

อ่านเพิ่ม: MinIO S3 Compatible Storage self-hosted ทดแทน AWS S3 · อ่านเพิ่ม: LXC vs Docker เลือก Container Technology อะไรดี · อ่านเพิ่ม: Proxmox VE Cluster ทำ High Availability สำหรับ Home Lab

docker-compose.yml

version: '3.8'

services:

zookeeper:

image: confluentinc/cp-zookeeper:7.5.0

environment:

ZOOKEEPER_CLIENT_PORT: 2181

kafka:

image: confluentinc/cp-kafka:7.5.0

ports: ["9092:9092"]

environment:

KAFKA_BROKER_ID: 1

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092, HOST://localhost:9092

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT, HOST:PLAINTEXT

KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

spark:

image: bitnami/spark:3.5

ports: ["4040:4040", "8888:8888"]

environment:

SPARK_MODE: master

volumes:

  • ./notebooks:/opt/notebooks

minio:

Spark Structured Streaming Home Lab Setup — ตั้ง

image: minio/minio

ports: ["9000:9000", "9001:9001"]

command: server /data --console-address ":9001"

environment:

MINIO_ROOT_USER: minioadmin

MINIO_ROOT_PASSWORD: minioadmin

kafka-ui:

image: provectuslabs/kafka-ui

ports: ["8080:8080"]

environment:

KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092

Create Kafka topic

เนื้อหาเกี่ยวข้อง — Accumulation — คู่มือฉบับสมบูรณ์ 2026

docker exec kafka kafka-topics --create \

--bootstrap-server localhost:9092 \

--topic events --partitions 3 --replication-factor 1

Produce test events

docker exec -it kafka kafka-console-producer \

--bootstrap-server localhost:9092 --topic events

from dataclasses import dataclass

@dataclass

แนะนำเพิ่มเติม — สัญญาณเทรดรายวัน XM Signal

class LabService:

service: str

image: str

ram: str

purpose: str

lab = [

LabService("Zookeeper", "cp-zookeeper:7.5", "512MB", "Kafka coordination"),

LabService("Kafka", "cp-kafka:7.5", "1GB", "Message broker"),

LabService("Spark Master", "bitnami/spark:3.5", "2GB", "Stream processing"),

LabService("Spark Worker", "bitnami/spark:3.5", "2GB", "Processing executor"),

LabService("MinIO", "minio/minio", "512MB", "S3-compatible storage"),

LabService("Kafka UI", "kafka-ui", "256MB", "Topic monitoring"),

]

print("=== Lab Services ===")

total_ram = 0

for s in lab:

ram_mb = int(s.ram.replace("MB", "").replace("GB", "000"))

total_ram += ram_mb

print(f" [{s.service}] Image: {s.image} | RAM: {s.ram}")

print(f" Purpose: {s.purpose}")

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: WordPress Headless GreenOps Sustainability

print(f"\n Total RAM needed: ~{total_ram/1000:.1f}GB")

Structured Streaming Code

=== PySpark Structured Streaming ===

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder \

.appName("StreamingLab") \

.config("spark.jars.packages",

"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,"

"io.delta:delta-spark_2.12:3.0.0") \

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

.getOrCreate()

# Read from Kafka

df = spark.readStream \

แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex

.format("kafka") \

.option("kafka.bootstrap.servers", "kafka:29092") \

.option("subscribe", "events") \

.option("startingOffsets", "latest") \

.load()

# Parse JSON events

schema = StructType([

StructField("user_id", StringType()),

StructField("event_type", StringType()),

StructField("amount", DoubleType()),

StructField("timestamp", TimestampType()),

เนื้อหาเกี่ยวข้อง — Medusa Commerce High Availability HA Setup

])

events = df.select(

from_json(col("value").cast("string"), schema).alias("data")

).select("data.*")

# Tumbling Window — Revenue per 5 minutes

revenue_5min = events \

.withWatermark("timestamp", "10 minutes") \

.groupBy(

window("timestamp", "5 minutes"),

"event_type"

) \

.agg(

sum("amount").alias("total_amount"),

count("*").alias("event_count"),

avg("amount").alias("avg_amount")

)

# Write to Delta Lake

query = revenue_5min.writeStream \

.format("delta") \

.outputMode("append") \

.option("checkpointLocation", "s3a://checkpoint/revenue") \

.start("s3a://lakehouse/revenue_5min")

# Sliding Window — Moving average

moving_avg = events \

.withWatermark("timestamp", "10 minutes") \

.groupBy(

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Flux CD GitOps Data Pipeline ETL — ทุกสิ่งที่ต้องรู้ในปี 2026

window("timestamp", "10 minutes", "5 minutes")

) \

.agg(avg("amount").alias("moving_avg"))

@dataclass

class WindowType:

window: str

syntax: str

use_case: str

example: str

windows = [

WindowType("Tumbling", 'window("ts", "5 minutes")',

"Fixed non-overlapping intervals",

"Revenue per 5 minutes, hourly counts"),

WindowType("Sliding", 'window("ts", "10 min", "5 min")',

"Overlapping intervals for smoothing",

"Moving average, trend detection"),

WindowType("Session", 'session_window("ts", "30 min")',

"Activity-based grouping",

"User session analysis, visit duration"),

]

print("\n=== Window Types ===")

for w in windows:

print(f" [{w.window}] Syntax: {w.syntax}")

print(f" Use case: {w.use_case}")

print(f" Example: {w.example}")

Production Patterns

# === Production Streaming Patterns ===

@dataclass
class StreamPattern:
    pattern: str
    description: str
    checkpoint: str
    output_mode: str
    guarantee: str

patterns = [
    StreamPattern("ETL Pipeline", "Kafka → Transform → Delta Lake",
        "S3 checkpoint per query", "Append", "Exactly-once"),
    StreamPattern("Real-time Aggregation", "Kafka → Window Agg → Dashboard DB",
        "S3 checkpoint + state store", "Update/Complete", "Exactly-once"),
    StreamPattern("Event Enrichment", "Kafka → Join with dim table → Kafka",
        "S3 checkpoint", "Append", "At-least-once"),
    StreamPattern("Anomaly Detection", "Kafka → ML Score → Alert",
        "S3 checkpoint", "Append", "At-least-once"),
    StreamPattern("CDC Processing", "Debezium → Spark → Delta Lake",
        "S3 checkpoint", "Append", "Exactly-once"),
]

print("Production Patterns:")
for p in patterns:
    print(f"  [{p.pattern}] {p.description}")
    print(f"    Checkpoint: {p.checkpoint}")
    print(f"    Output: {p.output_mode} | Guarantee: {p.guarantee}")

# Monitoring
monitoring = {
    "Processing Rate": "Records processed per second per batch",
    "Batch Duration": "Time to process each micro-batch",
    "Input Rate": "Records arriving per second from Kafka",
    "Watermark Delay": "How far behind the watermark is",
    "State Size": "Memory used by aggregation state",
    "Kafka Lag": "Consumer group lag per partition",
}

print(f"\n\nMonitoring Metrics:")
for k, v in monitoring.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Watermark: ตั้ง Watermark เสมอเมื่อใช้ Window Aggregation ป้องกัน State โตไม่หยุด
  • Checkpoint: เก็บ Checkpoint ใน S3 ไม่ใช่ Local Disk สำหรับ Production
  • Partition: จำนวน Kafka Partition กำหนด Parallelism สูงสุด
  • State: ติดตาม State Size ถ้าโตมากอาจต้อง Tune Watermark
  • Test: ทดสอบ Recovery โดยหยุด Spark แล้ว Restart ดูว่าข้อมูลไม่หาย

Spark Structured Streaming คืออะไร

Stream Processing Engine Apache Spark DataFrame API Real-time Micro-batch Kafka File Sink Delta Lake Exactly-once Watermark Window Aggregation

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง