ai

Databricks Unity Catalog กับ Automation Script —

Databricks Unity Catalog กับ Automation Script —

Databricks Unity Catalog

Databricks Unity Catalog กับ Automation Script —

Unity Catalog เป็น Data Governance Solution ที่จัดการ Data Assets ทั้งหมดจากจุดเดียว ใช้ 3-level Namespace มี Fine-grained Access Control, Data Lineage และ Audit Logging

Automation Scripts ช่วยให้จัดการ Unity Catalog ได้อัตโนมัติ สร้าง Catalogs, Schemas, Grant Permissions, Monitor Quality ผ่าน API และ SDK

Unity Catalog Setup

# === Unity Catalog Setup ด้วย SQL ===



-- 1. สร้าง Catalog

CREATE CATALOG IF NOT EXISTS production

COMMENT 'Production data catalog';



CREATE CATALOG IF NOT EXISTS development

COMMENT 'Development data catalog';



CREATE CATALOG IF NOT EXISTS staging

COMMENT 'Staging data catalog';



-- 2. สร้าง Schema

CREATE SCHEMA IF NOT EXISTS production.sales

COMMENT 'Sales domain data';



CREATE SCHEMA IF NOT EXISTS production.marketing

COMMENT 'Marketing domain data';



CREATE SCHEMA IF NOT EXISTS production.finance

COMMENT 'Finance domain data';



-- 3. สร้าง Table

CREATE TABLE IF NOT EXISTS production.sales.orders (

    order_id BIGINT NOT NULL,

    customer_id BIGINT NOT NULL,

    product_id BIGINT NOT NULL,

    amount DECIMAL(10, 2) NOT NULL,

    status STRING NOT NULL,

    created_at TIMESTAMP NOT NULL,

    updated_at TIMESTAMP

)

USING DELTA

COMMENT 'Customer orders'

TBLPROPERTIES (

    'delta.autoOptimize.optimizeWrite' = 'true',

    'delta.autoOptimize.autoCompact' = 'true',

    'quality' = 'gold'

);



-- 4. Grant Permissions

-- Data Engineers: full access

GRANT USE CATALOG ON CATALOG production TO `data-engineers`;

GRANT USE SCHEMA ON SCHEMA production.sales TO `data-engineers`;

GRANT ALL PRIVILEGES ON SCHEMA production.sales TO `data-engineers`;



-- Data Analysts: read-only

GRANT USE CATALOG ON CATALOG production TO `data-analysts`;

GRANT USE SCHEMA ON SCHEMA production.sales TO `data-analysts`;

GRANT SELECT ON SCHEMA production.sales TO `data-analysts`;



-- Data Scientists: read + create models

GRANT USE CATALOG ON CATALOG production TO `data-scientists`;

GRANT USE SCHEMA ON SCHEMA production.sales TO `data-scientists`;

GRANT SELECT ON SCHEMA production.sales TO `data-scientists`;



-- 5. Tags and Classification

ALTER TABLE production.sales.orders

SET TAGS ('domain' = 'sales', 'pii' = 'false', 'tier' = 'gold');



ALTER TABLE production.sales.orders

ALTER COLUMN customer_id SET TAGS ('pii' = 'true');



-- 6. Row-level Security

CREATE FUNCTION production.sales.region_filter(region STRING)

RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-access'), true, region = current_user_region());



ALTER TABLE production.sales.orders

SET ROW FILTER production.sales.region_filter ON (region);



-- 7. Column Masking

CREATE FUNCTION production.sales.mask_email(email STRING)

RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), email, regexp_replace(email, '(.).*@', '$1***@'));



ALTER TABLE production.sales.customers

ALTER COLUMN email SET MASK production.sales.mask_email;

Automation Scripts

# unity_catalog_automation.py — Unity Catalog Automation

# pip install databricks-sdk



from dataclasses import dataclass, field

from typing import List, Dict, Optional

from datetime import datetime

import json



@dataclass

class CatalogConfig:

    name: str

    comment: str

    schemas: List[Dict] = field(default_factory=list)

    owner: str = "data-platform-team"



@dataclass

class PermissionGrant:

    principal: str  # group or user

    catalog: str

    schema: Optional[str] = None

    privileges: List[str] = field(default_factory=list)



class UnityCatalogAutomation:

    """Automation สำหรับ Unity Catalog"""



    def __init__(self, host, token):

        self.host = host

        self.token = token

        self.catalogs_created = 0

        self.schemas_created = 0

        self.grants_applied = 0



    def _api_call(self, method, endpoint, data=None):

        """เรียก Databricks REST API"""

        import requests

        url = f"{self.host}/api/2.1/unity-catalog{endpoint}"

        headers = {"Authorization": f"Bearer {self.token}"}



        if method == "GET":

            resp = requests.get(url, headers=headers)

        elif method == "POST":

            resp = requests.post(url, headers=headers, json=data)

        elif method == "PATCH":

            resp = requests.patch(url, headers=headers, json=data)



        return resp.json() if resp.status_code < 400 else {"error": resp.text}



    def create_catalog(self, config: CatalogConfig):

        """สร้าง Catalog"""

        result = self._api_call("POST", "/catalogs", {

            "name": config.name,

            "comment": config.comment,

            "properties": {"owner": config.owner, "created_by": "automation"},

        })



        if "error" not in result:

            self.catalogs_created += 1

            print(f"  Created catalog: {config.name}")



            # สร้าง Schemas

            for schema in config.schemas:

                self.create_schema(config.name, schema["name"], schema.get("comment", ""))



        return result



    def create_schema(self, catalog, name, comment=""):

        """สร้าง Schema"""

        result = self._api_call("POST", "/schemas", {

            "name": name,

            "catalog_name": catalog,

            "comment": comment,

        })



        if "error" not in result:

            self.schemas_created += 1

            print(f"    Created schema: {catalog}.{name}")



        return result



    def grant_permissions(self, grant: PermissionGrant):

        """Grant Permissions"""

        securable_type = "schema" if grant.schema else "catalog"

        securable_name = f"{grant.catalog}.{grant.schema}" if grant.schema else grant.catalog



        changes = [{

            "principal": grant.principal,

            "add": grant.privileges,

        }]



        result = self._api_call("PATCH",

            f"/permissions/{securable_type}/{securable_name}",

            {"changes": changes})



        if "error" not in result:

            self.grants_applied += 1

            privs = ", ".join(grant.privileges)

            print(f"  Granted [{privs}] on {securable_name} to {grant.principal}")



        return result



    def setup_environment(self, env_config):

        """Setup ทั้ง Environment"""

        print(f"\n{'='*55}")

        print(f"Unity Catalog Setup — {datetime.now().strftime('%Y-%m-%d %H:%M')}")

        print(f"{'='*55}")



        # สร้าง Catalogs

        for catalog_cfg in env_config.get("catalogs", []):

            config = CatalogConfig(**catalog_cfg)

            self.create_catalog(config)



        # Apply Permissions

        for grant_cfg in env_config.get("permissions", []):

            grant = PermissionGrant(**grant_cfg)

            self.grant_permissions(grant)



        print(f"\n  Summary:")

        print(f"    Catalogs: {self.catalogs_created}")

        print(f"    Schemas: {self.schemas_created}")

        print(f"    Grants: {self.grants_applied}")



# === Configuration File ===

env_config = {

    "catalogs": [

        {

            "name": "production",

            "comment": "Production data",

            "schemas": [

                {"name": "sales", "comment": "Sales domain"},

                {"name": "marketing", "comment": "Marketing domain"},

                {"name": "finance", "comment": "Finance domain"},

                {"name": "ml_models", "comment": "ML models and features"},

            ],

        },

        {

            "name": "development",

            "comment": "Development sandbox",

            "schemas": [

                {"name": "sandbox", "comment": "Developer sandbox"},

                {"name": "experiments", "comment": "ML experiments"},

            ],

        },

    ],

    "permissions": [

        {"principal": "data-engineers", "catalog": "production",

         "privileges": ["USE_CATALOG"]},

        {"principal": "data-engineers", "catalog": "production",

         "schema": "sales", "privileges": ["ALL_PRIVILEGES"]},

        {"principal": "data-analysts", "catalog": "production",

         "schema": "sales", "privileges": ["USE_SCHEMA", "SELECT"]},

    ],

}



# automation = UnityCatalogAutomation("https://my-workspace.cloud.databricks.com", "token")

# automation.setup_environment(env_config)

print(f"Config: {len(env_config['catalogs'])} catalogs, {len(env_config['permissions'])} grants")

Data Lineage และ Audit

# lineage_audit.py — Data Lineage และ Audit Monitoring



from dataclasses import dataclass

from typing import List, Dict

from datetime import datetime



@dataclass

class LineageEdge:

    source_table: str

    target_table: str

    transformation: str

    job_name: str



@dataclass

class AuditEvent:

    timestamp: str

    user: str

    action: str

    resource: str

    details: str



class DataGovernanceMonitor:

    """Monitor Data Governance"""



    def __init__(self):

        self.lineage: List[LineageEdge] = []

        self.audit_events: List[AuditEvent] = []



    def add_lineage(self, source, target, transform, job):

        self.lineage.append(LineageEdge(source, target, transform, job))



    def add_audit(self, user, action, resource, details=""):

        self.audit_events.append(AuditEvent(

            datetime.now().isoformat(), user, action, resource, details))



    def lineage_report(self):

        """แสดง Data Lineage"""

        print(f"\nData Lineage ({len(self.lineage)} edges):")

        for edge in self.lineage:

            print(f"  {edge.source_table}")

            print(f"    -> [{edge.transformation}] ({edge.job_name})")

            print(f"    -> {edge.target_table}")



    def audit_report(self):

        """แสดง Audit Report"""

        print(f"\nAudit Trail ({len(self.audit_events)} events):")

        for event in self.audit_events[-10:]:

            print(f"  [{event.timestamp[:16]}] {event.user}: "

                  f"{event.action} on {event.resource}")



    def compliance_check(self):

        """ตรวจสอบ Compliance"""

        checks = [

            ("PII tables have row filters", True),

            ("All catalogs have owners", True),

            ("Audit logging enabled", True),

            ("Column masking on PII", True),

            ("No public access grants", True),

        ]



        print(f"\nCompliance Check:")

        passed = sum(1 for _, ok in checks if ok)

        for check, ok in checks:

            status = "PASS" if ok else "FAIL"

            print(f"  [{status}] {check}")

        print(f"\n  Score: {passed}/{len(checks)} ({passed/len(checks)*100:.0f}%)")



# ตัวอย่าง

monitor = DataGovernanceMonitor()



monitor.add_lineage("bronze.raw_orders", "silver.clean_orders", "Clean + Dedupe", "etl-orders")

monitor.add_lineage("silver.clean_orders", "gold.daily_revenue", "Aggregate", "agg-revenue")

monitor.add_lineage("silver.clean_orders", "gold.customer_segments", "ML Features", "ml-features")



monitor.add_audit("alice@company.com", "SELECT", "production.sales.orders")

monitor.add_audit("bob@company.com", "GRANT", "production.sales", "Granted SELECT to analysts")

monitor.add_audit("automation", "CREATE TABLE", "production.sales.daily_metrics")



monitor.lineage_report()

monitor.audit_report()

monitor.compliance_check()

Best Practices

  • 3-level Namespace: ใช้ Catalog แยกตาม Environment (prod/dev/staging) Schema แยกตาม Domain
  • Least Privilege: ให้สิทธิ์น้อยที่สุดที่จำเป็น ใช้ Groups แทน Users
  • PII Protection: ใช้ Column Masking และ Row-level Security สำหรับข้อมูล PII
  • Automation: ใช้ Terraform หรือ Databricks Asset Bundles จัดการ Infrastructure as Code
  • Audit Logging: เปิด Audit Logging ตรวจสอบการเข้าถึงข้อมูลทุกครั้ง
  • Data Classification: ใช้ Tags จัดประเภทข้อมูล (PII, Confidential, Public)

การนำไปใช้งานจริงในองค์กร

Databricks Unity Catalog กับ Automation Script —

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Text Generation WebUI Container Orchestration

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

แนะนำเพิ่มเติม — อีบุ๊กการลงทุน SiamCafeBook

Unity Catalog คืออะไร

Data Governance Solution ของ Databricks จัดการ Data Assets จากจุดเดียว Tables Views Volumes Models Functions 3-level Namespace Fine-grained Access Control Lineage Audit Multi-cloud

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: LocalAI Self-hosted Automation Script

Data Governance คืออะไร

กระบวนการจัดการข้อมูลให้มีคุณภาพปลอดภัยตามกฎระเบียบ Access Control Data Quality Lineage Classification Audit Trail

3-level Namespace คืออะไร

Catalog.Schema.Table เช่น production.sales.orders Catalog ระดับสูงสุด Schema จัดกลุ่มตาม Domain Table เป็น Data Asset จัดระเบียบควบคุมสิทธิ์ง่าย

แนะนำเพิ่มเติม — บทวิเคราะห์จาก XM Signal

เนื้อหาเกี่ยวข้อง — อ่านต่อ: nist cybersecurity framework คือ

วิธี Automate Data Governance ทำอย่างไร

ใช้ Databricks REST API SDK สร้าง Scripts Create Catalogs Schemas Grant Permissions Monitor Quality Generate Reports CI/CD Pipeline Terraform Databricks Asset Bundles

สรุป

Databricks Unity Catalog ให้ Data Governance ที่ครบถ้วน 3-level Namespace จัดระเบียบ Fine-grained Access Control Least Privilege Column Masking Row-level Security PII Protection Automation ด้วย API SDK Terraform Audit Logging ตรวจสอบการเข้าถึง Data Lineage ติดตามข้อมูล

เนื้อหาเกี่ยวข้อง — อ่านต่อ: AWS Glue ETL API Gateway Pattern — คู่มือฉบับสมบูรณ์ 2026 สำหรับนักพัฒนา

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง