SiamCafe · Blog
Delta Lake Event Driven Design — ออกแบบ Data
บทความ

Delta Lake Event Driven Design — ออกแบบ Data

เผยแพร่ 28 พฤษภาคม 2569

Delta Lake Event Driven

Delta Lake Event Driven ACID Transaction Change Data Feed Streaming Spark Merge Upsert Schema Evolution Time Travel S3 ADLS GCS

FeatureDelta LakeApache IcebergApache Hudi
ACID TransactionsYesYesYes
Change Data FeedCDF (Built-in)Incremental ReadCDC (Built-in)
StreamingSpark Structured StreamingFlinkSpark/Flink
Merge/UpsertMERGE INTOMERGE INTOUpsert
Time TravelVersion/TimestampSnapshotTimeline
OptimizeOPTIMIZE + Z-ORDERCompactionCompaction

Architecture & Setup

# === Delta Lake Event-driven Pipeline ===

# pip install delta-spark pyspark

# from pyspark.sql import SparkSession
# from delta.tables import DeltaTable
#
# spark = SparkSession.builder \
#     .appName("DeltaEventDriven") \
#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
#     .getOrCreate()

# Create Delta Table with CDF enabled
# spark.sql("""
#     CREATE TABLE events (
#         event_id STRING,
#         user_id STRING,
#         event_type STRING,
#         payload STRING,
#         event_time TIMESTAMP
#     ) USING DELTA
#     PARTITIONED BY (date(event_time))
#     TBLPROPERTIES (delta.enableChangeDataFeed = true)
# """)

# Streaming Read from Delta Table
# stream_df = spark.readStream \
#     .format("delta") \
#     .option("readChangeFeed", "true") \
#     .option("startingVersion", 0) \
#     .table("events")

# Auto Loader - Load new files automatically
# auto_df = spark.readStream \
#     .format("cloudFiles") \
#     .option("cloudFiles.format", "json") \
#     .option("cloudFiles.schemaLocation", "/schema/events") \
#     .load("s3://raw-data/events/")

from dataclasses import dataclass

@dataclass
class PipelineStage:
    stage: str
    source: str
    processing: str
    sink: str
    trigger: str

stages = [
    PipelineStage("Ingestion (Bronze)",
        "Kafka / S3 / API Webhook",
        "Auto Loader หรือ Kafka Source → Raw Delta Table",
        "delta://bronze/events",
        "Continuous (Streaming) หรือ Trigger.AvailableNow"),
    PipelineStage("Cleansing (Silver)",
        "delta://bronze/events (CDF)",
        "Schema Validation, Dedup, Type Cast, Null Handling",
        "delta://silver/events_clean",
        "CDF Trigger เมื่อ Bronze มีข้อมูลใหม่"),
    PipelineStage("Aggregation (Gold)",
        "delta://silver/events_clean (CDF)",
        "Aggregate, Join, Business Logic, Metrics",
        "delta://gold/event_metrics",
        "CDF Trigger เมื่อ Silver มีข้อมูลใหม่"),
    PipelineStage("Serving",
        "delta://gold/event_metrics",
        "Query via Trino/Presto/Spark SQL",
        "Dashboard / API / ML Feature Store",
        "On-demand Query"),
]

print("=== Pipeline Stages ===")
for s in stages:
    print(f"\n  [{s.stage}]")
    print(f"    Source: {s.source}")
    print(f"    Process: {s.processing}")
    print(f"    Sink: {s.sink}")
    print(f"    Trigger: {s.trigger}")

Change Data Feed & Merge

# === Change Data Feed & Merge Operations ===

# Read Change Data Feed
# changes_df = spark.read \
#     .format("delta") \
#     .option("readChangeFeed", "true") \
#     .option("startingVersion", 5) \
#     .option("endingVersion", 10) \
#     .table("events")
#
# # CDF columns: _change_type, _commit_version, _commit_timestamp
# # _change_type: insert, update_preimage, update_postimage, delete
#
# inserts = changes_df.filter("_change_type = 'insert'")
# updates = changes_df.filter("_change_type = 'update_postimage'")
# deletes = changes_df.filter("_change_type = 'delete'")

# Merge (Upsert) - Exactly-once Processing
# deltaTable = DeltaTable.forPath(spark, "delta://silver/users")
# deltaTable.alias("target").merge(
#     source_df.alias("source"),
#     "target.user_id = source.user_id"
# ).whenMatchedUpdate(set={
#     "name": "source.name",
#     "email": "source.email",
#     "updated_at": "source.event_time"
# }).whenNotMatchedInsert(values={
#     "user_id": "source.user_id",
#     "name": "source.name",
#     "email": "source.email",
#     "created_at": "source.event_time",
#     "updated_at": "source.event_time"
# }).execute()

@dataclass
class MergePattern:
    pattern: str
    use_case: str
    match_condition: str
    matched_action: str
    not_matched_action: str

patterns = [
    MergePattern("SCD Type 1 (Overwrite)",
        "อัพเดทข้อมูลล่าสุด ไม่เก็บประวัติ",
        "target.id = source.id",
        "UPDATE SET * (Overwrite ทุก Column)",
        "INSERT * (Insert Row ใหม่)"),
    MergePattern("SCD Type 2 (History)",
        "เก็บประวัติทุก Version",
        "target.id = source.id AND target.is_current = true",
        "UPDATE SET is_current=false, end_date=now()",
        "INSERT with is_current=true, start_date=now()"),
    MergePattern("Deduplication",
        "ลบ Duplicate Event",
        "target.event_id = source.event_id",
        "ไม่ทำอะไร (Skip)",
        "INSERT * (Insert เฉพาะ Event ใหม่)"),
    MergePattern("Delete + Insert",
        "Replace Partition ด้วยข้อมูลใหม่",
        "target.date = source.date",
        "DELETE (ลบ Partition เก่า)",
        "INSERT * (Insert ข้อมูลใหม่ทั้ง Partition)"),
]

print("=== Merge Patterns ===")
for m in patterns:
    print(f"\n  [{m.pattern}] {m.use_case}")
    print(f"    Match: {m.match_condition}")
    print(f"    Matched: {m.matched_action}")
    print(f"    Not Matched: {m.not_matched_action}")

Operations & Monitoring

# === Delta Lake Operations ===

@dataclass
class Operation:
    operation: str
    command: str
    frequency: str
    impact: str

operations = [
    Operation("OPTIMIZE (Compaction)",
        "OPTIMIZE delta.`s3://data/events` ZORDER BY (user_id, event_type)",
        "ทุกวัน หรือเมื่อ Small Files > 1000",
        "Query เร็วขึ้น 2-10x ลด File Count"),
    Operation("VACUUM (Cleanup)",
        "VACUUM delta.`s3://data/events` RETAIN 168 HOURS",
        "ทุกสัปดาห์",
        "ลดพื้นที่ Storage 30-50% หลัง Vacuum"),
    Operation("ANALYZE (Statistics)",
        "ANALYZE TABLE events COMPUTE STATISTICS FOR ALL COLUMNS",
        "หลัง OPTIMIZE",
        "Query Optimizer เลือก Plan ดีขึ้น"),
    Operation("DESCRIBE HISTORY",
        "DESCRIBE HISTORY delta.`s3://data/events`",
        "เมื่อต้อง Debug หรือ Audit",
        "ดู Version History ทุกการเปลี่ยนแปลง"),
    Operation("RESTORE (Time Travel)",
        "RESTORE TABLE events TO VERSION AS OF 42",
        "เมื่อต้อง Rollback ข้อมูลผิด",
        "คืนข้อมูลกลับไป Version ก่อนหน้า"),
]

print("=== Operations ===")
for o in operations:
    print(f"  [{o.operation}]")
    print(f"    Command: {o.command}")
    print(f"    Frequency: {o.frequency}")
    print(f"    Impact: {o.impact}")

เคล็ดลับ

  • CDF: เปิด Change Data Feed ทุก Table สำหรับ Event-driven
  • Merge: ใช้ Merge แทน Overwrite สำหรับ Exactly-once
  • Z-Order: Z-Order ตาม Column ที่ Filter บ่อยที่สุด
  • Partition: Partition ตาม Date ไม่เกิน 10K Partitions
  • Vacuum: VACUUM ทุกสัปดาห์ Retain 7 วัน ลด Storage Cost

Delta Lake คืออะไร

Open Source Storage Layer ACID Transaction S3 ADLS GCS Spark Merge Time Travel CDF Schema Evolution Optimize Vacuum Parquet

อ่านเพิ่ม: Apache Kafka เจาะลึก สอน Kafka Streams, Connect, Schema Regi · อ่านเพิ่ม: PostgreSQL ขั้นสูง สอน Indexing, Query Optimization, Replica · อ่านเพิ่ม: Event-Driven Architecture คืออะไร? สอนออกแบบระบบ Event Sourc