SiamCafe · Blog
Spark Structured Streaming Internal Developer
บทความ

Spark Structured Streaming Internal Developer

เผยแพร่ 28 พฤษภาคม 2569

Spark Streaming Platform

Spark Structured Streaming Internal Developer Platform Kafka Delta Lake Airflow Self-service Data Pipeline Real-time Processing DataFrame Micro-batch Exactly-once

EngineModelLatencyEcosystemเหมาะกับ
Spark StreamingMicro-batchวินาทีใหญ่มากBatch + Stream
Apache FlinkTrue Streamมิลลิวินาทีใหญ่Low-latency
Kafka StreamsTrue StreamมิลลิวินาทีKafka onlyKafka Native
Apache BeamUnifiedขึ้นกับ RunnerMulti-runnerPortable

Structured Streaming

=== Spark Structured Streaming ===

pip install pyspark delta-spark

from pyspark.sql import SparkSession

from pyspark.sql.functions import *

from pyspark.sql.types import *

spark = SparkSession.builder \

.appName("StreamingPipeline") \

.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \

.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

.getOrCreate()

# Read from Kafka

kafka_df = spark.readStream \

.format("kafka") \

.option("kafka.bootstrap.servers", "kafka:9092") \

.option("subscribe", "events") \

.option("startingOffsets", "latest") \

.load()

# Parse JSON

schema = StructType([

StructField("user_id", StringType()),

StructField("event_type", StringType()),

StructField("timestamp", TimestampType()),

StructField("amount", DoubleType()),

])

events = kafka_df \

.select(from_json(col("value").cast("string"), schema).alias("data")) \

.select("data.*")

# Windowed Aggregation

windowed = events \

.withWatermark("timestamp", "10 minutes") \

.groupBy(

window("timestamp", "5 minutes"),

"event_type"

) \

.agg(

count("*").alias("event_count"),

sum("amount").alias("total_amount"),

approx_count_distinct("user_id").alias("unique_users")

)

# Write to Delta Lake

query = windowed.writeStream \

.format("delta") \

.outputMode("append") \

.option("checkpointLocation", "/checkpoints/events") \

.start("/data/delta/event_summary")

from dataclasses import dataclass

@dataclass

class Pipeline:

name: str

source: str

sink: str

throughput: str

latency: str

status: str

pipelines = [

Pipeline("User Events", "Kafka: events", "Delta: event_summary", "50K msg/s", "5s", "Running"),

Pipeline("Order Stream", "Kafka: orders", "Delta: order_facts", "10K msg/s", "3s", "Running"),

Pipeline("Clickstream", "Kafka: clicks", "Delta: click_agg", "200K msg/s", "10s", "Running"),

Pipeline("CDC Postgres", "Debezium: users", "Delta: users_cdc", "5K msg/s", "2s", "Running"),

Pipeline("Log Analytics", "Kafka: app-logs", "Elasticsearch", "100K msg/s", "5s", "Running"),

]

print("=== Streaming Pipelines ===")

for p in pipelines:

print(f" [{p.status}] {p.name}")

print(f" {p.source} -> {p.sink} | {p.throughput} | Latency: {p.latency}")

Developer Platform

=== Internal Developer Platform ===

Pipeline Template — YAML Config

pipeline:

name: user-events-to-delta

source:

type: kafka

topic: user-events

format: json

schema: schemas/user_event.avsc

transformations:

  • type: filter

condition: "event_type != 'heartbeat'"

  • type: watermark

column: timestamp

delay: "10 minutes"

  • type: aggregate

window: "5 minutes"

group_by: [event_type, country]

metrics:

  • count: event_count
  • sum(amount): total_amount
  • approx_count_distinct(user_id): unique_users

sink:

type: delta

path: /data/delta/user_events_agg

mode: append

checkpoint: /checkpoints/user_events

monitoring:

alert_on_lag: 60 # seconds

alert_channel: slack

Self-service Pipeline Builder

1. Data Engineer เลือก Template

2. กำหนด Source (Kafka Topic, Database)

3. กำหนด Transformation

4. กำหนด Sink (Delta Lake, Elasticsearch)

5. ตั้ง Monitoring Alert

6. Submit -> CI/CD Deploy อัตโนมัติ

@dataclass

class PlatformFeature:

feature: str

description: str

tool: str

status: str

features = [

PlatformFeature("Pipeline Templates", "Template สำเร็จรูปสำหรับ Common Patterns", "YAML Config", "GA"),

PlatformFeature("Self-service UI", "สร้าง Pipeline ผ่าน Web UI", "React + FastAPI", "Beta"),

PlatformFeature("Schema Registry", "จัดการ Schema Avro Protobuf", "Confluent SR", "GA"),

PlatformFeature("Data Catalog", "ค้นหา Dataset Lineage", "DataHub", "GA"),

PlatformFeature("Monitoring", "Pipeline Health Lag Alert", "Grafana + PagerDuty", "GA"),

PlatformFeature("CI/CD", "Auto Deploy Pipeline Code", "GitHub Actions + ArgoCD", "GA"),

PlatformFeature("Cost Dashboard", "Track Spark Cluster Cost", "Grafana + Prometheus", "Beta"),

]

print("\n=== Platform Features ===")

for f in features:

print(f" [{f.status}] {f.feature}")

print(f" {f.description} | Tool: {f.tool}")

Production Operations

=== Production Monitoring ===

Spark UI Metrics

spark.metrics.conf:

*.sink.prometheus.class=org.apache.spark.metrics.sink.PrometheusSink

*.sink.prometheus.port=8090

Key Metrics

  • spark_streaming_lastCompletedBatch_processingDelay
  • spark_streaming_lastCompletedBatch_totalDelay
  • spark_streaming_numRecordsPerSecond
  • spark_streaming_stateOperator_numRows
  • kafka_consumer_lag

Kubernetes — Spark Operator

helm install spark-operator spark-operator/spark-operator

kubectl apply -f spark-application.yaml

apiVersion: sparkoperator.k8s.io/v1beta2

kind: SparkApplication

metadata:

name: streaming-pipeline

spec:

type: Python

mode: cluster

image: spark-streaming:latest

mainApplicationFile: local:///app/pipeline.py

sparkConf:

spark.streaming.kafka.maxRatePerPartition: "1000"

spark.sql.shuffle.partitions: "200"

driver:

cores: 2

memory: "4g"

executor:

cores: 4

instances: 3

memory: "8g"

operational_metrics = {

"Active Pipelines": "15",

"Total Throughput": "500K events/sec",

"Avg Processing Delay": "3.2 seconds",

"Kafka Consumer Lag": "1,200 messages",

"Spark Executors": "45 (3 clusters)",

"Delta Tables": "28",

"Daily Data Processed": "2.5 TB",

"Monthly Cost": "$4,500",

}

print("Operations Dashboard:")

for k, v in operational_metrics.items():

print(f" {k}: {v}")

best_practices = [

"Checkpoint ทุก Pipeline ป้องกัน Data Loss",

"Watermark สำหรับ Late Data ไม่เกิน Event Time",

"Kafka Consumer Group แยกต่อ Pipeline",

"Auto-scaling Executor ตาม Lag",

"Delta Lake Optimize + Z-Order ทุกสัปดาห์",

"Schema Evolution รองรับ Column เพิ่มลด",

"Monitor Consumer Lag Alert ที่ 10,000 messages",

]

print(f"\n\nBest Practices:")

for i, p in enumerate(best_practices, 1):

print(f" {i}. {p}")

เคล็ดลับ

  • Checkpoint: ใช้ Checkpoint ทุก Pipeline ป้องกัน Data Loss
  • Watermark: ตั้ง Watermark สำหรับ Late Data เสมอ
  • Delta: ใช้ Delta Lake สำหรับ ACID Transactions
  • Template: สร้าง Template ให้ Team ใช้ซ้ำ ลดเวลา
  • Monitor: Alert บน Kafka Lag และ Processing Delay

Spark Structured Streaming คืออะไร

Stream Processing Apache Spark DataFrame Real-time Kafka Kinesis Exactly-once Watermark Windowed Stateful Python Scala