Cybersecurity

Databricks Unity Catalog Automation Script

databricks unity catalog automation script
Databricks Unity Catalog Automation Script | SiamCafe Blog
2026-03-22· อ. บอม — SiamCafe.net· 10,887 คำ

Databricks Unity Catalog

Unity Catalog เป็น Data Governance Solution ที่จัดการ Data Assets ทั้งหมดจากจุดเดียว ใช้ 3-level Namespace มี Fine-grained Access Control, Data Lineage และ Audit Logging

Automation Scripts ช่วยให้จัดการ Unity Catalog ได้อัตโนมัติ สร้าง Catalogs, Schemas, Grant Permissions, Monitor Quality ผ่าน API และ SDK

Unity Catalog Setup

# === Unity Catalog Setup ด้วย SQL ===

-- 1. สร้าง Catalog
CREATE CATALOG IF NOT EXISTS production
COMMENT 'Production data catalog';

CREATE CATALOG IF NOT EXISTS development
COMMENT 'Development data catalog';

CREATE CATALOG IF NOT EXISTS staging
COMMENT 'Staging data catalog';

-- 2. สร้าง Schema
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales domain data';

CREATE SCHEMA IF NOT EXISTS production.marketing
COMMENT 'Marketing domain data';

CREATE SCHEMA IF NOT EXISTS production.finance
COMMENT 'Finance domain data';

-- 3. สร้าง Table
CREATE TABLE IF NOT EXISTS production.sales.orders (
    order_id BIGINT NOT NULL,
    customer_id BIGINT NOT NULL,
    product_id BIGINT NOT NULL,
    amount DECIMAL(10, 2) NOT NULL,
    status STRING NOT NULL,
    created_at TIMESTAMP NOT NULL,
    updated_at TIMESTAMP
)
USING DELTA
COMMENT 'Customer orders'
TBLPROPERTIES (
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true',
    'quality' = 'gold'
);

-- 4. Grant Permissions
-- Data Engineers: full access
GRANT USE CATALOG ON CATALOG production TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-engineers`;
GRANT ALL PRIVILEGES ON SCHEMA production.sales TO `data-engineers`;

-- Data Analysts: read-only
GRANT USE CATALOG ON CATALOG production TO `data-analysts`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-analysts`;
GRANT SELECT ON SCHEMA production.sales TO `data-analysts`;

-- Data Scientists: read + create models
GRANT USE CATALOG ON CATALOG production TO `data-scientists`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-scientists`;
GRANT SELECT ON SCHEMA production.sales TO `data-scientists`;

-- 5. Tags and Classification
ALTER TABLE production.sales.orders
SET TAGS ('domain' = 'sales', 'pii' = 'false', 'tier' = 'gold');

ALTER TABLE production.sales.orders
ALTER COLUMN customer_id SET TAGS ('pii' = 'true');

-- 6. Row-level Security
CREATE FUNCTION production.sales.region_filter(region STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-access'), true, region = current_user_region());

ALTER TABLE production.sales.orders
SET ROW FILTER production.sales.region_filter ON (region);

-- 7. Column Masking
CREATE FUNCTION production.sales.mask_email(email STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), email, regexp_replace(email, '(.).*@', '$1***@'));

ALTER TABLE production.sales.customers
ALTER COLUMN email SET MASK production.sales.mask_email;

Automation Scripts

# unity_catalog_automation.py — Unity Catalog Automation
# pip install databricks-sdk

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json

@dataclass
class CatalogConfig:
    name: str
    comment: str
    schemas: List[Dict] = field(default_factory=list)
    owner: str = "data-platform-team"

@dataclass
class PermissionGrant:
    principal: str  # group or user
    catalog: str
    schema: Optional[str] = None
    privileges: List[str] = field(default_factory=list)

class UnityCatalogAutomation:
    """Automation สำหรับ Unity Catalog"""

    def __init__(self, host, token):
        self.host = host
        self.token = token
        self.catalogs_created = 0
        self.schemas_created = 0
        self.grants_applied = 0

    def _api_call(self, method, endpoint, data=None):
        """เรียก Databricks REST API"""
        import requests
        url = f"{self.host}/api/2.1/unity-catalog{endpoint}"
        headers = {"Authorization": f"Bearer {self.token}"}

        if method == "GET":
            resp = requests.get(url, headers=headers)
        elif method == "POST":
            resp = requests.post(url, headers=headers, json=data)
        elif method == "PATCH":
            resp = requests.patch(url, headers=headers, json=data)

        return resp.json() if resp.status_code < 400 else {"error": resp.text}

    def create_catalog(self, config: CatalogConfig):
        """สร้าง Catalog"""
        result = self._api_call("POST", "/catalogs", {
            "name": config.name,
            "comment": config.comment,
            "properties": {"owner": config.owner, "created_by": "automation"},
        })

        if "error" not in result:
            self.catalogs_created += 1
            print(f"  Created catalog: {config.name}")

            # สร้าง Schemas
            for schema in config.schemas:
                self.create_schema(config.name, schema["name"], schema.get("comment", ""))

        return result

    def create_schema(self, catalog, name, comment=""):
        """สร้าง Schema"""
        result = self._api_call("POST", "/schemas", {
            "name": name,
            "catalog_name": catalog,
            "comment": comment,
        })

        if "error" not in result:
            self.schemas_created += 1
            print(f"    Created schema: {catalog}.{name}")

        return result

    def grant_permissions(self, grant: PermissionGrant):
        """Grant Permissions"""
        securable_type = "schema" if grant.schema else "catalog"
        securable_name = f"{grant.catalog}.{grant.schema}" if grant.schema else grant.catalog

        changes = [{
            "principal": grant.principal,
            "add": grant.privileges,
        }]

        result = self._api_call("PATCH",
            f"/permissions/{securable_type}/{securable_name}",
            {"changes": changes})

        if "error" not in result:
            self.grants_applied += 1
            privs = ", ".join(grant.privileges)
            print(f"  Granted [{privs}] on {securable_name} to {grant.principal}")

        return result

    def setup_environment(self, env_config):
        """Setup ทั้ง Environment"""
        print(f"\n{'='*55}")
        print(f"Unity Catalog Setup — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        print(f"{'='*55}")

        # สร้าง Catalogs
        for catalog_cfg in env_config.get("catalogs", []):
            config = CatalogConfig(**catalog_cfg)
            self.create_catalog(config)

        # Apply Permissions
        for grant_cfg in env_config.get("permissions", []):
            grant = PermissionGrant(**grant_cfg)
            self.grant_permissions(grant)

        print(f"\n  Summary:")
        print(f"    Catalogs: {self.catalogs_created}")
        print(f"    Schemas: {self.schemas_created}")
        print(f"    Grants: {self.grants_applied}")

# === Configuration File ===
env_config = {
    "catalogs": [
        {
            "name": "production",
            "comment": "Production data",
            "schemas": [
                {"name": "sales", "comment": "Sales domain"},
                {"name": "marketing", "comment": "Marketing domain"},
                {"name": "finance", "comment": "Finance domain"},
                {"name": "ml_models", "comment": "ML models and features"},
            ],
        },
        {
            "name": "development",
            "comment": "Development sandbox",
            "schemas": [
                {"name": "sandbox", "comment": "Developer sandbox"},
                {"name": "experiments", "comment": "ML experiments"},
            ],
        },
    ],
    "permissions": [
        {"principal": "data-engineers", "catalog": "production",
         "privileges": ["USE_CATALOG"]},
        {"principal": "data-engineers", "catalog": "production",
         "schema": "sales", "privileges": ["ALL_PRIVILEGES"]},
        {"principal": "data-analysts", "catalog": "production",
         "schema": "sales", "privileges": ["USE_SCHEMA", "SELECT"]},
    ],
}

# automation = UnityCatalogAutomation("https://my-workspace.cloud.databricks.com", "token")
# automation.setup_environment(env_config)
print(f"Config: {len(env_config['catalogs'])} catalogs, {len(env_config['permissions'])} grants")

Data Lineage และ Audit

# lineage_audit.py — Data Lineage และ Audit Monitoring

from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime

@dataclass
class LineageEdge:
    source_table: str
    target_table: str
    transformation: str
    job_name: str

@dataclass
class AuditEvent:
    timestamp: str
    user: str
    action: str
    resource: str
    details: str

class DataGovernanceMonitor:
    """Monitor Data Governance"""

    def __init__(self):
        self.lineage: List[LineageEdge] = []
        self.audit_events: List[AuditEvent] = []

    def add_lineage(self, source, target, transform, job):
        self.lineage.append(LineageEdge(source, target, transform, job))

    def add_audit(self, user, action, resource, details=""):
        self.audit_events.append(AuditEvent(
            datetime.now().isoformat(), user, action, resource, details))

    def lineage_report(self):
        """แสดง Data Lineage"""
        print(f"\nData Lineage ({len(self.lineage)} edges):")
        for edge in self.lineage:
            print(f"  {edge.source_table}")
            print(f"    -> [{edge.transformation}] ({edge.job_name})")
            print(f"    -> {edge.target_table}")

    def audit_report(self):
        """แสดง Audit Report"""
        print(f"\nAudit Trail ({len(self.audit_events)} events):")
        for event in self.audit_events[-10:]:
            print(f"  [{event.timestamp[:16]}] {event.user}: "
                  f"{event.action} on {event.resource}")

    def compliance_check(self):
        """ตรวจสอบ Compliance"""
        checks = [
            ("PII tables have row filters", True),
            ("All catalogs have owners", True),
            ("Audit logging enabled", True),
            ("Column masking on PII", True),
            ("No public access grants", True),
        ]

        print(f"\nCompliance Check:")
        passed = sum(1 for _, ok in checks if ok)
        for check, ok in checks:
            status = "PASS" if ok else "FAIL"
            print(f"  [{status}] {check}")
        print(f"\n  Score: {passed}/{len(checks)} ({passed/len(checks)*100:.0f}%)")

# ตัวอย่าง
monitor = DataGovernanceMonitor()

monitor.add_lineage("bronze.raw_orders", "silver.clean_orders", "Clean + Dedupe", "etl-orders")
monitor.add_lineage("silver.clean_orders", "gold.daily_revenue", "Aggregate", "agg-revenue")
monitor.add_lineage("silver.clean_orders", "gold.customer_segments", "ML Features", "ml-features")

monitor.add_audit("alice@company.com", "SELECT", "production.sales.orders")
monitor.add_audit("bob@company.com", "GRANT", "production.sales", "Granted SELECT to analysts")
monitor.add_audit("automation", "CREATE TABLE", "production.sales.daily_metrics")

monitor.lineage_report()
monitor.audit_report()
monitor.compliance_check()

Best Practices

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

Unity Catalog คืออะไร

Data Governance Solution ของ Databricks จัดการ Data Assets จากจุดเดียว Tables Views Volumes Models Functions 3-level Namespace Fine-grained Access Control Lineage Audit Multi-cloud

Data Governance คืออะไร

กระบวนการจัดการข้อมูลให้มีคุณภาพปลอดภัยตามกฎระเบียบ Access Control Data Quality Lineage Classification Audit Trail

3-level Namespace คืออะไร

Catalog.Schema.Table เช่น production.sales.orders Catalog ระดับสูงสุด Schema จัดกลุ่มตาม Domain Table เป็น Data Asset จัดระเบียบควบคุมสิทธิ์ง่าย

วิธี Automate Data Governance ทำอย่างไร

ใช้ Databricks REST API SDK สร้าง Scripts Create Catalogs Schemas Grant Permissions Monitor Quality Generate Reports CI/CD Pipeline Terraform Databricks Asset Bundles

สรุป

Databricks Unity Catalog ให้ Data Governance ที่ครบถ้วน 3-level Namespace จัดระเบียบ Fine-grained Access Control Least Privilege Column Masking Row-level Security PII Protection Automation ด้วย API SDK Terraform Audit Logging ตรวจสอบการเข้าถึง Data Lineage ติดตามข้อมูล

📖 บทความที่เกี่ยวข้อง

Databricks Unity Catalog Disaster Recovery Planอ่านบทความ → Databricks Unity Catalog Technical Debt Managementอ่านบทความ → Databricks Unity Catalog อ่านบทความ → Databricks Unity Catalog DevOps Cultureอ่านบทความ → Databricks Unity Catalog Site Reliability SREอ่านบทความ →

📚 ดูบทความทั้งหมด →