ai

Databricks Unity Catalog Progressive Delivery — ระบบจัดการข้อมูลแบบ Progressive สำหรับ Data Platform

Databricks Unity Catalog Progressive Delivery — ระบบจัดการข้อมูลแบบ Progressive สำหรับ Data Platform

Unity Catalog คืออะไรและเกี่ยวข้องกับ Progressive Delivery อย่างไร

Databricks Unity Catalog Progressive Delivery — ระบบจัดการข้อมูลแบบ Progressive สำหรับ Data Platform

Databricks Unity Catalog เป็นระบบ Unified Governance สำหรับจัดการข้อมูลทุกประเภทบน Databricks Platform ทั้ง Tables, Views, Volumes, ML Models และ Functions ในที่เดียว รองรับ Fine-grained Access Control ระดับ Row และ Column สามารถกำหนดสิทธิ์การเข้าถึงข้อมูลได้ละเอียดตามผู้ใช้หรือกลุ่ม

Progressive Delivery ในบริบทของ Data Platform หมายถึงการปล่อย Data Pipeline, ML Model หรือ Schema Change แบบค่อยๆเปิดให้ consumer ใช้ทีละส่วน แทนที่จะเปลี่ยนทั้งหมดพร้อมกัน เทคนิคนี้ช่วยลดความเสี่ยงจากการที่ Pipeline ใหม่ผลิตข้อมูลผิดพลาดหรือ Schema Change ทำให้ downstream job พัง

Unity Catalog มีฟีเจอร์ที่รองรับ Progressive Delivery โดยตรง ได้แก่ Catalog Versioning ที่สามารถสร้าง catalog หลายเวอร์ชันพร้อมกัน, Lineage Tracking ที่ติดตามว่าข้อมูลถูกใช้โดย job ไหนบ้าง และ Data Quality Monitoring ที่ตรวจสอบคุณภาพข้อมูลอัตโนมัติ

สถาปัตยกรรมของระบบ Progressive Data Delivery ประกอบด้วย 3 ชั้นคือ Bronze (raw data), Silver (cleaned data) และ Gold (aggregated data) โดยแต่ละชั้นสามารถ deploy แบบ progressive ได้แยกจากกัน

ติดตั้งและตั้งค่า Unity Catalog บน Databricks

การตั้งค่า Unity Catalog เริ่มจากการสร้าง Metastore แล้วกำหนด Catalog และ Schema สำหรับแต่ละ environment

เนื้อหาเกี่ยวข้อง — GraphQL Federation GitOps Workflow

-- สร้าง Catalog สำหรับแต่ละ environment

CREATE CATALOG IF NOT EXISTS prod_catalog;

CREATE CATALOG IF NOT EXISTS staging_catalog;

CREATE CATALOG IF NOT EXISTS canary_catalog;



-- สร้าง Schema ภายใน Catalog

CREATE SCHEMA IF NOT EXISTS prod_catalog.sales;

CREATE SCHEMA IF NOT EXISTS prod_catalog.analytics;

CREATE SCHEMA IF NOT EXISTS canary_catalog.sales;

CREATE SCHEMA IF NOT EXISTS canary_catalog.analytics;



-- กำหนดสิทธิ์การเข้าถึง

GRANT USE CATALOG ON CATALOG prod_catalog TO `data-engineers`;

GRANT USE CATALOG ON CATALOG canary_catalog TO `data-engineers`;

GRANT USE SCHEMA ON SCHEMA prod_catalog.sales TO `data-analysts`;

GRANT SELECT ON SCHEMA prod_catalog.sales TO `data-analysts`;



-- สร้าง External Location สำหรับเก็บข้อมูล

CREATE EXTERNAL LOCATION IF NOT EXISTS prod_data

 URL 's3://mycompany-datalake/prod/'

 WITH (STORAGE CREDENTIAL prod_credential);



CREATE EXTERNAL LOCATION IF NOT EXISTS canary_data

 URL 's3://mycompany-datalake/canary/'

 WITH (STORAGE CREDENTIAL canary_credential);



-- ตรวจสอบ Catalog ที่สร้าง

SHOW CATALOGS;

SHOW SCHEMAS IN prod_catalog;

DESCRIBE CATALOG EXTENDED prod_catalog;

ตั้งค่า Databricks CLI สำหรับจัดการ Unity Catalog ผ่าน command line

# ติดตั้ง Databricks CLI

pip install databricks-cli



# ตั้งค่า Authentication

databricks configure --token

# Host: https://your-workspace.cloud.databricks.com

# Token: dapi_your_personal_access_token



# ดูรายการ Catalogs

databricks unity-catalog catalogs list



# สร้าง Catalog ผ่าน CLI

databricks unity-catalog catalogs create \

 --name canary_catalog \

 --comment "Canary environment for progressive delivery"



# ดูรายการ Tables ใน Schema

databricks unity-catalog tables list \

 --catalog-name prod_catalog \

 --schema-name sales

สร้าง Data Pipeline แบบ Progressive ด้วย Delta Live Tables

Delta Live Tables (DLT) เป็นเครื่องมือสร้าง Data Pipeline แบบ declarative บน Databricks รองรับ Data Quality Expectations และ Auto-scaling สามารถตั้งค่าให้ deploy แบบ progressive ได้

# pipeline_progressive.py — Delta Live Tables Pipeline

import dlt

from pyspark.sql.functions import col, current_timestamp, lit



# กำหนด environment จาก pipeline parameter

ENV = spark.conf.get("pipeline.environment", "canary")

CATALOG = f"{ENV}_catalog"



@dlt.table(

 name="raw_orders",

 comment="Raw orders data from source systems",

 table_properties={

 "quality": "bronze",

 "pipelines.autoOptimize.managed": "true"

 }

)

def raw_orders():

 return (

 spark.readStream

 .format("cloudFiles")

 .option("cloudFiles.format", "json")

 .option("cloudFiles.schemaLocation", f"/mnt/{ENV}/schema/orders")

 .load(f"s3://mycompany-source/orders/")

 .withColumn("_ingested_at", current_timestamp())

 .withColumn("_environment", lit(ENV))

 )



@dlt.table(

 name="cleaned_orders",

 comment="Cleaned and validated orders",

 table_properties={"quality": "silver"}

)

@dlt.expect_or_drop("valid_amount", "total_amount > 0")

@dlt.expect_or_drop("valid_customer", "customer_id IS NOT NULL")

@dlt.expect("valid_date", "order_date >= '2020-01-01'")

def cleaned_orders():

 return (

 dlt.read_stream("raw_orders")

 .filter(col("status").isin(["completed", "processing", "shipped"]))

 .select(

 col("order_id"),

 col("customer_id"),

 col("product_id"),

 col("total_amount").cast("decimal(10,2)"),

 col("order_date").cast("date"),

 col("status"),

 col("_ingested_at")

 )

 )



@dlt.table(

 name="daily_sales_summary",

 comment="Daily aggregated sales metrics",

 table_properties={"quality": "gold"}

)

def daily_sales_summary():

 return (

 dlt.read("cleaned_orders")

 .groupBy("order_date")

 .agg(

 {"total_amount": "sum", "order_id": "count"}

 )

 .withColumnRenamed("sum(total_amount)", "total_revenue")

 .withColumnRenamed("count(order_id)", "order_count")

 )

ตั้งค่า Pipeline Configuration สำหรับ Canary และ Production

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

# pipeline_config_canary.json

{

 "name": "sales-pipeline-canary",

 "target": "canary_catalog.sales",

 "clusters": [

 {

 "label": "default",

 "autoscale": {

 "min_workers": 1,

 "max_workers": 4,

 "mode": "ENHANCED"

 }

 }

 ],

 "libraries": [

 {"notebook": {"path": "/Repos/data-team/pipelines/pipeline_progressive"}}

 ],

 "configuration": {

 "pipeline.environment": "canary",

 "spark.databricks.delta.preview.enabled": "true"

 },

 "continuous": false,

 "development": false,

 "channel": "CURRENT"

}



# สร้าง Pipeline ผ่าน API

# curl -X POST "https://your-workspace.cloud.databricks.com/api/2.0/pipelines" \

# -H "Authorization: Bearer dapi_token" \

# -H "Content-Type: application/json" \

# -d @pipeline_config_canary.json

ตั้งค่า Canary Data Release ด้วย Catalog Versioning

Databricks Unity Catalog Progressive Delivery — ระบบจัดการข้อมูลแบบ Progressive สำหรับ Data Platform

ใช้ Catalog Aliases และ Views เพื่อสร้างระบบ Canary Data Release ที่สามารถสลับระหว่าง production และ canary data ได้

-- สร้าง View ที่ชี้ไปยัง canary data สำหรับผู้ใช้กลุ่มทดสอบ

CREATE OR REPLACE VIEW prod_catalog.sales.orders_v2 AS

SELECT * FROM canary_catalog.sales.cleaned_orders;



-- สร้าง Function สำหรับเลือก data source ตาม user group

CREATE OR REPLACE FUNCTION prod_catalog.sales.get_orders(user_group STRING)

RETURNS TABLE(

 order_id STRING,

 customer_id STRING,

 total_amount DECIMAL(10,2),

 order_date DATE

)

RETURN

 SELECT order_id, customer_id, total_amount, order_date

 FROM (

 CASE

 WHEN user_group = 'canary'

 THEN canary_catalog.sales.cleaned_orders

 ELSE prod_catalog.sales.cleaned_orders

 END

 );



-- ตรวจสอบ Data Lineage

-- ใน Databricks UI: Catalog Explorer > Table > Lineage Tab



-- เปรียบเทียบข้อมูลระหว่าง canary และ production

SELECT

 'production' AS source,

 COUNT(*) AS row_count,

 SUM(total_amount) AS total_revenue,

 AVG(total_amount) AS avg_order_value

FROM prod_catalog.sales.cleaned_orders

WHERE order_date >= current_date() - INTERVAL 1 DAY



UNION ALL



SELECT

 'canary' AS source,

 COUNT(*) AS row_count,

 SUM(total_amount) AS total_revenue,

 AVG(total_amount) AS avg_order_value

FROM canary_catalog.sales.cleaned_orders

WHERE order_date >= current_date() - INTERVAL 1 DAY;

สร้าง Promotion Script ที่ย้ายข้อมูลจาก canary ไปยัง production เมื่อผ่านการตรวจสอบ

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Ollama Local LLM 12 Factor App —

# promote_canary.py — Promote canary data to production

from databricks.sdk import WorkspaceClient

from databricks.sdk.service.catalog import *

import sys



w = WorkspaceClient()



def compare_data_quality(canary_table, prod_table):

 """เปรียบเทียบคุณภาพข้อมูลระหว่าง canary กับ production"""

 query = f"""

 SELECT

 (SELECT COUNT(*) FROM {canary_table} WHERE order_date = current_date() - 1) as canary_count,

 (SELECT COUNT(*) FROM {prod_table} WHERE order_date = current_date() - 1) as prod_count,

 (SELECT COUNT(*) FROM {canary_table} WHERE total_amount IS NULL) as canary_nulls,

 (SELECT AVG(total_amount) FROM {canary_table} WHERE order_date = current_date() - 1) as canary_avg,

 (SELECT AVG(total_amount) FROM {prod_table} WHERE order_date = current_date() - 1) as prod_avg

 """

 result = w.statement_execution.execute_statement(

 warehouse_id="your-sql-warehouse-id",

 statement=query

 )

 row = result.result.data_array[0]

 canary_count, prod_count = int(row[0]), int(row[1])

 canary_nulls = int(row[2])

 canary_avg, prod_avg = float(row[3]), float(row[4])



 # ตรวจสอบว่า row count ไม่ต่างกันเกิน 10%

 if prod_count > 0:

 diff_pct = abs(canary_count - prod_count) / prod_count * 100

 if diff_pct > 10:

 print(f"FAIL: Row count diff {diff_pct:.1f}% exceeds 10% threshold")

 return False



 # ตรวจสอบว่าไม่มี null values เกิน 1%

 if canary_count > 0 and (canary_nulls / canary_count) > 0.01:

 print(f"FAIL: Null ratio {canary_nulls/canary_count:.2%} exceeds 1%")

 return False



 print(f"PASS: canary={canary_count} rows, prod={prod_count} rows, nulls={canary_nulls}")

 return True



def promote_table(canary_table, prod_table):

 """แทนที่ production table ด้วย canary data"""

 w.statement_execution.execute_statement(

 warehouse_id="your-sql-warehouse-id",

 statement=f"""

 CREATE OR REPLACE TABLE {prod_table} AS

 SELECT * FROM {canary_table}

 """

 )

 print(f"Promoted {canary_table} -> {prod_table}")



if __name__ == "__main__":

 tables = [

 ("canary_catalog.sales.cleaned_orders", "prod_catalog.sales.cleaned_orders"),

 ("canary_catalog.sales.daily_sales_summary", "prod_catalog.sales.daily_sales_summary"),

 ]

 for canary_t, prod_t in tables:

 if compare_data_quality(canary_t, prod_t):

 promote_table(canary_t, prod_t)

 else:

 print(f"BLOCKED: {canary_t} failed quality check")

 sys.exit(1)

Monitoring Data Quality ระหว่าง Progressive Rollout

ตั้งค่า Databricks Lakehouse Monitoring เพื่อติดตามคุณภาพข้อมูลในระหว่าง Progressive Rollout

# monitoring_setup.py — ตั้งค่า Data Quality Monitoring

from databricks.sdk import WorkspaceClient

from databricks.sdk.service.catalog import MonitorInfo, MonitorTimeSeries



w = WorkspaceClient()



# สร้าง Monitor สำหรับ table

monitor = w.quality_monitors.create(

 table_name="canary_catalog.sales.cleaned_orders",

 assets_dir="/Shared/monitoring/canary_sales",

 output_schema_name="canary_catalog.monitoring",

 schedule=MonitorCronSchedule(

 quartz_cron_expression="0 0 * * * ?", # ทุกชั่วโมง

 timezone_id="Asia/Bangkok"

 ),

 time_series=MonitorTimeSeries(

 timestamp_col="order_date",

 granularities=["1 day"]

 ),

 custom_metrics=[

 {

 "name": "null_rate",

 "definition": "avg(CASE WHEN total_amount IS NULL THEN 1 ELSE 0 END)",

 "input_columns": ["total_amount"],

 "type": "aggregate",

 "output_data_type": "DOUBLE"

 },

 {

 "name": "negative_amount_rate",

 "definition": "avg(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END)",

 "input_columns": ["total_amount"],

 "type": "aggregate",

 "output_data_type": "DOUBLE"

 }

 ]

)



print(f"Monitor created: {monitor.monitor_name}")

print(f"Dashboard: {monitor.dashboard_id}")



# ดูผลลัพธ์ของ Monitor

results = w.quality_monitors.get(

 table_name="canary_catalog.sales.cleaned_orders"

)

print(f"Status: {results.status}")

print(f"Last Run: {results.latest_monitor_run_time}")

สร้าง Alert ที่แจ้งเตือนเมื่อคุณภาพข้อมูลผิดปกติ

-- สร้าง SQL Alert สำหรับ Data Quality

-- ใน Databricks SQL > Alerts > Create Alert



-- Alert: Row Count Drop

SELECT

 CASE

 WHEN canary_count < prod_count * 0.8 THEN 'CRITICAL'

 WHEN canary_count < prod_count * 0.9 THEN 'WARNING'

 ELSE 'OK'

 END AS status,

 canary_count,

 prod_count,

 ROUND((canary_count - prod_count) / prod_count * 100, 1) AS diff_pct

FROM (

 SELECT

 (SELECT COUNT(*) FROM canary_catalog.sales.cleaned_orders

 WHERE order_date = current_date() - 1) AS canary_count,

 (SELECT COUNT(*) FROM prod_catalog.sales.cleaned_orders

 WHERE order_date = current_date() - 1) AS prod_count

);



-- Alert: Schema Drift Detection

SELECT

 c.col_name,

 c.data_type AS canary_type,

 p.data_type AS prod_type,

 CASE WHEN c.data_type != p.data_type THEN 'DRIFT' ELSE 'OK' END AS status

FROM information_schema.columns c

JOIN information_schema.columns p

 ON c.col_name = p.col_name

WHERE c.table_catalog = 'canary_catalog'

 AND c.table_schema = 'sales'

 AND c.table_name = 'cleaned_orders'

 AND p.table_catalog = 'prod_catalog'

 AND p.table_schema = 'sales'

 AND p.table_name = 'cleaned_orders';

Automate Rollback เมื่อ Data Quality ต่ำกว่า Threshold

สร้าง Databricks Job ที่ตรวจสอบคุณภาพข้อมูลและ rollback อัตโนมัติเมื่อพบปัญหา

# auto_rollback_job.py — Automated rollback for data quality issues

from databricks.sdk import WorkspaceClient

import json

import requests

import os



w = WorkspaceClient()



SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")

WAREHOUSE_ID = os.getenv("SQL_WAREHOUSE_ID")



QUALITY_CHECKS = [

 {

 "name": "row_count_check",

 "query": """

 SELECT ABS(

 (SELECT COUNT(*) FROM canary_catalog.sales.cleaned_orders WHERE order_date = current_date()-1) -

 (SELECT COUNT(*) FROM prod_catalog.sales.cleaned_orders WHERE order_date = current_date()-1)

 ) / GREATEST(

 (SELECT COUNT(*) FROM prod_catalog.sales.cleaned_orders WHERE order_date = current_date()-1), 1

 ) * 100 AS diff_pct

 """,

 "threshold": 15,

 "operator": "less_than"

 },

 {

 "name": "null_check",

 "query": """

 SELECT COUNT(*) * 100.0 / GREATEST(COUNT(*), 1) AS null_pct

 FROM canary_catalog.sales.cleaned_orders

 WHERE total_amount IS NULL AND order_date = current_date()-1

 """,

 "threshold": 1,

 "operator": "less_than"

 },

 {

 "name": "freshness_check",

 "query": """

 SELECT TIMESTAMPDIFF(HOUR, MAX(_ingested_at), current_timestamp()) AS hours_since_update

 FROM canary_catalog.sales.cleaned_orders

 """,

 "threshold": 4,

 "operator": "less_than"

 }

]



def run_check(check):

 result = w.statement_execution.execute_statement(

 warehouse_id=WAREHOUSE_ID,

 statement=check["query"]

 )

 value = float(result.result.data_array[0][0])

 passed = value < check["threshold"] if check["operator"] == "less_than" else value > check["threshold"]

 return {"name": check["name"], "value": value, "threshold": check["threshold"], "passed": passed}



def rollback_canary():

 w.statement_execution.execute_statement(

 warehouse_id=WAREHOUSE_ID,

 statement="""

 CREATE OR REPLACE TABLE canary_catalog.sales.cleaned_orders AS

 SELECT * FROM prod_catalog.sales.cleaned_orders

 """

 )



def notify(message):

 if SLACK_WEBHOOK:

 requests.post(SLACK_WEBHOOK, json={"text": message})



results = [run_check(c) for c in QUALITY_CHECKS]

failed = [r for r in results if not r["passed"]]



if failed:

 msg = f"DATA QUALITY ALERT: {len(failed)} checks failed\n"

 for f in failed:

 msg += f" - {f['name']}: {f['value']:.2f} (threshold: {f['threshold']})\n"

 msg += "Initiating canary rollback..."

 notify(msg)

 rollback_canary()

 print("Rollback completed")

else:

 print("All quality checks passed")

 for r in results:

 print(f" {r['name']}: {r['value']:.2f} (threshold: {r['threshold']})")

FAQ คำถามที่พบบ่อย

Q: Unity Catalog กับ Hive Metastore ต่างกันอย่างไร?

แนะนำเพิ่มเติม — ระบบเทรดของ iCafeForex

A: Unity Catalog เป็น centralized governance layer ที่ทำงานข้าม workspace ได้ รองรับ fine-grained access control ระดับ row/column มี data lineage tracking ในตัว และรองรับ data types หลากหลายทั้ง tables, volumes, models ส่วน Hive Metastore ทำงานเฉพาะภายใน workspace เดียวและรองรับเฉพาะ table-level permissions

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Lc คืออะไร — คู่มือฉบับสมบูรณ์ 2026

Q: Progressive Data Delivery เหมาะกับ use case ไหน?

A: เหมาะกับระบบที่มี downstream consumers จำนวนมากเช่น Data Warehouse ที่มี dashboard หลายร้อยตัวหรือ ML Pipeline ที่ใช้ feature table ร่วมกัน การเปลี่ยน schema หรือ logic แบบ progressive ช่วยลดความเสี่ยงที่ downstream job จะพังพร้อมกันทั้งหมด

Q: จะ rollback ข้อมูลกลับได้อย่างไรถ้าพบปัญหาหลัง promote?

A: Delta Lake รองรับ Time Travel สามารถ RESTORE TABLE ไปยัง version ก่อนหน้าได้ทันที ด้วยคำสั่ง RESTORE TABLE prod_catalog.sales.orders TO VERSION AS OF 42 หรือ RESTORE TABLE prod_catalog.sales.orders TO TIMESTAMP AS OF '2026-02-28T10:00:00' ข้อมูลจะกลับมาเหมือนเดิมภายในไม่กี่วินาที

Q: ค่าใช้จ่ายของ Unity Catalog เท่าไหร่?

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง SOPS Encryption Message Queue Design — เข้ารหัส

A: Unity Catalog รวมอยู่ใน Databricks Platform แล้วไม่มีค่าใช้จ่ายเพิ่มเติมสำหรับ governance features พื้นฐาน แต่ Lakehouse Monitoring และ Advanced Features บางอย่างอาจมีค่าใช้จ่ายเพิ่มตาม DBU ที่ใช้ ตรวจสอบ pricing page ล่าสุดของ Databricks สำหรับรายละเอียด

Q: สามารถใช้ Unity Catalog กับ Open Source Tools ได้ไหม?

A: ได้ Unity Catalog เปิด open source แล้วในชื่อ Unity Catalog OSS สามารถใช้ร่วมกับ Apache Spark, Trino, DuckDB และ tools อื่นๆที่รองรับ Iceberg REST Catalog Protocol ทำให้ไม่ต้อง lock-in กับ Databricks Platform เพียงอย่างเดียว

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง