Technology

Data Lakehouse Site Reliability SRE

data lakehouse site reliability sre
Data Lakehouse Site Reliability SRE | SiamCafe Blog
2025-09-10· อ. บอม — SiamCafe.net· 10,068 คำ

Data Lakehouse Architecture

Data Lakehouse รวมข้อดีของ Data Lake กับ Data Warehouse ใช้ Open Table Formats เช่น Delta Lake เก็บข้อมูลบน Object Storage มี ACID Transactions, Schema Enforcement และ SQL Query Performance

SRE Practices ช่วยให้ Data Lakehouse เสถียรและน่าเชื่อถือ กำหนด SLO สำหรับ Data Freshness, Quality และ Availability วัดด้วย SLI ใช้ Error Budget ตัดสินใจ

Data Lakehouse ด้วย Delta Lake

# lakehouse_setup.py — Data Lakehouse ด้วย Delta Lake + PySpark
# pip install pyspark delta-spark

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, current_timestamp, lit, when, count,
    sum as spark_sum, avg, max as spark_max,
)
from pyspark.sql.types import *
from delta import configure_spark_with_delta_pip
from datetime import datetime, timedelta

# === Spark Session กับ Delta Lake ===
builder = SparkSession.builder \
    .appName("DataLakehouse") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

# === Bronze Layer (Raw Data) ===
def ingest_to_bronze(source_path, table_name):
    """Ingest Raw Data ไป Bronze Layer"""
    df = spark.read.format("json").load(source_path)
    df = df.withColumn("_ingested_at", current_timestamp()) \
           .withColumn("_source", lit(source_path))

    df.write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .save(f"s3://lakehouse/bronze/{table_name}")

    print(f"Bronze: {table_name} — {df.count()} rows ingested")

# === Silver Layer (Cleaned Data) ===
def transform_to_silver(bronze_table, silver_table):
    """Clean และ Transform Data ไป Silver Layer"""
    bronze_df = spark.read.format("delta").load(f"s3://lakehouse/bronze/{bronze_table}")

    # Data Quality Rules
    silver_df = bronze_df \
        .filter(col("id").isNotNull()) \
        .filter(col("email").rlike(r"^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$")) \
        .dropDuplicates(["id"]) \
        .withColumn("_cleaned_at", current_timestamp())

    # Upsert (Merge) ไป Silver
    from delta.tables import DeltaTable

    silver_path = f"s3://lakehouse/silver/{silver_table}"

    if DeltaTable.isDeltaTable(spark, silver_path):
        delta_table = DeltaTable.forPath(spark, silver_path)
        delta_table.alias("target") \
            .merge(silver_df.alias("source"), "target.id = source.id") \
            .whenMatchedUpdateAll() \
            .whenNotMatchedInsertAll() \
            .execute()
    else:
        silver_df.write.format("delta").save(silver_path)

    print(f"Silver: {silver_table} — {silver_df.count()} rows")

# === Gold Layer (Business Aggregations) ===
def aggregate_to_gold():
    """Aggregate Data สำหรับ Business Use Cases"""
    users = spark.read.format("delta").load("s3://lakehouse/silver/users")
    orders = spark.read.format("delta").load("s3://lakehouse/silver/orders")

    # Daily Revenue
    daily_revenue = orders \
        .groupBy("order_date") \
        .agg(
            spark_sum("amount").alias("total_revenue"),
            count("*").alias("order_count"),
            avg("amount").alias("avg_order_value"),
        )

    daily_revenue.write.format("delta") \
        .mode("overwrite") \
        .save("s3://lakehouse/gold/daily_revenue")

    # User Segments
    user_segments = orders \
        .groupBy("user_id") \
        .agg(
            spark_sum("amount").alias("total_spent"),
            count("*").alias("order_count"),
            spark_max("order_date").alias("last_order"),
        ) \
        .withColumn("segment", when(col("total_spent") > 10000, "VIP")
                    .when(col("total_spent") > 5000, "Premium")
                    .otherwise("Standard"))

    user_segments.write.format("delta") \
        .mode("overwrite") \
        .save("s3://lakehouse/gold/user_segments")

    print("Gold Layer aggregations complete")

# === Time Travel ===
def query_historical(table_path, version=None, timestamp=None):
    """Query Historical Data ด้วย Time Travel"""
    reader = spark.read.format("delta")
    if version is not None:
        reader = reader.option("versionAsOf", version)
    elif timestamp is not None:
        reader = reader.option("timestampAsOf", timestamp)
    return reader.load(table_path)

# ตัวอย่าง
# ingest_to_bronze("/data/users/*.json", "users")
# transform_to_silver("users", "users")
# aggregate_to_gold()
# old_data = query_historical("s3://lakehouse/silver/users", version=5)

SRE Monitoring สำหรับ Data Lakehouse

# lakehouse_sre.py — SRE Monitoring สำหรับ Data Pipelines
import time
from datetime import datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum

class SLOStatus(Enum):
    HEALTHY = "healthy"
    WARNING = "warning"
    CRITICAL = "critical"
    BREACHED = "breached"

@dataclass
class SLI:
    name: str
    description: str
    current_value: float
    target: float
    unit: str = "%"

    @property
    def status(self):
        ratio = self.current_value / self.target
        if ratio >= 1.0:
            return SLOStatus.HEALTHY
        elif ratio >= 0.95:
            return SLOStatus.WARNING
        elif ratio >= 0.90:
            return SLOStatus.CRITICAL
        return SLOStatus.BREACHED

@dataclass
class ErrorBudget:
    slo_target: float  # เช่น 99.5
    window_days: int = 30
    total_minutes: int = 0
    error_minutes: int = 0

    @property
    def budget_total(self):
        return self.total_minutes * (1 - self.slo_target / 100)

    @property
    def budget_remaining(self):
        return max(0, self.budget_total - self.error_minutes)

    @property
    def budget_pct(self):
        if self.budget_total == 0:
            return 0
        return self.budget_remaining / self.budget_total * 100

class DataLakehouseSRE:
    """SRE Dashboard สำหรับ Data Lakehouse"""

    def __init__(self):
        self.slis: List[SLI] = []
        self.error_budgets: Dict[str, ErrorBudget] = {}
        self.incidents: List[Dict] = []

    def add_sli(self, sli: SLI):
        self.slis.append(sli)

    def add_error_budget(self, name: str, budget: ErrorBudget):
        self.error_budgets[name] = budget

    def record_incident(self, title, severity, duration_min, impact):
        self.incidents.append({
            "title": title,
            "severity": severity,
            "duration_min": duration_min,
            "impact": impact,
            "timestamp": datetime.now().isoformat(),
        })

    def dashboard(self):
        """แสดง SRE Dashboard"""
        print(f"\n{'='*60}")
        print(f"Data Lakehouse SRE Dashboard")
        print(f"{'='*60}")

        # SLIs
        print(f"\n  Service Level Indicators:")
        for sli in self.slis:
            status = sli.status.value.upper()
            bar = "█" * int(sli.current_value / sli.target * 20)
            print(f"    [{status:>8}] {sli.name:<30} "
                  f"{sli.current_value:.1f}/{sli.target:.1f}{sli.unit} {bar}")

        # Error Budgets
        print(f"\n  Error Budgets (30-day window):")
        for name, budget in self.error_budgets.items():
            pct = budget.budget_pct
            status = "OK" if pct > 20 else ("WARN" if pct > 0 else "EXHAUSTED")
            print(f"    [{status:>9}] {name:<25} "
                  f"Remaining: {budget.budget_remaining:.0f}/{budget.budget_total:.0f} min "
                  f"({pct:.0f}%)")

        # Recent Incidents
        if self.incidents:
            print(f"\n  Recent Incidents:")
            for inc in self.incidents[-5:]:
                print(f"    [{inc['severity']:>8}] {inc['title']} "
                      f"({inc['duration_min']} min)")

    def should_freeze_releases(self):
        """ตรวจสอบว่าควร Freeze Releases หรือไม่"""
        for name, budget in self.error_budgets.items():
            if budget.budget_pct <= 0:
                print(f"\n  RELEASE FREEZE: {name} error budget exhausted!")
                return True
        return False

# === ตัวอย่าง ===
sre = DataLakehouseSRE()

# SLIs
sre.add_sli(SLI("Data Freshness", "Data พร้อมใช้ภายใน 15 นาที", 99.2, 99.5, "%"))
sre.add_sli(SLI("Data Quality", "Records ผ่าน Validation", 99.95, 99.9, "%"))
sre.add_sli(SLI("Pipeline Availability", "ไม่มี Failed Jobs", 99.8, 99.5, "%"))
sre.add_sli(SLI("Query Latency P95", "Query ต่ำกว่า 5 วินาที", 4.2, 5.0, "s"))

# Error Budgets
sre.add_error_budget("Data Freshness", ErrorBudget(
    slo_target=99.5, total_minutes=43200, error_minutes=180))
sre.add_error_budget("Pipeline Availability", ErrorBudget(
    slo_target=99.5, total_minutes=43200, error_minutes=50))

# Incidents
sre.record_incident("Bronze ingestion delay", "medium", 25, "Data 25 min late")
sre.record_incident("Silver transform OOM", "high", 45, "3 tables affected")

sre.dashboard()
sre.should_freeze_releases()

Data Quality Checks

# data_quality.py — Data Quality Framework
# pip install great-expectations

from dataclasses import dataclass
from typing import List, Callable, Any
from datetime import datetime

@dataclass
class QualityCheck:
    name: str
    table: str
    check_fn: Callable
    severity: str  # critical, warning, info
    description: str = ""

@dataclass
class QualityResult:
    check_name: str
    table: str
    passed: bool
    details: str
    timestamp: str
    severity: str

class DataQualityFramework:
    """Data Quality Framework สำหรับ Lakehouse"""

    def __init__(self):
        self.checks: List[QualityCheck] = []
        self.results: List[QualityResult] = []

    def add_check(self, check: QualityCheck):
        self.checks.append(check)

    def run_all(self):
        """รัน Quality Checks ทั้งหมด"""
        self.results = []
        for check in self.checks:
            try:
                passed, details = check.check_fn()
                self.results.append(QualityResult(
                    check_name=check.name,
                    table=check.table,
                    passed=passed,
                    details=details,
                    timestamp=datetime.now().isoformat(),
                    severity=check.severity,
                ))
            except Exception as e:
                self.results.append(QualityResult(
                    check_name=check.name,
                    table=check.table,
                    passed=False,
                    details=f"Error: {str(e)}",
                    timestamp=datetime.now().isoformat(),
                    severity="critical",
                ))

    def report(self):
        """แสดง Quality Report"""
        passed = sum(1 for r in self.results if r.passed)
        failed = len(self.results) - passed

        print(f"\n{'='*60}")
        print(f"Data Quality Report — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        print(f"{'='*60}")
        print(f"  Total: {len(self.results)} | Passed: {passed} | Failed: {failed}")

        if failed > 0:
            print(f"\n  Failed Checks:")
            for r in self.results:
                if not r.passed:
                    print(f"    [{r.severity:>8}] {r.table}.{r.check_name}: {r.details}")

        score = passed / len(self.results) * 100 if self.results else 0
        print(f"\n  Quality Score: {score:.0f}%")

# ตัวอย่าง Checks
dq = DataQualityFramework()

dq.add_check(QualityCheck(
    name="not_null_id", table="users",
    check_fn=lambda: (True, "0 null IDs"),
    severity="critical",
))
dq.add_check(QualityCheck(
    name="valid_email", table="users",
    check_fn=lambda: (True, "99.8% valid emails"),
    severity="warning",
))
dq.add_check(QualityCheck(
    name="freshness", table="orders",
    check_fn=lambda: (False, "Data 25 min late (SLO: 15 min)"),
    severity="critical",
))
dq.add_check(QualityCheck(
    name="row_count", table="orders",
    check_fn=lambda: (True, "15,234 rows (expected: 10,000+)"),
    severity="warning",
))

dq.run_all()
dq.report()

Best Practices

Data Lakehouse คืออะไร

Architecture รวมข้อดี Data Lake (เก็บข้อมูลดิบทุกประเภท ราคาถูก) กับ Data Warehouse (SQL เร็ว ACID Schema) ใช้ Delta Lake Iceberg Hudi เก็บบน Object Storage S3 GCS

SRE คืออะไร

Site Reliability Engineering จาก Google ใช้ Software Engineering แก้ปัญหา Operations กำหนด SLO วัดด้วย SLI ใช้ Error Budget ตัดสินใจ Release Feature หรือ Fix Reliability

SLO สำหรับ Data Pipeline ตั้งอย่างไร

ตาม Data Consumers เช่น Freshness 99.5% พร้อมใน 15 นาที Quality 99.9% ผ่าน Validation Availability 99.5% ไม่มี Failed Jobs Query Latency P95 ต่ำกว่า 5 วินาที

Delta Lake คืออะไร

Open-source Storage Layer เพิ่ม ACID Transactions Schema Enforcement Time Travel Data Versioning บน Data Lake ทำงานบน Spark ใช้ Parquet Files รองรับ Upsert Delete Merge

สรุป

Data Lakehouse ร่วมกับ SRE Practices ให้ Data Platform ที่เสถียรและน่าเชื่อถือ ใช้ Medallion Architecture แยก Layers Delta Lake สำหรับ ACID Transactions SLO สำหรับ Freshness Quality Availability Error Budget ตัดสินใจ Data Quality Gates ตรวจสอบก่อน Promote Time Travel สำหรับ Debug

📖 บทความที่เกี่ยวข้อง

Data Lakehouse Network Segmentationอ่านบทความ → Azure Service Bus Site Reliability SREอ่านบทความ → Shopify Hydrogen Site Reliability SREอ่านบทความ → Data Lakehouse DevOps Cultureอ่านบทความ → Directus CMS Site Reliability SREอ่านบทความ →

📚 ดูบทความทั้งหมด →