SiamCafe · Blog
Delta Lake Home Lab Setup — สร้าง Data Lakehouse
บทความ

Delta Lake Home Lab Setup — สร้าง Data Lakehouse

เผยแพร่ 28 พฤษภาคม 2569

Delta Lake Home Lab

Delta Lake Home Lab Setup Data Lakehouse Spark MinIO ACID Transactions Time Travel Schema Enforcement Pipeline Bronze Silver Gold

ComponentToolResourcePurpose
ComputeApache Spark 3.x4 Core 8GB RAMData Processing Engine
StorageMinIO (S3-compatible)SSD 100GB+Object Storage for Delta Tables
NotebookJupyter + PySpark2GB RAMInteractive Analysis
MetastoreHive Metastore / Unity Catalog1GB RAM + PostgreSQLTable Metadata Management
OrchestrationAirflow / Cron1GB RAMPipeline Scheduling

Docker Compose Setup

=== Delta Lake Home Lab Docker Compose ===

docker-compose.yml

version: '3.8'

services:

spark-master:

image: bitnami/spark:3.5

environment:

  • SPARK_MODE=master
  • SPARK_MASTER_HOST=spark-master

ports:

  • "8080:8080" # Spark UI
  • "7077:7077" # Spark Master

volumes:

  • ./spark-defaults.conf:/opt/bitnami/spark/conf/spark-defaults.conf

spark-worker-1:

image: bitnami/spark:3.5

environment:

  • SPARK_MODE=worker
  • SPARK_MASTER_URL=spark://spark-master:7077
  • SPARK_WORKER_MEMORY=4G
  • SPARK_WORKER_CORES=2

minio:

image: minio/minio

command: server /data --console-address ":9001"

ports:

  • "9000:9000" # S3 API
  • "9001:9001" # Console

environment:

MINIO_ROOT_USER: minioadmin

MINIO_ROOT_PASSWORD: minioadmin

volumes:

  • minio-data:/data

jupyter:

image: jupyter/pyspark-notebook

ports:

  • "8888:8888"

environment:

  • SPARK_MASTER=spark://spark-master:7077

volumes:

  • ./notebooks:/home/jovyan/work

spark-defaults.conf

spark.jars.packages io.delta:delta-spark_2.12:3.0.0, org.apache.hadoop:hadoop-aws:3.3.4

spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension

spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

spark.hadoop.fs.s3a.endpoint http://minio:9000

spark.hadoop.fs.s3a.access.key minioadmin

spark.hadoop.fs.s3a.secret.key minioadmin

spark.hadoop.fs.s3a.path.style.access true

spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem

from dataclasses import dataclass

@dataclass

class LabComponent:

component: str

image: str

port: str

resource: str

config_tip: str

components = [

LabComponent("Spark Master",

"bitnami/spark:3.5",

"8080 (UI) 7077 (Master)",

"2 Core 2GB RAM",

"ตั้ง SPARK_DAEMON_MEMORY=1g สำหรับ Home Lab"),

LabComponent("Spark Worker",

"bitnami/spark:3.5",

"8081 (Worker UI)",

"2 Core 4GB RAM ต่อ Worker",

"เริ่ม 1-2 Workers ตามเครื่อง"),

LabComponent("MinIO",

"minio/minio",

"9000 (S3) 9001 (Console)",

"1 Core 1GB RAM SSD",

"ใช้ SSD ไม่ใช่ HDD สำหรับ Performance"),

LabComponent("Jupyter",

"jupyter/pyspark-notebook",

"8888",

"1 Core 2GB RAM",

"Install delta-spark ใน Notebook"),

]

print("=== Lab Components ===")

for c in components:

print(f" [{c.component}] Image: {c.image}")

print(f" Port: {c.port} | Resource: {c.resource}")

print(f" Tip: {c.config_tip}")

Data Pipeline (Medallion)

=== Medallion Architecture Pipeline ===

from pyspark.sql import SparkSession

from delta.tables import DeltaTable

spark = SparkSession.builder \

.appName("DeltaLakePipeline") \

.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0") \

.getOrCreate()

# Bronze: Raw Ingest

raw_df = spark.read.json("s3a://datalake/raw/events/")

raw_df.write.format("delta").mode("append") \

.save("s3a://datalake/bronze/events")

# Silver: Clean + Transform

bronze_df = spark.read.format("delta").load("s3a://datalake/bronze/events")

silver_df = bronze_df \

.dropna(subset=["user_id", "event_type"]) \

.dropDuplicates(["event_id"]) \

.withColumn("event_date", F.to_date("timestamp"))

silver_df.write.format("delta").mode("overwrite") \

.save("s3a://datalake/silver/events")

# Gold: Aggregate

silver_df = spark.read.format("delta").load("s3a://datalake/silver/events")

gold_df = silver_df.groupBy("event_date", "event_type") \

.agg(F.count("*").alias("event_count"),

F.countDistinct("user_id").alias("unique_users"))

gold_df.write.format("delta").mode("overwrite") \

.save("s3a://datalake/gold/daily_metrics")

# Time Travel

df_v0 = spark.read.format("delta").option("versionAsOf", 0) \

.load("s3a://datalake/silver/events")

# MERGE (Upsert)

delta_table = DeltaTable.forPath(spark, "s3a://datalake/silver/users")

delta_table.alias("target").merge(

new_data.alias("source"),

"target.user_id = source.user_id"

).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

@dataclass

class PipelineLayer:

layer: str

data: str

operations: str

schedule: str

retention: str

layers = [

PipelineLayer("Bronze (Raw)",

"Raw Data as-is จาก Source",

"Ingest Append Only ไม่แปลง",

"Real-time (Streaming) หรือ ทุก 15 นาที",

"90 วัน (VACUUM)"),

PipelineLayer("Silver (Cleaned)",

"Cleaned Validated Deduplicated",

"dropna dropDuplicates Cast Schema Enforce",

"ทุก 1 ชั่วโมง หรือ Triggered",

"1 ปี"),

PipelineLayer("Gold (Aggregated)",

"Business Metrics KPI Summary",

"groupBy agg join Materialized View",

"ทุกวัน หรือ ทุก 1 ชั่วโมง",

"ถาวร"),

]

print("=== Pipeline Layers ===")

for l in layers:

print(f" [{l.layer}] Data: {l.data}")

print(f" Ops: {l.operations}")

print(f" Schedule: {l.schedule}")

print(f" Retention: {l.retention}")

Optimization & Maintenance

# === Delta Lake Maintenance ===

# # OPTIMIZE - Compact small files
# spark.sql("OPTIMIZE delta.`s3a://datalake/silver/events`")
#
# # OPTIMIZE with ZORDER
# spark.sql("""
#   OPTIMIZE delta.`s3a://datalake/silver/events`
#   ZORDER BY (event_date, user_id)
# """)
#
# # VACUUM - Remove old files
# spark.sql("VACUUM delta.`s3a://datalake/bronze/events` RETAIN 168 HOURS")
#
# # DESCRIBE HISTORY
# spark.sql("DESCRIBE HISTORY delta.`s3a://datalake/silver/events`").show()

@dataclass
class MaintenanceTask:
    task: str
    command: str
    schedule: str
    benefit: str
    warning: str

tasks = [
    MaintenanceTask("OPTIMIZE",
        "OPTIMIZE table ZORDER BY (col1, col2)",
        "ทุกวัน หลัง Batch Ingest",
        "Compact Small Files ลด Read Time 50-80%",
        "ใช้ Resource มาก รันตอน Off-peak"),
    MaintenanceTask("VACUUM",
        "VACUUM table RETAIN 168 HOURS",
        "ทุกสัปดาห์",
        "ลบไฟล์เก่า ลด Storage Cost",
        "ต้อง > 7 วัน ไม่งั้น Time Travel ใช้ไม่ได้"),
    MaintenanceTask("ANALYZE TABLE",
        "ANALYZE TABLE table COMPUTE STATISTICS",
        "หลัง Large Ingest",
        "อัพเดท Statistics สำหรับ Query Optimizer",
        "ใช้เวลานานสำหรับ Table ใหญ่"),
    MaintenanceTask("Schema Check",
        "DESCRIBE TABLE table",
        "หลัง Schema Evolution",
        "ตรวจ Schema ถูกต้อง",
        "Schema Evolution ต้อง Compatible"),
    MaintenanceTask("Data Quality",
        "ALTER TABLE ADD CONSTRAINT",
        "เมื่อสร้าง Table",
        "ป้องกัน Bad Data เข้า Table",
        "Constraint ทำ Write ช้าลงเล็กน้อย"),
]

print("=== Maintenance Tasks ===")
for t in tasks:
    print(f"  [{t.task}] {t.command}")
    print(f"    Schedule: {t.schedule}")
    print(f"    Benefit: {t.benefit}")
    print(f"    Warning: {t.warning}")

เคล็ดลับ

  • SSD: ใช้ SSD สำหรับ MinIO + Spark Shuffle ดีกว่า HDD มาก
  • ZORDER: ใช้ ZORDER กับ Column ที่ Filter บ่อย
  • Partition: Partition ตาม Date สำหรับ Time Series Data
  • VACUUM: ตั้ง VACUUM ทุกสัปดาห์ ลด Storage
  • Docker: เริ่ม Docker Compose ง่ายที่สุดสำหรับ Home Lab

Delta Lake คืออะไร

Open Source Storage Layer Lakehouse ACID Parquet Transaction Log Time Travel Schema Enforcement MERGE OPTIMIZE VACUUM Spark Databricks