SiamCafe.net Blog
Technology

Spark Structured Streaming Internal Developer Platform

spark structured streaming internal developer platform
Spark Structured Streaming Internal Developer Platform | SiamCafe Blog
2026-01-07· อ. บอม — SiamCafe.net· 11,388 คำ

Spark Streaming Platform

Spark Structured Streaming Internal Developer Platform Kafka Delta Lake Airflow Self-service Data Pipeline Real-time Processing DataFrame Micro-batch Exactly-once

EngineModelLatencyEcosystemเหมาะกับ
Spark StreamingMicro-batchวินาทีใหญ่มากBatch + Stream
Apache FlinkTrue Streamมิลลิวินาทีใหญ่Low-latency
Kafka StreamsTrue StreamมิลลิวินาทีKafka onlyKafka Native
Apache BeamUnifiedขึ้นกับ RunnerMulti-runnerPortable

Structured Streaming

# === Spark Structured Streaming ===

# pip install pyspark delta-spark

# from pyspark.sql import SparkSession
# from pyspark.sql.functions import *
# from pyspark.sql.types import *
#
# spark = SparkSession.builder \
#     .appName("StreamingPipeline") \
#     .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .getOrCreate()
#
# # Read from Kafka
# kafka_df = spark.readStream \
#     .format("kafka") \
#     .option("kafka.bootstrap.servers", "kafka:9092") \
#     .option("subscribe", "events") \
#     .option("startingOffsets", "latest") \
#     .load()
#
# # Parse JSON
# schema = StructType([
#     StructField("user_id", StringType()),
#     StructField("event_type", StringType()),
#     StructField("timestamp", TimestampType()),
#     StructField("amount", DoubleType()),
# ])
#
# events = kafka_df \
#     .select(from_json(col("value").cast("string"), schema).alias("data")) \
#     .select("data.*")
#
# # Windowed Aggregation
# windowed = events \
#     .withWatermark("timestamp", "10 minutes") \
#     .groupBy(
#         window("timestamp", "5 minutes"),
#         "event_type"
#     ) \
#     .agg(
#         count("*").alias("event_count"),
#         sum("amount").alias("total_amount"),
#         approx_count_distinct("user_id").alias("unique_users")
#     )
#
# # Write to Delta Lake
# query = windowed.writeStream \
#     .format("delta") \
#     .outputMode("append") \
#     .option("checkpointLocation", "/checkpoints/events") \
#     .start("/data/delta/event_summary")

from dataclasses import dataclass

@dataclass
class Pipeline:
    name: str
    source: str
    sink: str
    throughput: str
    latency: str
    status: str

pipelines = [
    Pipeline("User Events", "Kafka: events", "Delta: event_summary", "50K msg/s", "5s", "Running"),
    Pipeline("Order Stream", "Kafka: orders", "Delta: order_facts", "10K msg/s", "3s", "Running"),
    Pipeline("Clickstream", "Kafka: clicks", "Delta: click_agg", "200K msg/s", "10s", "Running"),
    Pipeline("CDC Postgres", "Debezium: users", "Delta: users_cdc", "5K msg/s", "2s", "Running"),
    Pipeline("Log Analytics", "Kafka: app-logs", "Elasticsearch", "100K msg/s", "5s", "Running"),
]

print("=== Streaming Pipelines ===")
for p in pipelines:
    print(f"  [{p.status}] {p.name}")
    print(f"    {p.source} -> {p.sink} | {p.throughput} | Latency: {p.latency}")

Developer Platform

# === Internal Developer Platform ===

# Pipeline Template — YAML Config
# pipeline:
#   name: user-events-to-delta
#   source:
#     type: kafka
#     topic: user-events
#     format: json
#     schema: schemas/user_event.avsc
#   transformations:
#     - type: filter
#       condition: "event_type != 'heartbeat'"
#     - type: watermark
#       column: timestamp
#       delay: "10 minutes"
#     - type: aggregate
#       window: "5 minutes"
#       group_by: [event_type, country]
#       metrics:
#         - count: event_count
#         - sum(amount): total_amount
#         - approx_count_distinct(user_id): unique_users
#   sink:
#     type: delta
#     path: /data/delta/user_events_agg
#     mode: append
#     checkpoint: /checkpoints/user_events
#   monitoring:
#     alert_on_lag: 60  # seconds
#     alert_channel: slack

# Self-service Pipeline Builder
# 1. Data Engineer เลือก Template
# 2. กำหนด Source (Kafka Topic, Database)
# 3. กำหนด Transformation
# 4. กำหนด Sink (Delta Lake, Elasticsearch)
# 5. ตั้ง Monitoring Alert
# 6. Submit -> CI/CD Deploy อัตโนมัติ

@dataclass
class PlatformFeature:
    feature: str
    description: str
    tool: str
    status: str

features = [
    PlatformFeature("Pipeline Templates", "Template สำเร็จรูปสำหรับ Common Patterns", "YAML Config", "GA"),
    PlatformFeature("Self-service UI", "สร้าง Pipeline ผ่าน Web UI", "React + FastAPI", "Beta"),
    PlatformFeature("Schema Registry", "จัดการ Schema Avro Protobuf", "Confluent SR", "GA"),
    PlatformFeature("Data Catalog", "ค้นหา Dataset Lineage", "DataHub", "GA"),
    PlatformFeature("Monitoring", "Pipeline Health Lag Alert", "Grafana + PagerDuty", "GA"),
    PlatformFeature("CI/CD", "Auto Deploy Pipeline Code", "GitHub Actions + ArgoCD", "GA"),
    PlatformFeature("Cost Dashboard", "Track Spark Cluster Cost", "Grafana + Prometheus", "Beta"),
]

print("\n=== Platform Features ===")
for f in features:
    print(f"  [{f.status}] {f.feature}")
    print(f"    {f.description} | Tool: {f.tool}")

Production Operations

# === Production Monitoring ===

# Spark UI Metrics
# spark.metrics.conf:
# *.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusSink
# *.sink.prometheus.port=8090

# Key Metrics
# - spark_streaming_lastCompletedBatch_processingDelay
# - spark_streaming_lastCompletedBatch_totalDelay
# - spark_streaming_numRecordsPerSecond
# - spark_streaming_stateOperator_numRows
# - kafka_consumer_lag

# Kubernetes — Spark Operator
# helm install spark-operator spark-operator/spark-operator
# kubectl apply -f spark-application.yaml
#
# apiVersion: sparkoperator.k8s.io/v1beta2
# kind: SparkApplication
# metadata:
#   name: streaming-pipeline
# spec:
#   type: Python
#   mode: cluster
#   image: spark-streaming:latest
#   mainApplicationFile: local:///app/pipeline.py
#   sparkConf:
#     spark.streaming.kafka.maxRatePerPartition: "1000"
#     spark.sql.shuffle.partitions: "200"
#   driver:
#     cores: 2
#     memory: "4g"
#   executor:
#     cores: 4
#     instances: 3
#     memory: "8g"

operational_metrics = {
    "Active Pipelines": "15",
    "Total Throughput": "500K events/sec",
    "Avg Processing Delay": "3.2 seconds",
    "Kafka Consumer Lag": "1,200 messages",
    "Spark Executors": "45 (3 clusters)",
    "Delta Tables": "28",
    "Daily Data Processed": "2.5 TB",
    "Monthly Cost": "$4,500",
}

print("Operations Dashboard:")
for k, v in operational_metrics.items():
    print(f"  {k}: {v}")

best_practices = [
    "Checkpoint ทุก Pipeline ป้องกัน Data Loss",
    "Watermark สำหรับ Late Data ไม่เกิน Event Time",
    "Kafka Consumer Group แยกต่อ Pipeline",
    "Auto-scaling Executor ตาม Lag",
    "Delta Lake Optimize + Z-Order ทุกสัปดาห์",
    "Schema Evolution รองรับ Column เพิ่มลด",
    "Monitor Consumer Lag Alert ที่ 10,000 messages",
]

print(f"\n\nBest Practices:")
for i, p in enumerate(best_practices, 1):
    print(f"  {i}. {p}")

เคล็ดลับ

Spark Structured Streaming คืออะไร

Stream Processing Apache Spark DataFrame Real-time Kafka Kinesis Exactly-once Watermark Windowed Stateful Python Scala

Internal Developer Platform คืออะไร

IDP Self-service Developer สร้าง Deploy จัดการเอง Template CI/CD Monitoring Data Pipeline UI Config ไม่ต้องรอ Ops

Spark กับ Flink ต่างกันอย่างไร

Spark Micro-batch วินาที Ecosystem ใหญ่ ML Flink True Stream มิลลิวินาที Stateful ดี Spark Batch+Stream Flink Low-latency

สร้าง Data Platform อย่างไร

Spark Kafka Delta Lake Airflow Template Common Patterns Self-service UI Monitoring CI/CD Data Catalog Schema Registry

สรุป

Spark Structured Streaming Internal Developer Platform Kafka Delta Lake Self-service Template Pipeline CI/CD Monitoring Schema Registry Data Catalog Production Kubernetes

📖 บทความที่เกี่ยวข้อง

Spark Structured Streaming Microservices Architectureอ่านบทความ → Spark Structured Streaming AR VR Developmentอ่านบทความ → Spark Structured Streaming Citizen Developerอ่านบทความ → Spark Structured Streaming RBAC ABAC Policyอ่านบทความ → Spark Structured Streaming Infrastructure as Codeอ่านบทความ →

📚 ดูบทความทั้งหมด →