Databricks Unity Catalog Incident Management คืออะไร
Databricks Unity Catalog เป็น unified governance solution สำหรับ data และ AI assets บน Databricks Lakehouse Platform ครอบคลุม data catalog, access control, lineage tracking และ audit logging Incident Management คือกระบวนการจัดการเหตุการณ์ด้าน security และ data ที่ผิดปกติ ตั้งแต่ detection, triage, containment, remediation จนถึง post-mortem การรวม Unity Catalog กับ Incident Management ช่วยให้ตรวจจับ data breaches, unauthorized access และ policy violations ได้เร็ว พร้อม audit trail ครบถ้วนสำหรับ compliance
Unity Catalog Security Features
# unity_catalog.py — Unity Catalog security features
import json
class UnityCatalogSecurity:
FEATURES = {
"access_control": {
"name": "Fine-Grained Access Control",
"description": "กำหนดสิทธิ์ระดับ catalog, schema, table, column, row",
"example": "GRANT SELECT ON TABLE sales.transactions TO `data-analysts`",
},
"audit_logging": {
"name": "Audit Logging",
"description": "บันทึกทุก action: data access, permission changes, schema modifications",
"retention": "90 days default, configurable",
},
"lineage": {
"name": "Data Lineage",
"description": "ติดตาม data flow: source → transformations → downstream tables",
"benefit": "Impact analysis เมื่อมี incident — รู้ว่า data ไหนได้รับผลกระทบ",
},
"column_masking": {
"name": "Dynamic Column Masking",
"description": "ซ่อนข้อมูล sensitive (PII) — แสดง masked value ตาม user role",
"example": "email → j***@example.com สำหรับ analysts",
},
"row_filters": {
"name": "Row-Level Security",
"description": "กรองข้อมูลตาม user — เห็นเฉพาะ rows ที่มีสิทธิ์",
"example": "Sales team เห็นเฉพาะ region ของตัวเอง",
},
}
def show_features(self):
print("=== Unity Catalog Security ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}]")
print(f" {feat['description']}")
if 'example' in feat:
print(f" Example: {feat['example']}")
print()
uc = UnityCatalogSecurity()
uc.show_features()
Incident Detection
# detection.py — Incident detection from Unity Catalog audit logs
import json
class IncidentDetection:
CODE = """
# incident_detector.py — Detect security incidents from audit logs
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime, timedelta
class UnityIncidentDetector:
def __init__(self, spark):
self.spark = spark
self.audit_log = "system.access.audit"
def detect_unauthorized_access(self, hours=24):
'''Detect unauthorized data access attempts'''
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
incidents = self.spark.sql(f'''
SELECT
user_identity.email AS user_email,
request_params.full_name_arg AS resource,
action_name,
response.status_code,
event_time,
source_ip_address
FROM {self.audit_log}
WHERE event_time > '{cutoff}'
AND response.status_code = 403
ORDER BY event_time DESC
''')
# Group by user — flag if > 10 denied attempts
flagged = incidents.groupBy("user_email").agg(
F.count("*").alias("denied_count"),
F.collect_set("resource").alias("resources_attempted"),
F.max("event_time").alias("last_attempt"),
).filter("denied_count > 10")
return flagged
def detect_bulk_data_export(self, hours=24):
'''Detect unusual bulk data downloads'''
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
exports = self.spark.sql(f'''
SELECT
user_identity.email AS user_email,
request_params.full_name_arg AS table_name,
response.result AS rows_returned,
event_time,
source_ip_address
FROM {self.audit_log}
WHERE event_time > '{cutoff}'
AND action_name IN ('getTable', 'commandSubmit')
AND response.status_code = 200
''')
# Flag users with unusually high data access
flagged = exports.groupBy("user_email").agg(
F.count("*").alias("query_count"),
F.countDistinct("table_name").alias("tables_accessed"),
).filter("query_count > 100 OR tables_accessed > 20")
return flagged
def detect_permission_changes(self, hours=24):
'''Detect suspicious permission changes'''
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
changes = self.spark.sql(f'''
SELECT
user_identity.email AS changed_by,
action_name,
request_params,
event_time
FROM {self.audit_log}
WHERE event_time > '{cutoff}'
AND action_name IN (
'updatePermissions', 'createMetastore',
'updateExternalLocation', 'createStorageCredential'
)
ORDER BY event_time DESC
''')
return changes
def detect_off_hours_access(self, hours=24):
'''Detect data access outside business hours'''
cutoff = (datetime.now() - timedelta(hours=hours)).isoformat()
off_hours = self.spark.sql(f'''
SELECT
user_identity.email AS user_email,
action_name,
request_params.full_name_arg AS resource,
event_time,
hour(event_time) AS access_hour
FROM {self.audit_log}
WHERE event_time > '{cutoff}'
AND (hour(event_time) < 7 OR hour(event_time) > 22)
AND response.status_code = 200
''')
return off_hours
# detector = UnityIncidentDetector(spark)
# unauthorized = detector.detect_unauthorized_access(24)
# bulk_exports = detector.detect_bulk_data_export(24)
"""
def show_code(self):
print("=== Incident Detector ===")
print(self.CODE[:600])
detector = IncidentDetection()
detector.show_code()
Incident Response Workflow
# response.py — Incident response workflow
import json
class IncidentResponse:
WORKFLOW = {
"detect": {
"name": "1. Detection",
"description": "ตรวจจับ incident จาก audit logs, alerts, user reports",
"actions": [
"Automated monitoring ทุก 5 นาที",
"Alert rules สำหรับ suspicious patterns",
"User-reported incidents",
],
},
"triage": {
"name": "2. Triage & Classification",
"description": "ประเมินความรุนแรง, จัดลำดับความสำคัญ",
"severity": {
"P1 Critical": "Data breach ข้อมูล PII, unauthorized admin access",
"P2 High": "Bulk data export ผิดปกติ, permission escalation",
"P3 Medium": "Off-hours access, policy violations",
"P4 Low": "Failed login attempts, minor policy deviations",
},
},
"contain": {
"name": "3. Containment",
"description": "หยุดความเสียหาย — revoke access, disable accounts",
"actions": [
"REVOKE ALL PRIVILEGES จาก compromised user",
"Disable API tokens / personal access tokens",
"Isolate affected tables (restrict access)",
"Block suspicious IP addresses",
],
},
"investigate": {
"name": "4. Investigation",
"description": "วิเคราะห์ root cause — ใช้ audit logs + lineage",
"actions": [
"Query audit logs สำหรับ affected user/resource",
"Trace data lineage — ข้อมูลไหนถูก access",
"Check permission history — ใครเปลี่ยนอะไร",
"Identify scope of impact",
],
},
"remediate": {
"name": "5. Remediation",
"description": "แก้ไข root cause — fix permissions, patch vulnerabilities",
"actions": [
"Reset compromised credentials",
"Fix permission policies",
"Add missing column masking/row filters",
"Update monitoring rules",
],
},
"postmortem": {
"name": "6. Post-Mortem",
"description": "วิเคราะห์หลังเหตุการณ์ — lessons learned, preventive measures",
"deliverables": ["Incident timeline", "Root cause analysis", "Action items", "Updated playbooks"],
},
}
def show_workflow(self):
print("=== Incident Response Workflow ===\n")
for key, step in self.WORKFLOW.items():
print(f"[{step['name']}]")
print(f" {step['description']}")
if 'actions' in step:
for action in step['actions'][:3]:
print(f" • {action}")
print()
ir = IncidentResponse()
ir.show_workflow()
Automated Remediation
# remediation.py — Automated incident remediation
import json
import random
class AutoRemediation:
CODE = """
# auto_remediate.py — Automated remediation for Unity Catalog incidents
from databricks.sdk import WorkspaceClient
import json
class UnityRemediator:
def __init__(self):
self.client = WorkspaceClient()
def revoke_user_access(self, user_email, catalog=None):
'''Revoke all permissions for a user'''
if catalog:
self.client.grants.update(
securable_type="catalog",
full_name=catalog,
changes=[{
"principal": user_email,
"remove": ["ALL_PRIVILEGES"],
}]
)
# Disable personal access tokens
tokens = self.client.token_management.list()
for token in tokens:
if token.created_by == user_email:
self.client.token_management.delete(token.token_id)
return {"status": "access_revoked", "user": user_email}
def quarantine_table(self, table_name):
'''Restrict access to a compromised table'''
# Remove all grants except owner
current_grants = self.client.grants.get(
securable_type="table",
full_name=table_name
)
for grant in current_grants.privilege_assignments:
if grant.principal != "owner":
self.client.grants.update(
securable_type="table",
full_name=table_name,
changes=[{
"principal": grant.principal,
"remove": [p.privilege for p in grant.privileges],
}]
)
return {"status": "quarantined", "table": table_name}
def add_column_mask(self, table_name, column_name, mask_function):
'''Add dynamic column masking to sensitive column'''
spark.sql(f'''
ALTER TABLE {table_name}
ALTER COLUMN {column_name}
SET MASK {mask_function}
''')
return {"status": "masked", "column": f"{table_name}.{column_name}"}
def create_incident_ticket(self, incident):
'''Create incident ticket in tracking system'''
ticket = {
"id": f"INC-{random.randint(1000, 9999)}",
"severity": incident["severity"],
"type": incident["type"],
"affected_resources": incident.get("resources", []),
"status": "open",
"assigned_to": "security-team",
}
return ticket
# remediate = UnityRemediator()
# remediate.revoke_user_access("suspicious@company.com", catalog="production")
# remediate.quarantine_table("production.customers.pii_data")
"""
def show_code(self):
print("=== Auto Remediation ===")
print(self.CODE[:600])
def dashboard(self):
print(f"\n=== Incident Dashboard (30-day) ===")
print(f" Total incidents: {random.randint(5, 30)}")
print(f" P1 Critical: {random.randint(0, 2)}")
print(f" P2 High: {random.randint(1, 5)}")
print(f" P3 Medium: {random.randint(3, 15)}")
print(f" P4 Low: {random.randint(5, 20)}")
print(f" Avg MTTR: {random.uniform(0.5, 4):.1f} hours")
print(f" Auto-remediated: {random.randint(50, 90)}%")
rem = AutoRemediation()
rem.show_code()
rem.dashboard()
Monitoring & Alerting
# monitoring.py — Continuous monitoring setup
import json
import random
class MonitoringSetup:
ALERT_RULES = """
-- alert_rules.sql — Databricks SQL alert queries
-- Alert: Multiple failed access attempts
SELECT
user_identity.email,
COUNT(*) AS failed_count,
COLLECT_SET(request_params.full_name_arg) AS resources
FROM system.access.audit
WHERE event_time > current_timestamp() - INTERVAL 1 HOUR
AND response.status_code = 403
GROUP BY user_identity.email
HAVING failed_count > 10;
-- Alert: Permission escalation
SELECT
user_identity.email AS changed_by,
request_params.changes AS permission_changes,
event_time
FROM system.access.audit
WHERE event_time > current_timestamp() - INTERVAL 1 HOUR
AND action_name = 'updatePermissions'
AND request_params.changes LIKE '%ALL_PRIVILEGES%';
-- Alert: New external location created
SELECT
user_identity.email,
request_params.name AS location_name,
request_params.url AS storage_url,
event_time
FROM system.access.audit
WHERE event_time > current_timestamp() - INTERVAL 24 HOUR
AND action_name = 'createExternalLocation';
-- Alert: Unusual query volume per user
SELECT
user_identity.email,
COUNT(*) AS query_count,
COUNT(DISTINCT request_params.full_name_arg) AS distinct_tables
FROM system.access.audit
WHERE event_time > current_timestamp() - INTERVAL 1 HOUR
AND action_name = 'commandSubmit'
GROUP BY user_identity.email
HAVING query_count > 200 OR distinct_tables > 30;
"""
def show_alerts(self):
print("=== Alert Rules ===")
print(self.ALERT_RULES[:500])
def monitoring_dashboard(self):
print(f"\n=== Real-time Monitoring ===")
print(f" Active users: {random.randint(50, 300)}")
print(f" Queries/min: {random.randint(100, 2000)}")
print(f" Failed access (1h): {random.randint(0, 20)}")
print(f" Permission changes (24h): {random.randint(0, 10)}")
print(f" Tables accessed (1h): {random.randint(50, 500)}")
print(f" Alerts triggered (24h): {random.randint(0, 5)}")
mon = MonitoringSetup()
mon.show_alerts()
mon.monitoring_dashboard()
FAQ - คำถามที่พบบ่อย
Q: Unity Catalog audit logs เก็บได้นานแค่ไหน?
A: Default: 90 วัน ใน system.access.audit table ถ้าต้องเก็บนานกว่า: export ไป Delta Lake table ใน cloud storage กำหนด retention policy ตาม compliance: PCI DSS 1 ปี, GDPR ตาม purpose แนะนำ: สร้าง scheduled job export audit logs ทุกวัน → เก็บถาวร
Q: Incident detection ควร real-time หรือ batch?
A: ทั้งสองอย่าง: Real-time (streaming): สำหรับ P1/P2 — unauthorized access, data breach ต้อง detect ภายในนาที Batch (scheduled): สำหรับ P3/P4 — anomaly detection, compliance reporting ทุก 15-60 นาที ใช้ Databricks SQL Alerts สำหรับ near real-time (ทุก 1-5 นาที) ใช้ Structured Streaming สำหรับ true real-time
Q: ใครควรรับผิดชอบ incident management?
A: Data Security Team: lead incident response, investigation Platform Team: technical remediation, access management Data Owners: impact assessment, data classification Compliance Team: regulatory reporting, audit Legal: data breach notification (GDPR 72 ชั่วโมง) RACI matrix: กำหนดบทบาทชัดเจนก่อนเกิด incident
Q: Column masking กับ row-level security ต่างกันอย่างไร?
A: Column masking: ซ่อนค่าใน column (เช่น email → j***@example.com) — ทุกู้คืนเห็น row แต่ค่าถูก mask Row-level security: กรอง rows ตาม user — เห็นเฉพาะ rows ที่มีสิทธิ์ ใช้ร่วมกัน: row filter กรอง region + column mask ซ่อน PII ทั้งสองเป็น built-in ใน Unity Catalog — ไม่ต้อง implement เอง
