Unity Catalog คืออะไรและเกี่ยวข้องกับ Progressive Delivery อย่างไร
Databricks Unity Catalog เป็นระบบ Unified Governance สำหรับจัดการข้อมูลทุกประเภทบน Databricks Platform ทั้ง Tables, Views, Volumes, ML Models และ Functions ในที่เดียว รองรับ Fine-grained Access Control ระดับ Row และ Column สามารถกำหนดสิทธิ์การเข้าถึงข้อมูลได้ละเอียดตามผู้ใช้หรือกลุ่ม
Progressive Delivery ในบริบทของ Data Platform หมายถึงการปล่อย Data Pipeline, ML Model หรือ Schema Change แบบค่อยๆเปิดให้ consumer ใช้ทีละส่วน แทนที่จะเปลี่ยนทั้งหมดพร้อมกัน เทคนิคนี้ช่วยลดความเสี่ยงจากการที่ Pipeline ใหม่ผลิตข้อมูลผิดพลาดหรือ Schema Change ทำให้ downstream job พัง
Unity Catalog มีฟีเจอร์ที่รองรับ Progressive Delivery โดยตรง ได้แก่ Catalog Versioning ที่สามารถสร้าง catalog หลายเวอร์ชันพร้อมกัน, Lineage Tracking ที่ติดตามว่าข้อมูลถูกใช้โดย job ไหนบ้าง และ Data Quality Monitoring ที่ตรวจสอบคุณภาพข้อมูลอัตโนมัติ
สถาปัตยกรรมของระบบ Progressive Data Delivery ประกอบด้วย 3 ชั้นคือ Bronze (raw data), Silver (cleaned data) และ Gold (aggregated data) โดยแต่ละชั้นสามารถ deploy แบบ progressive ได้แยกจากกัน
ติดตั้งและตั้งค่า Unity Catalog บน Databricks
การตั้งค่า Unity Catalog เริ่มจากการสร้าง Metastore แล้วกำหนด Catalog และ Schema สำหรับแต่ละ environment
-- สร้าง Catalog สำหรับแต่ละ environment
CREATE CATALOG IF NOT EXISTS prod_catalog;
CREATE CATALOG IF NOT EXISTS staging_catalog;
CREATE CATALOG IF NOT EXISTS canary_catalog;
-- สร้าง Schema ภายใน Catalog
CREATE SCHEMA IF NOT EXISTS prod_catalog.sales;
CREATE SCHEMA IF NOT EXISTS prod_catalog.analytics;
CREATE SCHEMA IF NOT EXISTS canary_catalog.sales;
CREATE SCHEMA IF NOT EXISTS canary_catalog.analytics;
-- กำหนดสิทธิ์การเข้าถึง
GRANT USE CATALOG ON CATALOG prod_catalog TO `data-engineers`;
GRANT USE CATALOG ON CATALOG canary_catalog TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA prod_catalog.sales TO `data-analysts`;
GRANT SELECT ON SCHEMA prod_catalog.sales TO `data-analysts`;
-- สร้าง External Location สำหรับเก็บข้อมูล
CREATE EXTERNAL LOCATION IF NOT EXISTS prod_data
URL 's3://mycompany-datalake/prod/'
WITH (STORAGE CREDENTIAL prod_credential);
CREATE EXTERNAL LOCATION IF NOT EXISTS canary_data
URL 's3://mycompany-datalake/canary/'
WITH (STORAGE CREDENTIAL canary_credential);
-- ตรวจสอบ Catalog ที่สร้าง
SHOW CATALOGS;
SHOW SCHEMAS IN prod_catalog;
DESCRIBE CATALOG EXTENDED prod_catalog;
ตั้งค่า Databricks CLI สำหรับจัดการ Unity Catalog ผ่าน command line
# ติดตั้ง Databricks CLI
pip install databricks-cli
# ตั้งค่า Authentication
databricks configure --token
# Host: https://your-workspace.cloud.databricks.com
# Token: dapi_your_personal_access_token
# ดูรายการ Catalogs
databricks unity-catalog catalogs list
# สร้าง Catalog ผ่าน CLI
databricks unity-catalog catalogs create \
--name canary_catalog \
--comment "Canary environment for progressive delivery"
# ดูรายการ Tables ใน Schema
databricks unity-catalog tables list \
--catalog-name prod_catalog \
--schema-name sales
สร้าง Data Pipeline แบบ Progressive ด้วย Delta Live Tables
Delta Live Tables (DLT) เป็นเครื่องมือสร้าง Data Pipeline แบบ declarative บน Databricks รองรับ Data Quality Expectations และ Auto-scaling สามารถตั้งค่าให้ deploy แบบ progressive ได้
# pipeline_progressive.py — Delta Live Tables Pipeline
import dlt
from pyspark.sql.functions import col, current_timestamp, lit
# กำหนด environment จาก pipeline parameter
ENV = spark.conf.get("pipeline.environment", "canary")
CATALOG = f"{ENV}_catalog"
@dlt.table(
name="raw_orders",
comment="Raw orders data from source systems",
table_properties={
"quality": "bronze",
"pipelines.autoOptimize.managed": "true"
}
)
def raw_orders():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", f"/mnt/{ENV}/schema/orders")
.load(f"s3://mycompany-source/orders/")
.withColumn("_ingested_at", current_timestamp())
.withColumn("_environment", lit(ENV))
)
@dlt.table(
name="cleaned_orders",
comment="Cleaned and validated orders",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_amount", "total_amount > 0")
@dlt.expect_or_drop("valid_customer", "customer_id IS NOT NULL")
@dlt.expect("valid_date", "order_date >= '2020-01-01'")
def cleaned_orders():
return (
dlt.read_stream("raw_orders")
.filter(col("status").isin(["completed", "processing", "shipped"]))
.select(
col("order_id"),
col("customer_id"),
col("product_id"),
col("total_amount").cast("decimal(10,2)"),
col("order_date").cast("date"),
col("status"),
col("_ingested_at")
)
)
@dlt.table(
name="daily_sales_summary",
comment="Daily aggregated sales metrics",
table_properties={"quality": "gold"}
)
def daily_sales_summary():
return (
dlt.read("cleaned_orders")
.groupBy("order_date")
.agg(
{"total_amount": "sum", "order_id": "count"}
)
.withColumnRenamed("sum(total_amount)", "total_revenue")
.withColumnRenamed("count(order_id)", "order_count")
)
ตั้งค่า Pipeline Configuration สำหรับ Canary และ Production
# pipeline_config_canary.json
{
"name": "sales-pipeline-canary",
"target": "canary_catalog.sales",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 4,
"mode": "ENHANCED"
}
}
],
"libraries": [
{"notebook": {"path": "/Repos/data-team/pipelines/pipeline_progressive"}}
],
"configuration": {
"pipeline.environment": "canary",
"spark.databricks.delta.preview.enabled": "true"
},
"continuous": false,
"development": false,
"channel": "CURRENT"
}
# สร้าง Pipeline ผ่าน API
# curl -X POST "https://your-workspace.cloud.databricks.com/api/2.0/pipelines" \
# -H "Authorization: Bearer dapi_token" \
# -H "Content-Type: application/json" \
# -d @pipeline_config_canary.json
ตั้งค่า Canary Data Release ด้วย Catalog Versioning
ใช้ Catalog Aliases และ Views เพื่อสร้างระบบ Canary Data Release ที่สามารถสลับระหว่าง production และ canary data ได้
-- สร้าง View ที่ชี้ไปยัง canary data สำหรับผู้ใช้กลุ่มทดสอบ
CREATE OR REPLACE VIEW prod_catalog.sales.orders_v2 AS
SELECT * FROM canary_catalog.sales.cleaned_orders;
-- สร้าง Function สำหรับเลือก data source ตาม user group
CREATE OR REPLACE FUNCTION prod_catalog.sales.get_orders(user_group STRING)
RETURNS TABLE(
order_id STRING,
customer_id STRING,
total_amount DECIMAL(10,2),
order_date DATE
)
RETURN
SELECT order_id, customer_id, total_amount, order_date
FROM (
CASE
WHEN user_group = 'canary'
THEN canary_catalog.sales.cleaned_orders
ELSE prod_catalog.sales.cleaned_orders
END
);
-- ตรวจสอบ Data Lineage
-- ใน Databricks UI: Catalog Explorer > Table > Lineage Tab
-- เปรียบเทียบข้อมูลระหว่าง canary และ production
SELECT
'production' AS source,
COUNT(*) AS row_count,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value
FROM prod_catalog.sales.cleaned_orders
WHERE order_date >= current_date() - INTERVAL 1 DAY
UNION ALL
SELECT
'canary' AS source,
COUNT(*) AS row_count,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value
FROM canary_catalog.sales.cleaned_orders
WHERE order_date >= current_date() - INTERVAL 1 DAY;
สร้าง Promotion Script ที่ย้ายข้อมูลจาก canary ไปยัง production เมื่อผ่านการตรวจสอบ
# promote_canary.py — Promote canary data to production
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import *
import sys
w = WorkspaceClient()
def compare_data_quality(canary_table, prod_table):
"""เปรียบเทียบคุณภาพข้อมูลระหว่าง canary กับ production"""
query = f"""
SELECT
(SELECT COUNT(*) FROM {canary_table} WHERE order_date = current_date() - 1) as canary_count,
(SELECT COUNT(*) FROM {prod_table} WHERE order_date = current_date() - 1) as prod_count,
(SELECT COUNT(*) FROM {canary_table} WHERE total_amount IS NULL) as canary_nulls,
(SELECT AVG(total_amount) FROM {canary_table} WHERE order_date = current_date() - 1) as canary_avg,
(SELECT AVG(total_amount) FROM {prod_table} WHERE order_date = current_date() - 1) as prod_avg
"""
result = w.statement_execution.execute_statement(
warehouse_id="your-sql-warehouse-id",
statement=query
)
row = result.result.data_array[0]
canary_count, prod_count = int(row[0]), int(row[1])
canary_nulls = int(row[2])
canary_avg, prod_avg = float(row[3]), float(row[4])
# ตรวจสอบว่า row count ไม่ต่างกันเกิน 10%
if prod_count > 0:
diff_pct = abs(canary_count - prod_count) / prod_count * 100
if diff_pct > 10:
print(f"FAIL: Row count diff {diff_pct:.1f}% exceeds 10% threshold")
return False
# ตรวจสอบว่าไม่มี null values เกิน 1%
if canary_count > 0 and (canary_nulls / canary_count) > 0.01:
print(f"FAIL: Null ratio {canary_nulls/canary_count:.2%} exceeds 1%")
return False
print(f"PASS: canary={canary_count} rows, prod={prod_count} rows, nulls={canary_nulls}")
return True
def promote_table(canary_table, prod_table):
"""แทนที่ production table ด้วย canary data"""
w.statement_execution.execute_statement(
warehouse_id="your-sql-warehouse-id",
statement=f"""
CREATE OR REPLACE TABLE {prod_table} AS
SELECT * FROM {canary_table}
"""
)
print(f"Promoted {canary_table} -> {prod_table}")
if __name__ == "__main__":
tables = [
("canary_catalog.sales.cleaned_orders", "prod_catalog.sales.cleaned_orders"),
("canary_catalog.sales.daily_sales_summary", "prod_catalog.sales.daily_sales_summary"),
]
for canary_t, prod_t in tables:
if compare_data_quality(canary_t, prod_t):
promote_table(canary_t, prod_t)
else:
print(f"BLOCKED: {canary_t} failed quality check")
sys.exit(1)
Monitoring Data Quality ระหว่าง Progressive Rollout
ตั้งค่า Databricks Lakehouse Monitoring เพื่อติดตามคุณภาพข้อมูลในระหว่าง Progressive Rollout
# monitoring_setup.py — ตั้งค่า Data Quality Monitoring
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import MonitorInfo, MonitorTimeSeries
w = WorkspaceClient()
# สร้าง Monitor สำหรับ table
monitor = w.quality_monitors.create(
table_name="canary_catalog.sales.cleaned_orders",
assets_dir="/Shared/monitoring/canary_sales",
output_schema_name="canary_catalog.monitoring",
schedule=MonitorCronSchedule(
quartz_cron_expression="0 0 * * * ?", # ทุกชั่วโมง
timezone_id="Asia/Bangkok"
),
time_series=MonitorTimeSeries(
timestamp_col="order_date",
granularities=["1 day"]
),
custom_metrics=[
{
"name": "null_rate",
"definition": "avg(CASE WHEN total_amount IS NULL THEN 1 ELSE 0 END)",
"input_columns": ["total_amount"],
"type": "aggregate",
"output_data_type": "DOUBLE"
},
{
"name": "negative_amount_rate",
"definition": "avg(CASE WHEN total_amount < 0 THEN 1 ELSE 0 END)",
"input_columns": ["total_amount"],
"type": "aggregate",
"output_data_type": "DOUBLE"
}
]
)
print(f"Monitor created: {monitor.monitor_name}")
print(f"Dashboard: {monitor.dashboard_id}")
# ดูผลลัพธ์ของ Monitor
results = w.quality_monitors.get(
table_name="canary_catalog.sales.cleaned_orders"
)
print(f"Status: {results.status}")
print(f"Last Run: {results.latest_monitor_run_time}")
สร้าง Alert ที่แจ้งเตือนเมื่อคุณภาพข้อมูลผิดปกติ
-- สร้าง SQL Alert สำหรับ Data Quality
-- ใน Databricks SQL > Alerts > Create Alert
-- Alert: Row Count Drop
SELECT
CASE
WHEN canary_count < prod_count * 0.8 THEN 'CRITICAL'
WHEN canary_count < prod_count * 0.9 THEN 'WARNING'
ELSE 'OK'
END AS status,
canary_count,
prod_count,
ROUND((canary_count - prod_count) / prod_count * 100, 1) AS diff_pct
FROM (
SELECT
(SELECT COUNT(*) FROM canary_catalog.sales.cleaned_orders
WHERE order_date = current_date() - 1) AS canary_count,
(SELECT COUNT(*) FROM prod_catalog.sales.cleaned_orders
WHERE order_date = current_date() - 1) AS prod_count
);
-- Alert: Schema Drift Detection
SELECT
c.col_name,
c.data_type AS canary_type,
p.data_type AS prod_type,
CASE WHEN c.data_type != p.data_type THEN 'DRIFT' ELSE 'OK' END AS status
FROM information_schema.columns c
JOIN information_schema.columns p
ON c.col_name = p.col_name
WHERE c.table_catalog = 'canary_catalog'
AND c.table_schema = 'sales'
AND c.table_name = 'cleaned_orders'
AND p.table_catalog = 'prod_catalog'
AND p.table_schema = 'sales'
AND p.table_name = 'cleaned_orders';
Automate Rollback เมื่อ Data Quality ต่ำกว่า Threshold
สร้าง Databricks Job ที่ตรวจสอบคุณภาพข้อมูลและ rollback อัตโนมัติเมื่อพบปัญหา
# auto_rollback_job.py — Automated rollback for data quality issues
from databricks.sdk import WorkspaceClient
import json
import requests
import os
w = WorkspaceClient()
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK_URL")
WAREHOUSE_ID = os.getenv("SQL_WAREHOUSE_ID")
QUALITY_CHECKS = [
{
"name": "row_count_check",
"query": """
SELECT ABS(
(SELECT COUNT(*) FROM canary_catalog.sales.cleaned_orders WHERE order_date = current_date()-1) -
(SELECT COUNT(*) FROM prod_catalog.sales.cleaned_orders WHERE order_date = current_date()-1)
) / GREATEST(
(SELECT COUNT(*) FROM prod_catalog.sales.cleaned_orders WHERE order_date = current_date()-1), 1
) * 100 AS diff_pct
""",
"threshold": 15,
"operator": "less_than"
},
{
"name": "null_check",
"query": """
SELECT COUNT(*) * 100.0 / GREATEST(COUNT(*), 1) AS null_pct
FROM canary_catalog.sales.cleaned_orders
WHERE total_amount IS NULL AND order_date = current_date()-1
""",
"threshold": 1,
"operator": "less_than"
},
{
"name": "freshness_check",
"query": """
SELECT TIMESTAMPDIFF(HOUR, MAX(_ingested_at), current_timestamp()) AS hours_since_update
FROM canary_catalog.sales.cleaned_orders
""",
"threshold": 4,
"operator": "less_than"
}
]
def run_check(check):
result = w.statement_execution.execute_statement(
warehouse_id=WAREHOUSE_ID,
statement=check["query"]
)
value = float(result.result.data_array[0][0])
passed = value < check["threshold"] if check["operator"] == "less_than" else value > check["threshold"]
return {"name": check["name"], "value": value, "threshold": check["threshold"], "passed": passed}
def rollback_canary():
w.statement_execution.execute_statement(
warehouse_id=WAREHOUSE_ID,
statement="""
CREATE OR REPLACE TABLE canary_catalog.sales.cleaned_orders AS
SELECT * FROM prod_catalog.sales.cleaned_orders
"""
)
def notify(message):
if SLACK_WEBHOOK:
requests.post(SLACK_WEBHOOK, json={"text": message})
results = [run_check(c) for c in QUALITY_CHECKS]
failed = [r for r in results if not r["passed"]]
if failed:
msg = f"DATA QUALITY ALERT: {len(failed)} checks failed\n"
for f in failed:
msg += f" - {f['name']}: {f['value']:.2f} (threshold: {f['threshold']})\n"
msg += "Initiating canary rollback..."
notify(msg)
rollback_canary()
print("Rollback completed")
else:
print("All quality checks passed")
for r in results:
print(f" {r['name']}: {r['value']:.2f} (threshold: {r['threshold']})")
FAQ คำถามที่พบบ่อย
Q: Unity Catalog กับ Hive Metastore ต่างกันอย่างไร?
A: Unity Catalog เป็น centralized governance layer ที่ทำงานข้าม workspace ได้ รองรับ fine-grained access control ระดับ row/column มี data lineage tracking ในตัว และรองรับ data types หลากหลายทั้ง tables, volumes, models ส่วน Hive Metastore ทำงานเฉพาะภายใน workspace เดียวและรองรับเฉพาะ table-level permissions
Q: Progressive Data Delivery เหมาะกับ use case ไหน?
A: เหมาะกับระบบที่มี downstream consumers จำนวนมากเช่น Data Warehouse ที่มี dashboard หลายร้อยตัวหรือ ML Pipeline ที่ใช้ feature table ร่วมกัน การเปลี่ยน schema หรือ logic แบบ progressive ช่วยลดความเสี่ยงที่ downstream job จะพังพร้อมกันทั้งหมด
Q: จะ rollback ข้อมูลกลับได้อย่างไรถ้าพบปัญหาหลัง promote?
A: Delta Lake รองรับ Time Travel สามารถ RESTORE TABLE ไปยัง version ก่อนหน้าได้ทันที ด้วยคำสั่ง RESTORE TABLE prod_catalog.sales.orders TO VERSION AS OF 42 หรือ RESTORE TABLE prod_catalog.sales.orders TO TIMESTAMP AS OF '2026-02-28T10:00:00' ข้อมูลจะกลับมาเหมือนเดิมภายในไม่กี่วินาที
Q: ค่าใช้จ่ายของ Unity Catalog เท่าไหร่?
A: Unity Catalog รวมอยู่ใน Databricks Platform แล้วไม่มีค่าใช้จ่ายเพิ่มเติมสำหรับ governance features พื้นฐาน แต่ Lakehouse Monitoring และ Advanced Features บางอย่างอาจมีค่าใช้จ่ายเพิ่มตาม DBU ที่ใช้ ตรวจสอบ pricing page ล่าสุดของ Databricks สำหรับรายละเอียด
Q: สามารถใช้ Unity Catalog กับ Open Source Tools ได้ไหม?
A: ได้ Unity Catalog เปิด open source แล้วในชื่อ Unity Catalog OSS สามารถใช้ร่วมกับ Apache Spark, Trino, DuckDB และ tools อื่นๆที่รองรับ Iceberg REST Catalog Protocol ทำให้ไม่ต้อง lock-in กับ Databricks Platform เพียงอย่างเดียว
