Delta Lake Home Lab
Delta Lake Home Lab Setup Data Lakehouse Spark MinIO ACID Transactions Time Travel Schema Enforcement Pipeline Bronze Silver Gold
| Component | Tool | Resource | Purpose |
|---|---|---|---|
| Compute | Apache Spark 3.x | 4 Core 8GB RAM | Data Processing Engine |
| Storage | MinIO (S3-compatible) | SSD 100GB+ | Object Storage for Delta Tables |
| Notebook | Jupyter + PySpark | 2GB RAM | Interactive Analysis |
| Metastore | Hive Metastore / Unity Catalog | 1GB RAM + PostgreSQL | Table Metadata Management |
| Orchestration | Airflow / Cron | 1GB RAM | Pipeline Scheduling |
Docker Compose Setup
# === Delta Lake Home Lab Docker Compose ===
# docker-compose.yml
# version: '3.8'
# services:
# spark-master:
# image: bitnami/spark:3.5
# environment:
# - SPARK_MODE=master
# - SPARK_MASTER_HOST=spark-master
# ports:
# - "8080:8080" # Spark UI
# - "7077:7077" # Spark Master
# volumes:
# - ./spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf
#
# spark-worker-1:
# image: bitnami/spark:3.5
# environment:
# - SPARK_MODE=worker
# - SPARK_MASTER_URL=spark://spark-master:7077
# - SPARK_WORKER_MEMORY=4G
# - SPARK_WORKER_CORES=2
#
# minio:
# image: minio/minio
# command: server /data --console-address ":9001"
# ports:
# - "9000:9000" # S3 API
# - "9001:9001" # Console
# environment:
# MINIO_ROOT_USER: minioadmin
# MINIO_ROOT_PASSWORD: minioadmin
# volumes:
# - minio-data:/data
#
# jupyter:
# image: jupyter/pyspark-notebook
# ports:
# - "8888:8888"
# environment:
# - SPARK_MASTER=spark://spark-master:7077
# volumes:
# - ./notebooks:/home/jovyan/work
# spark-defaults.conf
# spark.jars.packages io.delta:delta-spark_2.12:3.0.0, org.apache.hadoop:hadoop-aws:3.3.4
# spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
# spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
# spark.hadoop.fs.s3a.endpoint http://minio:9000
# spark.hadoop.fs.s3a.access.key minioadmin
# spark.hadoop.fs.s3a.secret.key minioadmin
# spark.hadoop.fs.s3a.path.style.access true
# spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
from dataclasses import dataclass
@dataclass
class LabComponent:
component: str
image: str
port: str
resource: str
config_tip: str
components = [
LabComponent("Spark Master",
"bitnami/spark:3.5",
"8080 (UI) 7077 (Master)",
"2 Core 2GB RAM",
"ตั้ง SPARK_DAEMON_MEMORY=1g สำหรับ Home Lab"),
LabComponent("Spark Worker",
"bitnami/spark:3.5",
"8081 (Worker UI)",
"2 Core 4GB RAM ต่อ Worker",
"เริ่ม 1-2 Workers ตามเครื่อง"),
LabComponent("MinIO",
"minio/minio",
"9000 (S3) 9001 (Console)",
"1 Core 1GB RAM SSD",
"ใช้ SSD ไม่ใช่ HDD สำหรับ Performance"),
LabComponent("Jupyter",
"jupyter/pyspark-notebook",
"8888",
"1 Core 2GB RAM",
"Install delta-spark ใน Notebook"),
]
print("=== Lab Components ===")
for c in components:
print(f" [{c.component}] Image: {c.image}")
print(f" Port: {c.port} | Resource: {c.resource}")
print(f" Tip: {c.config_tip}")
Data Pipeline (Medallion)
# === Medallion Architecture Pipeline ===
# from pyspark.sql import SparkSession
# from delta.tables import DeltaTable
#
# spark = SparkSession.builder \
# .appName("DeltaLakePipeline") \
# .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
# .getOrCreate()
#
# # Bronze: Raw Ingest
# raw_df = spark.read.json("s3a://datalake/raw/events/")
# raw_df.write.format("delta").mode("append") \
# .save("s3a://datalake/bronze/events")
#
# # Silver: Clean + Transform
# bronze_df = spark.read.format("delta").load("s3a://datalake/bronze/events")
# silver_df = bronze_df \
# .dropna(subset=["user_id", "event_type"]) \
# .dropDuplicates(["event_id"]) \
# .withColumn("event_date", F.to_date("timestamp"))
# silver_df.write.format("delta").mode("overwrite") \
# .save("s3a://datalake/silver/events")
#
# # Gold: Aggregate
# silver_df = spark.read.format("delta").load("s3a://datalake/silver/events")
# gold_df = silver_df.groupBy("event_date", "event_type") \
# .agg(F.count("*").alias("event_count"),
# F.countDistinct("user_id").alias("unique_users"))
# gold_df.write.format("delta").mode("overwrite") \
# .save("s3a://datalake/gold/daily_metrics")
#
# # Time Travel
# df_v0 = spark.read.format("delta").option("versionAsOf", 0) \
# .load("s3a://datalake/silver/events")
#
# # MERGE (Upsert)
# delta_table = DeltaTable.forPath(spark, "s3a://datalake/silver/users")
# delta_table.alias("target").merge(
# new_data.alias("source"),
# "target.user_id = source.user_id"
# ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
@dataclass
class PipelineLayer:
layer: str
data: str
operations: str
schedule: str
retention: str
layers = [
PipelineLayer("Bronze (Raw)",
"Raw Data as-is จาก Source",
"Ingest Append Only ไม่แปลง",
"Real-time (Streaming) หรือ ทุก 15 นาที",
"90 วัน (VACUUM)"),
PipelineLayer("Silver (Cleaned)",
"Cleaned Validated Deduplicated",
"dropna dropDuplicates Cast Schema Enforce",
"ทุก 1 ชั่วโมง หรือ Triggered",
"1 ปี"),
PipelineLayer("Gold (Aggregated)",
"Business Metrics KPI Summary",
"groupBy agg join Materialized View",
"ทุกวัน หรือ ทุก 1 ชั่วโมง",
"ถาวร"),
]
print("=== Pipeline Layers ===")
for l in layers:
print(f" [{l.layer}] Data: {l.data}")
print(f" Ops: {l.operations}")
print(f" Schedule: {l.schedule}")
print(f" Retention: {l.retention}")
Optimization & Maintenance
# === Delta Lake Maintenance ===
# # OPTIMIZE - Compact small files
# spark.sql("OPTIMIZE delta.`s3a://datalake/silver/events`")
#
# # OPTIMIZE with ZORDER
# spark.sql("""
# OPTIMIZE delta.`s3a://datalake/silver/events`
# ZORDER BY (event_date, user_id)
# """)
#
# # VACUUM - Remove old files
# spark.sql("VACUUM delta.`s3a://datalake/bronze/events` RETAIN 168 HOURS")
#
# # DESCRIBE HISTORY
# spark.sql("DESCRIBE HISTORY delta.`s3a://datalake/silver/events`").show()
@dataclass
class MaintenanceTask:
task: str
command: str
schedule: str
benefit: str
warning: str
tasks = [
MaintenanceTask("OPTIMIZE",
"OPTIMIZE table ZORDER BY (col1, col2)",
"ทุกวัน หลัง Batch Ingest",
"Compact Small Files ลด Read Time 50-80%",
"ใช้ Resource มาก รันตอน Off-peak"),
MaintenanceTask("VACUUM",
"VACUUM table RETAIN 168 HOURS",
"ทุกสัปดาห์",
"ลบไฟล์เก่า ลด Storage Cost",
"ต้อง > 7 วัน ไม่งั้น Time Travel ใช้ไม่ได้"),
MaintenanceTask("ANALYZE TABLE",
"ANALYZE TABLE table COMPUTE STATISTICS",
"หลัง Large Ingest",
"อัพเดท Statistics สำหรับ Query Optimizer",
"ใช้เวลานานสำหรับ Table ใหญ่"),
MaintenanceTask("Schema Check",
"DESCRIBE TABLE table",
"หลัง Schema Evolution",
"ตรวจ Schema ถูกต้อง",
"Schema Evolution ต้อง Compatible"),
MaintenanceTask("Data Quality",
"ALTER TABLE ADD CONSTRAINT",
"เมื่อสร้าง Table",
"ป้องกัน Bad Data เข้า Table",
"Constraint ทำ Write ช้าลงเล็กน้อย"),
]
print("=== Maintenance Tasks ===")
for t in tasks:
print(f" [{t.task}] {t.command}")
print(f" Schedule: {t.schedule}")
print(f" Benefit: {t.benefit}")
print(f" Warning: {t.warning}")
เคล็ดลับ
- SSD: ใช้ SSD สำหรับ MinIO + Spark Shuffle ดีกว่า HDD มาก
- ZORDER: ใช้ ZORDER กับ Column ที่ Filter บ่อย
- Partition: Partition ตาม Date สำหรับ Time Series Data
- VACUUM: ตั้ง VACUUM ทุกสัปดาห์ ลด Storage
- Docker: เริ่ม Docker Compose ง่ายที่สุดสำหรับ Home Lab
Delta Lake คืออะไร
Open Source Storage Layer Lakehouse ACID Parquet Transaction Log Time Travel Schema Enforcement MERGE OPTIMIZE VACUUM Spark Databricks
Home Lab ต้องการอะไร
4 Core 16GB RAM SSD 256GB Ubuntu Java Python Spark MinIO Docker Jupyter Metastore Single Node หรือ Docker Compose Cluster
ติดตั้งอย่างไร
Docker Compose Spark Master Workers MinIO Jupyter delta-spark Package spark-defaults.conf S3A Endpoint Bucket mc mb datalake
Pipeline สร้างอย่างไร
Medallion Bronze Raw Silver Clean Gold Aggregate MERGE Upsert Time Travel Streaming Batch OPTIMIZE ZORDER VACUUM Schedule Airflow
สรุป
Delta Lake Home Lab Setup Spark MinIO Docker Compose Medallion Bronze Silver Gold ACID Time Travel OPTIMIZE VACUUM Production
