Data Lakehouse Architecture
Data Lakehouse รวมข้อดีของ Data Lake กับ Data Warehouse ใช้ Open Table Formats เช่น Delta Lake เก็บข้อมูลบน Object Storage มี ACID Transactions, Schema Enforcement และ SQL Query Performance
SRE Practices ช่วยให้ Data Lakehouse เสถียรและน่าเชื่อถือ กำหนด SLO สำหรับ Data Freshness, Quality และ Availability วัดด้วย SLI ใช้ Error Budget ตัดสินใจ
Data Lakehouse ด้วย Delta Lake
# lakehouse_setup.py — Data Lakehouse ด้วย Delta Lake + PySpark
# pip install pyspark delta-spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
col, current_timestamp, lit, when, count,
sum as spark_sum, avg, max as spark_max,
)
from pyspark.sql.types import *
from delta import configure_spark_with_delta_pip
from datetime import datetime, timedelta
# === Spark Session กับ Delta Lake ===
builder = SparkSession.builder \
.appName("DataLakehouse") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
# === Bronze Layer (Raw Data) ===
def ingest_to_bronze(source_path, table_name):
"""Ingest Raw Data ไป Bronze Layer"""
df = spark.read.format("json").load(source_path)
df = df.withColumn("_ingested_at", current_timestamp()) \
.withColumn("_source", lit(source_path))
df.write.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(f"s3://lakehouse/bronze/{table_name}")
print(f"Bronze: {table_name} — {df.count()} rows ingested")
# === Silver Layer (Cleaned Data) ===
def transform_to_silver(bronze_table, silver_table):
"""Clean และ Transform Data ไป Silver Layer"""
bronze_df = spark.read.format("delta").load(f"s3://lakehouse/bronze/{bronze_table}")
# Data Quality Rules
silver_df = bronze_df \
.filter(col("id").isNotNull()) \
.filter(col("email").rlike(r"^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$")) \
.dropDuplicates(["id"]) \
.withColumn("_cleaned_at", current_timestamp())
# Upsert (Merge) ไป Silver
from delta.tables import DeltaTable
silver_path = f"s3://lakehouse/silver/{silver_table}"
if DeltaTable.isDeltaTable(spark, silver_path):
delta_table = DeltaTable.forPath(spark, silver_path)
delta_table.alias("target") \
.merge(silver_df.alias("source"), "target.id = source.id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
silver_df.write.format("delta").save(silver_path)
print(f"Silver: {silver_table} — {silver_df.count()} rows")
# === Gold Layer (Business Aggregations) ===
def aggregate_to_gold():
"""Aggregate Data สำหรับ Business Use Cases"""
users = spark.read.format("delta").load("s3://lakehouse/silver/users")
orders = spark.read.format("delta").load("s3://lakehouse/silver/orders")
# Daily Revenue
daily_revenue = orders \
.groupBy("order_date") \
.agg(
spark_sum("amount").alias("total_revenue"),
count("*").alias("order_count"),
avg("amount").alias("avg_order_value"),
)
daily_revenue.write.format("delta") \
.mode("overwrite") \
.save("s3://lakehouse/gold/daily_revenue")
# User Segments
user_segments = orders \
.groupBy("user_id") \
.agg(
spark_sum("amount").alias("total_spent"),
count("*").alias("order_count"),
spark_max("order_date").alias("last_order"),
) \
.withColumn("segment", when(col("total_spent") > 10000, "VIP")
.when(col("total_spent") > 5000, "Premium")
.otherwise("Standard"))
user_segments.write.format("delta") \
.mode("overwrite") \
.save("s3://lakehouse/gold/user_segments")
print("Gold Layer aggregations complete")
# === Time Travel ===
def query_historical(table_path, version=None, timestamp=None):
"""Query Historical Data ด้วย Time Travel"""
reader = spark.read.format("delta")
if version is not None:
reader = reader.option("versionAsOf", version)
elif timestamp is not None:
reader = reader.option("timestampAsOf", timestamp)
return reader.load(table_path)
# ตัวอย่าง
# ingest_to_bronze("/data/users/*.json", "users")
# transform_to_silver("users", "users")
# aggregate_to_gold()
# old_data = query_historical("s3://lakehouse/silver/users", version=5)
SRE Monitoring สำหรับ Data Lakehouse
# lakehouse_sre.py — SRE Monitoring สำหรับ Data Pipelines
import time
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
class SLOStatus(Enum):
HEALTHY = "healthy"
WARNING = "warning"
CRITICAL = "critical"
BREACHED = "breached"
@dataclass
class SLI:
name: str
description: str
current_value: float
target: float
unit: str = "%"
@property
def status(self):
ratio = self.current_value / self.target
if ratio >= 1.0:
return SLOStatus.HEALTHY
elif ratio >= 0.95:
return SLOStatus.WARNING
elif ratio >= 0.90:
return SLOStatus.CRITICAL
return SLOStatus.BREACHED
@dataclass
class ErrorBudget:
slo_target: float # เช่น 99.5
window_days: int = 30
total_minutes: int = 0
error_minutes: int = 0
@property
def budget_total(self):
return self.total_minutes * (1 - self.slo_target / 100)
@property
def budget_remaining(self):
return max(0, self.budget_total - self.error_minutes)
@property
def budget_pct(self):
if self.budget_total == 0:
return 0
return self.budget_remaining / self.budget_total * 100
class DataLakehouseSRE:
"""SRE Dashboard สำหรับ Data Lakehouse"""
def __init__(self):
self.slis: List[SLI] = []
self.error_budgets: Dict[str, ErrorBudget] = {}
self.incidents: List[Dict] = []
def add_sli(self, sli: SLI):
self.slis.append(sli)
def add_error_budget(self, name: str, budget: ErrorBudget):
self.error_budgets[name] = budget
def record_incident(self, title, severity, duration_min, impact):
self.incidents.append({
"title": title,
"severity": severity,
"duration_min": duration_min,
"impact": impact,
"timestamp": datetime.now().isoformat(),
})
def dashboard(self):
"""แสดง SRE Dashboard"""
print(f"\n{'='*60}")
print(f"Data Lakehouse SRE Dashboard")
print(f"{'='*60}")
# SLIs
print(f"\n Service Level Indicators:")
for sli in self.slis:
status = sli.status.value.upper()
bar = "█" * int(sli.current_value / sli.target * 20)
print(f" [{status:>8}] {sli.name:<30} "
f"{sli.current_value:.1f}/{sli.target:.1f}{sli.unit} {bar}")
# Error Budgets
print(f"\n Error Budgets (30-day window):")
for name, budget in self.error_budgets.items():
pct = budget.budget_pct
status = "OK" if pct > 20 else ("WARN" if pct > 0 else "EXHAUSTED")
print(f" [{status:>9}] {name:<25} "
f"Remaining: {budget.budget_remaining:.0f}/{budget.budget_total:.0f} min "
f"({pct:.0f}%)")
# Recent Incidents
if self.incidents:
print(f"\n Recent Incidents:")
for inc in self.incidents[-5:]:
print(f" [{inc['severity']:>8}] {inc['title']} "
f"({inc['duration_min']} min)")
def should_freeze_releases(self):
"""ตรวจสอบว่าควร Freeze Releases หรือไม่"""
for name, budget in self.error_budgets.items():
if budget.budget_pct <= 0:
print(f"\n RELEASE FREEZE: {name} error budget exhausted!")
return True
return False
# === ตัวอย่าง ===
sre = DataLakehouseSRE()
# SLIs
sre.add_sli(SLI("Data Freshness", "Data พร้อมใช้ภายใน 15 นาที", 99.2, 99.5, "%"))
sre.add_sli(SLI("Data Quality", "Records ผ่าน Validation", 99.95, 99.9, "%"))
sre.add_sli(SLI("Pipeline Availability", "ไม่มี Failed Jobs", 99.8, 99.5, "%"))
sre.add_sli(SLI("Query Latency P95", "Query ต่ำกว่า 5 วินาที", 4.2, 5.0, "s"))
# Error Budgets
sre.add_error_budget("Data Freshness", ErrorBudget(
slo_target=99.5, total_minutes=43200, error_minutes=180))
sre.add_error_budget("Pipeline Availability", ErrorBudget(
slo_target=99.5, total_minutes=43200, error_minutes=50))
# Incidents
sre.record_incident("Bronze ingestion delay", "medium", 25, "Data 25 min late")
sre.record_incident("Silver transform OOM", "high", 45, "3 tables affected")
sre.dashboard()
sre.should_freeze_releases()
Data Quality Checks
# data_quality.py — Data Quality Framework
# pip install great-expectations
from dataclasses import dataclass
from typing import List, Callable, Any
from datetime import datetime
@dataclass
class QualityCheck:
name: str
table: str
check_fn: Callable
severity: str # critical, warning, info
description: str = ""
@dataclass
class QualityResult:
check_name: str
table: str
passed: bool
details: str
timestamp: str
severity: str
class DataQualityFramework:
"""Data Quality Framework สำหรับ Lakehouse"""
def __init__(self):
self.checks: List[QualityCheck] = []
self.results: List[QualityResult] = []
def add_check(self, check: QualityCheck):
self.checks.append(check)
def run_all(self):
"""รัน Quality Checks ทั้งหมด"""
self.results = []
for check in self.checks:
try:
passed, details = check.check_fn()
self.results.append(QualityResult(
check_name=check.name,
table=check.table,
passed=passed,
details=details,
timestamp=datetime.now().isoformat(),
severity=check.severity,
))
except Exception as e:
self.results.append(QualityResult(
check_name=check.name,
table=check.table,
passed=False,
details=f"Error: {str(e)}",
timestamp=datetime.now().isoformat(),
severity="critical",
))
def report(self):
"""แสดง Quality Report"""
passed = sum(1 for r in self.results if r.passed)
failed = len(self.results) - passed
print(f"\n{'='*60}")
print(f"Data Quality Report — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*60}")
print(f" Total: {len(self.results)} | Passed: {passed} | Failed: {failed}")
if failed > 0:
print(f"\n Failed Checks:")
for r in self.results:
if not r.passed:
print(f" [{r.severity:>8}] {r.table}.{r.check_name}: {r.details}")
score = passed / len(self.results) * 100 if self.results else 0
print(f"\n Quality Score: {score:.0f}%")
# ตัวอย่าง Checks
dq = DataQualityFramework()
dq.add_check(QualityCheck(
name="not_null_id", table="users",
check_fn=lambda: (True, "0 null IDs"),
severity="critical",
))
dq.add_check(QualityCheck(
name="valid_email", table="users",
check_fn=lambda: (True, "99.8% valid emails"),
severity="warning",
))
dq.add_check(QualityCheck(
name="freshness", table="orders",
check_fn=lambda: (False, "Data 25 min late (SLO: 15 min)"),
severity="critical",
))
dq.add_check(QualityCheck(
name="row_count", table="orders",
check_fn=lambda: (True, "15,234 rows (expected: 10,000+)"),
severity="warning",
))
dq.run_all()
dq.report()
Best Practices
- Medallion Architecture: ใช้ Bronze-Silver-Gold Layers แยก Raw, Cleaned, Aggregated Data
- ACID Transactions: ใช้ Delta Lake/Iceberg สำหรับ ACID Transactions บน Object Storage
- SLO-based Monitoring: กำหนด SLO สำหรับ Freshness, Quality, Availability วัดด้วย SLI
- Error Budget: ใช้ Error Budget ตัดสินใจ Feature vs Reliability
- Data Quality Gates: ตรวจสอบ Quality ก่อน Promote ข้อมูลไป Layer ถัดไป
- Time Travel: ใช้ Time Travel สำหรับ Debug และ Rollback Data Issues
Data Lakehouse คืออะไร
Architecture รวมข้อดี Data Lake (เก็บข้อมูลดิบทุกประเภท ราคาถูก) กับ Data Warehouse (SQL เร็ว ACID Schema) ใช้ Delta Lake Iceberg Hudi เก็บบน Object Storage S3 GCS
SRE คืออะไร
Site Reliability Engineering จาก Google ใช้ Software Engineering แก้ปัญหา Operations กำหนด SLO วัดด้วย SLI ใช้ Error Budget ตัดสินใจ Release Feature หรือ Fix Reliability
SLO สำหรับ Data Pipeline ตั้งอย่างไร
ตาม Data Consumers เช่น Freshness 99.5% พร้อมใน 15 นาที Quality 99.9% ผ่าน Validation Availability 99.5% ไม่มี Failed Jobs Query Latency P95 ต่ำกว่า 5 วินาที
Delta Lake คืออะไร
Open-source Storage Layer เพิ่ม ACID Transactions Schema Enforcement Time Travel Data Versioning บน Data Lake ทำงานบน Spark ใช้ Parquet Files รองรับ Upsert Delete Merge
สรุป
Data Lakehouse ร่วมกับ SRE Practices ให้ Data Platform ที่เสถียรและน่าเชื่อถือ ใช้ Medallion Architecture แยก Layers Delta Lake สำหรับ ACID Transactions SLO สำหรับ Freshness Quality Availability Error Budget ตัดสินใจ Data Quality Gates ตรวจสอบก่อน Promote Time Travel สำหรับ Debug
