Spark Streaming Platform
Spark Structured Streaming Internal Developer Platform Kafka Delta Lake Airflow Self-service Data Pipeline Real-time Processing DataFrame Micro-batch Exactly-once
| Engine | Model | Latency | Ecosystem | เหมาะกับ |
|---|---|---|---|---|
| Spark Streaming | Micro-batch | วินาที | ใหญ่มาก | Batch + Stream |
| Apache Flink | True Stream | มิลลิวินาที | ใหญ่ | Low-latency |
| Kafka Streams | True Stream | มิลลิวินาที | Kafka only | Kafka Native |
| Apache Beam | Unified | ขึ้นกับ Runner | Multi-runner | Portable |
Structured Streaming
# === Spark Structured Streaming ===
# pip install pyspark delta-spark
# from pyspark.sql import SparkSession
# from pyspark.sql.functions import *
# from pyspark.sql.types import *
#
# spark = SparkSession.builder \
# .appName("StreamingPipeline") \
# .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
# .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
# .getOrCreate()
#
# # Read from Kafka
# kafka_df = spark.readStream \
# .format("kafka") \
# .option("kafka.bootstrap.servers", "kafka:9092") \
# .option("subscribe", "events") \
# .option("startingOffsets", "latest") \
# .load()
#
# # Parse JSON
# schema = StructType([
# StructField("user_id", StringType()),
# StructField("event_type", StringType()),
# StructField("timestamp", TimestampType()),
# StructField("amount", DoubleType()),
# ])
#
# events = kafka_df \
# .select(from_json(col("value").cast("string"), schema).alias("data")) \
# .select("data.*")
#
# # Windowed Aggregation
# windowed = events \
# .withWatermark("timestamp", "10 minutes") \
# .groupBy(
# window("timestamp", "5 minutes"),
# "event_type"
# ) \
# .agg(
# count("*").alias("event_count"),
# sum("amount").alias("total_amount"),
# approx_count_distinct("user_id").alias("unique_users")
# )
#
# # Write to Delta Lake
# query = windowed.writeStream \
# .format("delta") \
# .outputMode("append") \
# .option("checkpointLocation", "/checkpoints/events") \
# .start("/data/delta/event_summary")
from dataclasses import dataclass
@dataclass
class Pipeline:
name: str
source: str
sink: str
throughput: str
latency: str
status: str
pipelines = [
Pipeline("User Events", "Kafka: events", "Delta: event_summary", "50K msg/s", "5s", "Running"),
Pipeline("Order Stream", "Kafka: orders", "Delta: order_facts", "10K msg/s", "3s", "Running"),
Pipeline("Clickstream", "Kafka: clicks", "Delta: click_agg", "200K msg/s", "10s", "Running"),
Pipeline("CDC Postgres", "Debezium: users", "Delta: users_cdc", "5K msg/s", "2s", "Running"),
Pipeline("Log Analytics", "Kafka: app-logs", "Elasticsearch", "100K msg/s", "5s", "Running"),
]
print("=== Streaming Pipelines ===")
for p in pipelines:
print(f" [{p.status}] {p.name}")
print(f" {p.source} -> {p.sink} | {p.throughput} | Latency: {p.latency}")
Developer Platform
# === Internal Developer Platform ===
# Pipeline Template — YAML Config
# pipeline:
# name: user-events-to-delta
# source:
# type: kafka
# topic: user-events
# format: json
# schema: schemas/user_event.avsc
# transformations:
# - type: filter
# condition: "event_type != 'heartbeat'"
# - type: watermark
# column: timestamp
# delay: "10 minutes"
# - type: aggregate
# window: "5 minutes"
# group_by: [event_type, country]
# metrics:
# - count: event_count
# - sum(amount): total_amount
# - approx_count_distinct(user_id): unique_users
# sink:
# type: delta
# path: /data/delta/user_events_agg
# mode: append
# checkpoint: /checkpoints/user_events
# monitoring:
# alert_on_lag: 60 # seconds
# alert_channel: slack
# Self-service Pipeline Builder
# 1. Data Engineer เลือก Template
# 2. กำหนด Source (Kafka Topic, Database)
# 3. กำหนด Transformation
# 4. กำหนด Sink (Delta Lake, Elasticsearch)
# 5. ตั้ง Monitoring Alert
# 6. Submit -> CI/CD Deploy อัตโนมัติ
@dataclass
class PlatformFeature:
feature: str
description: str
tool: str
status: str
features = [
PlatformFeature("Pipeline Templates", "Template สำเร็จรูปสำหรับ Common Patterns", "YAML Config", "GA"),
PlatformFeature("Self-service UI", "สร้าง Pipeline ผ่าน Web UI", "React + FastAPI", "Beta"),
PlatformFeature("Schema Registry", "จัดการ Schema Avro Protobuf", "Confluent SR", "GA"),
PlatformFeature("Data Catalog", "ค้นหา Dataset Lineage", "DataHub", "GA"),
PlatformFeature("Monitoring", "Pipeline Health Lag Alert", "Grafana + PagerDuty", "GA"),
PlatformFeature("CI/CD", "Auto Deploy Pipeline Code", "GitHub Actions + ArgoCD", "GA"),
PlatformFeature("Cost Dashboard", "Track Spark Cluster Cost", "Grafana + Prometheus", "Beta"),
]
print("\n=== Platform Features ===")
for f in features:
print(f" [{f.status}] {f.feature}")
print(f" {f.description} | Tool: {f.tool}")
Production Operations
# === Production Monitoring ===
# Spark UI Metrics
# spark.metrics.conf:
# *.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusSink
# *.sink.prometheus.port=8090
# Key Metrics
# - spark_streaming_lastCompletedBatch_processingDelay
# - spark_streaming_lastCompletedBatch_totalDelay
# - spark_streaming_numRecordsPerSecond
# - spark_streaming_stateOperator_numRows
# - kafka_consumer_lag
# Kubernetes — Spark Operator
# helm install spark-operator spark-operator/spark-operator
# kubectl apply -f spark-application.yaml
#
# apiVersion: sparkoperator.k8s.io/v1beta2
# kind: SparkApplication
# metadata:
# name: streaming-pipeline
# spec:
# type: Python
# mode: cluster
# image: spark-streaming:latest
# mainApplicationFile: local:///app/pipeline.py
# sparkConf:
# spark.streaming.kafka.maxRatePerPartition: "1000"
# spark.sql.shuffle.partitions: "200"
# driver:
# cores: 2
# memory: "4g"
# executor:
# cores: 4
# instances: 3
# memory: "8g"
operational_metrics = {
"Active Pipelines": "15",
"Total Throughput": "500K events/sec",
"Avg Processing Delay": "3.2 seconds",
"Kafka Consumer Lag": "1,200 messages",
"Spark Executors": "45 (3 clusters)",
"Delta Tables": "28",
"Daily Data Processed": "2.5 TB",
"Monthly Cost": "$4,500",
}
print("Operations Dashboard:")
for k, v in operational_metrics.items():
print(f" {k}: {v}")
best_practices = [
"Checkpoint ทุก Pipeline ป้องกัน Data Loss",
"Watermark สำหรับ Late Data ไม่เกิน Event Time",
"Kafka Consumer Group แยกต่อ Pipeline",
"Auto-scaling Executor ตาม Lag",
"Delta Lake Optimize + Z-Order ทุกสัปดาห์",
"Schema Evolution รองรับ Column เพิ่มลด",
"Monitor Consumer Lag Alert ที่ 10,000 messages",
]
print(f"\n\nBest Practices:")
for i, p in enumerate(best_practices, 1):
print(f" {i}. {p}")
เคล็ดลับ
- Checkpoint: ใช้ Checkpoint ทุก Pipeline ป้องกัน Data Loss
- Watermark: ตั้ง Watermark สำหรับ Late Data เสมอ
- Delta: ใช้ Delta Lake สำหรับ ACID Transactions
- Template: สร้าง Template ให้ Team ใช้ซ้ำ ลดเวลา
- Monitor: Alert บน Kafka Lag และ Processing Delay
Spark Structured Streaming คืออะไร
Stream Processing Apache Spark DataFrame Real-time Kafka Kinesis Exactly-once Watermark Windowed Stateful Python Scala
Internal Developer Platform คืออะไร
IDP Self-service Developer สร้าง Deploy จัดการเอง Template CI/CD Monitoring Data Pipeline UI Config ไม่ต้องรอ Ops
Spark กับ Flink ต่างกันอย่างไร
Spark Micro-batch วินาที Ecosystem ใหญ่ ML Flink True Stream มิลลิวินาที Stateful ดี Spark Batch+Stream Flink Low-latency
สร้าง Data Platform อย่างไร
Spark Kafka Delta Lake Airflow Template Common Patterns Self-service UI Monitoring CI/CD Data Catalog Schema Registry
สรุป
Spark Structured Streaming Internal Developer Platform Kafka Delta Lake Self-service Template Pipeline CI/CD Monitoring Schema Registry Data Catalog Production Kubernetes
