Databricks Unity Catalog ?????????????????????
Databricks Unity Catalog ???????????? unified governance solution ?????????????????? data ????????? AI assets ?????? Databricks Lakehouse Platform ??????????????????????????????????????? centralized metadata store ??????????????????????????? data catalog, access control, auditing ????????? lineage tracking ???????????? workspaces ?????????????????????
Unity Catalog ?????? 3-level namespace ????????? Catalog > Schema > Table/View/Function ????????????????????????????????????????????? data assets ????????????????????????????????? ?????????????????? fine-grained access control ??????????????? row ????????? column ?????? data lineage tracking ??????????????????????????? ????????? audit logging ????????? operations
ELK Stack (Elasticsearch, Logstash, Kibana) ???????????? log management platform ????????????????????? ????????????????????????????????? Unity Catalog ????????????????????????????????????????????? analyze ????????? visualize audit logs ????????? Databricks ????????????????????????????????????????????? ??????????????????????????? compliance monitoring, security analysis ????????? operational troubleshooting
??????????????????????????????????????????????????? Unity Catalog
Setup Unity Catalog ????????? ELK Stack
# === Unity Catalog Setup ===
# 1. Create Unity Catalog via Databricks CLI
pip install databricks-cli
databricks configure --token
# 2. Create Catalog and Schema
databricks unity-catalog catalogs create \
--name production_catalog \
--comment "Production data catalog"
databricks unity-catalog schemas create \
--catalog-name production_catalog \
--name raw_data \
--comment "Raw ingested data"
databricks unity-catalog schemas create \
--catalog-name production_catalog \
--name processed_data \
--comment "Processed and cleaned data"
# 3. SQL Commands in Databricks Notebook
# CREATE CATALOG IF NOT EXISTS production_catalog;
# CREATE SCHEMA IF NOT EXISTS production_catalog.raw_data;
# CREATE SCHEMA IF NOT EXISTS production_catalog.processed_data;
#
# -- Create managed table
# CREATE TABLE production_catalog.raw_data.events (
# event_id STRING,
# event_type STRING,
# user_id STRING,
# timestamp TIMESTAMP,
# payload STRING
# ) USING DELTA;
#
# -- Grant permissions
# GRANT USE CATALOG ON CATALOG production_catalog TO `data-engineers`;
# GRANT USE SCHEMA ON SCHEMA production_catalog.raw_data TO `data-engineers`;
# GRANT SELECT ON TABLE production_catalog.raw_data.events TO `data-analysts`;
# 4. ELK Stack Installation
# docker-compose.yml for ELK
cat > docker-compose-elk.yml << 'EOF'
version: "3.8"
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=true
- ELASTIC_PASSWORD=changeme
- "ES_JAVA_OPTS=-Xms2g -Xmx2g"
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
logstash:
image: docker.elastic.co/logstash/logstash:8.12.0
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044"
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.12.0
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=changeme
ports:
- "5601:5601"
depends_on:
- elasticsearch
volumes:
es_data:
EOF
docker compose -f docker-compose-elk.yml up -d
echo "Unity Catalog and ELK Stack configured"
Log Management ???????????? ELK Stack
Configure Logstash pipeline ?????????????????? Databricks audit logs
# === Logstash Pipeline Configuration ===
# logstash/pipeline/databricks-audit.conf
cat > logstash/pipeline/databricks-audit.conf << 'EOF'
input {
http {
port => 5044
codec => json
additional_codecs => { "application/json" => "json" }
}
file {
path => "/var/log/databricks/audit/*.json"
start_position => "beginning"
sincedb_path => "/var/lib/logstash/sincedb_databricks"
codec => json
}
}
filter {
# Parse timestamp
date {
match => ["timestamp", "ISO8601", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"]
target => "@timestamp"
}
# Enrich with action category
if [actionName] {
mutate {
add_field => {
"action_category" => ""
}
}
if [actionName] =~ /^(createTable|deleteTable|alterTable)/ {
mutate { replace => { "action_category" => "schema_change" } }
} else if [actionName] =~ /^(select|read)/ {
mutate { replace => { "action_category" => "data_access" } }
} else if [actionName] =~ /^(grant|revoke|deny)/ {
mutate { replace => { "action_category" => "permission_change" } }
} else if [actionName] =~ /^(login|logout|tokenCreate)/ {
mutate { replace => { "action_category" => "authentication" } }
}
}
# GeoIP for source IP
if [sourceIPAddress] {
geoip {
source => "sourceIPAddress"
target => "geoip"
}
}
# Remove sensitive fields
mutate {
remove_field => ["password", "token", "secret"]
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
user => "elastic"
password => "changeme"
index => "databricks-audit-%{+YYYY.MM.dd}"
template_name => "databricks-audit"
}
# Alert on critical events
if [action_category] == "permission_change" or [action_category] == "schema_change" {
http {
url => "https://hooks.slack.com/services/xxx/yyy/zzz"
http_method => "post"
format => "json"
mapping => {
"text" => "Databricks Alert: %{actionName} by %{userIdentity} on %{requestParams}"
}
}
}
}
EOF
echo "Logstash pipeline configured"
??????????????? Data Pipeline
??????????????? pipeline ??????????????????????????? audit logs ????????? Databricks ?????? ELK
#!/usr/bin/env python3
# audit_pipeline.py ??? Databricks Audit Log Pipeline
import json
import logging
import requests
from datetime import datetime, timedelta
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline")
class DatabricksAuditPipeline:
def __init__(self, workspace_url, token, elk_url):
self.workspace_url = workspace_url
self.token = token
self.elk_url = elk_url
self.headers = {"Authorization": f"Bearer {token}"}
def fetch_audit_logs(self, start_date, end_date):
"""Fetch audit logs from Databricks API"""
url = f"{self.workspace_url}/api/2.0/unity-catalog/audit-logs"
params = {
"start_time": start_date.isoformat(),
"end_time": end_date.isoformat(),
"limit": 1000,
}
all_logs = []
while True:
response = requests.get(url, headers=self.headers, params=params)
data = response.json()
logs = data.get("events", [])
all_logs.extend(logs)
next_token = data.get("next_page_token")
if not next_token:
break
params["page_token"] = next_token
return all_logs
def transform_log(self, log_entry):
"""Transform audit log for ELK ingestion"""
return {
"@timestamp": log_entry.get("timestamp"),
"workspace_id": log_entry.get("workspaceId"),
"action_name": log_entry.get("actionName"),
"service_name": log_entry.get("serviceName"),
"user_identity": log_entry.get("userIdentity", {}).get("email"),
"source_ip": log_entry.get("sourceIPAddress"),
"request_id": log_entry.get("requestId"),
"request_params": json.dumps(log_entry.get("requestParams", {})),
"response_status": log_entry.get("response", {}).get("statusCode"),
"audit_level": log_entry.get("auditLevel"),
}
def send_to_elk(self, logs):
"""Bulk send logs to Elasticsearch"""
bulk_data = []
for log in logs:
transformed = self.transform_log(log)
index_name = f"databricks-audit-{datetime.utcnow().strftime('%Y.%m.%d')}"
bulk_data.append(json.dumps({"index": {"_index": index_name}}))
bulk_data.append(json.dumps(transformed))
if bulk_data:
payload = "\n".join(bulk_data) + "\n"
response = requests.post(
f"{self.elk_url}/_bulk",
headers={"Content-Type": "application/x-ndjson"},
auth=("elastic", "changeme"),
data=payload,
)
return response.json()
return {"items": []}
def run_pipeline(self):
"""Execute full pipeline"""
end_date = datetime.utcnow()
start_date = end_date - timedelta(hours=1)
logger.info(f"Fetching logs from {start_date} to {end_date}")
logs = self.fetch_audit_logs(start_date, end_date)
logger.info(f"Fetched {len(logs)} audit events")
if logs:
result = self.send_to_elk(logs)
errors = sum(1 for item in result.get("items", []) if item.get("index", {}).get("error"))
logger.info(f"Sent {len(logs)} events, {errors} errors")
return {"events_processed": len(logs)}
# Demo
pipeline = DatabricksAuditPipeline(
"https://adb-123456789.azuredatabricks.net",
"dapi_token_xxx",
"http://localhost:9200"
)
print("Pipeline configured")
print("Run: pipeline.run_pipeline()")
Monitoring ????????? Auditing
??????????????? Kibana dashboards ????????? monitoring
#!/usr/bin/env python3
# monitoring.py ??? Audit Monitoring Dashboard
import json
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitoring")
class AuditMonitor:
def __init__(self):
self.alerts = []
def kibana_queries(self):
return {
"failed_logins": {
"query": 'action_name:"login" AND response_status:401',
"visualization": "line_chart",
"alert_threshold": "5 per hour",
},
"permission_changes": {
"query": 'action_name:("grantPermission" OR "revokePermission")',
"visualization": "data_table",
"alert_threshold": "any outside business hours",
},
"schema_modifications": {
"query": 'action_name:("createTable" OR "deleteTable" OR "alterTable")',
"visualization": "timeline",
"alert_threshold": "deleteTable always alert",
},
"data_exports": {
"query": 'action_name:"downloadResults" OR action_name:"exportTable"',
"visualization": "bar_chart",
"alert_threshold": "10 per user per day",
},
"top_users": {
"query": '*',
"aggregation": "terms on user_identity, top 20",
"visualization": "pie_chart",
},
"error_rate": {
"query": 'response_status:[400 TO 599]',
"visualization": "gauge",
"alert_threshold": "error_rate > 5%",
},
}
def compliance_report(self):
return {
"report_date": datetime.utcnow().isoformat(),
"period": "last_30_days",
"summary": {
"total_events": 245000,
"unique_users": 89,
"data_access_events": 180000,
"permission_changes": 34,
"schema_changes": 12,
"failed_authentications": 156,
"data_exports": 89,
},
"risk_findings": [
{"severity": "high", "finding": "User admin@corp.com accessed 15 tables outside normal hours"},
{"severity": "medium", "finding": "3 users have unused ADMIN permissions for 90+ days"},
{"severity": "low", "finding": "5 service accounts have not rotated tokens in 180 days"},
],
"recommendations": [
"Review and revoke unused admin permissions",
"Implement token rotation policy (max 90 days)",
"Enable IP allowlisting for production workspace",
"Set up automated alerts for after-hours data access",
],
}
monitor = AuditMonitor()
queries = monitor.kibana_queries()
print("Kibana Queries:")
for name, q in queries.items():
print(f" {name}: {q['query'][:60]}")
report = monitor.compliance_report()
print(f"\nCompliance: {report['summary']['total_events']} events, {len(report['risk_findings'])} findings")
Security ????????? Governance
Data governance best practices
# === Data Governance Configuration ===
# 1. Unity Catalog Access Control
# SQL commands in Databricks notebook:
# -- Create groups
# CREATE GROUP IF NOT EXISTS `data-engineers`;
# CREATE GROUP IF NOT EXISTS `data-analysts`;
# CREATE GROUP IF NOT EXISTS `data-scientists`;
# -- Catalog-level permissions
# GRANT USE CATALOG ON CATALOG production_catalog TO `data-engineers`;
# GRANT USE CATALOG ON CATALOG production_catalog TO `data-analysts`;
# -- Schema-level permissions
# GRANT ALL PRIVILEGES ON SCHEMA production_catalog.raw_data TO `data-engineers`;
# GRANT USE SCHEMA, SELECT ON SCHEMA production_catalog.processed_data TO `data-analysts`;
# -- Table-level permissions
# GRANT SELECT ON TABLE production_catalog.processed_data.user_metrics TO `data-scientists`;
# -- Row-level security
# CREATE FUNCTION production_catalog.default.region_filter(region STRING)
# RETURN IF(IS_ACCOUNT_GROUP_MEMBER('apac-team'), region = 'APAC',
# IF(IS_ACCOUNT_GROUP_MEMBER('emea-team'), region = 'EMEA', TRUE));
# -- Column masking
# CREATE FUNCTION production_catalog.default.mask_email(email STRING)
# RETURN IF(IS_ACCOUNT_GROUP_MEMBER('data-engineers'), email,
# CONCAT(LEFT(email, 2), '***@', SPLIT(email, '@')[1]));
# 2. Index Lifecycle Management (ILM) for ELK
cat > ilm-policy.json << 'EOF'
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "7d"
},
"set_priority": { "priority": 100 }
}
},
"warm": {
"min_age": "30d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 },
"set_priority": { "priority": 50 }
}
},
"cold": {
"min_age": "90d",
"actions": {
"freeze": {},
"set_priority": { "priority": 0 }
}
},
"delete": {
"min_age": "365d",
"actions": { "delete": {} }
}
}
}
}
EOF
# Apply ILM policy
curl -X PUT "localhost:9200/_ilm/policy/databricks-audit-policy" \
-H "Content-Type: application/json" \
-u elastic:changeme \
-d @ilm-policy.json
echo "Security and governance configured"
FAQ ??????????????????????????????????????????
Q: Unity Catalog ????????? Hive Metastore ???????????????????????????????????????????
A: Hive Metastore ???????????? legacy metastore ?????????????????? workspace ??????????????? ??????????????? fine-grained access control, ??????????????? data lineage, ??????????????? audit logging ??????????????? Unity Catalog ???????????? centralized metastore ???????????? workspaces ?????? RBAC ??????????????? row/column, data lineage ???????????????????????????, audit logging ?????????, ?????????????????? external locations (S3, ADLS, GCS) Databricks ???????????????????????? migrate ????????? Hive Metastore ?????? Unity Catalog ?????????????????????
Q: ELK ????????? Splunk ??????????????????????????????????????????????????? audit logs?
A: ELK (Elasticsearch, Logstash, Kibana) ???????????? open source ??????????????? license cost (?????????????????? Elastic Cloud) ????????????????????????????????? customize ?????????????????????????????? ???????????? manage infrastructure ????????? ????????????????????????????????????????????????????????? engineering capability Splunk ???????????? commercial product ????????????????????????????????? ?????? features ??????????????????????????????????????? support ?????? ????????? license cost ?????????????????? (?????????????????? data volume) ????????????????????????????????? enterprise ?????????????????????????????? out-of-box solution ?????????????????? Databricks audit logs ????????????????????????????????????????????? ???????????????????????? budget ????????? team capability
Q: Data Lineage ?????? Unity Catalog ?????????????????????????????????????
A: Unity Catalog track lineage ???????????????????????????????????? Spark jobs, SQL queries, Delta Lake operations ??????????????? query ????????????????????? table A ??????????????????????????? table B ????????????????????????????????????????????? B depends on A ??????????????????????????? Lineage tab ?????? Databricks UI ???????????? REST API ????????????????????????????????? Impact Analysis ?????????????????????????????????????????? schema ????????? table ???????????????????????? tables ?????????????????????????????????????????????????????? Root Cause Analysis ??????????????? data ????????????????????? trace ???????????????????????? source ????????? Compliance ??????????????????????????????????????? data ???????????????????????? ???????????????????????????????????????????????????????????????
Q: Audit logs ????????????????????????????????????????
A: Databricks ???????????? audit logs ?????? system tables 365 ????????? ?????????????????????????????????????????????????????? ?????????????????? compliance ?????????????????????????????????????????????????????? ???????????? export ???????????????????????????????????? ?????? ELK ????????? ILM (Index Lifecycle Management) ?????????????????? hot storage 7-30 ????????? ?????????????????? active search, warm 30-90 ????????? ?????? resources, cold 90-365 ????????? freeze index, delete ???????????? 1-7 ?????? ????????? compliance requirement ?????????????????? long-term archive ??????????????? S3 Glacier ???????????? Azure Cool Storage ????????????????????????????????? Elasticsearch ?????????
