Databricks Unity Catalog
Unity Catalog เป็น Data Governance Solution ที่จัดการ Data Assets ทั้งหมดจากจุดเดียว ใช้ 3-level Namespace มี Fine-grained Access Control, Data Lineage และ Audit Logging
Automation Scripts ช่วยให้จัดการ Unity Catalog ได้อัตโนมัติ สร้าง Catalogs, Schemas, Grant Permissions, Monitor Quality ผ่าน API และ SDK
Unity Catalog Setup
# === Unity Catalog Setup ด้วย SQL ===
-- 1. สร้าง Catalog
CREATE CATALOG IF NOT EXISTS production
COMMENT 'Production data catalog';
CREATE CATALOG IF NOT EXISTS development
COMMENT 'Development data catalog';
CREATE CATALOG IF NOT EXISTS staging
COMMENT 'Staging data catalog';
-- 2. สร้าง Schema
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales domain data';
CREATE SCHEMA IF NOT EXISTS production.marketing
COMMENT 'Marketing domain data';
CREATE SCHEMA IF NOT EXISTS production.finance
COMMENT 'Finance domain data';
-- 3. สร้าง Table
CREATE TABLE IF NOT EXISTS production.sales.orders (
order_id BIGINT NOT NULL,
customer_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status STRING NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP
)
USING DELTA
COMMENT 'Customer orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'quality' = 'gold'
);
-- 4. Grant Permissions
-- Data Engineers: full access
GRANT USE CATALOG ON CATALOG production TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-engineers`;
GRANT ALL PRIVILEGES ON SCHEMA production.sales TO `data-engineers`;
-- Data Analysts: read-only
GRANT USE CATALOG ON CATALOG production TO `data-analysts`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-analysts`;
GRANT SELECT ON SCHEMA production.sales TO `data-analysts`;
-- Data Scientists: read + create models
GRANT USE CATALOG ON CATALOG production TO `data-scientists`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-scientists`;
GRANT SELECT ON SCHEMA production.sales TO `data-scientists`;
-- 5. Tags and Classification
ALTER TABLE production.sales.orders
SET TAGS ('domain' = 'sales', 'pii' = 'false', 'tier' = 'gold');
ALTER TABLE production.sales.orders
ALTER COLUMN customer_id SET TAGS ('pii' = 'true');
-- 6. Row-level Security
CREATE FUNCTION production.sales.region_filter(region STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-access'), true, region = current_user_region());
ALTER TABLE production.sales.orders
SET ROW FILTER production.sales.region_filter ON (region);
-- 7. Column Masking
CREATE FUNCTION production.sales.mask_email(email STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), email, regexp_replace(email, '(.).*@', '$1***@'));
ALTER TABLE production.sales.customers
ALTER COLUMN email SET MASK production.sales.mask_email;
Automation Scripts
# unity_catalog_automation.py — Unity Catalog Automation
# pip install databricks-sdk
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json
@dataclass
class CatalogConfig:
name: str
comment: str
schemas: List[Dict] = field(default_factory=list)
owner: str = "data-platform-team"
@dataclass
class PermissionGrant:
principal: str # group or user
catalog: str
schema: Optional[str] = None
privileges: List[str] = field(default_factory=list)
class UnityCatalogAutomation:
"""Automation สำหรับ Unity Catalog"""
def __init__(self, host, token):
self.host = host
self.token = token
self.catalogs_created = 0
self.schemas_created = 0
self.grants_applied = 0
def _api_call(self, method, endpoint, data=None):
"""เรียก Databricks REST API"""
import requests
url = f"{self.host}/api/2.1/unity-catalog{endpoint}"
headers = {"Authorization": f"Bearer {self.token}"}
if method == "GET":
resp = requests.get(url, headers=headers)
elif method == "POST":
resp = requests.post(url, headers=headers, json=data)
elif method == "PATCH":
resp = requests.patch(url, headers=headers, json=data)
return resp.json() if resp.status_code < 400 else {"error": resp.text}
def create_catalog(self, config: CatalogConfig):
"""สร้าง Catalog"""
result = self._api_call("POST", "/catalogs", {
"name": config.name,
"comment": config.comment,
"properties": {"owner": config.owner, "created_by": "automation"},
})
if "error" not in result:
self.catalogs_created += 1
print(f" Created catalog: {config.name}")
# สร้าง Schemas
for schema in config.schemas:
self.create_schema(config.name, schema["name"], schema.get("comment", ""))
return result
def create_schema(self, catalog, name, comment=""):
"""สร้าง Schema"""
result = self._api_call("POST", "/schemas", {
"name": name,
"catalog_name": catalog,
"comment": comment,
})
if "error" not in result:
self.schemas_created += 1
print(f" Created schema: {catalog}.{name}")
return result
def grant_permissions(self, grant: PermissionGrant):
"""Grant Permissions"""
securable_type = "schema" if grant.schema else "catalog"
securable_name = f"{grant.catalog}.{grant.schema}" if grant.schema else grant.catalog
changes = [{
"principal": grant.principal,
"add": grant.privileges,
}]
result = self._api_call("PATCH",
f"/permissions/{securable_type}/{securable_name}",
{"changes": changes})
if "error" not in result:
self.grants_applied += 1
privs = ", ".join(grant.privileges)
print(f" Granted [{privs}] on {securable_name} to {grant.principal}")
return result
def setup_environment(self, env_config):
"""Setup ทั้ง Environment"""
print(f"\n{'='*55}")
print(f"Unity Catalog Setup — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*55}")
# สร้าง Catalogs
for catalog_cfg in env_config.get("catalogs", []):
config = CatalogConfig(**catalog_cfg)
self.create_catalog(config)
# Apply Permissions
for grant_cfg in env_config.get("permissions", []):
grant = PermissionGrant(**grant_cfg)
self.grant_permissions(grant)
print(f"\n Summary:")
print(f" Catalogs: {self.catalogs_created}")
print(f" Schemas: {self.schemas_created}")
print(f" Grants: {self.grants_applied}")
# === Configuration File ===
env_config = {
"catalogs": [
{
"name": "production",
"comment": "Production data",
"schemas": [
{"name": "sales", "comment": "Sales domain"},
{"name": "marketing", "comment": "Marketing domain"},
{"name": "finance", "comment": "Finance domain"},
{"name": "ml_models", "comment": "ML models and features"},
],
},
{
"name": "development",
"comment": "Development sandbox",
"schemas": [
{"name": "sandbox", "comment": "Developer sandbox"},
{"name": "experiments", "comment": "ML experiments"},
],
},
],
"permissions": [
{"principal": "data-engineers", "catalog": "production",
"privileges": ["USE_CATALOG"]},
{"principal": "data-engineers", "catalog": "production",
"schema": "sales", "privileges": ["ALL_PRIVILEGES"]},
{"principal": "data-analysts", "catalog": "production",
"schema": "sales", "privileges": ["USE_SCHEMA", "SELECT"]},
],
}
# automation = UnityCatalogAutomation("https://my-workspace.cloud.databricks.com", "token")
# automation.setup_environment(env_config)
print(f"Config: {len(env_config['catalogs'])} catalogs, {len(env_config['permissions'])} grants")
Data Lineage และ Audit
# lineage_audit.py — Data Lineage และ Audit Monitoring
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime
@dataclass
class LineageEdge:
source_table: str
target_table: str
transformation: str
job_name: str
@dataclass
class AuditEvent:
timestamp: str
user: str
action: str
resource: str
details: str
class DataGovernanceMonitor:
"""Monitor Data Governance"""
def __init__(self):
self.lineage: List[LineageEdge] = []
self.audit_events: List[AuditEvent] = []
def add_lineage(self, source, target, transform, job):
self.lineage.append(LineageEdge(source, target, transform, job))
def add_audit(self, user, action, resource, details=""):
self.audit_events.append(AuditEvent(
datetime.now().isoformat(), user, action, resource, details))
def lineage_report(self):
"""แสดง Data Lineage"""
print(f"\nData Lineage ({len(self.lineage)} edges):")
for edge in self.lineage:
print(f" {edge.source_table}")
print(f" -> [{edge.transformation}] ({edge.job_name})")
print(f" -> {edge.target_table}")
def audit_report(self):
"""แสดง Audit Report"""
print(f"\nAudit Trail ({len(self.audit_events)} events):")
for event in self.audit_events[-10:]:
print(f" [{event.timestamp[:16]}] {event.user}: "
f"{event.action} on {event.resource}")
def compliance_check(self):
"""ตรวจสอบ Compliance"""
checks = [
("PII tables have row filters", True),
("All catalogs have owners", True),
("Audit logging enabled", True),
("Column masking on PII", True),
("No public access grants", True),
]
print(f"\nCompliance Check:")
passed = sum(1 for _, ok in checks if ok)
for check, ok in checks:
status = "PASS" if ok else "FAIL"
print(f" [{status}] {check}")
print(f"\n Score: {passed}/{len(checks)} ({passed/len(checks)*100:.0f}%)")
# ตัวอย่าง
monitor = DataGovernanceMonitor()
monitor.add_lineage("bronze.raw_orders", "silver.clean_orders", "Clean + Dedupe", "etl-orders")
monitor.add_lineage("silver.clean_orders", "gold.daily_revenue", "Aggregate", "agg-revenue")
monitor.add_lineage("silver.clean_orders", "gold.customer_segments", "ML Features", "ml-features")
monitor.add_audit("alice@company.com", "SELECT", "production.sales.orders")
monitor.add_audit("bob@company.com", "GRANT", "production.sales", "Granted SELECT to analysts")
monitor.add_audit("automation", "CREATE TABLE", "production.sales.daily_metrics")
monitor.lineage_report()
monitor.audit_report()
monitor.compliance_check()
Best Practices
- 3-level Namespace: ใช้ Catalog แยกตาม Environment (prod/dev/staging) Schema แยกตาม Domain
- Least Privilege: ให้สิทธิ์น้อยที่สุดที่จำเป็น ใช้ Groups แทน Users
- PII Protection: ใช้ Column Masking และ Row-level Security สำหรับข้อมูล PII
- Automation: ใช้ Terraform หรือ Databricks Asset Bundles จัดการ Infrastructure as Code
- Audit Logging: เปิด Audit Logging ตรวจสอบการเข้าถึงข้อมูลทุกครั้ง
- Data Classification: ใช้ Tags จัดประเภทข้อมูล (PII, Confidential, Public)
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
Unity Catalog คืออะไร
Data Governance Solution ของ Databricks จัดการ Data Assets จากจุดเดียว Tables Views Volumes Models Functions 3-level Namespace Fine-grained Access Control Lineage Audit Multi-cloud
Data Governance คืออะไร
กระบวนการจัดการข้อมูลให้มีคุณภาพปลอดภัยตามกฎระเบียบ Access Control Data Quality Lineage Classification Audit Trail
3-level Namespace คืออะไร
Catalog.Schema.Table เช่น production.sales.orders Catalog ระดับสูงสุด Schema จัดกลุ่มตาม Domain Table เป็น Data Asset จัดระเบียบควบคุมสิทธิ์ง่าย
วิธี Automate Data Governance ทำอย่างไร
ใช้ Databricks REST API SDK สร้าง Scripts Create Catalogs Schemas Grant Permissions Monitor Quality Generate Reports CI/CD Pipeline Terraform Databricks Asset Bundles
สรุป
Databricks Unity Catalog ให้ Data Governance ที่ครบถ้วน 3-level Namespace จัดระเบียบ Fine-grained Access Control Least Privilege Column Masking Row-level Security PII Protection Automation ด้วย API SDK Terraform Audit Logging ตรวจสอบการเข้าถึง Data Lineage ติดตามข้อมูล
