Delta Lake Home Lab Setup — สร้าง Data Lakehouse
Delta Lake Home Lab
Delta Lake Home Lab Setup Data Lakehouse Spark MinIO ACID Transactions Time Travel Schema Enforcement Pipeline Bronze Silver Gold
| Component | Tool | Resource | Purpose |
|---|---|---|---|
| Compute | Apache Spark 3.x | 4 Core 8GB RAM | Data Processing Engine |
| Storage | MinIO (S3-compatible) | SSD 100GB+ | Object Storage for Delta Tables |
| Notebook | Jupyter + PySpark | 2GB RAM | Interactive Analysis |
| Metastore | Hive Metastore / Unity Catalog | 1GB RAM + PostgreSQL | Table Metadata Management |
| Orchestration | Airflow / Cron | 1GB RAM | Pipeline Scheduling |
Docker Compose Setup
=== Delta Lake Home Lab Docker Compose ===
docker-compose.yml
version: '3.8'
services:
spark-master:
image: bitnami/spark:3.5
environment:
- SPARK_MODE=master
- SPARK_MASTER_HOST=spark-master
ports:
- "8080:8080" # Spark UI
- "7077:7077" # Spark Master
volumes:
- ./spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf
spark-worker-1:
image: bitnami/spark:3.5
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark-master:7077
- SPARK_WORKER_MEMORY=4G
- SPARK_WORKER_CORES=2
minio:
image: minio/minio
command: server /data --console-address ":9001"
ports:
- "9000:9000" # S3 API
- "9001:9001" # Console
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
volumes:
- minio-data:/data
jupyter:
image: jupyter/pyspark-notebook
ports:
- "8888:8888"
environment:
- SPARK_MASTER=spark://spark-master:7077
volumes:
- ./notebooks:/home/jovyan/work
spark-defaults.conf
spark.jars.packages io.delta:delta-spark_2.12:3.0.0, org.apache.hadoop:hadoop-aws:3.3.4
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
spark.hadoop.fs.s3a.endpoint http://minio:9000
spark.hadoop.fs.s3a.access.key minioadmin
spark.hadoop.fs.s3a.secret.key minioadmin
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
from dataclasses import dataclass
@dataclass
class LabComponent:
component: str
image: str
port: str
resource: str
config_tip: str
components = [
LabComponent("Spark Master",
"bitnami/spark:3.5",
"8080 (UI) 7077 (Master)",
"2 Core 2GB RAM",
"ตั้ง SPARK_DAEMON_MEMORY=1g สำหรับ Home Lab"),
LabComponent("Spark Worker",
"bitnami/spark:3.5",
"8081 (Worker UI)",
"2 Core 4GB RAM ต่อ Worker",
"เริ่ม 1-2 Workers ตามเครื่อง"),
LabComponent("MinIO",
"minio/minio",
"9000 (S3) 9001 (Console)",
"1 Core 1GB RAM SSD",
"ใช้ SSD ไม่ใช่ HDD สำหรับ Performance"),
LabComponent("Jupyter",
"jupyter/pyspark-notebook",
"8888",
"1 Core 2GB RAM",
"Install delta-spark ใน Notebook"),
]
print("=== Lab Components ===")
for c in components:
print(f" [{c.component}] Image: {c.image}")
print(f" Port: {c.port} | Resource: {c.resource}")
print(f" Tip: {c.config_tip}")
Data Pipeline (Medallion)
=== Medallion Architecture Pipeline ===
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
spark = SparkSession.builder \
.appName("DeltaLakePipeline") \
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \
.getOrCreate()
# Bronze: Raw Ingest
raw_df = spark.read.json("s3a://datalake/raw/events/")
raw_df.write.format("delta").mode("append") \
.save("s3a://datalake/bronze/events")
# Silver: Clean + Transform
bronze_df = spark.read.format("delta").load("s3a://datalake/bronze/events")
silver_df = bronze_df \
.dropna(subset=["user_id", "event_type"]) \
.dropDuplicates(["event_id"]) \
.withColumn("event_date", F.to_date("timestamp"))
silver_df.write.format("delta").mode("overwrite") \
.save("s3a://datalake/silver/events")
# Gold: Aggregate
silver_df = spark.read.format("delta").load("s3a://datalake/silver/events")
gold_df = silver_df.groupBy("event_date", "event_type") \
.agg(F.count("*").alias("event_count"),
F.countDistinct("user_id").alias("unique_users"))
gold_df.write.format("delta").mode("overwrite") \
.save("s3a://datalake/gold/daily_metrics")
# Time Travel
df_v0 = spark.read.format("delta").option("versionAsOf", 0) \
.load("s3a://datalake/silver/events")
# MERGE (Upsert)
delta_table = DeltaTable.forPath(spark, "s3a://datalake/silver/users")
delta_table.alias("target").merge(
new_data.alias("source"),
"target.user_id = source.user_id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
@dataclass
class PipelineLayer:
layer: str
data: str
operations: str
schedule: str
retention: str
layers = [
PipelineLayer("Bronze (Raw)",
"Raw Data as-is จาก Source",
"Ingest Append Only ไม่แปลง",
"Real-time (Streaming) หรือ ทุก 15 นาที",
"90 วัน (VACUUM)"),
PipelineLayer("Silver (Cleaned)",
"Cleaned Validated Deduplicated",
"dropna dropDuplicates Cast Schema Enforce",
"ทุก 1 ชั่วโมง หรือ Triggered",
"1 ปี"),
PipelineLayer("Gold (Aggregated)",
"Business Metrics KPI Summary",
"groupBy agg join Materialized View",
"ทุกวัน หรือ ทุก 1 ชั่วโมง",
"ถาวร"),
]
print("=== Pipeline Layers ===")
for l in layers:
print(f" [{l.layer}] Data: {l.data}")
print(f" Ops: {l.operations}")
print(f" Schedule: {l.schedule}")
print(f" Retention: {l.retention}")
Optimization & Maintenance
# === Delta Lake Maintenance ===
# # OPTIMIZE - Compact small files
# spark.sql("OPTIMIZE delta.`s3a://datalake/silver/events`")
#
# # OPTIMIZE with ZORDER
# spark.sql("""
# OPTIMIZE delta.`s3a://datalake/silver/events`
# ZORDER BY (event_date, user_id)
# """)
#
# # VACUUM - Remove old files
# spark.sql("VACUUM delta.`s3a://datalake/bronze/events` RETAIN 168 HOURS")
#
# # DESCRIBE HISTORY
# spark.sql("DESCRIBE HISTORY delta.`s3a://datalake/silver/events`").show()
@dataclass
class MaintenanceTask:
task: str
command: str
schedule: str
benefit: str
warning: str
tasks = [
MaintenanceTask("OPTIMIZE",
"OPTIMIZE table ZORDER BY (col1, col2)",
"ทุกวัน หลัง Batch Ingest",
"Compact Small Files ลด Read Time 50-80%",
"ใช้ Resource มาก รันตอน Off-peak"),
MaintenanceTask("VACUUM",
"VACUUM table RETAIN 168 HOURS",
"ทุกสัปดาห์",
"ลบไฟล์เก่า ลด Storage Cost",
"ต้อง > 7 วัน ไม่งั้น Time Travel ใช้ไม่ได้"),
MaintenanceTask("ANALYZE TABLE",
"ANALYZE TABLE table COMPUTE STATISTICS",
"หลัง Large Ingest",
"อัพเดท Statistics สำหรับ Query Optimizer",
"ใช้เวลานานสำหรับ Table ใหญ่"),
MaintenanceTask("Schema Check",
"DESCRIBE TABLE table",
"หลัง Schema Evolution",
"ตรวจ Schema ถูกต้อง",
"Schema Evolution ต้อง Compatible"),
MaintenanceTask("Data Quality",
"ALTER TABLE ADD CONSTRAINT",
"เมื่อสร้าง Table",
"ป้องกัน Bad Data เข้า Table",
"Constraint ทำ Write ช้าลงเล็กน้อย"),
]
print("=== Maintenance Tasks ===")
for t in tasks:
print(f" [{t.task}] {t.command}")
print(f" Schedule: {t.schedule}")
print(f" Benefit: {t.benefit}")
print(f" Warning: {t.warning}")
เคล็ดลับ
- SSD: ใช้ SSD สำหรับ MinIO + Spark Shuffle ดีกว่า HDD มาก
- ZORDER: ใช้ ZORDER กับ Column ที่ Filter บ่อย
- Partition: Partition ตาม Date สำหรับ Time Series Data
- VACUUM: ตั้ง VACUUM ทุกสัปดาห์ ลด Storage
- Docker: เริ่ม Docker Compose ง่ายที่สุดสำหรับ Home Lab
Delta Lake คืออะไร
Open Source Storage Layer Lakehouse ACID Parquet Transaction Log Time Travel Schema Enforcement MERGE OPTIMIZE VACUUM Spark Databricks