Delta Lake Event Driven Design — ออกแบบ Data
Delta Lake Event Driven
Delta Lake Event Driven ACID Transaction Change Data Feed Streaming Spark Merge Upsert Schema Evolution Time Travel S3 ADLS GCS
| Feature | Delta Lake | Apache Iceberg | Apache Hudi |
|---|---|---|---|
| ACID Transactions | Yes | Yes | Yes |
| Change Data Feed | CDF (Built-in) | Incremental Read | CDC (Built-in) |
| Streaming | Spark Structured Streaming | Flink | Spark/Flink |
| Merge/Upsert | MERGE INTO | MERGE INTO | Upsert |
| Time Travel | Version/Timestamp | Snapshot | Timeline |
| Optimize | OPTIMIZE + Z-ORDER | Compaction | Compaction |
Architecture & Setup
# === Delta Lake Event-driven Pipeline ===
# pip install delta-spark pyspark
# from pyspark.sql import SparkSession
# from delta.tables import DeltaTable
#
# spark = SparkSession.builder \
# .appName("DeltaEventDriven") \
# .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
# .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
# .getOrCreate()
# Create Delta Table with CDF enabled
# spark.sql("""
# CREATE TABLE events (
# event_id STRING,
# user_id STRING,
# event_type STRING,
# payload STRING,
# event_time TIMESTAMP
# ) USING DELTA
# PARTITIONED BY (date(event_time))
# TBLPROPERTIES (delta.enableChangeDataFeed = true)
# """)
# Streaming Read from Delta Table
# stream_df = spark.readStream \
# .format("delta") \
# .option("readChangeFeed", "true") \
# .option("startingVersion", 0) \
# .table("events")
# Auto Loader - Load new files automatically
# auto_df = spark.readStream \
# .format("cloudFiles") \
# .option("cloudFiles.format", "json") \
# .option("cloudFiles.schemaLocation", "/schema/events") \
# .load("s3://raw-data/events/")
from dataclasses import dataclass
@dataclass
class PipelineStage:
stage: str
source: str
processing: str
sink: str
trigger: str
stages = [
PipelineStage("Ingestion (Bronze)",
"Kafka / S3 / API Webhook",
"Auto Loader หรือ Kafka Source → Raw Delta Table",
"delta://bronze/events",
"Continuous (Streaming) หรือ Trigger.AvailableNow"),
PipelineStage("Cleansing (Silver)",
"delta://bronze/events (CDF)",
"Schema Validation, Dedup, Type Cast, Null Handling",
"delta://silver/events_clean",
"CDF Trigger เมื่อ Bronze มีข้อมูลใหม่"),
PipelineStage("Aggregation (Gold)",
"delta://silver/events_clean (CDF)",
"Aggregate, Join, Business Logic, Metrics",
"delta://gold/event_metrics",
"CDF Trigger เมื่อ Silver มีข้อมูลใหม่"),
PipelineStage("Serving",
"delta://gold/event_metrics",
"Query via Trino/Presto/Spark SQL",
"Dashboard / API / ML Feature Store",
"On-demand Query"),
]
print("=== Pipeline Stages ===")
for s in stages:
print(f"\n [{s.stage}]")
print(f" Source: {s.source}")
print(f" Process: {s.processing}")
print(f" Sink: {s.sink}")
print(f" Trigger: {s.trigger}")
Change Data Feed & Merge
# === Change Data Feed & Merge Operations ===
# Read Change Data Feed
# changes_df = spark.read \
# .format("delta") \
# .option("readChangeFeed", "true") \
# .option("startingVersion", 5) \
# .option("endingVersion", 10) \
# .table("events")
#
# # CDF columns: _change_type, _commit_version, _commit_timestamp
# # _change_type: insert, update_preimage, update_postimage, delete
#
# inserts = changes_df.filter("_change_type = 'insert'")
# updates = changes_df.filter("_change_type = 'update_postimage'")
# deletes = changes_df.filter("_change_type = 'delete'")
# Merge (Upsert) - Exactly-once Processing
# deltaTable = DeltaTable.forPath(spark, "delta://silver/users")
# deltaTable.alias("target").merge(
# source_df.alias("source"),
# "target.user_id = source.user_id"
# ).whenMatchedUpdate(set={
# "name": "source.name",
# "email": "source.email",
# "updated_at": "source.event_time"
# }).whenNotMatchedInsert(values={
# "user_id": "source.user_id",
# "name": "source.name",
# "email": "source.email",
# "created_at": "source.event_time",
# "updated_at": "source.event_time"
# }).execute()
@dataclass
class MergePattern:
pattern: str
use_case: str
match_condition: str
matched_action: str
not_matched_action: str
patterns = [
MergePattern("SCD Type 1 (Overwrite)",
"อัพเดทข้อมูลล่าสุด ไม่เก็บประวัติ",
"target.id = source.id",
"UPDATE SET * (Overwrite ทุก Column)",
"INSERT * (Insert Row ใหม่)"),
MergePattern("SCD Type 2 (History)",
"เก็บประวัติทุก Version",
"target.id = source.id AND target.is_current = true",
"UPDATE SET is_current=false, end_date=now()",
"INSERT with is_current=true, start_date=now()"),
MergePattern("Deduplication",
"ลบ Duplicate Event",
"target.event_id = source.event_id",
"ไม่ทำอะไร (Skip)",
"INSERT * (Insert เฉพาะ Event ใหม่)"),
MergePattern("Delete + Insert",
"Replace Partition ด้วยข้อมูลใหม่",
"target.date = source.date",
"DELETE (ลบ Partition เก่า)",
"INSERT * (Insert ข้อมูลใหม่ทั้ง Partition)"),
]
print("=== Merge Patterns ===")
for m in patterns:
print(f"\n [{m.pattern}] {m.use_case}")
print(f" Match: {m.match_condition}")
print(f" Matched: {m.matched_action}")
print(f" Not Matched: {m.not_matched_action}")
Operations & Monitoring
# === Delta Lake Operations ===
@dataclass
class Operation:
operation: str
command: str
frequency: str
impact: str
operations = [
Operation("OPTIMIZE (Compaction)",
"OPTIMIZE delta.`s3://data/events` ZORDER BY (user_id, event_type)",
"ทุกวัน หรือเมื่อ Small Files > 1000",
"Query เร็วขึ้น 2-10x ลด File Count"),
Operation("VACUUM (Cleanup)",
"VACUUM delta.`s3://data/events` RETAIN 168 HOURS",
"ทุกสัปดาห์",
"ลดพื้นที่ Storage 30-50% หลัง Vacuum"),
Operation("ANALYZE (Statistics)",
"ANALYZE TABLE events COMPUTE STATISTICS FOR ALL COLUMNS",
"หลัง OPTIMIZE",
"Query Optimizer เลือก Plan ดีขึ้น"),
Operation("DESCRIBE HISTORY",
"DESCRIBE HISTORY delta.`s3://data/events`",
"เมื่อต้อง Debug หรือ Audit",
"ดู Version History ทุกการเปลี่ยนแปลง"),
Operation("RESTORE (Time Travel)",
"RESTORE TABLE events TO VERSION AS OF 42",
"เมื่อต้อง Rollback ข้อมูลผิด",
"คืนข้อมูลกลับไป Version ก่อนหน้า"),
]
print("=== Operations ===")
for o in operations:
print(f" [{o.operation}]")
print(f" Command: {o.command}")
print(f" Frequency: {o.frequency}")
print(f" Impact: {o.impact}")
เคล็ดลับ
- CDF: เปิด Change Data Feed ทุก Table สำหรับ Event-driven
- Merge: ใช้ Merge แทน Overwrite สำหรับ Exactly-once
- Z-Order: Z-Order ตาม Column ที่ Filter บ่อยที่สุด
- Partition: Partition ตาม Date ไม่เกิน 10K Partitions
- Vacuum: VACUUM ทุกสัปดาห์ Retain 7 วัน ลด Storage Cost
Delta Lake คืออะไร
Open Source Storage Layer ACID Transaction S3 ADLS GCS Spark Merge Time Travel CDF Schema Evolution Optimize Vacuum Parquet
อ่านเพิ่ม: Apache Kafka เจาะลึก สอน Kafka Streams, Connect, Schema Regi · อ่านเพิ่ม: PostgreSQL ขั้นสูง สอน Indexing, Query Optimization, Replica · อ่านเพิ่ม: Event-Driven Architecture คืออะไร? สอนออกแบบระบบ Event Sourc