Delta Lake คืออะไร
Delta Lake เป็น open source storage layer ที่เพิ่ม ACID transactions, scalable metadata handling และ unified batch/streaming processing บน data lake ทำให้ data lake มีความน่าเชื่อถือเหมือน data warehouse พัฒนาโดย Databricks ทำงานบน Apache Spark และ storage systems เช่น S3, ADLS, GCS
Features หลักของ Delta Lake ได้แก่ ACID Transactions รับประกัน atomicity, consistency, isolation, durability สำหรับ data lake operations, Time Travel ย้อนดูข้อมูลในอดีตได้ทุก version, Schema Evolution เปลี่ยน schema ได้โดยไม่ต้อง rewrite data, Unified Batch/Streaming ใช้ table เดียวสำหรับทั้ง batch และ streaming, Z-Ordering data clustering สำหรับ faster queries และ Data Skipping ข้าม files ที่ไม่เกี่ยวข้องอัตโนมัติ
Capacity Planning สำคัญเพราะ Delta Lake เก็บ transaction log (_delta_log) ที่โตตาม operations, small files problem ทำให้ performance ลดลง, Z-Ordering ใช้ compute resources มาก, Time Travel เก็บ old versions ที่ใช้ storage เพิ่ม และ VACUUM operations ต้อง plan ให้เหมาะสมเพื่อคุม storage costs
ติดตั้ง Delta Lake
วิธีติดตั้งและเริ่มต้นใช้งาน
# === ติดตั้ง Delta Lake ===
# 1. PySpark with Delta Lake
pip install pyspark==3.5.0 delta-spark==3.1.0
# 2. Start PySpark with Delta
pyspark --packages io.delta:delta-spark_2.12:3.1.0 \
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
# 3. Python Setup
python3 << 'PYEOF'
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip
builder = SparkSession.builder \
.appName("DeltaLakeDemo") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# Create Delta table
data = spark.range(0, 1000000)
data = data.withColumn("value", data.id * 2)
data.write.format("delta").mode("overwrite").save("/tmp/delta-demo")
# Read Delta table
df = spark.read.format("delta").load("/tmp/delta-demo")
print(f"Row count: {df.count()}")
print(f"Schema: {df.schema}")
# Time Travel
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta-demo")
print(f"Version 0 rows: {df_v0.count()}")
# Table history
from delta.tables import DeltaTable
dt = DeltaTable.forPath(spark, "/tmp/delta-demo")
dt.history().show(truncate=False)
spark.stop()
PYEOF
# 4. Docker with Delta Lake + Spark
cat > docker-compose.yml << 'EOF'
version: '3'
services:
spark-master:
image: bitnami/spark:3.5
environment:
- SPARK_MODE=master
- SPARK_MASTER_HOST=spark-master
ports:
- "8080:8080"
- "7077:7077"
volumes:
- spark_data:/data
spark-worker:
image: bitnami/spark:3.5
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_WORKER_CORES=2
depends_on:
- spark-master
volumes:
spark_data:
EOF
docker-compose up -d
echo "Delta Lake installed"
Capacity Planning และ Storage Optimization
คำนวณและ optimize storage
#!/usr/bin/env python3
# delta_capacity_planner.py — Delta Lake Capacity Planning
import json
import math
import logging
from typing import Dict, List
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("capacity")
@dataclass
class TableProfile:
name: str
row_count: int
avg_row_size_bytes: int
daily_inserts: int
daily_updates: int
daily_deletes: int
partitions: int
compression: str # snappy, zstd, gzip
time_travel_days: int
class DeltaCapacityPlanner:
COMPRESSION_RATIOS = {
"none": 1.0,
"snappy": 0.5,
"zstd": 0.35,
"gzip": 0.3,
}
def __init__(self):
self.tables: List[TableProfile] = []
def add_table(self, table: TableProfile):
self.tables.append(table)
def estimate_table_storage(self, table: TableProfile):
raw_size = table.row_count * table.avg_row_size_bytes
compression = self.COMPRESSION_RATIOS.get(table.compression, 0.5)
compressed_size = raw_size * compression
# Parquet overhead (~10%)
parquet_overhead = compressed_size * 0.1
# Delta log size (approximately 1KB per commit)
daily_commits = math.ceil(
(table.daily_inserts + table.daily_updates + table.daily_deletes) / 10000
)
delta_log_size = daily_commits * 1024 * 365 # 1 year
# Time travel storage (old versions)
daily_change_bytes = (
(table.daily_updates * table.avg_row_size_bytes * compression) +
(table.daily_deletes * table.avg_row_size_bytes * compression * 0.1) # tombstones
)
time_travel_storage = daily_change_bytes * table.time_travel_days
# Checkpoint files (every 10 commits)
checkpoint_size = compressed_size * 0.01 * (daily_commits / 10)
total = compressed_size + parquet_overhead + delta_log_size + time_travel_storage + checkpoint_size
return {
"table": table.name,
"raw_size_gb": round(raw_size / 1e9, 2),
"compressed_size_gb": round(compressed_size / 1e9, 2),
"compression_ratio": compression,
"parquet_overhead_gb": round(parquet_overhead / 1e9, 3),
"delta_log_gb": round(delta_log_size / 1e9, 3),
"time_travel_gb": round(time_travel_storage / 1e9, 2),
"total_storage_gb": round(total / 1e9, 2),
"monthly_growth_gb": round(
(table.daily_inserts * table.avg_row_size_bytes * compression * 30) / 1e9, 2
),
}
def estimate_total_storage(self):
results = []
total_gb = 0
monthly_growth = 0
for table in self.tables:
est = self.estimate_table_storage(table)
results.append(est)
total_gb += est["total_storage_gb"]
monthly_growth += est["monthly_growth_gb"]
return {
"tables": results,
"total_storage_gb": round(total_gb, 2),
"total_storage_tb": round(total_gb / 1024, 3),
"monthly_growth_gb": round(monthly_growth, 2),
"yearly_projection_tb": round((total_gb + monthly_growth * 12) / 1024, 3),
}
def optimize_recommendations(self, table: TableProfile):
est = self.estimate_table_storage(table)
recs = []
if table.compression == "snappy" and est["total_storage_gb"] > 100:
recs.append("Switch to ZSTD compression for 30% more savings")
if table.daily_updates > table.row_count * 0.1:
recs.append("High update rate — consider OPTIMIZE more frequently")
files_per_partition = math.ceil(table.row_count / max(table.partitions, 1) / 1000000)
if files_per_partition > 10:
recs.append(f"Too many files per partition ({files_per_partition}) — run OPTIMIZE with ZORDER")
if table.time_travel_days > 30:
recs.append(f"Time travel retention {table.time_travel_days}d — consider reducing to save storage")
optimal_file_size_mb = 128
current_files = math.ceil(est["compressed_size_gb"] * 1024 / optimal_file_size_mb)
recs.append(f"Target file count: {current_files} files ({optimal_file_size_mb}MB each)")
return {"table": table.name, "recommendations": recs, "storage": est}
planner = DeltaCapacityPlanner()
planner.add_table(TableProfile("orders", 50_000_000, 500, 100_000, 5_000, 100, 365, "snappy", 30))
planner.add_table(TableProfile("events", 500_000_000, 200, 2_000_000, 0, 0, 30, "zstd", 7))
planner.add_table(TableProfile("users", 5_000_000, 1000, 10_000, 50_000, 100, 1, "snappy", 90))
print(json.dumps(planner.estimate_total_storage(), indent=2))
print(json.dumps(planner.optimize_recommendations(planner.tables[0]), indent=2))
Performance Tuning
ปรับแต่ง Delta Lake performance
# === Delta Lake Performance Tuning ===
# 1. OPTIMIZE — Compact Small Files
# ===================================
# SQL
# OPTIMIZE delta.`/data/orders`
# OPTIMIZE delta.`/data/orders` WHERE date >= '2025-01-01'
# PySpark
# from delta.tables import DeltaTable
# dt = DeltaTable.forPath(spark, "/data/orders")
# dt.optimize().executeCompaction()
# With Z-ORDER (data clustering)
# OPTIMIZE delta.`/data/orders` ZORDER BY (customer_id, order_date)
# dt.optimize().executeZOrderBy("customer_id", "order_date")
# 2. VACUUM — Remove Old Files
# ===================================
# Remove files older than 7 days (default)
# VACUUM delta.`/data/orders` RETAIN 168 HOURS
# dt.vacuum(168) # hours
# WARNING: VACUUM deletes Time Travel data!
# Set retention period carefully
# spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
# dt.vacuum(0) # Remove ALL old files (dangerous!)
# 3. Auto Optimize
# ===================================
# Enable auto compaction
# ALTER TABLE delta.`/data/orders`
# SET TBLPROPERTIES (
# 'delta.autoOptimize.optimizeWrite' = 'true',
# 'delta.autoOptimize.autoCompact' = 'true'
# )
# Spark config
# spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
# spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
# 4. Partition Strategy
# ===================================
# Good: partition by date (low cardinality)
# df.write.format("delta").partitionBy("date").save("/data/orders")
# Bad: partition by customer_id (high cardinality = too many small files)
# Instead, use Z-ORDER for high cardinality columns
# Ideal partition size: 1GB per partition
# Formula: total_data_size / target_partition_size = number_of_partitions
# 5. File Size Tuning
# ===================================
# Target file size (default 1GB, reduce for faster queries)
# ALTER TABLE delta.`/data/orders`
# SET TBLPROPERTIES ('delta.targetFileSize' = '128mb')
# For streaming tables with frequent small writes:
# SET TBLPROPERTIES ('delta.targetFileSize' = '64mb')
# 6. Caching
# ===================================
# spark.conf.set("spark.databricks.io.cache.enabled", "true")
# spark.conf.set("spark.databricks.io.cache.maxDiskUsage", "50g")
# spark.conf.set("spark.databricks.io.cache.maxMetaDataCache", "2g")
# 7. Benchmark Script
python3 << 'PYEOF'
import time
import json
def benchmark_query(spark, path, query_name, sql):
start = time.time()
result = spark.sql(sql)
count = result.count()
elapsed = time.time() - start
return {
"query": query_name,
"rows": count,
"time_seconds": round(elapsed, 2),
}
# results = []
# results.append(benchmark_query(spark, path, "full_scan",
# "SELECT count(*) FROM delta.`/data/orders`"))
# results.append(benchmark_query(spark, path, "filtered",
# "SELECT count(*) FROM delta.`/data/orders` WHERE date = '2025-01-15'"))
# results.append(benchmark_query(spark, path, "aggregation",
# "SELECT date, SUM(amount) FROM delta.`/data/orders` GROUP BY date"))
# print(json.dumps(results, indent=2))
PYEOF
echo "Delta Lake tuning complete"
Data Pipeline ด้วย Delta Lake
สร้าง data pipeline
#!/usr/bin/env python3
# delta_pipeline.py — Delta Lake Data Pipeline
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")
class DeltaPipeline:
def __init__(self, spark_session=None):
self.spark = spark_session
self.pipeline_runs = []
def bronze_layer(self, source_path, target_path, source_format="json"):
"""Raw data ingestion — append only"""
logger.info(f"Bronze: Ingesting from {source_path}")
# df = self.spark.read.format(source_format).load(source_path)
# df = df.withColumn("_ingested_at", current_timestamp())
# df = df.withColumn("_source_file", input_file_name())
# df.write.format("delta").mode("append").save(target_path)
return {"layer": "bronze", "source": source_path, "target": target_path}
def silver_layer(self, source_path, target_path, transformations=None):
"""Cleaned and transformed data"""
logger.info(f"Silver: Transforming from {source_path}")
# df = self.spark.read.format("delta").load(source_path)
#
# # Deduplicate
# df = df.dropDuplicates(["id"])
#
# # Clean nulls
# df = df.na.fill({"amount": 0, "status": "unknown"})
#
# # Type casting
# df = df.withColumn("amount", col("amount").cast("double"))
#
# # Write with merge (upsert)
# from delta.tables import DeltaTable
# if DeltaTable.isDeltaTable(self.spark, target_path):
# dt = DeltaTable.forPath(self.spark, target_path)
# dt.alias("target").merge(
# df.alias("source"),
# "target.id = source.id"
# ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# else:
# df.write.format("delta").save(target_path)
return {"layer": "silver", "source": source_path, "target": target_path}
def gold_layer(self, source_path, target_path, aggregation_sql=None):
"""Business-level aggregations"""
logger.info(f"Gold: Aggregating from {source_path}")
# df = self.spark.read.format("delta").load(source_path)
# df.createOrReplaceTempView("silver_data")
#
# result = self.spark.sql(aggregation_sql or """
# SELECT
# date,
# category,
# COUNT(*) as order_count,
# SUM(amount) as total_revenue,
# AVG(amount) as avg_order_value,
# COUNT(DISTINCT customer_id) as unique_customers
# FROM silver_data
# GROUP BY date, category
# """)
#
# result.write.format("delta").mode("overwrite").save(target_path)
return {"layer": "gold", "source": source_path, "target": target_path}
def run_pipeline(self, config):
run = {
"run_id": f"run-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
"start_time": datetime.utcnow().isoformat(),
"steps": [],
"status": "running",
}
try:
bronze = self.bronze_layer(
config["source"], config["bronze_path"])
run["steps"].append(bronze)
silver = self.silver_layer(
config["bronze_path"], config["silver_path"])
run["steps"].append(silver)
gold = self.gold_layer(
config["silver_path"], config["gold_path"])
run["steps"].append(gold)
run["status"] = "completed"
except Exception as e:
run["status"] = "failed"
run["error"] = str(e)
run["end_time"] = datetime.utcnow().isoformat()
self.pipeline_runs.append(run)
return run
# Medallion Architecture Pipeline
pipeline = DeltaPipeline()
config = {
"source": "s3://raw-data/orders/2025-01-15/",
"bronze_path": "/data/bronze/orders",
"silver_path": "/data/silver/orders",
"gold_path": "/data/gold/daily_summary",
}
result = pipeline.run_pipeline(config)
print(json.dumps(result, indent=2))
Monitoring และ Maintenance
Monitor และ maintain Delta tables
#!/usr/bin/env python3
# delta_monitor.py — Delta Lake Monitoring
import json
import logging
from datetime import datetime
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("delta_mon")
class DeltaTableMonitor:
def __init__(self):
self.table_stats = {}
def collect_stats(self, table_name, path):
# dt = DeltaTable.forPath(spark, path)
# detail = dt.detail().collect()[0]
# history = dt.history(100).collect()
stats = {
"table": table_name,
"path": path,
"checked_at": datetime.utcnow().isoformat(),
"num_files": 150,
"size_gb": 45.2,
"num_partitions": 365,
"avg_file_size_mb": round(45.2 * 1024 / 150, 1),
"min_file_size_mb": 12,
"max_file_size_mb": 256,
"small_files_count": 23, # < 32MB
"last_optimize": "2025-01-14T02:00:00",
"last_vacuum": "2025-01-14T03:00:00",
"total_versions": 1250,
}
self.table_stats[table_name] = stats
return stats
def health_check(self, table_name):
stats = self.table_stats.get(table_name, {})
if not stats:
return {"status": "unknown"}
issues = []
# Small files check
if stats["small_files_count"] > stats["num_files"] * 0.2:
issues.append({
"severity": "warning",
"issue": f"Too many small files: {stats['small_files_count']}/{stats['num_files']}",
"action": "Run OPTIMIZE",
})
# File size variance
if stats["max_file_size_mb"] > stats["avg_file_size_mb"] * 3:
issues.append({
"severity": "info",
"issue": "High file size variance",
"action": "Run OPTIMIZE with ZORDER",
})
# Optimize freshness
# Check if OPTIMIZE ran recently
# Version count (log cleanup)
if stats["total_versions"] > 1000:
issues.append({
"severity": "info",
"issue": f"High version count: {stats['total_versions']}",
"action": "Consider running VACUUM and checkpoint cleanup",
})
return {
"table": table_name,
"healthy": len(issues) == 0,
"issues": issues,
"stats": stats,
}
def maintenance_schedule(self):
return {
"daily": [
"OPTIMIZE tables with high write volume",
"Check for small files accumulation",
"Monitor query latencies",
],
"weekly": [
"VACUUM tables (retain 7 days minimum)",
"OPTIMIZE with ZORDER on query columns",
"Review partition sizes",
"Check delta log size",
],
"monthly": [
"Review storage costs and growth trends",
"Evaluate compression settings",
"Update partition strategies if needed",
"Capacity planning review",
],
}
# === Maintenance Cron Jobs ===
# Daily OPTIMIZE (2 AM)
# 0 2 * * * spark-submit optimize_tables.py >> /var/log/delta-optimize.log 2>&1
#
# Weekly VACUUM (Sunday 3 AM)
# 0 3 * * 0 spark-submit vacuum_tables.py >> /var/log/delta-vacuum.log 2>&1
# === Prometheus Metrics ===
# delta_table_size_bytes{table="orders"}
# delta_table_num_files{table="orders"}
# delta_table_small_files{table="orders"}
# delta_optimize_duration_seconds{table="orders"}
# delta_vacuum_duration_seconds{table="orders"}
monitor = DeltaTableMonitor()
stats = monitor.collect_stats("orders", "/data/silver/orders")
health = monitor.health_check("orders")
print(json.dumps(health, indent=2))
print(json.dumps(monitor.maintenance_schedule(), indent=2, ensure_ascii=False))
FAQ คำถามที่พบบ่อย
Q: Delta Lake กับ Apache Iceberg ต่างกันอย่างไร?
A: Delta Lake พัฒนาโดย Databricks ทำงานดีที่สุดกับ Spark ecosystem มี Spark integration ที่ลึกกว่า auto-optimize, Z-ORDER ใช้ง่าย Apache Iceberg พัฒนาโดย Netflix เป็น engine-agnostic ทำงานกับ Spark, Flink, Trino, Presto, Dremio ดีกว่าสำหรับ multi-engine environments มี hidden partitioning ที่ดีกว่า ถ้าใช้ Databricks เลือก Delta Lake ถ้าต้องการ engine flexibility เลือก Iceberg
Q: OPTIMIZE ควรรันบ่อยแค่ไหร?
A: ขึ้นกับ write patterns tables ที่มี streaming writes ควร OPTIMIZE ทุก 1-6 ชั่วโมง tables ที่มี batch writes วันละครั้งควร OPTIMIZE หลัง batch job เสร็จ tables ที่ read-heavy ควร OPTIMIZE สัปดาห์ละครั้ง ใช้ auto-optimize สำหรับ tables ที่ write บ่อย แต่ระวัง auto-optimize อาจทำให้ write latency เพิ่มขึ้น สำหรับ critical tables ควร schedule OPTIMIZE ในช่วง off-peak
Q: VACUUM ลบอะไรบ้างและปลอดภัยไหม?
A: VACUUM ลบ data files ที่ไม่ถูก reference โดย Delta table แล้ว (old versions จาก updates/deletes) default retention 7 วัน หมายความว่า Time Travel ย้อนได้แค่ 7 วัน หลัง VACUUM ปลอดภัยถ้าไม่มี long-running queries ที่ใช้ old versions อยู่ ห้ามตั้ง retention เป็น 0 ใน production เพราะอาจทำให้ concurrent readers fail ตั้ง retention ให้มากกว่า longest running query
Q: Storage costs สำหรับ Delta Lake เป็นอย่างไร?
A: ค่าใช้จ่ายหลักคือ storage (S3: $0.023/GB/month, ADLS: $0.018/GB/month) Delta Lake เพิ่ม overhead 10-20% จาก transaction log, checkpoints, Time Travel data วิธีลด costs ใช้ ZSTD compression (ลด size 30% vs Snappy), VACUUM เป็นประจำ, ลด Time Travel retention ให้เหมาะสม, partition และ Z-ORDER ที่ถูกต้อง (ลด data scanned), ใช้ lifecycle policies ย้าย old data ไป cheaper storage tiers
