Cybersecurity

Databricks Unity Catalog Business Continuity

databricks unity catalog business continuity
Databricks Unity Catalog Business Continuity | SiamCafe Blog
2025-12-20· อ. บอม — SiamCafe.net· 1,510 คำ

Databricks Unity Catalog Business Continuity คืออะไร

Databricks Unity Catalog เป็น unified governance solution สำหรับ data และ AI assets บน Databricks Lakehouse Platform ช่วยจัดการ data access, lineage, auditing และ discovery ข้าม workspaces Business Continuity คือการวางแผนให้ระบบทำงานต่อเนื่องแม้เกิดเหตุการณ์ไม่คาดคิด เช่น region outage, data corruption หรือ human error การรวมสองแนวคิดนี้ช่วยให้องค์กรมี data platform ที่ resilient มี disaster recovery plan และ governance ที่ต่อเนื่องสำหรับ data lakehouse

Unity Catalog Architecture

# unity_catalog.py — Unity Catalog architecture
import json

class UnityCatalog:
    HIERARCHY = {
        "metastore": {
            "name": "Metastore",
            "description": "Top-level container — 1 per region, shared across workspaces",
            "contains": "Catalogs",
        },
        "catalog": {
            "name": "Catalog",
            "description": "กลุ่มของ schemas — เทียบได้กับ database server",
            "contains": "Schemas",
            "examples": "production, staging, development",
        },
        "schema": {
            "name": "Schema (Database)",
            "description": "กลุ่มของ tables, views, functions",
            "contains": "Tables, Views, Functions, Models",
            "examples": "sales, marketing, ml_models",
        },
        "table": {
            "name": "Table",
            "description": "Delta Lake tables — managed หรือ external",
            "types": ["Managed Table (Databricks จัดการ storage)", "External Table (user จัดการ storage location)"],
        },
    }

    GOVERNANCE = {
        "access_control": "GRANT/REVOKE permissions ที่ระดับ catalog, schema, table",
        "data_lineage": "ติดตาม data flow จาก source ถึง downstream tables",
        "audit_logs": "บันทึกทุก access, query, modification",
        "data_sharing": "Delta Sharing — แชร์ data ข้าม organizations",
        "tagging": "Tag sensitive data (PII, PHI) สำหรับ compliance",
    }

    def show_hierarchy(self):
        print("=== Unity Catalog Hierarchy ===\n")
        for key, level in self.HIERARCHY.items():
            print(f"[{level['name']}]")
            print(f"  {level['description']}")
            print()

    def show_governance(self):
        print("=== Governance Features ===")
        for feature, desc in self.GOVERNANCE.items():
            print(f"  [{feature}] {desc}")

uc = UnityCatalog()
uc.show_hierarchy()
uc.show_governance()

Business Continuity Planning

# bcp.py — Business Continuity Plan for Unity Catalog
import json

class BusinessContinuityPlan:
    RISKS = {
        "region_outage": {
            "name": "Cloud Region Outage",
            "impact": "Critical — ทุก workspaces ใน region ใช้งานไม่ได้",
            "probability": "Low",
            "rto": "4-8 hours",
            "rpo": "< 1 hour",
        },
        "data_corruption": {
            "name": "Data Corruption / Accidental Delete",
            "impact": "High — data สูญหายหรือเสียหาย",
            "probability": "Medium",
            "rto": "1-4 hours",
            "rpo": "< 15 minutes (Delta time travel)",
        },
        "metastore_failure": {
            "name": "Metastore Failure",
            "impact": "Critical — governance metadata สูญหาย",
            "probability": "Low",
            "rto": "2-4 hours",
            "rpo": "< 1 hour",
        },
        "credential_compromise": {
            "name": "Credential/Access Compromise",
            "impact": "Critical — unauthorized data access",
            "probability": "Medium",
            "rto": "< 1 hour (revoke + rotate)",
            "rpo": "N/A (audit logs preserved)",
        },
    }

    STRATEGIES = {
        "multi_region": {
            "name": "Multi-Region Deployment",
            "description": "Deploy Unity Catalog metastore ใน 2+ regions",
            "implementation": "Primary region + DR region with replicated metastore",
        },
        "delta_time_travel": {
            "name": "Delta Time Travel",
            "description": "กู้คืน data จากจุดใดก็ได้ใน retention period (default 30 days)",
            "implementation": "RESTORE TABLE my_table TO VERSION AS OF 42",
        },
        "backup_metadata": {
            "name": "Metadata Backup",
            "description": "Export Unity Catalog metadata (permissions, tags, lineage) เป็น backup",
            "implementation": "Databricks REST API + scheduled export job",
        },
        "access_audit": {
            "name": "Continuous Access Auditing",
            "description": "Monitor access patterns, detect anomalies, auto-revoke",
            "implementation": "System tables + alerting pipeline",
        },
    }

    def show_risks(self):
        print("=== Risk Assessment ===\n")
        for key, risk in self.RISKS.items():
            print(f"[{risk['name']}] Impact: {risk['impact']}")
            print(f"  RTO: {risk['rto']} | RPO: {risk['rpo']}")
            print()

    def show_strategies(self):
        print("=== BC Strategies ===")
        for key, strat in self.STRATEGIES.items():
            print(f"\n[{strat['name']}]")
            print(f"  {strat['description']}")
            print(f"  Implementation: {strat['implementation']}")

bcp = BusinessContinuityPlan()
bcp.show_risks()
bcp.show_strategies()

Disaster Recovery Implementation

# dr_implementation.py — DR implementation
import json

class DRImplementation:
    DELTA_RECOVERY = """
# delta_recovery.py — Delta Lake recovery operations
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 1. Time Travel — restore to specific version
spark.sql("RESTORE TABLE production.sales.orders TO VERSION AS OF 42")

# 2. Time Travel — restore to timestamp
spark.sql("RESTORE TABLE production.sales.orders TO TIMESTAMP AS OF '2024-01-15 10:30:00'")

# 3. View table history
history = spark.sql("DESCRIBE HISTORY production.sales.orders")
history.show(20, False)

# 4. Read old version (without restoring)
old_data = spark.read.format("delta").option("versionAsOf", 42).table("production.sales.orders")

# 5. Clone table for backup
spark.sql('''
    CREATE TABLE backup.sales.orders_backup_20240115
    DEEP CLONE production.sales.orders
    VERSION AS OF 42
''')

# 6. Shallow clone (metadata only, shares data files)
spark.sql('''
    CREATE TABLE staging.sales.orders_staging
    SHALLOW CLONE production.sales.orders
''')
"""

    METADATA_BACKUP = """
# metadata_backup.py — Export Unity Catalog metadata
import requests
import json
from datetime import datetime

class UCMetadataBackup:
    def __init__(self, workspace_url, token):
        self.base_url = f"{workspace_url}/api/2.1/unity-catalog"
        self.headers = {"Authorization": f"Bearer {token}"}
    
    def export_catalogs(self):
        resp = requests.get(f"{self.base_url}/catalogs", headers=self.headers)
        return resp.json().get("catalogs", [])
    
    def export_schemas(self, catalog_name):
        resp = requests.get(
            f"{self.base_url}/schemas",
            headers=self.headers,
            params={"catalog_name": catalog_name},
        )
        return resp.json().get("schemas", [])
    
    def export_tables(self, catalog_name, schema_name):
        resp = requests.get(
            f"{self.base_url}/tables",
            headers=self.headers,
            params={"catalog_name": catalog_name, "schema_name": schema_name},
        )
        return resp.json().get("tables", [])
    
    def export_permissions(self, securable_type, full_name):
        resp = requests.get(
            f"{self.base_url}/permissions/{securable_type}/{full_name}",
            headers=self.headers,
        )
        return resp.json()
    
    def full_backup(self, output_path):
        backup = {"timestamp": datetime.utcnow().isoformat(), "catalogs": []}
        
        for catalog in self.export_catalogs():
            cat_data = {**catalog, "schemas": []}
            for schema in self.export_schemas(catalog["name"]):
                schema_data = {**schema, "tables": []}
                tables = self.export_tables(catalog["name"], schema["name"])
                schema_data["tables"] = tables
                cat_data["schemas"].append(schema_data)
            backup["catalogs"].append(cat_data)
        
        with open(output_path, "w") as f:
            json.dump(backup, f, indent=2)
        
        print(f"Backup saved: {output_path}")
        return backup

# Usage
# backup = UCMetadataBackup("https://workspace.databricks.com", "token")
# backup.full_backup("/dbfs/backups/uc_backup_20240115.json")
"""

    def show_delta(self):
        print("=== Delta Lake Recovery ===")
        print(self.DELTA_RECOVERY[:500])

    def show_metadata(self):
        print(f"\n=== Metadata Backup ===")
        print(self.METADATA_BACKUP[:500])

dr = DRImplementation()
dr.show_delta()
dr.show_metadata()

Monitoring & Alerting

# monitoring.py — Unity Catalog monitoring
import json
import random

class UCMonitoring:
    SYSTEM_TABLES = """
# system_tables.py — Query Unity Catalog system tables
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 1. Audit logs — ใครเข้าถึงอะไรเมื่อไหร่
audit = spark.sql('''
    SELECT 
        event_time, user_identity.email, action_name,
        request_params.full_name_arg as resource,
        response.status_code
    FROM system.access.audit
    WHERE event_date >= current_date() - INTERVAL 7 DAYS
    ORDER BY event_time DESC
    LIMIT 100
''')

# 2. Table lineage — data flow tracking
lineage = spark.sql('''
    SELECT 
        source_table_full_name,
        target_table_full_name,
        source_type,
        event_time
    FROM system.access.table_lineage
    WHERE event_date >= current_date() - INTERVAL 30 DAYS
''')

# 3. Storage usage — capacity monitoring
storage = spark.sql('''
    SELECT 
        catalog_name, schema_name, table_name,
        total_size_bytes / (1024*1024*1024) as size_gb,
        row_count
    FROM system.information_schema.tables
    WHERE table_schema != 'information_schema'
    ORDER BY total_size_bytes DESC
    LIMIT 20
''')

# 4. Query history — performance monitoring
queries = spark.sql('''
    SELECT 
        user_name, 
        total_duration_ms / 1000 as duration_sec,
        rows_produced,
        statement_text
    FROM system.query.history
    WHERE start_time >= current_date() - INTERVAL 1 DAY
    ORDER BY total_duration_ms DESC
    LIMIT 10
''')
"""

    def show_queries(self):
        print("=== System Table Queries ===")
        print(self.SYSTEM_TABLES[:500])

    def health_dashboard(self):
        print(f"\n=== Unity Catalog Health ===")
        checks = [
            {"name": "Metastore", "status": "Healthy", "latency": f"{random.randint(5, 30)}ms"},
            {"name": "Access Control", "status": "Healthy", "active_policies": random.randint(50, 200)},
            {"name": "Delta Tables", "status": "Healthy", "total": random.randint(500, 2000)},
            {"name": "Lineage Tracking", "status": "Healthy", "edges": random.randint(1000, 5000)},
            {"name": "Audit Logging", "status": "Healthy", "events_24h": random.randint(5000, 50000)},
        ]
        for c in checks:
            print(f"  [{c['status']:>7}] {c['name']}")

    def capacity_report(self):
        print(f"\n=== Storage Capacity ===")
        catalogs = [
            {"name": "production", "size_tb": random.uniform(1, 10), "tables": random.randint(100, 500)},
            {"name": "staging", "size_tb": random.uniform(0.5, 3), "tables": random.randint(50, 200)},
            {"name": "development", "size_tb": random.uniform(0.1, 1), "tables": random.randint(20, 100)},
        ]
        total = 0
        for c in catalogs:
            total += c["size_tb"]
            print(f"  [{c['name']}] {c['size_tb']:.1f} TB | {c['tables']} tables")
        print(f"  Total: {total:.1f} TB")

mon = UCMonitoring()
mon.show_queries()
mon.health_dashboard()
mon.capacity_report()

Compliance & Governance

# compliance.py — Compliance and governance
import json

class Compliance:
    FRAMEWORKS = {
        "pdpa": {
            "name": "PDPA (Thailand)",
            "uc_features": ["Data classification tags (PII)", "Access audit logs", "Data lineage", "Right to erasure support"],
        },
        "gdpr": {
            "name": "GDPR (EU)",
            "uc_features": ["Data subject access requests", "Purpose limitation tags", "Data processing records", "Cross-border transfer controls"],
        },
        "sox": {
            "name": "SOX (Financial)",
            "uc_features": ["Change audit trail", "Access control evidence", "Data integrity verification", "Segregation of duties"],
        },
    }

    GOVERNANCE_AUTOMATION = """
# governance_automation.py — Automated governance checks
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

class GovernanceChecker:
    def check_untagged_pii(self):
        '''Find tables that might contain PII but aren't tagged'''
        tables = spark.sql('''
            SELECT full_name, column_name
            FROM system.information_schema.columns
            WHERE (
                column_name LIKE '%email%' OR
                column_name LIKE '%phone%' OR
                column_name LIKE '%ssn%' OR
                column_name LIKE '%national_id%'
            )
            AND full_name NOT IN (
                SELECT full_name FROM system.information_schema.table_tags
                WHERE tag_name = 'pii'
            )
        ''')
        return tables
    
    def check_stale_permissions(self):
        '''Find permissions for users who haven't accessed in 90 days'''
        return spark.sql('''
            SELECT grantee, privilege, securable_full_name
            FROM system.information_schema.table_privileges
            WHERE grantee NOT IN (
                SELECT DISTINCT user_identity.email
                FROM system.access.audit
                WHERE event_date >= current_date() - INTERVAL 90 DAYS
            )
        ''')

checker = GovernanceChecker()
untagged = checker.check_untagged_pii()
print(f"Untagged PII columns: {untagged.count()}")
"""

    def show_frameworks(self):
        print("=== Compliance Frameworks ===\n")
        for key, fw in self.FRAMEWORKS.items():
            print(f"[{fw['name']}]")
            for feature in fw["uc_features"][:3]:
                print(f"  • {feature}")
            print()

    def show_automation(self):
        print("=== Governance Automation ===")
        print(self.GOVERNANCE_AUTOMATION[:500])

comp = Compliance()
comp.show_frameworks()
comp.show_automation()

FAQ - คำถามที่พบบ่อย

Q: Unity Catalog กับ Hive Metastore ต่างกัน?

A: Unity Catalog: centralized governance ข้าม workspaces, fine-grained access control, lineage, auditing Hive Metastore: per-workspace, basic permissions, no lineage, no cross-workspace sharing Unity Catalog เป็น evolution ของ Hive Metastore — แนะนำ migrate ทุก workspace

Q: DR สำหรับ Unity Catalog ต้องทำอะไร?

A: 1) Delta Time Travel: restore tables จาก version/timestamp (built-in) 2) Deep Clone: สร้าง backup copies ของ critical tables 3) Metadata Export: backup permissions, tags, lineage ผ่าน REST API 4) Multi-region: replicate metastore ข้าม regions 5) Audit Log Retention: เก็บ audit logs ใน separate storage

Q: RTO/RPO ที่ realistic สำหรับ Databricks?

A: Data recovery (Delta Time Travel): RPO < 15 min, RTO < 1 hour Metadata recovery (API backup): RPO < 1 hour, RTO 2-4 hours Region failover: RPO < 1 hour, RTO 4-8 hours (ถ้ามี DR region) ปรับตาม criticality ของ data — financial data ต้อง RPO ต่ำกว่า analytics data

Q: Unity Catalog มีค่าใช้จ่ายเพิ่มไหม?

A: Unity Catalog มาฟรีกับ Databricks workspace (Premium tier ขึ้นไป) ค่าใช้จ่ายเพิ่ม: storage สำหรับ system tables (audit logs, lineage), compute สำหรับ governance jobs ไม่มีค่า license แยก — เป็นส่วนหนึ่งของ Databricks platform

📖 บทความที่เกี่ยวข้อง

Databricks Unity Catalog Certification Pathอ่านบทความ → Databricks Unity Catalog อ่านบทความ → Databricks Unity Catalog Cloud Native Designอ่านบทความ → Databricks Unity Catalog Disaster Recovery Planอ่านบทความ → Databricks Unity Catalog DevOps Cultureอ่านบทความ →

📚 ดูบทความทั้งหมด →