Technology

Delta Lake Event Driven Design

delta lake event driven design
Delta Lake Event Driven Design | SiamCafe Blog
2025-11-18· อ. บอม — SiamCafe.net· 10,152 คำ

Delta Lake Event Driven

Delta Lake Event Driven ACID Transaction Change Data Feed Streaming Spark Merge Upsert Schema Evolution Time Travel S3 ADLS GCS

FeatureDelta LakeApache IcebergApache Hudi
ACID TransactionsYesYesYes
Change Data FeedCDF (Built-in)Incremental ReadCDC (Built-in)
StreamingSpark Structured StreamingFlinkSpark/Flink
Merge/UpsertMERGE INTOMERGE INTOUpsert
Time TravelVersion/TimestampSnapshotTimeline
OptimizeOPTIMIZE + Z-ORDERCompactionCompaction

Architecture & Setup

# === Delta Lake Event-driven Pipeline ===

# pip install delta-spark pyspark

# from pyspark.sql import SparkSession
# from delta.tables import DeltaTable
#
# spark = SparkSession.builder \
#     .appName("DeltaEventDriven") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
#     .getOrCreate()

# Create Delta Table with CDF enabled
# spark.sql("""
#     CREATE TABLE events (
#         event_id STRING,
#         user_id STRING,
#         event_type STRING,
#         payload STRING,
#         event_time TIMESTAMP
#     ) USING DELTA
#     PARTITIONED BY (date(event_time))
#     TBLPROPERTIES (delta.enableChangeDataFeed = true)
# """)

# Streaming Read from Delta Table
# stream_df = spark.readStream \
#     .format("delta") \
#     .option("readChangeFeed", "true") \
#     .option("startingVersion", 0) \
#     .table("events")

# Auto Loader - Load new files automatically
# auto_df = spark.readStream \
#     .format("cloudFiles") \
#     .option("cloudFiles.format", "json") \
#     .option("cloudFiles.schemaLocation", "/schema/events") \
#     .load("s3://raw-data/events/")

from dataclasses import dataclass

@dataclass
class PipelineStage:
    stage: str
    source: str
    processing: str
    sink: str
    trigger: str

stages = [
    PipelineStage("Ingestion (Bronze)",
        "Kafka / S3 / API Webhook",
        "Auto Loader หรือ Kafka Source → Raw Delta Table",
        "delta://bronze/events",
        "Continuous (Streaming) หรือ Trigger.AvailableNow"),
    PipelineStage("Cleansing (Silver)",
        "delta://bronze/events (CDF)",
        "Schema Validation, Dedup, Type Cast, Null Handling",
        "delta://silver/events_clean",
        "CDF Trigger เมื่อ Bronze มีข้อมูลใหม่"),
    PipelineStage("Aggregation (Gold)",
        "delta://silver/events_clean (CDF)",
        "Aggregate, Join, Business Logic, Metrics",
        "delta://gold/event_metrics",
        "CDF Trigger เมื่อ Silver มีข้อมูลใหม่"),
    PipelineStage("Serving",
        "delta://gold/event_metrics",
        "Query via Trino/Presto/Spark SQL",
        "Dashboard / API / ML Feature Store",
        "On-demand Query"),
]

print("=== Pipeline Stages ===")
for s in stages:
    print(f"\n  [{s.stage}]")
    print(f"    Source: {s.source}")
    print(f"    Process: {s.processing}")
    print(f"    Sink: {s.sink}")
    print(f"    Trigger: {s.trigger}")

Change Data Feed & Merge

# === Change Data Feed & Merge Operations ===

# Read Change Data Feed
# changes_df = spark.read \
#     .format("delta") \
#     .option("readChangeFeed", "true") \
#     .option("startingVersion", 5) \
#     .option("endingVersion", 10) \
#     .table("events")
#
# # CDF columns: _change_type, _commit_version, _commit_timestamp
# # _change_type: insert, update_preimage, update_postimage, delete
#
# inserts = changes_df.filter("_change_type = 'insert'")
# updates = changes_df.filter("_change_type = 'update_postimage'")
# deletes = changes_df.filter("_change_type = 'delete'")

# Merge (Upsert) - Exactly-once Processing
# deltaTable = DeltaTable.forPath(spark, "delta://silver/users")
# deltaTable.alias("target").merge(
#     source_df.alias("source"),
#     "target.user_id = source.user_id"
# ).whenMatchedUpdate(set={
#     "name": "source.name",
#     "email": "source.email",
#     "updated_at": "source.event_time"
# }).whenNotMatchedInsert(values={
#     "user_id": "source.user_id",
#     "name": "source.name",
#     "email": "source.email",
#     "created_at": "source.event_time",
#     "updated_at": "source.event_time"
# }).execute()

@dataclass
class MergePattern:
    pattern: str
    use_case: str
    match_condition: str
    matched_action: str
    not_matched_action: str

patterns = [
    MergePattern("SCD Type 1 (Overwrite)",
        "อัพเดทข้อมูลล่าสุด ไม่เก็บประวัติ",
        "target.id = source.id",
        "UPDATE SET * (Overwrite ทุก Column)",
        "INSERT * (Insert Row ใหม่)"),
    MergePattern("SCD Type 2 (History)",
        "เก็บประวัติทุก Version",
        "target.id = source.id AND target.is_current = true",
        "UPDATE SET is_current=false, end_date=now()",
        "INSERT with is_current=true, start_date=now()"),
    MergePattern("Deduplication",
        "ลบ Duplicate Event",
        "target.event_id = source.event_id",
        "ไม่ทำอะไร (Skip)",
        "INSERT * (Insert เฉพาะ Event ใหม่)"),
    MergePattern("Delete + Insert",
        "Replace Partition ด้วยข้อมูลใหม่",
        "target.date = source.date",
        "DELETE (ลบ Partition เก่า)",
        "INSERT * (Insert ข้อมูลใหม่ทั้ง Partition)"),
]

print("=== Merge Patterns ===")
for m in patterns:
    print(f"\n  [{m.pattern}] {m.use_case}")
    print(f"    Match: {m.match_condition}")
    print(f"    Matched: {m.matched_action}")
    print(f"    Not Matched: {m.not_matched_action}")

Operations & Monitoring

# === Delta Lake Operations ===

@dataclass
class Operation:
    operation: str
    command: str
    frequency: str
    impact: str

operations = [
    Operation("OPTIMIZE (Compaction)",
        "OPTIMIZE delta.`s3://data/events` ZORDER BY (user_id, event_type)",
        "ทุกวัน หรือเมื่อ Small Files > 1000",
        "Query เร็วขึ้น 2-10x ลด File Count"),
    Operation("VACUUM (Cleanup)",
        "VACUUM delta.`s3://data/events` RETAIN 168 HOURS",
        "ทุกสัปดาห์",
        "ลดพื้นที่ Storage 30-50% หลัง Vacuum"),
    Operation("ANALYZE (Statistics)",
        "ANALYZE TABLE events COMPUTE STATISTICS FOR ALL COLUMNS",
        "หลัง OPTIMIZE",
        "Query Optimizer เลือก Plan ดีขึ้น"),
    Operation("DESCRIBE HISTORY",
        "DESCRIBE HISTORY delta.`s3://data/events`",
        "เมื่อต้อง Debug หรือ Audit",
        "ดู Version History ทุกการเปลี่ยนแปลง"),
    Operation("RESTORE (Time Travel)",
        "RESTORE TABLE events TO VERSION AS OF 42",
        "เมื่อต้อง Rollback ข้อมูลผิด",
        "คืนข้อมูลกลับไป Version ก่อนหน้า"),
]

print("=== Operations ===")
for o in operations:
    print(f"  [{o.operation}]")
    print(f"    Command: {o.command}")
    print(f"    Frequency: {o.frequency}")
    print(f"    Impact: {o.impact}")

เคล็ดลับ

Delta Lake คืออะไร

Open Source Storage Layer ACID Transaction S3 ADLS GCS Spark Merge Time Travel CDF Schema Evolution Optimize Vacuum Parquet

Event Driven Design คืออะไร

ขับเคลื่อนด้วย Event CDF Change Data Feed Streaming Auto Loader Kafka Kinesis Low Latency Compute Cost Scalable Bronze Silver Gold

ตั้งค่าอย่างไร

CREATE TABLE USING DELTA enableChangeDataFeed readStream cloudFiles MERGE INTO OPTIMIZE ZORDER VACUUM RETAIN readChangeFeed startingVersion

Production Best Practices คืออะไร

Schema Evolution mergeSchema Partition Z-Order Vacuum Optimize Checkpoint Dead Letter Queue Idempotent Exactly-once foreachBatch Merge Testing

สรุป

Delta Lake Event Driven ACID CDF Change Data Feed Streaming Merge Upsert Bronze Silver Gold Optimize Z-Order Vacuum Time Travel Production

📖 บทความที่เกี่ยวข้อง

Data Lakehouse Event Driven Designอ่านบทความ → Delta Lake Open Source Contributionอ่านบทความ → oVirt Virtualization Event Driven Designอ่านบทความ → Healthchecks.io Domain Driven Design DDDอ่านบทความ → Delta Lake Cloud Migration Strategyอ่านบทความ →

📚 ดูบทความทั้งหมด →