ai

Databricks Unity Catalog Business Continuity

Databricks Unity Catalog Business Continuity

Databricks Unity Catalog Business Continuity คืออะไร

Databricks Unity Catalog Business Continuity

Databricks Unity Catalog เป็น unified governance solution สำหรับ data และ AI assets บน Databricks Lakehouse Platform ช่วยจัดการ data access, lineage, auditing และ discovery ข้าม workspaces Business Continuity คือการวางแผนให้ระบบทำงานต่อเนื่องแม้เกิดเหตุการณ์ไม่คาดคิด เช่น region outage, data corruption หรือ human error การรวมสองแนวคิดนี้ช่วยให้องค์กรมี data platform ที่ resilient มี disaster recovery plan และ governance ที่ต่อเนื่องสำหรับ data lakehouse

Unity Catalog Architecture

# unity_catalog.py — Unity Catalog architecture

import json



class UnityCatalog:

    HIERARCHY = {

        "metastore": {

            "name": "Metastore",

            "description": "Top-level container — 1 per region, shared across workspaces",

            "contains": "Catalogs",

        },

        "catalog": {

            "name": "Catalog",

            "description": "กลุ่มของ schemas — เทียบได้กับ database server",

            "contains": "Schemas",

            "examples": "production, staging, development",

        },

        "schema": {

            "name": "Schema (Database)",

            "description": "กลุ่มของ tables, views, functions",

            "contains": "Tables, Views, Functions, Models",

            "examples": "sales, marketing, ml_models",

        },

        "table": {

            "name": "Table",

            "description": "Delta Lake tables — managed หรือ external",

            "types": ["Managed Table (Databricks จัดการ storage)", "External Table (user จัดการ storage location)"],

        },

    }



    GOVERNANCE = {

        "access_control": "GRANT/REVOKE permissions ที่ระดับ catalog, schema, table",

        "data_lineage": "ติดตาม data flow จาก source ถึง downstream tables",

        "audit_logs": "บันทึกทุก access, query, modification",

        "data_sharing": "Delta Sharing — แชร์ data ข้าม organizations",

        "tagging": "Tag sensitive data (PII, PHI) สำหรับ compliance",

    }



    def show_hierarchy(self):

        print("=== Unity Catalog Hierarchy ===\n")

        for key, level in self.HIERARCHY.items():

            print(f"[{level['name']}]")

            print(f"  {level['description']}")

            print()



    def show_governance(self):

        print("=== Governance Features ===")

        for feature, desc in self.GOVERNANCE.items():

            print(f"  [{feature}] {desc}")



uc = UnityCatalog()

uc.show_hierarchy()

uc.show_governance()

Business Continuity Planning

# bcp.py — Business Continuity Plan for Unity Catalog

import json



class BusinessContinuityPlan:

    RISKS = {

        "region_outage": {

            "name": "Cloud Region Outage",

            "impact": "Critical — ทุก workspaces ใน region ใช้งานไม่ได้",

            "probability": "Low",

            "rto": "4-8 hours",

            "rpo": "< 1 hour",

        },

        "data_corruption": {

            "name": "Data Corruption / Accidental Delete",

            "impact": "High — data สูญหายหรือเสียหาย",

            "probability": "Medium",

            "rto": "1-4 hours",

            "rpo": "< 15 minutes (Delta time travel)",

        },

        "metastore_failure": {

            "name": "Metastore Failure",

            "impact": "Critical — governance metadata สูญหาย",

            "probability": "Low",

            "rto": "2-4 hours",

            "rpo": "< 1 hour",

        },

        "credential_compromise": {

            "name": "Credential/Access Compromise",

            "impact": "Critical — unauthorized data access",

            "probability": "Medium",

            "rto": "< 1 hour (revoke + rotate)",

            "rpo": "N/A (audit logs preserved)",

        },

    }



    STRATEGIES = {

        "multi_region": {

            "name": "Multi-Region Deployment",

            "description": "Deploy Unity Catalog metastore ใน 2+ regions",

            "implementation": "Primary region + DR region with replicated metastore",

        },

        "delta_time_travel": {

            "name": "Delta Time Travel",

            "description": "กู้คืน data จากจุดใดก็ได้ใน retention period (default 30 days)",

            "implementation": "RESTORE TABLE my_table TO VERSION AS OF 42",

        },

        "backup_metadata": {

            "name": "Metadata Backup",

            "description": "Export Unity Catalog metadata (permissions, tags, lineage) เป็น backup",

            "implementation": "Databricks REST API + scheduled export job",

        },

        "access_audit": {

            "name": "Continuous Access Auditing",

            "description": "Monitor access patterns, detect anomalies, auto-revoke",

            "implementation": "System tables + alerting pipeline",

        },

    }



    def show_risks(self):

        print("=== Risk Assessment ===\n")

        for key, risk in self.RISKS.items():

            print(f"[{risk['name']}] Impact: {risk['impact']}")

            print(f"  RTO: {risk['rto']} | RPO: {risk['rpo']}")

            print()



    def show_strategies(self):

        print("=== BC Strategies ===")

        for key, strat in self.STRATEGIES.items():

            print(f"\n[{strat['name']}]")

            print(f"  {strat['description']}")

            print(f"  Implementation: {strat['implementation']}")



bcp = BusinessContinuityPlan()

bcp.show_risks()

bcp.show_strategies()

Disaster Recovery Implementation

Databricks Unity Catalog Business Continuity
# dr_implementation.py — DR implementation

import json



class DRImplementation:

    DELTA_RECOVERY = """

# delta_recovery.py — Delta Lake recovery operations

from pyspark.sql import SparkSession



spark = SparkSession.builder.getOrCreate()



# 1. Time Travel — restore to specific version

spark.sql("RESTORE TABLE production.sales.orders TO VERSION AS OF 42")



# 2. Time Travel — restore to timestamp

spark.sql("RESTORE TABLE production.sales.orders TO TIMESTAMP AS OF '2024-01-15 10:30:00'")



# 3. View table history

history = spark.sql("DESCRIBE HISTORY production.sales.orders")

history.show(20, False)



# 4. Read old version (without restoring)

old_data = spark.read.format("delta").option("versionAsOf", 42).table("production.sales.orders")



# 5. Clone table for backup

spark.sql('''

    CREATE TABLE backup.sales.orders_backup_20240115

    DEEP CLONE production.sales.orders

    VERSION AS OF 42

''')



# 6. Shallow clone (metadata only, shares data files)

spark.sql('''

    CREATE TABLE staging.sales.orders_staging

    SHALLOW CLONE production.sales.orders

''')

"""



    METADATA_BACKUP = """

# metadata_backup.py — Export Unity Catalog metadata

import requests

import json

from datetime import datetime



class UCMetadataBackup:

    def __init__(self, workspace_url, token):

        self.base_url = f"{workspace_url}/api/2.1/unity-catalog"

        self.headers = {"Authorization": f"Bearer {token}"}

    

    def export_catalogs(self):

        resp = requests.get(f"{self.base_url}/catalogs", headers=self.headers)

        return resp.json().get("catalogs", [])

    

    def export_schemas(self, catalog_name):

        resp = requests.get(

            f"{self.base_url}/schemas",

            headers=self.headers,

            params={"catalog_name": catalog_name},

        )

        return resp.json().get("schemas", [])

    

    def export_tables(self, catalog_name, schema_name):

        resp = requests.get(

            f"{self.base_url}/tables",

            headers=self.headers,

            params={"catalog_name": catalog_name, "schema_name": schema_name},

        )

        return resp.json().get("tables", [])

    

    def export_permissions(self, securable_type, full_name):

        resp = requests.get(

            f"{self.base_url}/permissions/{securable_type}/{full_name}",

            headers=self.headers,

        )

        return resp.json()

    

    def full_backup(self, output_path):

        backup = {"timestamp": datetime.utcnow().isoformat(), "catalogs": []}

        

        for catalog in self.export_catalogs():

            cat_data = {**catalog, "schemas": []}

            for schema in self.export_schemas(catalog["name"]):

                schema_data = {**schema, "tables": []}

                tables = self.export_tables(catalog["name"], schema["name"])

                schema_data["tables"] = tables

                cat_data["schemas"].append(schema_data)

            backup["catalogs"].append(cat_data)

        

        with open(output_path, "w") as f:

            json.dump(backup, f, indent=2)

        

        print(f"Backup saved: {output_path}")

        return backup



# Usage

# backup = UCMetadataBackup("https://workspace.databricks.com", "token")

# backup.full_backup("/dbfs/backups/uc_backup_20240115.json")

"""



    def show_delta(self):

        print("=== Delta Lake Recovery ===")

        print(self.DELTA_RECOVERY[:500])



    def show_metadata(self):

        print(f"\n=== Metadata Backup ===")

        print(self.METADATA_BACKUP[:500])



dr = DRImplementation()

dr.show_delta()

dr.show_metadata()

Monitoring & Alerting

# monitoring.py — Unity Catalog monitoring

import json

import random



class UCMonitoring:

    SYSTEM_TABLES = """

# system_tables.py — Query Unity Catalog system tables

from pyspark.sql import SparkSession



spark = SparkSession.builder.getOrCreate()



# 1. Audit logs — ใครเข้าถึงอะไรเมื่อไหร่

audit = spark.sql('''

    SELECT 

        event_time, user_identity.email, action_name,

        request_params.full_name_arg as resource,

        response.status_code

    FROM system.access.audit

    WHERE event_date >= current_date() - INTERVAL 7 DAYS

    ORDER BY event_time DESC

    LIMIT 100

''')



# 2. Table lineage — data flow tracking

lineage = spark.sql('''

    SELECT 

        source_table_full_name,

        target_table_full_name,

        source_type,

        event_time

    FROM system.access.table_lineage

    WHERE event_date >= current_date() - INTERVAL 30 DAYS

''')



# 3. Storage usage — capacity monitoring

storage = spark.sql('''

    SELECT 

        catalog_name, schema_name, table_name,

        total_size_bytes / (1024*1024*1024) as size_gb,

        row_count

    FROM system.information_schema.tables

    WHERE table_schema != 'information_schema'

    ORDER BY total_size_bytes DESC

    LIMIT 20

''')



# 4. Query history — performance monitoring

queries = spark.sql('''

    SELECT 

        user_name, 

        total_duration_ms / 1000 as duration_sec,

        rows_produced,

        statement_text

    FROM system.query.history

    WHERE start_time >= current_date() - INTERVAL 1 DAY

    ORDER BY total_duration_ms DESC

    LIMIT 10

''')

"""



    def show_queries(self):

        print("=== System Table Queries ===")

        print(self.SYSTEM_TABLES[:500])



    def health_dashboard(self):

        print(f"\n=== Unity Catalog Health ===")

        checks = [

            {"name": "Metastore", "status": "Healthy", "latency": f"{random.randint(5, 30)}ms"},

            {"name": "Access Control", "status": "Healthy", "active_policies": random.randint(50, 200)},

            {"name": "Delta Tables", "status": "Healthy", "total": random.randint(500, 2000)},

            {"name": "Lineage Tracking", "status": "Healthy", "edges": random.randint(1000, 5000)},

            {"name": "Audit Logging", "status": "Healthy", "events_24h": random.randint(5000, 50000)},

        ]

        for c in checks:

            print(f"  [{c['status']:>7}] {c['name']}")



    def capacity_report(self):

        print(f"\n=== Storage Capacity ===")

        catalogs = [

            {"name": "production", "size_tb": random.uniform(1, 10), "tables": random.randint(100, 500)},

            {"name": "staging", "size_tb": random.uniform(0.5, 3), "tables": random.randint(50, 200)},

            {"name": "development", "size_tb": random.uniform(0.1, 1), "tables": random.randint(20, 100)},

        ]

        total = 0

        for c in catalogs:

            total += c["size_tb"]

            print(f"  [{c['name']}] {c['size_tb']:.1f} TB | {c['tables']} tables")

        print(f"  Total: {total:.1f} TB")



mon = UCMonitoring()

mon.show_queries()

mon.health_dashboard()

mon.capacity_report()

Compliance & Governance

# compliance.py — Compliance and governance

import json



class Compliance:

    FRAMEWORKS = {

        "pdpa": {

            "name": "PDPA (Thailand)",

            "uc_features": ["Data classification tags (PII)", "Access audit logs", "Data lineage", "Right to erasure support"],

        },

        "gdpr": {

            "name": "GDPR (EU)",

            "uc_features": ["Data subject access requests", "Purpose limitation tags", "Data processing records", "Cross-border transfer controls"],

        },

        "sox": {

            "name": "SOX (Financial)",

            "uc_features": ["Change audit trail", "Access control evidence", "Data integrity verification", "Segregation of duties"],

        },

    }



    GOVERNANCE_AUTOMATION = """

# governance_automation.py — Automated governance checks

from pyspark.sql import SparkSession



spark = SparkSession.builder.getOrCreate()



class GovernanceChecker:

    def check_untagged_pii(self):

        '''Find tables that might contain PII but aren't tagged'''

        tables = spark.sql('''

            SELECT full_name, column_name

            FROM system.information_schema.columns

            WHERE (

                column_name LIKE '%email%' OR

                column_name LIKE '%phone%' OR

                column_name LIKE '%ssn%' OR

                column_name LIKE '%national_id%'

            )

            AND full_name NOT IN (

                SELECT full_name FROM system.information_schema.table_tags

                WHERE tag_name = 'pii'

            )

        ''')

        return tables

    

    def check_stale_permissions(self):

        '''Find permissions for users who haven't accessed in 90 days'''

        return spark.sql('''

            SELECT grantee, privilege, securable_full_name

            FROM system.information_schema.table_privileges

            WHERE grantee NOT IN (

                SELECT DISTINCT user_identity.email

                FROM system.access.audit

                WHERE event_date >= current_date() - INTERVAL 90 DAYS

            )

        ''')



checker = GovernanceChecker()

untagged = checker.check_untagged_pii()

print(f"Untagged PII columns: {untagged.count()}")

"""



    def show_frameworks(self):

        print("=== Compliance Frameworks ===\n")

        for key, fw in self.FRAMEWORKS.items():

            print(f"[{fw['name']}]")

            for feature in fw["uc_features"][:3]:

                print(f"  • {feature}")

            print()



    def show_automation(self):

        print("=== Governance Automation ===")

        print(self.GOVERNANCE_AUTOMATION[:500])



comp = Compliance()

comp.show_frameworks()

comp.show_automation()

FAQ - คำถามที่พบบ่อย

Q: Unity Catalog กับ Hive Metastore ต่างกัน?

A: Unity Catalog: centralized governance ข้าม workspaces, fine-grained access control, lineage, auditing Hive Metastore: per-workspace, basic permissions, no lineage, no cross-workspace sharing Unity Catalog เป็น evolution ของ Hive Metastore — แนะนำ migrate ทุก workspace

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง A/B Testing ML Troubleshooting แก้ปัญหา

Q: DR สำหรับ Unity Catalog ต้องทำอะไร?

แนะนำเพิ่มเติม — สัญญาณเทรดรายวัน XM Signal

A: 1) Delta Time Travel: restore tables จาก version/timestamp (built-in) 2) Deep Clone: สร้าง backup copies ของ critical tables 3) Metadata Export: backup permissions, tags, lineage ผ่าน REST API 4) Multi-region: replicate metastore ข้าม regions 5) Audit Log Retention: เก็บ audit logs ใน separate storage

เนื้อหาเกี่ยวข้อง — Prometheus Alertmanager Low Code No Code

Q: RTO/RPO ที่ realistic สำหรับ Databricks?

A: Data recovery (Delta Time Travel): RPO < 15 min, RTO < 1 hour Metadata recovery (API backup): RPO < 1 hour, RTO 2-4 hours Region failover: RPO < 1 hour, RTO 4-8 hours (ถ้ามี DR region) ปรับตาม criticality ของ data — financial data ต้อง RPO ต่ำกว่า analytics data

แนะนำเพิ่มเติม — คู่มือเทรดจาก SiamCafeBook

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Healthchecks.io Infrastructure as Code

Q: Unity Catalog มีค่าใช้จ่ายเพิ่มไหม?

A: Unity Catalog มาฟรีกับ Databricks workspace (Premium tier ขึ้นไป) ค่าใช้จ่ายเพิ่ม: storage สำหรับ system tables (audit logs, lineage), compute สำหรับ governance jobs ไม่มีค่า license แยก — เป็นส่วนหนึ่งของ Databricks platform

เนื้อหาเกี่ยวข้อง — สมัคร Eightcap — คู่มือฉบับสมบูรณ์ 2026

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง