Delta Lake Stream Processing คืออะไร
Delta Lake เป็น open-source storage layer ที่เพิ่ม ACID transactions, schema enforcement และ time travel ให้กับ data lakes ที่ใช้ Apache Parquet format พัฒนาโดย Databricks รองรับทั้ง batch และ streaming workloads บน Apache Spark Stream Processing คือการประมวลผลข้อมูลแบบ real-time หรือ near-real-time เมื่อข้อมูลเข้ามา การรวม Delta Lake กับ Stream Processing ช่วยให้สร้าง reliable streaming pipelines ที่มี exactly-once semantics, schema evolution และ time travel สำหรับ debugging
Delta Lake Architecture
# delta_arch.py — Delta Lake architecture
import json
class DeltaLakeArchitecture:
FEATURES = {
"acid_transactions": {
"name": "ACID Transactions",
"description": "Serializable isolation level — concurrent reads/writes ปลอดภัย",
"benefit": "ไม่มี partial writes, ไม่มี dirty reads — data consistency",
},
"schema_enforcement": {
"name": "Schema Enforcement & Evolution",
"description": "บังคับ schema เมื่อ write — reject ข้อมูลที่ไม่ตรง schema",
"benefit": "ป้องกัน bad data เข้า table + evolve schema ได้อย่างปลอดภัย",
},
"time_travel": {
"name": "Time Travel (Data Versioning)",
"description": "อ่านข้อมูล ณ เวลาใดก็ได้ในอดีต — ทุก write สร้าง version ใหม่",
"benefit": "Audit, debugging, rollback, reproduce ML training data",
},
"unified_batch_streaming": {
"name": "Unified Batch & Streaming",
"description": "ใช้ Delta table เดียวกันสำหรับทั้ง batch reads/writes และ streaming",
"benefit": "ไม่ต้องมี separate systems สำหรับ batch/streaming — Lambda → Delta",
},
"change_data_feed": {
"name": "Change Data Feed (CDF)",
"description": "Track row-level changes (insert, update, delete) — CDC for downstream",
"benefit": "Stream changes ไป downstream systems — incremental processing",
},
}
LAYERS = {
"bronze": "Raw data — ingest as-is from sources (Kafka, files, APIs)",
"silver": "Cleaned data — deduplicated, validated, enriched",
"gold": "Business-level aggregations — dashboards, ML features, reports",
}
def show_features(self):
print("=== Delta Lake Features ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}]")
print(f" {feat['description']}")
print()
def show_layers(self):
print("=== Medallion Architecture ===")
for layer, desc in self.LAYERS.items():
print(f" [{layer}] {desc}")
arch = DeltaLakeArchitecture()
arch.show_features()
arch.show_layers()
Streaming with Delta Lake
# streaming.py — Stream processing with Delta Lake
import json
class DeltaStreaming:
PATTERNS = {
"kafka_to_delta": {
"name": "Kafka → Delta Lake (Bronze)",
"description": "Ingest streaming data จาก Kafka ไป Delta table",
"code": """
# PySpark Structured Streaming: Kafka → Delta
df = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "events")
.option("startingOffsets", "latest")
.load()
)
# Parse JSON payload
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, TimestampType
schema = StructType() \\
.add("event_id", StringType()) \\
.add("user_id", StringType()) \\
.add("event_type", StringType()) \\
.add("timestamp", TimestampType()) \\
.add("payload", StringType())
parsed = df.select(
from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")
# Write to Delta (Bronze)
(parsed.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/bronze_events")
.trigger(processingTime="10 seconds")
.toTable("bronze.events")
)
""",
},
"delta_to_delta": {
"name": "Delta → Delta (Bronze → Silver)",
"description": "Stream จาก Bronze Delta table → transform → write Silver",
"code": """
# Stream from Bronze → transform → Silver
bronze_stream = (spark.readStream
.format("delta")
.table("bronze.events")
)
# Transform: deduplicate, validate, enrich
silver = (bronze_stream
.dropDuplicates(["event_id"])
.filter(col("event_type").isNotNull())
.withColumn("processed_at", current_timestamp())
)
# Write to Silver
(silver.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/silver_events")
.trigger(processingTime="30 seconds")
.toTable("silver.events")
)
""",
},
"cdf_streaming": {
"name": "Change Data Feed Streaming",
"description": "Stream row-level changes จาก Delta table",
"code": """
# Enable CDF on table
spark.sql("ALTER TABLE silver.events SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
# Read changes as stream
changes = (spark.readStream
.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.table("silver.events")
)
# changes has columns: _change_type (insert/update_preimage/update_postimage/delete)
# Process changes → send to downstream
""",
},
}
def show_patterns(self):
print("=== Streaming Patterns ===\n")
for key, p in self.PATTERNS.items():
print(f"[{p['name']}]")
print(f" {p['description']}")
print()
streaming = DeltaStreaming()
streaming.show_patterns()
Python Implementation
# implementation.py — Python Delta Lake streaming tools
import json
class DeltaImplementation:
CODE = """
# delta_pipeline.py — Delta Lake streaming pipeline manager
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, current_timestamp, window, count, avg
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType
from delta.tables import DeltaTable
class DeltaStreamPipeline:
def __init__(self, app_name="delta-stream"):
self.spark = (SparkSession.builder
.appName(app_name)
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
def ingest_from_kafka(self, topic, servers, delta_table, checkpoint):
'''Ingest from Kafka to Delta Bronze'''
raw = (self.spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("maxOffsetsPerTrigger", 10000)
.load()
)
schema = StructType() \\
.add("event_id", StringType()) \\
.add("user_id", StringType()) \\
.add("event_type", StringType()) \\
.add("amount", DoubleType()) \\
.add("timestamp", TimestampType())
parsed = raw.select(
from_json(col("value").cast("string"), schema).alias("data"),
col("topic"), col("partition"), col("offset"), col("timestamp").alias("kafka_ts"),
).select("data.*", "topic", "partition", "offset", "kafka_ts")
query = (parsed.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint)
.trigger(processingTime="10 seconds")
.toTable(delta_table)
)
return query
def transform_bronze_to_silver(self, bronze_table, silver_table, checkpoint):
'''Transform Bronze → Silver with dedup and validation'''
bronze = (self.spark.readStream
.format("delta")
.table(bronze_table)
)
silver = (bronze
.dropDuplicates(["event_id"])
.filter(col("event_type").isNotNull())
.filter(col("amount") > 0)
.withColumn("processed_at", current_timestamp())
)
query = (silver.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", checkpoint)
.trigger(processingTime="30 seconds")
.toTable(silver_table)
)
return query
def aggregate_to_gold(self, silver_table, gold_table, checkpoint):
'''Aggregate Silver → Gold (windowed aggregations)'''
silver = (self.spark.readStream
.format("delta")
.table(silver_table)
)
gold = (silver
.withWatermark("timestamp", "1 hour")
.groupBy(
window(col("timestamp"), "1 hour"),
col("event_type"),
)
.agg(
count("*").alias("event_count"),
avg("amount").alias("avg_amount"),
)
)
query = (gold.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", checkpoint)
.trigger(processingTime="1 minute")
.toTable(gold_table)
)
return query
def merge_upsert(self, source_df, target_table, merge_key):
'''Merge/Upsert into Delta table'''
target = DeltaTable.forName(self.spark, target_table)
(target.alias("target")
.merge(source_df.alias("source"), f"target.{merge_key} = source.{merge_key}")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
# pipeline = DeltaStreamPipeline("my-pipeline")
# q1 = pipeline.ingest_from_kafka("events", "kafka:9092", "bronze.events", "/cp/bronze")
# q2 = pipeline.transform_bronze_to_silver("bronze.events", "silver.events", "/cp/silver")
# q3 = pipeline.aggregate_to_gold("silver.events", "gold.hourly_stats", "/cp/gold")
"""
def show_code(self):
print("=== Delta Pipeline ===")
print(self.CODE[:600])
impl = DeltaImplementation()
impl.show_code()
Optimization & Monitoring
# optimization.py — Delta Lake streaming optimization
import json
class DeltaOptimization:
TIPS = {
"auto_optimize": {
"name": "Auto Optimize",
"description": "Auto compact small files + optimize write",
"config": "delta.autoOptimize.optimizeWrite=true, delta.autoOptimize.autoCompact=true",
},
"z_order": {
"name": "Z-Order Clustering",
"description": "Colocate related data — ทำให้ query เร็วขึ้น",
"config": "OPTIMIZE table ZORDER BY (user_id, timestamp)",
},
"partition": {
"name": "Partitioning",
"description": "Partition by date — ลด data scan",
"config": "PARTITIONED BY (date) — ไม่ partition by high-cardinality columns",
},
"checkpoint_interval": {
"name": "Checkpoint Interval",
"description": "Delta log checkpoint ทุก 10 commits — เร็วขึ้นเมื่อ read",
"config": "delta.checkpointInterval=10",
},
"trigger": {
"name": "Trigger Interval",
"description": "ตั้ง trigger ให้เหมาะสม — balance latency vs throughput",
"config": "processingTime='10 seconds' (latency) vs '1 minute' (throughput)",
},
}
MONITORING = {
"streaming_query": "spark.streams.active — ดู streaming queries ที่ running",
"progress": "query.lastProgress — ดู input rows, processing time, state",
"metrics": "numInputRows, inputRowsPerSecond, processedRowsPerSecond",
"delta_history": "DESCRIBE HISTORY table — ดู version history, operations, metrics",
"table_detail": "DESCRIBE DETAIL table — ดู size, files, partitions",
}
def show_tips(self):
print("=== Optimization Tips ===\n")
for key, tip in self.TIPS.items():
print(f"[{tip['name']}]")
print(f" {tip['description']}")
print(f" Config: {tip['config']}")
print()
def show_monitoring(self):
print("=== Monitoring ===")
for name, desc in self.MONITORING.items():
print(f" [{name}] {desc}")
opt = DeltaOptimization()
opt.show_tips()
opt.show_monitoring()
Use Cases
# use_cases.py — Delta Lake streaming use cases
import json
class DeltaUseCases:
CASES = {
"real_time_analytics": {
"name": "Real-time Analytics Dashboard",
"flow": "Kafka → Bronze (raw) → Silver (clean) → Gold (aggregated) → Dashboard",
"benefit": "Near real-time dashboards กับ historical data ใน table เดียว",
},
"cdc_replication": {
"name": "CDC Database Replication",
"flow": "MySQL CDC (Debezium) → Kafka → Delta Lake → Data Warehouse",
"benefit": "Replicate production DB changes to data lake — near real-time",
},
"ml_feature_store": {
"name": "ML Feature Store",
"flow": "Events → Delta Lake → Feature computation → Feature table → ML training",
"benefit": "Time travel สำหรับ reproduce training data, versioned features",
},
"event_sourcing": {
"name": "Event Sourcing",
"flow": "Application events → Delta Lake (append-only) → Materialized views",
"benefit": "ACID transactions + time travel = perfect event store",
},
}
def show_cases(self):
print("=== Use Cases ===\n")
for key, case in self.CASES.items():
print(f"[{case['name']}]")
print(f" Flow: {case['flow']}")
print(f" Benefit: {case['benefit']}")
print()
cases = DeltaUseCases()
cases.show_cases()
FAQ - คำถามที่พบบ่อย
Q: Delta Lake กับ Apache Iceberg อันไหนดีกว่า?
A: Delta Lake: Databricks ecosystem, Spark-first, mature streaming support, Unity Catalog Iceberg: Vendor-neutral, multi-engine (Spark, Trino, Flink), growing fast เลือก Delta Lake: ถ้าใช้ Databricks/Spark เป็นหลัก — integration ดีสุด เลือก Iceberg: ถ้าต้องการ vendor-neutral, multi-engine, ไม่ผูกกับ Spark ทั้งสอง: ACID transactions, time travel, schema evolution — features คล้ายกัน
Q: Delta Lake streaming กับ Kafka Streams ต่างกันอย่างไร?
A: Delta Lake Streaming: Spark-based, batch + stream unified, complex transformations, ML integration Kafka Streams: Java library, lightweight, low-latency (ms), stateful processing เลือก Delta Lake: ถ้าต้องการ batch+stream unified, complex analytics, ML features เลือก Kafka Streams: ถ้าต้องการ low-latency (< 100ms), lightweight, Java/Kotlin app ใช้ร่วมกัน: Kafka Streams สำหรับ real-time processing → Delta Lake สำหรับ storage + analytics
Q: Small files problem คืออะไร?
A: Streaming writes สร้าง files เล็กมากจำนวนมาก (micro-batches) → query ช้า (ต้อง open หลาย files) แก้ไข: Auto Optimize (delta.autoOptimize.optimizeWrite=true), OPTIMIZE command (compact files), Auto Compaction แนะนำ: ตั้ง trigger interval ให้ไม่ถี่เกินไป + enable auto optimize + run OPTIMIZE เป็น schedule
Q: Exactly-once semantics ทำได้จริงไหม?
A: ได้ — Delta Lake + Structured Streaming ให้ exactly-once end-to-end: Checkpoint: Spark เก็บ offset ที่ process แล้ว — restart ไม่ซ้ำ ACID writes: Delta Lake ไม่มี partial writes — all or nothing Idempotent: reprocess batch เดิมได้ผลเหมือนเดิม ข้อแม้: source ต้อง replayable (Kafka ✓, socket ✗) + sink ต้อง idempotent (Delta ✓)
