ai

Databricks Unity Catalog Incident Management

Databricks Unity Catalog Incident Management

Databricks Unity Catalog Incident Management คืออะไร

Databricks Unity Catalog Incident Management

Databricks Unity Catalog เป็น unified governance solution สำหรับ data และ AI assets บน Databricks Lakehouse Platform ครอบคลุม data catalog, access control, lineage tracking และ audit logging Incident Management คือกระบวนการจัดการเหตุการณ์ด้าน security และ data ที่ผิดปกติ ตั้งแต่ detection, triage, containment, remediation จนถึง post-mortem การรวม Unity Catalog กับ Incident Management ช่วยให้ตรวจจับ data breaches, unauthorized access และ policy violations ได้เร็ว พร้อม audit trail ครบถ้วนสำหรับ compliance

Unity Catalog Security Features

# unity_catalog.py — Unity Catalog security features


import json





class UnityCatalogSecurity:


    FEATURES = {


        "access_control": {


            "name": "Fine-Grained Access Control",


            "description": "กำหนดสิทธิ์ระดับ catalog, schema, table, column, row",


            "example": "GRANT SELECT ON TABLE sales.transactions TO `data-analysts`",


        },


        "audit_logging": {


            "name": "Audit Logging",


            "description": "บันทึกทุก action: data access, permission changes, schema modifications",


            "retention": "90 days default, configurable",


        },


        "lineage": {


            "name": "Data Lineage",


            "description": "ติดตาม data flow: source → transformations → downstream tables",


            "benefit": "Impact analysis เมื่อมี incident — รู้ว่า data ไหนได้รับผลกระทบ",


        },


        "column_masking": {


            "name": "Dynamic Column Masking",


            "description": "ซ่อนข้อมูล sensitive (PII) — แสดง masked value ตาม user role",


            "example": "email → j***@example.com สำหรับ analysts",


        },


        "row_filters": {


            "name": "Row-Level Security",


            "description": "กรองข้อมูลตาม user — เห็นเฉพาะ rows ที่มีสิทธิ์",


            "example": "Sales team เห็นเฉพาะ region ของตัวเอง",


        },


    }





    def show_features(self):


        print("=== Unity Catalog Security ===\n")


        for key, feat in self.FEATURES.items():


            print(f"[{feat['name']}]")


            print(f"  {feat['description']}")


            if 'example' in feat:


                print(f"  Example: {feat['example']}")


            print()





uc = UnityCatalogSecurity()


uc.show_features()

Incident Detection

# detection.py — Incident detection from Unity Catalog audit logs


import json





class IncidentDetection:


    CODE = """


# incident_detector.py — Detect security incidents from audit logs


from pyspark.sql import SparkSession


from pyspark.sql import functions as F


from datetime import datetime, timedelta





class UnityIncidentDetector:


    def __init__(self, spark):


        self.spark = spark


        self.audit_log = "system.access.audit"


    


    def detect_unauthorized_access(self, hours=24):


        '''Detect unauthorized data access attempts'''


        cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()


        


        incidents = self.spark.sql(f'''


            SELECT


                user_identity.email AS user_email,


                request_params.full_name_arg AS resource,


                action_name,


                response.status_code,


                event_time,


                source_ip_address


            FROM {self.audit_log}


            WHERE event_time > '{cutoff}'


            AND response.status_code = 403


            ORDER BY event_time DESC


        ''')


        


        # Group by user — flag if > 10 denied attempts


        flagged = incidents.groupBy("user_email").agg(


            F.count("*").alias("denied_count"),


            F.collect_set("resource").alias("resources_attempted"),


            F.max("event_time").alias("last_attempt"),


        ).filter("denied_count > 10")


        


        return flagged


    


    def detect_bulk_data_export(self, hours=24):


        '''Detect unusual bulk data downloads'''


        cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()


        


        exports = self.spark.sql(f'''


            SELECT


                user_identity.email AS user_email,


                request_params.full_name_arg AS table_name,


                response.result AS rows_returned,


                event_time,


                source_ip_address


            FROM {self.audit_log}


            WHERE event_time > '{cutoff}'


            AND action_name IN ('getTable', 'commandSubmit')


            AND response.status_code = 200


        ''')


        


        # Flag users with unusually high data access


        flagged = exports.groupBy("user_email").agg(


            F.count("*").alias("query_count"),


            F.countDistinct("table_name").alias("tables_accessed"),


        ).filter("query_count > 100 OR tables_accessed > 20")


        


        return flagged


    


    def detect_permission_changes(self, hours=24):


        '''Detect suspicious permission changes'''


        cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()


        


        changes = self.spark.sql(f'''


            SELECT


                user_identity.email AS changed_by,


                action_name,


                request_params,


                event_time


            FROM {self.audit_log}


            WHERE event_time > '{cutoff}'


            AND action_name IN (


                'updatePermissions', 'createMetastore',


                'updateExternalLocation', 'createStorageCredential'


            )


            ORDER BY event_time DESC


        ''')


        


        return changes


    


    def detect_off_hours_access(self, hours=24):


        '''Detect data access outside business hours'''


        cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()


        


        off_hours = self.spark.sql(f'''


            SELECT


                user_identity.email AS user_email,


                action_name,


                request_params.full_name_arg AS resource,


                event_time,


                hour(event_time) AS access_hour


            FROM {self.audit_log}


            WHERE event_time > '{cutoff}'


            AND (hour(event_time) < 7 OR hour(event_time) > 22)


            AND response.status_code = 200


        ''')


        


        return off_hours





# detector = UnityIncidentDetector(spark)


# unauthorized = detector.detect_unauthorized_access(24)


# bulk_exports = detector.detect_bulk_data_export(24)


"""





    def show_code(self):


        print("=== Incident Detector ===")


        print(self.CODE[:600])





detector = IncidentDetection()


detector.show_code()

Incident Response Workflow

Databricks Unity Catalog Incident Management
# response.py — Incident response workflow


import json





class IncidentResponse:


    WORKFLOW = {


        "detect": {


            "name": "1. Detection",


            "description": "ตรวจจับ incident จาก audit logs, alerts, user reports",


            "actions": [


                "Automated monitoring ทุก 5 นาที",


                "Alert rules สำหรับ suspicious patterns",


                "User-reported incidents",


            ],


        },


        "triage": {


            "name": "2. Triage & Classification",


            "description": "ประเมินความรุนแรง, จัดลำดับความสำคัญ",


            "severity": {


                "P1 Critical": "Data breach ข้อมูล PII, unauthorized admin access",


                "P2 High": "Bulk data export ผิดปกติ, permission escalation",


                "P3 Medium": "Off-hours access, policy violations",


                "P4 Low": "Failed login attempts, minor policy deviations",


            },


        },


        "contain": {


            "name": "3. Containment",


            "description": "หยุดความเสียหาย — revoke access, disable accounts",


            "actions": [


                "REVOKE ALL PRIVILEGES จาก compromised user",


                "Disable API tokens / personal access tokens",


                "Isolate affected tables (restrict access)",


                "Block suspicious IP addresses",


            ],


        },


        "investigate": {


            "name": "4. Investigation",


            "description": "วิเคราะห์ root cause — ใช้ audit logs + lineage",


            "actions": [


                "Query audit logs สำหรับ affected user/resource",


                "Trace data lineage — ข้อมูลไหนถูก access",


                "Check permission history — ใครเปลี่ยนอะไร",


                "Identify scope of impact",


            ],


        },


        "remediate": {


            "name": "5. Remediation",


            "description": "แก้ไข root cause — fix permissions, patch vulnerabilities",


            "actions": [


                "Reset compromised credentials",


                "Fix permission policies",


                "Add missing column masking/row filters",


                "Update monitoring rules",


            ],


        },


        "postmortem": {


            "name": "6. Post-Mortem",


            "description": "วิเคราะห์หลังเหตุการณ์ — lessons learned, preventive measures",


            "deliverables": ["Incident timeline", "Root cause analysis", "Action items", "Updated playbooks"],


        },


    }





    def show_workflow(self):


        print("=== Incident Response Workflow ===\n")


        for key, step in self.WORKFLOW.items():


            print(f"[{step['name']}]")


            print(f"  {step['description']}")


            if 'actions' in step:


                for action in step['actions'][:3]:


                    print(f"    • {action}")


            print()





ir = IncidentResponse()


ir.show_workflow()

Automated Remediation

# remediation.py — Automated incident remediation


import json


import random





class AutoRemediation:


    CODE = """


# auto_remediate.py — Automated remediation for Unity Catalog incidents


from databricks.sdk import WorkspaceClient


import json





class UnityRemediator:


    def __init__(self):


        self.client = WorkspaceClient()


    


    def revoke_user_access(self, user_email, catalog=None):


        '''Revoke all permissions for a user'''


        if catalog:


            self.client.grants.update(


                securable_type="catalog",


                full_name=catalog,


                changes=[{


                    "principal": user_email,


                    "remove": ["ALL_PRIVILEGES"],


                }]


            )


        


        # Disable personal access tokens


        tokens = self.client.token_management.list()


        for token in tokens:


            if token.created_by == user_email:


                self.client.token_management.delete(token.token_id)


        


        return {"status": "access_revoked", "user": user_email}


    


    def quarantine_table(self, table_name):


        '''Restrict access to a compromised table'''


        # Remove all grants except owner


        current_grants = self.client.grants.get(


            securable_type="table",


            full_name=table_name


        )


        


        for grant in current_grants.privilege_assignments:


            if grant.principal != "owner":


                self.client.grants.update(


                    securable_type="table",


                    full_name=table_name,


                    changes=[{


                        "principal": grant.principal,


                        "remove": [p.privilege for p in grant.privileges],


                    }]


                )


        


        return {"status": "quarantined", "table": table_name}


    


    def add_column_mask(self, table_name, column_name, mask_function):


        '''Add dynamic column masking to sensitive column'''


        spark.sql(f'''


            ALTER TABLE {table_name}


            ALTER COLUMN {column_name}


            SET MASK {mask_function}


        ''')


        return {"status": "masked", "column": f"{table_name}.{column_name}"}


    


    def create_incident_ticket(self, incident):


        '''Create incident ticket in tracking system'''


        ticket = {


            "id": f"INC-{random.randint(1000, 9999)}",


            "severity": incident["severity"],


            "type": incident["type"],


            "affected_resources": incident.get("resources", []),


            "status": "open",


            "assigned_to": "security-team",


        }


        return ticket





# remediate = UnityRemediator()


# remediate.revoke_user_access("suspicious@company.com", catalog="production")


# remediate.quarantine_table("production.customers.pii_data")


"""





    def show_code(self):


        print("=== Auto Remediation ===")


        print(self.CODE[:600])





    def dashboard(self):


        print(f"\n=== Incident Dashboard (30-day) ===")


        print(f"  Total incidents: {random.randint(5, 30)}")


        print(f"  P1 Critical: {random.randint(0, 2)}")


        print(f"  P2 High: {random.randint(1, 5)}")


        print(f"  P3 Medium: {random.randint(3, 15)}")


        print(f"  P4 Low: {random.randint(5, 20)}")


        print(f"  Avg MTTR: {random.uniform(0.5, 4):.1f} hours")


        print(f"  Auto-remediated: {random.randint(50, 90)}%")





rem = AutoRemediation()


rem.show_code()


rem.dashboard()

Monitoring & Alerting

# monitoring.py — Continuous monitoring setup


import json


import random





class MonitoringSetup:


    ALERT_RULES = """


-- alert_rules.sql — Databricks SQL alert queries





-- Alert: Multiple failed access attempts


SELECT


    user_identity.email,


    COUNT(*) AS failed_count,


    COLLECT_SET(request_params.full_name_arg) AS resources


FROM system.access.audit


WHERE event_time > current_timestamp() - INTERVAL 1 HOUR


AND response.status_code = 403


GROUP BY user_identity.email


HAVING failed_count > 10;





-- Alert: Permission escalation


SELECT


    user_identity.email AS changed_by,


    request_params.changes AS permission_changes,


    event_time


FROM system.access.audit


WHERE event_time > current_timestamp() - INTERVAL 1 HOUR


AND action_name = 'updatePermissions'


AND request_params.changes LIKE '%ALL_PRIVILEGES%';





-- Alert: New external location created


SELECT


    user_identity.email,


    request_params.name AS location_name,


    request_params.url AS storage_url,


    event_time


FROM system.access.audit


WHERE event_time > current_timestamp() - INTERVAL 24 HOUR


AND action_name = 'createExternalLocation';





-- Alert: Unusual query volume per user


SELECT


    user_identity.email,


    COUNT(*) AS query_count,


    COUNT(DISTINCT request_params.full_name_arg) AS distinct_tables


FROM system.access.audit


WHERE event_time > current_timestamp() - INTERVAL 1 HOUR


AND action_name = 'commandSubmit'


GROUP BY user_identity.email


HAVING query_count > 200 OR distinct_tables > 30;


"""





    def show_alerts(self):


        print("=== Alert Rules ===")


        print(self.ALERT_RULES[:500])





    def monitoring_dashboard(self):


        print(f"\n=== Real-time Monitoring ===")


        print(f"  Active users: {random.randint(50, 300)}")


        print(f"  Queries/min: {random.randint(100, 2000)}")


        print(f"  Failed access (1h): {random.randint(0, 20)}")


        print(f"  Permission changes (24h): {random.randint(0, 10)}")


        print(f"  Tables accessed (1h): {random.randint(50, 500)}")


        print(f"  Alerts triggered (24h): {random.randint(0, 5)}")





mon = MonitoringSetup()


mon.show_alerts()


mon.monitoring_dashboard()

FAQ - คำถามที่พบบ่อย

Q: Unity Catalog audit logs เก็บได้นานแค่ไหน?

A: Default: 90 วัน ใน system.access.audit table ถ้าต้องเก็บนานกว่า: export ไป Delta Lake table ใน cloud storage กำหนด retention policy ตาม compliance: PCI DSS 1 ปี, GDPR ตาม purpose แนะนำ: สร้าง scheduled job export audit logs ทุกวัน → เก็บถาวร

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: MLflow Experiment Cost Optimization ลดค่าใช้จ่าย

Q: Incident detection ควร real-time หรือ batch?

แนะนำเพิ่มเติม — SiamCafeBook

A: ทั้งสองอย่าง: Real-time (streaming): สำหรับ P1/P2 — unauthorized access, data breach ต้อง detect ภายในนาที Batch (scheduled): สำหรับ P3/P4 — anomaly detection, compliance reporting ทุก 15-60 นาที ใช้ Databricks SQL Alerts สำหรับ near real-time (ทุก 1-5 นาที) ใช้ Structured Streaming สำหรับ true real-time

เนื้อหาเกี่ยวข้อง — อ่านต่อ: unemployment rate india

Q: ใครควรรับผิดชอบ incident management?

A: Data Security Team: lead incident response, investigation Platform Team: technical remediation, access management Data Owners: impact assessment, data classification Compliance Team: regulatory reporting, audit Legal: data breach notification (GDPR 72 ชั่วโมง) RACI matrix: กำหนดบทบาทชัดเจนก่อนเกิด incident

แนะนำเพิ่มเติม — iCafeForex

เนื้อหาเกี่ยวข้อง — mô hình resin giá rẻ

Q: Column masking กับ row-level security ต่างกันอย่างไร?

A: Column masking: ซ่อนค่าใน column (เช่น email → j***@example.com) — ทุกคนเห็น row แต่ค่าถูก mask Row-level security: กรอง rows ตาม user — เห็นเฉพาะ rows ที่มีสิทธิ์ ใช้ร่วมกัน: row filter กรอง region + column mask ซ่อน PII ทั้งสองเป็น built-in ใน Unity Catalog — ไม่ต้อง implement เอง

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ CSS Container Queries 12 Factor App

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง