SiamCafe · Blog
Databricks Unity Catalog Agile Scrum Kanban
บทความ

Databricks Unity Catalog Agile Scrum Kanban

เผยแพร่ 28 พฤษภาคม 2569

Databricks Unity Catalog กับการจัดการ Data Governance

Unity Catalog เป็น centralized governance layer ของ Databricks ที่ช่วยจัดการ data assets ทั้งหมดในที่เดียว ไม่ว่าจะเป็น tables, views, volumes, models หรือ functions ทุกอย่างถูกจัดระเบียบภายใต้ 3-level namespace: catalog → schema → object ทำให้ทีมที่ใช้ Agile/Scrum สามารถแบ่ง workspace ตาม sprint หรือ project ได้ชัดเจน

สร้าง Unity Catalog สำหรับทีม Agile

การออกแบบ catalog structure ที่ดีควรสะท้อน workflow ของทีม ตัวอย่างด้านล่างแบ่งตาม environment และ domain

-- สร้าง catalog สำหรับแต่ละ environment
CREATE CATALOG IF NOT EXISTS dev_analytics;
CREATE CATALOG IF NOT EXISTS staging_analytics;
CREATE CATALOG IF NOT EXISTS prod_analytics;

-- สร้าง schema ตาม domain (แต่ละ schema = 1 Kanban board)
CREATE SCHEMA IF NOT EXISTS dev_analytics.marketing;
CREATE SCHEMA IF NOT EXISTS dev_analytics.sales;
CREATE SCHEMA IF NOT EXISTS dev_analytics.operations;

-- กำหนดสิทธิ์ให้ทีม data engineering
GRANT USE CATALOG ON CATALOG dev_analytics TO `data-engineering-team`;
GRANT USE SCHEMA ON SCHEMA dev_analytics.marketing TO `data-engineering-team`;
GRANT CREATE TABLE ON SCHEMA dev_analytics.marketing TO `data-engineering-team`;
GRANT SELECT ON SCHEMA dev_analytics.marketing TO `data-analyst-team`;

-- ดู permissions ทั้งหมด
SHOW GRANTS ON CATALOG dev_analytics;

ตั้งค่า Databricks CLI สำหรับ CI/CD Pipeline

ทีมที่ทำ Scrum ต้องมี CI/CD pipeline ที่ deploy data pipelines อัตโนมัติทุก sprint ใช้ Databricks CLI ร่วมกับ Databricks Asset Bundles (DABs)

ติดตั้ง Databricks CLI

curl -fsSL https://raw.githubusercontent.com/databricks/setup-cli/main/install.sh | sh

ตรวจสอบเวอร์ชัน

databricks --version

Databricks CLI v0.218.0

ตั้งค่า authentication

databricks configure --token

Databricks Host: https://your-workspace.cloud.databricks.com

Personal Access Token: dapi_xxxxxxxxxxxxx

ทดสอบ connection

databricks workspace list /

databricks catalogs list

สร้าง Databricks Asset Bundle

สร้างโปรเจค bundle ใหม่

databricks bundle init

โครงสร้างไฟล์

my-project/

├── databricks.yml # Bundle configuration

├── src/

│ ├── marketing_pipeline.py

│ ├── sales_etl.py

│ └── ops_reporting.py

├── tests/

│ ├── test_marketing.py

│ └── test_sales.py

└── resources/

├── jobs.yml

└── pipelines.yml

# databricks.yml
bundle:
  name: analytics-pipelines

workspace:
  host: https://your-workspace.cloud.databricks.com

variables:
  catalog:
    description: "Target Unity Catalog"
    default: dev_analytics

targets:
  dev:
    mode: development
    default: true
    variables:
      catalog: dev_analytics
    workspace:
      root_path: /Users//.bundle//dev

  staging:
    variables:
      catalog: staging_analytics
    workspace:
      root_path: /Shared/.bundle//staging

  prod:
    mode: production
    variables:
      catalog: prod_analytics
    workspace:
      root_path: /Shared/.bundle//prod
    permissions:
      - level: CAN_MANAGE
        group_name: data-platform-admins
      - level: CAN_VIEW
        group_name: data-engineering-team

สร้าง Delta Live Tables Pipeline

Delta Live Tables (DLT) ช่วยสร้าง data pipeline แบบ declarative ทำให้ทีม Agile สามารถ iterate ได้เร็ว ไม่ต้องเขียน boilerplate code มาก

# src/marketing_pipeline.py
import dlt
from pyspark.sql.functions import col, current_timestamp, when

@dlt.table(
    name="raw_website_events",
    comment="Raw clickstream data from website",
    table_properties={"quality": "bronze"}
)
def raw_website_events():
    return (
        spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", "/mnt/schema/website_events")
        .load("/mnt/raw/website_events/")
    )

@dlt.table(
    name="cleaned_events",
    comment="Cleaned and deduplicated events",
    table_properties={"quality": "silver"}
)
@dlt.expect_or_drop("valid_user_id", "user_id IS NOT NULL")
@dlt.expect_or_drop("valid_timestamp", "event_time IS NOT NULL")
@dlt.expect("valid_event_type", "event_type IN ('pageview','click','purchase','signup')")
def cleaned_events():
    return (
        dlt.read_stream("raw_website_events")
        .dropDuplicates(["event_id"])
        .withColumn("processed_at", current_timestamp())
        .select(
            col("event_id"),
            col("user_id"),
            col("event_type"),
            col("event_time").cast("timestamp"),
            col("page_url"),
            col("referrer"),
            col("device_type"),
            col("processed_at")
        )
    )

@dlt.table(
    name="marketing_attribution",
    comment="Marketing channel attribution per user",
    table_properties={"quality": "gold"}
)
def marketing_attribution():
    return (
        dlt.read("cleaned_events")
        .filter(col("event_type").isin("purchase", "signup"))
        .groupBy("referrer", "device_type")
        .agg(
            {"event_id": "count", "user_id": "approx_count_distinct"}
        )
        .withColumnRenamed("count(event_id)", "total_conversions")
        .withColumnRenamed("approx_count_distinct(user_id)", "unique_users")
    )

จัดการ Sprint ด้วย Kanban Board สำหรับ Data Tasks

การใช้ Kanban board กับ data engineering มีความแตกต่างจาก software development ตรงที่ task มักมี dependency chain ยาว ต้องออกแบบ board ให้สะท้อน data pipeline stages

# resources/jobs.yml — Databricks Job สำหรับแต่ละ Sprint Task
resources:
  jobs:
    sprint_42_marketing_etl:
      name: "[Sprint-42] Marketing ETL Pipeline"
      schedule:
        quartz_cron_expression: "0 0 6 * * ?"
        timezone_id: Asia/Bangkok
      tasks:
        - task_key: ingest_raw
          pipeline_task:
            pipeline_id: 
          
        - task_key: run_quality_checks
          depends_on:
            - task_key: ingest_raw
          notebook_task:
            notebook_path: ../src/quality_checks.py
            base_parameters:
              catalog: 
              schema: marketing

        - task_key: update_dashboard
          depends_on:
            - task_key: run_quality_checks
          notebook_task:
            notebook_path: ../src/refresh_dashboards.py
      
      email_notifications:
        on_failure:
          - data-team@company.com
      
      tags:
        sprint: "42"
        team: marketing
        priority: high

Data Quality Monitoring ด้วย Unity Catalog

Unity Catalog มี built-in data quality monitoring ที่ช่วยให้ทีม Scrum ตรวจจับปัญหา data drift ก่อนที่จะกระทบ downstream consumers

-- สร้าง monitoring สำหรับ gold table
CREATE OR REFRESH MONITOR prod_analytics.marketing.marketing_attribution
  USING PROFILE
  WITH SCHEDULE CRON '0 0 8 * * ?'
  WITH TIME_SERIES (event_time);

-- ดู monitoring results
SELECT * FROM prod_analytics.marketing.marketing_attribution_profile_metrics
WHERE log_time > current_date() - INTERVAL 7 DAYS
ORDER BY log_time DESC;

-- ตั้ง alert เมื่อ data quality ต่ำ
CREATE ALERT marketing_null_check
  USING (
    SELECT COUNT(*) as null_count
    FROM prod_analytics.marketing.cleaned_events
    WHERE user_id IS NULL AND event_time > current_date()
  )
  WHEN null_count > 100
  THEN NOTIFY 'data-team-webhook';

Lineage Tracking สำหรับ Sprint Retrospective

Unity Catalog บันทึก data lineage อัตโนมัติ ทำให้ตอน sprint retrospective ทีมเห็นภาพรวมว่า pipeline ไหนใช้ data จากไหน impact analysis ทำได้ง่าย

# ดู lineage ผ่าน REST API
import requests

WORKSPACE_URL = "https://your-workspace.cloud.databricks.com"
TOKEN = "dapi_xxxxxxxxxxxxx"

# ดู downstream dependencies ของ table
resp = requests.get(
    f"{WORKSPACE_URL}/api/2.0/lineage-tracking/table-lineage",
    headers={"Authorization": f"Bearer {TOKEN}"},
    json={
        "table_name": "prod_analytics.marketing.cleaned_events",
        "include_entity_lineage": True
    }
)

lineage = resp.json()
print("Downstream tables:")
for dep in lineage.get("downstreams", []):
    print(f"  → {dep['tableInfo']['name']}")

print("\nUpstream tables:")
for dep in lineage.get("upstreams", []):
    print(f"  ← {dep['tableInfo']['name']}")
# CI/CD: deploy bundle ตอนจบ sprint
# GitHub Actions workflow
# .github/workflows/deploy.yml
name: Deploy Data Pipelines
on:
  push:
    branches: [main]

jobs:
  deploy-staging:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: databricks/setup-cli@main
      - run: databricks bundle validate -t staging
      - run: databricks bundle deploy -t staging

  test-staging:
    needs: deploy-staging
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: databricks/setup-cli@main
      - run: databricks bundle run sprint_42_marketing_etl -t staging
      - run: python tests/integration/test_data_quality.py

  deploy-prod:
    needs: test-staging
    runs-on: ubuntu-latest
    environment: production
    steps:
      - uses: actions/checkout@v4
      - uses: databricks/setup-cli@main
      - run: databricks bundle deploy -t prod

FAQ — คำถามที่พบบ่อย

Q: Unity Catalog ใช้ได้กับ Databricks Community Edition ไหม?

A: ไม่ได้ Unity Catalog ต้องใช้กับ Databricks Premium plan ขึ้นไป เพราะต้องการ metastore server ที่ Databricks จัดการให้ ถ้าต้องการทดลองใช้ต้องสมัคร trial ของ premium tier

Q: ควรแบ่ง Catalog ตาม environment หรือตาม domain?

A: แนะนำให้แบ่งตาม environment เป็นหลัก (dev/staging/prod) แล้วใช้ schema แบ่งตาม domain ภายใน catalog เพราะ permission model ของ Unity Catalog ทำงานที่ระดับ catalog ได้สะดวกกว่า ทำให้ควบคุม access ระหว่าง environment ได้ง่าย

Q: Agile กับ Data Engineering ต่างจาก Software Engineering อย่างไร?

A: Data Engineering sprint มักมี task ที่ต้องรอ data arrive หรือรอ pipeline run เสร็จ ทำให้ velocity ผันผวนกว่า แนะนำให้ใช้ Kanban มากกว่า Scrum แบบเคร่งครัด โดยจัด column เป็น Backlog → Data Exploration → Development → Testing → Production → Done แทนที่จะเป็น To Do → In Progress → Done แบบทั่วไป

Q: Delta Live Tables กับ Structured Streaming ต่างกันอย่างไร?

A: DLT เป็น abstraction layer ที่อยู่บน Structured Streaming อีกชั้น ข้อดีคือเขียน code น้อยกว่า มี data quality expectations built-in และ Databricks จัดการ infrastructure ให้ทั้งหมด ข้อเสียคือ flexibility น้อยกว่าการเขียน Structured Streaming เอง ถ้า pipeline ไม่ซับซ้อนมากให้ใช้ DLT ถ้าต้อง custom logic เยอะให้ใช้ Structured Streaming ตรงๆ