ai

Data Lakehouse กับ Site Reliability Engineering

Data Lakehouse กับ Site Reliability Engineering

Data Lakehouse Architecture

Data Lakehouse กับ Site Reliability Engineering

Data Lakehouse รวมข้อดีของ Data Lake กับ Data Warehouse ใช้ Open Table Formats เช่น Delta Lake เก็บข้อมูลบน Object Storage มี ACID Transactions, Schema Enforcement และ SQL Query Performance

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Pulumi IaC Community Building

SRE Practices ช่วยให้ Data Lakehouse เสถียรและน่าเชื่อถือ กำหนด SLO สำหรับ Data Freshness, Quality และ Availability วัดด้วย SLI ใช้ Error Budget ตัดสินใจ

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Vue Composition API Automation Script

Data Lakehouse ด้วย Delta Lake

# lakehouse_setup.py — Data Lakehouse ด้วย Delta Lake + PySpark

# pip install pyspark delta-spark



from pyspark.sql import SparkSession

from pyspark.sql.functions import (

    col, current_timestamp, lit, when, count,

    sum as spark_sum, avg, max as spark_max,

)

from pyspark.sql.types import *

from delta import configure_spark_with_delta_pip

from datetime import datetime, timedelta



# === Spark Session กับ Delta Lake ===

builder = SparkSession.builder \

    .appName("DataLakehouse") \

    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \

    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \

    .config("spark.databricks.delta.schema.autoMerge.enabled", "true")



spark = configure_spark_with_delta_pip(builder).getOrCreate()



# === Bronze Layer (Raw Data) ===

def ingest_to_bronze(source_path, table_name):

    """Ingest Raw Data ไป Bronze Layer"""

    df = spark.read.format("json").load(source_path)

    df = df.withColumn("_ingested_at", current_timestamp()) \

           .withColumn("_source", lit(source_path))



    df.write.format("delta") \

        .mode("append") \

        .option("mergeSchema", "true") \

        .save(f"s3://lakehouse/bronze/{table_name}")



    print(f"Bronze: {table_name} — {df.count()} rows ingested")



# === Silver Layer (Cleaned Data) ===

def transform_to_silver(bronze_table, silver_table):

    """Clean และ Transform Data ไป Silver Layer"""

    bronze_df = spark.read.format("delta").load(f"s3://lakehouse/bronze/{bronze_table}")



    # Data Quality Rules

    silver_df = bronze_df \

        .filter(col("id").isNotNull()) \

        .filter(col("email").rlike(r"^[a-zA-Z0-9+_.-]+@[a-zA-Z0-9.-]+$")) \

        .dropDuplicates(["id"]) \

        .withColumn("_cleaned_at", current_timestamp())



    # Upsert (Merge) ไป Silver

    from delta.tables import DeltaTable



    silver_path = f"s3://lakehouse/silver/{silver_table}"



    if DeltaTable.isDeltaTable(spark, silver_path):

        delta_table = DeltaTable.forPath(spark, silver_path)

        delta_table.alias("target") \

            .merge(silver_df.alias("source"), "target.id = source.id") \

            .whenMatchedUpdateAll() \

            .whenNotMatchedInsertAll() \

            .execute()

    else:

        silver_df.write.format("delta").save(silver_path)



    print(f"Silver: {silver_table} — {silver_df.count()} rows")



# === Gold Layer (Business Aggregations) ===

def aggregate_to_gold():

    """Aggregate Data สำหรับ Business Use Cases"""

    users = spark.read.format("delta").load("s3://lakehouse/silver/users")

    orders = spark.read.format("delta").load("s3://lakehouse/silver/orders")



    # Daily Revenue

    daily_revenue = orders \

        .groupBy("order_date") \

        .agg(

            spark_sum("amount").alias("total_revenue"),

            count("*").alias("order_count"),

            avg("amount").alias("avg_order_value"),

        )



    daily_revenue.write.format("delta") \

        .mode("overwrite") \

        .save("s3://lakehouse/gold/daily_revenue")



    # User Segments

    user_segments = orders \

        .groupBy("user_id") \

        .agg(

            spark_sum("amount").alias("total_spent"),

            count("*").alias("order_count"),

            spark_max("order_date").alias("last_order"),

        ) \

        .withColumn("segment", when(col("total_spent") > 10000, "VIP")

                    .when(col("total_spent") > 5000, "Premium")

                    .otherwise("Standard"))



    user_segments.write.format("delta") \

        .mode("overwrite") \

        .save("s3://lakehouse/gold/user_segments")



    print("Gold Layer aggregations complete")



# === Time Travel ===

def query_historical(table_path, version=None, timestamp=None):

    """Query Historical Data ด้วย Time Travel"""

    reader = spark.read.format("delta")

    if version is not None:

        reader = reader.option("versionAsOf", version)

    elif timestamp is not None:

        reader = reader.option("timestampAsOf", timestamp)

    return reader.load(table_path)



# ตัวอย่าง

# ingest_to_bronze("/data/users/*.json", "users")

# transform_to_silver("users", "users")

# aggregate_to_gold()

# old_data = query_historical("s3://lakehouse/silver/users", version=5)

SRE Monitoring สำหรับ Data Lakehouse

Data Lakehouse กับ Site Reliability Engineering
# lakehouse_sre.py — SRE Monitoring สำหรับ Data Pipelines

import time

from datetime import datetime, timedelta

from dataclasses import dataclass, field

from typing import List, Dict, Optional

from enum import Enum



class SLOStatus(Enum):

    HEALTHY = "healthy"

    WARNING = "warning"

    CRITICAL = "critical"

    BREACHED = "breached"



@dataclass

class SLI:

    name: str

    description: str

    current_value: float

    target: float

    unit: str = "%"



    @property

    def status(self):

        ratio = self.current_value / self.target

        if ratio >= 1.0:

            return SLOStatus.HEALTHY

        elif ratio >= 0.95:

            return SLOStatus.WARNING

        elif ratio >= 0.90:

            return SLOStatus.CRITICAL

        return SLOStatus.BREACHED



@dataclass

class ErrorBudget:

    slo_target: float  # เช่น 99.5

    window_days: int = 30

    total_minutes: int = 0

    error_minutes: int = 0



    @property

    def budget_total(self):

        return self.total_minutes * (1 - self.slo_target / 100)



    @property

    def budget_remaining(self):

        return max(0, self.budget_total - self.error_minutes)



    @property

    def budget_pct(self):

        if self.budget_total == 0:

            return 0

        return self.budget_remaining / self.budget_total * 100



class DataLakehouseSRE:

    """SRE Dashboard สำหรับ Data Lakehouse"""



    def __init__(self):

        self.slis: List[SLI] = []

        self.error_budgets: Dict[str, ErrorBudget] = {}

        self.incidents: List[Dict] = []



    def add_sli(self, sli: SLI):

        self.slis.append(sli)



    def add_error_budget(self, name: str, budget: ErrorBudget):

        self.error_budgets[name] = budget



    def record_incident(self, title, severity, duration_min, impact):

        self.incidents.append({

            "title": title,

            "severity": severity,

            "duration_min": duration_min,

            "impact": impact,

            "timestamp": datetime.now().isoformat(),

        })



    def dashboard(self):

        """แสดง SRE Dashboard"""

        print(f"\n{'='*60}")

        print(f"Data Lakehouse SRE Dashboard")

        print(f"{'='*60}")



        # SLIs

        print(f"\n  Service Level Indicators:")

        for sli in self.slis:

            status = sli.status.value.upper()

            bar = "█" * int(sli.current_value / sli.target * 20)

            print(f"    [{status:>8}] {sli.name:<30} "

                  f"{sli.current_value:.1f}/{sli.target:.1f}{sli.unit} {bar}")



        # Error Budgets

        print(f"\n  Error Budgets (30-day window):")

        for name, budget in self.error_budgets.items():

            pct = budget.budget_pct

            status = "OK" if pct > 20 else ("WARN" if pct > 0 else "EXHAUSTED")

            print(f"    [{status:>9}] {name:<25} "

                  f"Remaining: {budget.budget_remaining:.0f}/{budget.budget_total:.0f} min "

                  f"({pct:.0f}%)")



        # Recent Incidents

        if self.incidents:

            print(f"\n  Recent Incidents:")

            for inc in self.incidents[-5:]:

                print(f"    [{inc['severity']:>8}] {inc['title']} "

                      f"({inc['duration_min']} min)")



    def should_freeze_releases(self):

        """ตรวจสอบว่าควร Freeze Releases หรือไม่"""

        for name, budget in self.error_budgets.items():

            if budget.budget_pct <= 0:

                print(f"\n  RELEASE FREEZE: {name} error budget exhausted!")

                return True

        return False



# === ตัวอย่าง ===

sre = DataLakehouseSRE()



# SLIs

sre.add_sli(SLI("Data Freshness", "Data พร้อมใช้ภายใน 15 นาที", 99.2, 99.5, "%"))

sre.add_sli(SLI("Data Quality", "Records ผ่าน Validation", 99.95, 99.9, "%"))

sre.add_sli(SLI("Pipeline Availability", "ไม่มี Failed Jobs", 99.8, 99.5, "%"))

sre.add_sli(SLI("Query Latency P95", "Query ต่ำกว่า 5 วินาที", 4.2, 5.0, "s"))



# Error Budgets

sre.add_error_budget("Data Freshness", ErrorBudget(

    slo_target=99.5, total_minutes=43200, error_minutes=180))

sre.add_error_budget("Pipeline Availability", ErrorBudget(

    slo_target=99.5, total_minutes=43200, error_minutes=50))



# Incidents

sre.record_incident("Bronze ingestion delay", "medium", 25, "Data 25 min late")

sre.record_incident("Silver transform OOM", "high", 45, "3 tables affected")



sre.dashboard()

sre.should_freeze_releases()

Data Quality Checks

# data_quality.py — Data Quality Framework

# pip install great-expectations



from dataclasses import dataclass

from typing import List, Callable, Any

from datetime import datetime



@dataclass

class QualityCheck:

    name: str

    table: str

    check_fn: Callable

    severity: str  # critical, warning, info

    description: str = ""



@dataclass

class QualityResult:

    check_name: str

    table: str

    passed: bool

    details: str

    timestamp: str

    severity: str



class DataQualityFramework:

    """Data Quality Framework สำหรับ Lakehouse"""



    def __init__(self):

        self.checks: List[QualityCheck] = []

        self.results: List[QualityResult] = []



    def add_check(self, check: QualityCheck):

        self.checks.append(check)



    def run_all(self):

        """รัน Quality Checks ทั้งหมด"""

        self.results = []

        for check in self.checks:

            try:

                passed, details = check.check_fn()

                self.results.append(QualityResult(

                    check_name=check.name,

                    table=check.table,

                    passed=passed,

                    details=details,

                    timestamp=datetime.now().isoformat(),

                    severity=check.severity,

                ))

            except Exception as e:

                self.results.append(QualityResult(

                    check_name=check.name,

                    table=check.table,

                    passed=False,

                    details=f"Error: {str(e)}",

                    timestamp=datetime.now().isoformat(),

                    severity="critical",

                ))



    def report(self):

        """แสดง Quality Report"""

        passed = sum(1 for r in self.results if r.passed)

        failed = len(self.results) - passed



        print(f"\n{'='*60}")

        print(f"Data Quality Report — {datetime.now().strftime('%Y-%m-%d %H:%M')}")

        print(f"{'='*60}")

        print(f"  Total: {len(self.results)} | Passed: {passed} | Failed: {failed}")



        if failed > 0:

            print(f"\n  Failed Checks:")

            for r in self.results:

                if not r.passed:

                    print(f"    [{r.severity:>8}] {r.table}.{r.check_name}: {r.details}")



        score = passed / len(self.results) * 100 if self.results else 0

        print(f"\n  Quality Score: {score:.0f}%")



# ตัวอย่าง Checks

dq = DataQualityFramework()



dq.add_check(QualityCheck(

    name="not_null_id", table="users",

    check_fn=lambda: (True, "0 null IDs"),

    severity="critical",

))

dq.add_check(QualityCheck(

    name="valid_email", table="users",

    check_fn=lambda: (True, "99.8% valid emails"),

    severity="warning",

))

dq.add_check(QualityCheck(

    name="freshness", table="orders",

    check_fn=lambda: (False, "Data 25 min late (SLO: 15 min)"),

    severity="critical",

))

dq.add_check(QualityCheck(

    name="row_count", table="orders",

    check_fn=lambda: (True, "15,234 rows (expected: 10,000+)"),

    severity="warning",

))



dq.run_all()

dq.report()

Best Practices

  • Medallion Architecture: ใช้ Bronze-Silver-Gold Layers แยก Raw, Cleaned, Aggregated Data
  • ACID Transactions: ใช้ Delta Lake/Iceberg สำหรับ ACID Transactions บน Object Storage
  • SLO-based Monitoring: กำหนด SLO สำหรับ Freshness, Quality, Availability วัดด้วย SLI
  • Error Budget: ใช้ Error Budget ตัดสินใจ Feature vs Reliability
  • Data Quality Gates: ตรวจสอบ Quality ก่อน Promote ข้อมูลไป Layer ถัดไป
  • Time Travel: ใช้ Time Travel สำหรับ Debug และ Rollback Data Issues

Data Lakehouse คืออะไร

Architecture รวมข้อดี Data Lake (เก็บข้อมูลดิบทุกประเภท ราคาถูก) กับ Data Warehouse (SQL เร็ว ACID Schema) ใช้ Delta Lake Iceberg Hudi เก็บบน Object Storage S3 GCS

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ สร้างเว็บไซต์โรงเรียนฟรี

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง