Spark Structured Streaming Internal Developer
Spark Streaming Platform
Spark Structured Streaming Internal Developer Platform Kafka Delta Lake Airflow Self-service Data Pipeline Real-time Processing DataFrame Micro-batch Exactly-once
| Engine | Model | Latency | Ecosystem | เหมาะกับ |
|---|---|---|---|---|
| Spark Streaming | Micro-batch | วินาที | ใหญ่มาก | Batch + Stream |
| Apache Flink | True Stream | มิลลิวินาที | ใหญ่ | Low-latency |
| Kafka Streams | True Stream | มิลลิวินาที | Kafka only | Kafka Native |
| Apache Beam | Unified | ขึ้นกับ Runner | Multi-runner | Portable |
Structured Streaming
=== Spark Structured Streaming ===
pip install pyspark delta-spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder \
.appName("StreamingPipeline") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.getOrCreate()
# Read from Kafka
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load()
# Parse JSON
schema = StructType([
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType()),
StructField("amount", DoubleType()),
])
events = kafka_df \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Windowed Aggregation
windowed = events \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
window("timestamp", "5 minutes"),
"event_type"
) \
.agg(
count("*").alias("event_count"),
sum("amount").alias("total_amount"),
approx_count_distinct("user_id").alias("unique_users")
)
# Write to Delta Lake
query = windowed.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoints/events") \
.start("/data/delta/event_summary")
from dataclasses import dataclass
@dataclass
class Pipeline:
name: str
source: str
sink: str
throughput: str
latency: str
status: str
pipelines = [
Pipeline("User Events", "Kafka: events", "Delta: event_summary", "50K msg/s", "5s", "Running"),
Pipeline("Order Stream", "Kafka: orders", "Delta: order_facts", "10K msg/s", "3s", "Running"),
Pipeline("Clickstream", "Kafka: clicks", "Delta: click_agg", "200K msg/s", "10s", "Running"),
Pipeline("CDC Postgres", "Debezium: users", "Delta: users_cdc", "5K msg/s", "2s", "Running"),
Pipeline("Log Analytics", "Kafka: app-logs", "Elasticsearch", "100K msg/s", "5s", "Running"),
]
print("=== Streaming Pipelines ===")
for p in pipelines:
print(f" [{p.status}] {p.name}")
print(f" {p.source} -> {p.sink} | {p.throughput} | Latency: {p.latency}")
Developer Platform
=== Internal Developer Platform ===
Pipeline Template — YAML Config
pipeline:
name: user-events-to-delta
source:
type: kafka
topic: user-events
format: json
schema: schemas/user_event.avsc
transformations:
- type: filter
condition: "event_type != 'heartbeat'"
- type: watermark
column: timestamp
delay: "10 minutes"
- type: aggregate
window: "5 minutes"
group_by: [event_type, country]
metrics:
- count: event_count
- sum(amount): total_amount
- approx_count_distinct(user_id): unique_users
sink:
type: delta
path: /data/delta/user_events_agg
mode: append
checkpoint: /checkpoints/user_events
monitoring:
alert_on_lag: 60 # seconds
alert_channel: slack
Self-service Pipeline Builder
1. Data Engineer เลือก Template
2. กำหนด Source (Kafka Topic, Database)
3. กำหนด Transformation
4. กำหนด Sink (Delta Lake, Elasticsearch)
5. ตั้ง Monitoring Alert
6. Submit -> CI/CD Deploy อัตโนมัติ
@dataclass
class PlatformFeature:
feature: str
description: str
tool: str
status: str
features = [
PlatformFeature("Pipeline Templates", "Template สำเร็จรูปสำหรับ Common Patterns", "YAML Config", "GA"),
PlatformFeature("Self-service UI", "สร้าง Pipeline ผ่าน Web UI", "React + FastAPI", "Beta"),
PlatformFeature("Schema Registry", "จัดการ Schema Avro Protobuf", "Confluent SR", "GA"),
PlatformFeature("Data Catalog", "ค้นหา Dataset Lineage", "DataHub", "GA"),
PlatformFeature("Monitoring", "Pipeline Health Lag Alert", "Grafana + PagerDuty", "GA"),
PlatformFeature("CI/CD", "Auto Deploy Pipeline Code", "GitHub Actions + ArgoCD", "GA"),
PlatformFeature("Cost Dashboard", "Track Spark Cluster Cost", "Grafana + Prometheus", "Beta"),
]
print("\n=== Platform Features ===")
for f in features:
print(f" [{f.status}] {f.feature}")
print(f" {f.description} | Tool: {f.tool}")
Production Operations
=== Production Monitoring ===
Spark UI Metrics
spark.metrics.conf:
*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusSink
*.sink.prometheus.port=8090
Key Metrics
- spark_streaming_lastCompletedBatch_processingDelay
- spark_streaming_lastCompletedBatch_totalDelay
- spark_streaming_numRecordsPerSecond
- spark_streaming_stateOperator_numRows
- kafka_consumer_lag
Kubernetes — Spark Operator
helm install spark-operator spark-operator/spark-operator
kubectl apply -f spark-application.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: streaming-pipeline
spec:
type: Python
mode: cluster
image: spark-streaming:latest
mainApplicationFile: local:///app/pipeline.py
sparkConf:
spark.streaming.kafka.maxRatePerPartition: "1000"
spark.sql.shuffle.partitions: "200"
driver:
cores: 2
memory: "4g"
executor:
cores: 4
instances: 3
memory: "8g"
operational_metrics = {
"Active Pipelines": "15",
"Total Throughput": "500K events/sec",
"Avg Processing Delay": "3.2 seconds",
"Kafka Consumer Lag": "1,200 messages",
"Spark Executors": "45 (3 clusters)",
"Delta Tables": "28",
"Daily Data Processed": "2.5 TB",
"Monthly Cost": "$4,500",
}
print("Operations Dashboard:")
for k, v in operational_metrics.items():
print(f" {k}: {v}")
best_practices = [
"Checkpoint ทุก Pipeline ป้องกัน Data Loss",
"Watermark สำหรับ Late Data ไม่เกิน Event Time",
"Kafka Consumer Group แยกต่อ Pipeline",
"Auto-scaling Executor ตาม Lag",
"Delta Lake Optimize + Z-Order ทุกสัปดาห์",
"Schema Evolution รองรับ Column เพิ่มลด",
"Monitor Consumer Lag Alert ที่ 10,000 messages",
]
print(f"\n\nBest Practices:")
for i, p in enumerate(best_practices, 1):
print(f" {i}. {p}")
เคล็ดลับ
- Checkpoint: ใช้ Checkpoint ทุก Pipeline ป้องกัน Data Loss
- Watermark: ตั้ง Watermark สำหรับ Late Data เสมอ
- Delta: ใช้ Delta Lake สำหรับ ACID Transactions
- Template: สร้าง Template ให้ Team ใช้ซ้ำ ลดเวลา
- Monitor: Alert บน Kafka Lag และ Processing Delay
Spark Structured Streaming คืออะไร
Stream Processing Apache Spark DataFrame Real-time Kafka Kinesis Exactly-once Watermark Windowed Stateful Python Scala