Databricks Unity Catalog กับการจัดการ Data Governance
Unity Catalog เป็น centralized governance layer ของ Databricks ที่ช่วยจัดการ data assets ทั้งหมดในที่เดียว ไม่ว่าจะเป็น tables, views, volumes, models หรือ functions ทุกอย่างถูกจัดระเบียบภายใต้ 3-level namespace: catalog → schema → object ทำให้ทีมที่ใช้ Agile/Scrum สามารถแบ่ง workspace ตาม sprint หรือ project ได้ชัดเจน
สร้าง Unity Catalog สำหรับทีม Agile
การออกแบบ catalog structure ที่ดีควรสะท้อน workflow ของทีม ตัวอย่างด้านล่างแบ่งตาม environment และ domain
-- สร้าง catalog สำหรับแต่ละ environment
CREATE CATALOG IF NOT EXISTS dev_analytics;
CREATE CATALOG IF NOT EXISTS staging_analytics;
CREATE CATALOG IF NOT EXISTS prod_analytics;
-- สร้าง schema ตาม domain (แต่ละ schema = 1 Kanban board)
CREATE SCHEMA IF NOT EXISTS dev_analytics.marketing;
CREATE SCHEMA IF NOT EXISTS dev_analytics.sales;
CREATE SCHEMA IF NOT EXISTS dev_analytics.operations;
-- กำหนดสิทธิ์ให้ทีม data engineering
GRANT USE CATALOG ON CATALOG dev_analytics TO `data-engineering-team`;
GRANT USE SCHEMA ON SCHEMA dev_analytics.marketing TO `data-engineering-team`;
GRANT CREATE TABLE ON SCHEMA dev_analytics.marketing TO `data-engineering-team`;
GRANT SELECT ON SCHEMA dev_analytics.marketing TO `data-analyst-team`;
-- ดู permissions ทั้งหมด
SHOW GRANTS ON CATALOG dev_analytics;
ตั้งค่า Databricks CLI สำหรับ CI/CD Pipeline
ทีมที่ทำ Scrum ต้องมี CI/CD pipeline ที่ deploy data pipelines อัตโนมัติทุก sprint ใช้ Databricks CLI ร่วมกับ Databricks Asset Bundles (DABs)
# ติดตั้ง Databricks CLI
curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh
# ตรวจสอบเวอร์ชัน
databricks --version
# Databricks CLI v0.218.0
# ตั้งค่า authentication
databricks configure --token
# Databricks Host: https://your-workspace.cloud.databricks.com
# Personal Access Token: dapi_xxxxxxxxxxxxx
# ทดสอบ connection
databricks workspace list /
databricks catalogs list
สร้าง Databricks Asset Bundle
# สร้างโปรเจค bundle ใหม่
databricks bundle init
# โครงสร้างไฟล์
# my-project/
# ├── databricks.yml # Bundle configuration
# ├── src/
# │ ├── marketing_pipeline.py
# │ ├── sales_etl.py
# │ └── ops_reporting.py
# ├── tests/
# │ ├── test_marketing.py
# │ └── test_sales.py
# └── resources/
# ├── jobs.yml
# └── pipelines.yml
# databricks.yml
bundle:
name: analytics-pipelines
workspace:
host: https://your-workspace.cloud.databricks.com
variables:
catalog:
description: "Target Unity Catalog"
default: dev_analytics
targets:
dev:
mode: development
default: true
variables:
catalog: dev_analytics
workspace:
root_path: /Users//.bundle//dev
staging:
variables:
catalog: staging_analytics
workspace:
root_path: /Shared/.bundle//staging
prod:
mode: production
variables:
catalog: prod_analytics
workspace:
root_path: /Shared/.bundle//prod
permissions:
- level: CAN_MANAGE
group_name: data-platform-admins
- level: CAN_VIEW
group_name: data-engineering-team
สร้าง Delta Live Tables Pipeline
Delta Live Tables (DLT) ช่วยสร้าง data pipeline แบบ declarative ทำให้ทีม Agile สามารถ iterate ได้เร็ว ไม่ต้องเขียน boilerplate code มาก
# src/marketing_pipeline.py
import dlt
from pyspark.sql.functions import col, current_timestamp, when
@dlt.table(
name="raw_website_events",
comment="Raw clickstream data from website",
table_properties={"quality": "bronze"}
)
def raw_website_events():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/mnt/schema/website_events")
.load("/mnt/raw/website_events/")
)
@dlt.table(
name="cleaned_events",
comment="Cleaned and deduplicated events",
table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_user_id", "user_id IS NOT NULL")
@dlt.expect_or_drop("valid_timestamp", "event_time IS NOT NULL")
@dlt.expect("valid_event_type", "event_type IN ('pageview','click','purchase','signup')")
def cleaned_events():
return (
dlt.read_stream("raw_website_events")
.dropDuplicates(["event_id"])
.withColumn("processed_at", current_timestamp())
.select(
col("event_id"),
col("user_id"),
col("event_type"),
col("event_time").cast("timestamp"),
col("page_url"),
col("referrer"),
col("device_type"),
col("processed_at")
)
)
@dlt.table(
name="marketing_attribution",
comment="Marketing channel attribution per user",
table_properties={"quality": "gold"}
)
def marketing_attribution():
return (
dlt.read("cleaned_events")
.filter(col("event_type").isin("purchase", "signup"))
.groupBy("referrer", "device_type")
.agg(
{"event_id": "count", "user_id": "approx_count_distinct"}
)
.withColumnRenamed("count(event_id)", "total_conversions")
.withColumnRenamed("approx_count_distinct(user_id)", "unique_users")
)
จัดการ Sprint ด้วย Kanban Board สำหรับ Data Tasks
การใช้ Kanban board กับ data engineering มีความแตกต่างจาก software development ตรงที่ task มักมี dependency chain ยาว ต้องออกแบบ board ให้สะท้อน data pipeline stages
# resources/jobs.yml — Databricks Job สำหรับแต่ละ Sprint Task
resources:
jobs:
sprint_42_marketing_etl:
name: "[Sprint-42] Marketing ETL Pipeline"
schedule:
quartz_cron_expression: "0 0 6 * * ?"
timezone_id: Asia/Bangkok
tasks:
- task_key: ingest_raw
pipeline_task:
pipeline_id:
- task_key: run_quality_checks
depends_on:
- task_key: ingest_raw
notebook_task:
notebook_path: ../src/quality_checks.py
base_parameters:
catalog:
schema: marketing
- task_key: update_dashboard
depends_on:
- task_key: run_quality_checks
notebook_task:
notebook_path: ../src/refresh_dashboards.py
email_notifications:
on_failure:
- data-team@company.com
tags:
sprint: "42"
team: marketing
priority: high
Data Quality Monitoring ด้วย Unity Catalog
Unity Catalog มี built-in data quality monitoring ที่ช่วยให้ทีม Scrum ตรวจจับปัญหา data drift ก่อนที่จะกระทบ downstream consumers
-- สร้าง monitoring สำหรับ gold table
CREATE OR REFRESH MONITOR prod_analytics.marketing.marketing_attribution
USING PROFILE
WITH SCHEDULE CRON '0 0 8 * * ?'
WITH TIME_SERIES (event_time);
-- ดู monitoring results
SELECT * FROM prod_analytics.marketing.marketing_attribution_profile_metrics
WHERE log_time > current_date() - INTERVAL 7 DAYS
ORDER BY log_time DESC;
-- ตั้ง alert เมื่อ data quality ต่ำ
CREATE ALERT marketing_null_check
USING (
SELECT COUNT(*) as null_count
FROM prod_analytics.marketing.cleaned_events
WHERE user_id IS NULL AND event_time > current_date()
)
WHEN null_count > 100
THEN NOTIFY 'data-team-webhook';
Lineage Tracking สำหรับ Sprint Retrospective
Unity Catalog บันทึก data lineage อัตโนมัติ ทำให้ตอน sprint retrospective ทีมเห็นภาพรวมว่า pipeline ไหนใช้ data จากไหน impact analysis ทำได้ง่าย
# ดู lineage ผ่าน REST API
import requests
WORKSPACE_URL = "https://your-workspace.cloud.databricks.com"
TOKEN = "dapi_xxxxxxxxxxxxx"
# ดู downstream dependencies ของ table
resp = requests.get(
f"{WORKSPACE_URL}/api/2.0/lineage-tracking/table-lineage",
headers={"Authorization": f"Bearer {TOKEN}"},
json={
"table_name": "prod_analytics.marketing.cleaned_events",
"include_entity_lineage": True
}
)
lineage = resp.json()
print("Downstream tables:")
for dep in lineage.get("downstreams", []):
print(f" → {dep['tableInfo']['name']}")
print("\nUpstream tables:")
for dep in lineage.get("upstreams", []):
print(f" ← {dep['tableInfo']['name']}")
# CI/CD: deploy bundle ตอนจบ sprint
# GitHub Actions workflow
# .github/workflows/deploy.yml
name: Deploy Data Pipelines
on:
push:
branches: [main]
jobs:
deploy-staging:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- run: databricks bundle validate -t staging
- run: databricks bundle deploy -t staging
test-staging:
needs: deploy-staging
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- run: databricks bundle run sprint_42_marketing_etl -t staging
- run: python tests/integration/test_data_quality.py
deploy-prod:
needs: test-staging
runs-on: ubuntu-latest
environment: production
steps:
- uses: actions/checkout@v4
- uses: databricks/setup-cli@main
- run: databricks bundle deploy -t prod
FAQ — คำถามที่พบบ่อย
Q: Unity Catalog ใช้ได้กับ Databricks Community Edition ไหม?
A: ไม่ได้ Unity Catalog ต้องใช้กับ Databricks Premium plan ขึ้นไป เพราะต้องการ metastore server ที่ Databricks จัดการให้ ถ้าต้องการทดลองใช้ต้องสมัคร trial ของ premium tier
Q: ควรแบ่ง Catalog ตาม environment หรือตาม domain?
A: แนะนำให้แบ่งตาม environment เป็นหลัก (dev/staging/prod) แล้วใช้ schema แบ่งตาม domain ภายใน catalog เพราะ permission model ของ Unity Catalog ทำงานที่ระดับ catalog ได้สะดวกกว่า ทำให้ควบคุม access ระหว่าง environment ได้ง่าย
Q: Agile กับ Data Engineering ต่างจาก Software Engineering อย่างไร?
A: Data Engineering sprint มักมี task ที่ต้องรอ data arrive หรือรอ pipeline run เสร็จ ทำให้ velocity ผันผวนกว่า แนะนำให้ใช้ Kanban มากกว่า Scrum แบบเคร่งครัด โดยจัด column เป็น Backlog → Data Exploration → Development → Testing → Production → Done แทนที่จะเป็น To Do → In Progress → Done แบบทั่วไป
Q: Delta Live Tables กับ Structured Streaming ต่างกันอย่างไร?
A: DLT เป็น abstraction layer ที่อยู่บน Structured Streaming อีกชั้น ข้อดีคือเขียน code น้อยกว่า มี data quality expectations built-in และ Databricks จัดการ infrastructure ให้ทั้งหมด ข้อเสียคือ flexibility น้อยกว่าการเขียน Structured Streaming เอง ถ้า pipeline ไม่ซับซ้อนมากให้ใช้ DLT ถ้าต้อง custom logic เยอะให้ใช้ Structured Streaming ตรงๆ
