it

Delta Lake Event Driven Design — ออกแบบ Data

Delta Lake Event Driven Design — ออกแบบ Data

Delta Lake Event Driven

Delta Lake Event Driven Design — ออกแบบ Data

Delta Lake Event Driven ACID Transaction Change Data Feed Streaming Spark Merge Upsert Schema Evolution Time Travel S3 ADLS GCS

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Immutable OS Fedora CoreOS Cloud Migration

FeatureDelta LakeApache IcebergApache Hudi
ACID TransactionsYesYesYes
Change Data FeedCDF (Built-in)Incremental ReadCDC (Built-in)
StreamingSpark Structured StreamingFlinkSpark/Flink
Merge/UpsertMERGE INTOMERGE INTOUpsert
Time TravelVersion/TimestampSnapshotTimeline
OptimizeOPTIMIZE + Z-ORDERCompactionCompaction

Architecture & Setup

# === Delta Lake Event-driven Pipeline ===



# pip install delta-spark pyspark



# from pyspark.sql import SparkSession

# from delta.tables import DeltaTable

#

# spark = SparkSession.builder \

#     .appName("DeltaEventDriven") \

#     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

#     .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

#     .getOrCreate()



# Create Delta Table with CDF enabled

# spark.sql("""

#     CREATE TABLE events (

#         event_id STRING,

#         user_id STRING,

#         event_type STRING,

#         payload STRING,

#         event_time TIMESTAMP

#     ) USING DELTA

#     PARTITIONED BY (date(event_time))

#     TBLPROPERTIES (delta.enableChangeDataFeed = true)

# """)



# Streaming Read from Delta Table

# stream_df = spark.readStream \

#     .format("delta") \

#     .option("readChangeFeed", "true") \

#     .option("startingVersion", 0) \

#     .table("events")



# Auto Loader - Load new files automatically

# auto_df = spark.readStream \

#     .format("cloudFiles") \

#     .option("cloudFiles.format", "json") \

#     .option("cloudFiles.schemaLocation", "/schema/events") \

#     .load("s3://raw-data/events/")



from dataclasses import dataclass



@dataclass

class PipelineStage:

    stage: str

    source: str

    processing: str

    sink: str

    trigger: str



stages = [

    PipelineStage("Ingestion (Bronze)",

        "Kafka / S3 / API Webhook",

        "Auto Loader หรือ Kafka Source → Raw Delta Table",

        "delta://bronze/events",

        "Continuous (Streaming) หรือ Trigger.AvailableNow"),

    PipelineStage("Cleansing (Silver)",

        "delta://bronze/events (CDF)",

        "Schema Validation, Dedup, Type Cast, Null Handling",

        "delta://silver/events_clean",

        "CDF Trigger เมื่อ Bronze มีข้อมูลใหม่"),

    PipelineStage("Aggregation (Gold)",

        "delta://silver/events_clean (CDF)",

        "Aggregate, Join, Business Logic, Metrics",

        "delta://gold/event_metrics",

        "CDF Trigger เมื่อ Silver มีข้อมูลใหม่"),

    PipelineStage("Serving",

        "delta://gold/event_metrics",

        "Query via Trino/Presto/Spark SQL",

        "Dashboard / API / ML Feature Store",

        "On-demand Query"),

]



print("=== Pipeline Stages ===")

for s in stages:

    print(f"\n  [{s.stage}]")

    print(f"    Source: {s.source}")

    print(f"    Process: {s.processing}")

    print(f"    Sink: {s.sink}")

    print(f"    Trigger: {s.trigger}")

Change Data Feed & Merge

Delta Lake Event Driven Design — ออกแบบ Data
# === Change Data Feed & Merge Operations ===



# Read Change Data Feed

# changes_df = spark.read \

#     .format("delta") \

#     .option("readChangeFeed", "true") \

#     .option("startingVersion", 5) \

#     .option("endingVersion", 10) \

#     .table("events")

#

# # CDF columns: _change_type, _commit_version, _commit_timestamp

# # _change_type: insert, update_preimage, update_postimage, delete

#

# inserts = changes_df.filter("_change_type = 'insert'")

# updates = changes_df.filter("_change_type = 'update_postimage'")

# deletes = changes_df.filter("_change_type = 'delete'")



# Merge (Upsert) - Exactly-once Processing

# deltaTable = DeltaTable.forPath(spark, "delta://silver/users")

# deltaTable.alias("target").merge(

#     source_df.alias("source"),

#     "target.user_id = source.user_id"

# ).whenMatchedUpdate(set={

#     "name": "source.name",

#     "email": "source.email",

#     "updated_at": "source.event_time"

# }).whenNotMatchedInsert(values={

#     "user_id": "source.user_id",

#     "name": "source.name",

#     "email": "source.email",

#     "created_at": "source.event_time",

#     "updated_at": "source.event_time"

# }).execute()



@dataclass

class MergePattern:

    pattern: str

    use_case: str

    match_condition: str

    matched_action: str

    not_matched_action: str



patterns = [

    MergePattern("SCD Type 1 (Overwrite)",

        "อัพเดทข้อมูลล่าสุด ไม่เก็บประวัติ",

        "target.id = source.id",

        "UPDATE SET * (Overwrite ทุก Column)",

        "INSERT * (Insert Row ใหม่)"),

    MergePattern("SCD Type 2 (History)",

        "เก็บประวัติทุก Version",

        "target.id = source.id AND target.is_current = true",

        "UPDATE SET is_current=false, end_date=now()",

        "INSERT with is_current=true, start_date=now()"),

    MergePattern("Deduplication",

        "ลบ Duplicate Event",

        "target.event_id = source.event_id",

        "ไม่ทำอะไร (Skip)",

        "INSERT * (Insert เฉพาะ Event ใหม่)"),

    MergePattern("Delete + Insert",

        "Replace Partition ด้วยข้อมูลใหม่",

        "target.date = source.date",

        "DELETE (ลบ Partition เก่า)",

        "INSERT * (Insert ข้อมูลใหม่ทั้ง Partition)"),

]



print("=== Merge Patterns ===")

for m in patterns:

    print(f"\n  [{m.pattern}] {m.use_case}")

    print(f"    Match: {m.match_condition}")

    print(f"    Matched: {m.matched_action}")

    print(f"    Not Matched: {m.not_matched_action}")

Operations & Monitoring

# === Delta Lake Operations ===



@dataclass

class Operation:

    operation: str

    command: str

    frequency: str

    impact: str



operations = [

    Operation("OPTIMIZE (Compaction)",

        "OPTIMIZE delta.`s3://data/events` ZORDER BY (user_id, event_type)",

        "ทุกวัน หรือเมื่อ Small Files > 1000",

        "Query เร็วขึ้น 2-10x ลด File Count"),

    Operation("VACUUM (Cleanup)",

        "VACUUM delta.`s3://data/events` RETAIN 168 HOURS",

        "ทุกสัปดาห์",

        "ลดพื้นที่ Storage 30-50% หลัง Vacuum"),

    Operation("ANALYZE (Statistics)",

        "ANALYZE TABLE events COMPUTE STATISTICS FOR ALL COLUMNS",

        "หลัง OPTIMIZE",

        "Query Optimizer เลือก Plan ดีขึ้น"),

    Operation("DESCRIBE HISTORY",

        "DESCRIBE HISTORY delta.`s3://data/events`",

        "เมื่อต้อง Debug หรือ Audit",

        "ดู Version History ทุกการเปลี่ยนแปลง"),

    Operation("RESTORE (Time Travel)",

        "RESTORE TABLE events TO VERSION AS OF 42",

        "เมื่อต้อง Rollback ข้อมูลผิด",

        "คืนข้อมูลกลับไป Version ก่อนหน้า"),

]



print("=== Operations ===")

for o in operations:

    print(f"  [{o.operation}]")

    print(f"    Command: {o.command}")

    print(f"    Frequency: {o.frequency}")

    print(f"    Impact: {o.impact}")

เคล็ดลับ

  • CDF: เปิด Change Data Feed ทุก Table สำหรับ Event-driven
  • Merge: ใช้ Merge แทน Overwrite สำหรับ Exactly-once
  • Z-Order: Z-Order ตาม Column ที่ Filter บ่อยที่สุด
  • Partition: Partition ตาม Date ไม่เกิน 10K Partitions
  • Vacuum: VACUUM ทุกสัปดาห์ Retain 7 วัน ลด Storage Cost

Delta Lake คืออะไร

Open Source Storage Layer ACID Transaction S3 ADLS GCS Spark Merge Time Travel CDF Schema Evolution Optimize Vacuum Parquet

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน non farm payroll 2022

อ่านเพิ่ม: Apache Kafka เจาะลึก สอน Kafka Streams, Connect, Schema Regi · อ่านเพิ่ม: PostgreSQL ขั้นสูง สอน Indexing, Query Optimization, Replica · อ่านเพิ่ม: Event-Driven Architecture คืออะไร? สอนออกแบบระบบ Event Sourc

แนะนำเพิ่มเติม — แหล่งความรู้ Forex iCafeForex

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน functional programming concepts

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง