Databricks Unity Catalog กับ Automation Script —
Databricks Unity Catalog

Unity Catalog เป็น Data Governance Solution ที่จัดการ Data Assets ทั้งหมดจากจุดเดียว ใช้ 3-level Namespace มี Fine-grained Access Control, Data Lineage และ Audit Logging
Automation Scripts ช่วยให้จัดการ Unity Catalog ได้อัตโนมัติ สร้าง Catalogs, Schemas, Grant Permissions, Monitor Quality ผ่าน API และ SDK
Unity Catalog Setup
# === Unity Catalog Setup ด้วย SQL ===
-- 1. สร้าง Catalog
CREATE CATALOG IF NOT EXISTS production
COMMENT 'Production data catalog';
CREATE CATALOG IF NOT EXISTS development
COMMENT 'Development data catalog';
CREATE CATALOG IF NOT EXISTS staging
COMMENT 'Staging data catalog';
-- 2. สร้าง Schema
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales domain data';
CREATE SCHEMA IF NOT EXISTS production.marketing
COMMENT 'Marketing domain data';
CREATE SCHEMA IF NOT EXISTS production.finance
COMMENT 'Finance domain data';
-- 3. สร้าง Table
CREATE TABLE IF NOT EXISTS production.sales.orders (
order_id BIGINT NOT NULL,
customer_id BIGINT NOT NULL,
product_id BIGINT NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status STRING NOT NULL,
created_at TIMESTAMP NOT NULL,
updated_at TIMESTAMP
)
USING DELTA
COMMENT 'Customer orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'quality' = 'gold'
);
-- 4. Grant Permissions
-- Data Engineers: full access
GRANT USE CATALOG ON CATALOG production TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-engineers`;
GRANT ALL PRIVILEGES ON SCHEMA production.sales TO `data-engineers`;
-- Data Analysts: read-only
GRANT USE CATALOG ON CATALOG production TO `data-analysts`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-analysts`;
GRANT SELECT ON SCHEMA production.sales TO `data-analysts`;
-- Data Scientists: read + create models
GRANT USE CATALOG ON CATALOG production TO `data-scientists`;
GRANT USE SCHEMA ON SCHEMA production.sales TO `data-scientists`;
GRANT SELECT ON SCHEMA production.sales TO `data-scientists`;
-- 5. Tags and Classification
ALTER TABLE production.sales.orders
SET TAGS ('domain' = 'sales', 'pii' = 'false', 'tier' = 'gold');
ALTER TABLE production.sales.orders
ALTER COLUMN customer_id SET TAGS ('pii' = 'true');
-- 6. Row-level Security
CREATE FUNCTION production.sales.region_filter(region STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-access'), true, region = current_user_region());
ALTER TABLE production.sales.orders
SET ROW FILTER production.sales.region_filter ON (region);
-- 7. Column Masking
CREATE FUNCTION production.sales.mask_email(email STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), email, regexp_replace(email, '(.).*@', '$1***@'));
ALTER TABLE production.sales.customers
ALTER COLUMN email SET MASK production.sales.mask_email;
Automation Scripts
# unity_catalog_automation.py — Unity Catalog Automation
# pip install databricks-sdk
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json
@dataclass
class CatalogConfig:
name: str
comment: str
schemas: List[Dict] = field(default_factory=list)
owner: str = "data-platform-team"
@dataclass
class PermissionGrant:
principal: str # group or user
catalog: str
schema: Optional[str] = None
privileges: List[str] = field(default_factory=list)
class UnityCatalogAutomation:
"""Automation สำหรับ Unity Catalog"""
def __init__(self, host, token):
self.host = host
self.token = token
self.catalogs_created = 0
self.schemas_created = 0
self.grants_applied = 0
def _api_call(self, method, endpoint, data=None):
"""เรียก Databricks REST API"""
import requests
url = f"{self.host}/api/2.1/unity-catalog{endpoint}"
headers = {"Authorization": f"Bearer {self.token}"}
if method == "GET":
resp = requests.get(url, headers=headers)
elif method == "POST":
resp = requests.post(url, headers=headers, json=data)
elif method == "PATCH":
resp = requests.patch(url, headers=headers, json=data)
return resp.json() if resp.status_code < 400 else {"error": resp.text}
def create_catalog(self, config: CatalogConfig):
"""สร้าง Catalog"""
result = self._api_call("POST", "/catalogs", {
"name": config.name,
"comment": config.comment,
"properties": {"owner": config.owner, "created_by": "automation"},
})
if "error" not in result:
self.catalogs_created += 1
print(f" Created catalog: {config.name}")
# สร้าง Schemas
for schema in config.schemas:
self.create_schema(config.name, schema["name"], schema.get("comment", ""))
return result
def create_schema(self, catalog, name, comment=""):
"""สร้าง Schema"""
result = self._api_call("POST", "/schemas", {
"name": name,
"catalog_name": catalog,
"comment": comment,
})
if "error" not in result:
self.schemas_created += 1
print(f" Created schema: {catalog}.{name}")
return result
def grant_permissions(self, grant: PermissionGrant):
"""Grant Permissions"""
securable_type = "schema" if grant.schema else "catalog"
securable_name = f"{grant.catalog}.{grant.schema}" if grant.schema else grant.catalog
changes = [{
"principal": grant.principal,
"add": grant.privileges,
}]
result = self._api_call("PATCH",
f"/permissions/{securable_type}/{securable_name}",
{"changes": changes})
if "error" not in result:
self.grants_applied += 1
privs = ", ".join(grant.privileges)
print(f" Granted [{privs}] on {securable_name} to {grant.principal}")
return result
def setup_environment(self, env_config):
"""Setup ทั้ง Environment"""
print(f"\n{'='*55}")
print(f"Unity Catalog Setup — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*55}")
# สร้าง Catalogs
for catalog_cfg in env_config.get("catalogs", []):
config = CatalogConfig(**catalog_cfg)
self.create_catalog(config)
# Apply Permissions
for grant_cfg in env_config.get("permissions", []):
grant = PermissionGrant(**grant_cfg)
self.grant_permissions(grant)
print(f"\n Summary:")
print(f" Catalogs: {self.catalogs_created}")
print(f" Schemas: {self.schemas_created}")
print(f" Grants: {self.grants_applied}")
# === Configuration File ===
env_config = {
"catalogs": [
{
"name": "production",
"comment": "Production data",
"schemas": [
{"name": "sales", "comment": "Sales domain"},
{"name": "marketing", "comment": "Marketing domain"},
{"name": "finance", "comment": "Finance domain"},
{"name": "ml_models", "comment": "ML models and features"},
],
},
{
"name": "development",
"comment": "Development sandbox",
"schemas": [
{"name": "sandbox", "comment": "Developer sandbox"},
{"name": "experiments", "comment": "ML experiments"},
],
},
],
"permissions": [
{"principal": "data-engineers", "catalog": "production",
"privileges": ["USE_CATALOG"]},
{"principal": "data-engineers", "catalog": "production",
"schema": "sales", "privileges": ["ALL_PRIVILEGES"]},
{"principal": "data-analysts", "catalog": "production",
"schema": "sales", "privileges": ["USE_SCHEMA", "SELECT"]},
],
}
# automation = UnityCatalogAutomation("https://my-workspace.cloud.databricks.com", "token")
# automation.setup_environment(env_config)
print(f"Config: {len(env_config['catalogs'])} catalogs, {len(env_config['permissions'])} grants")
Data Lineage และ Audit
# lineage_audit.py — Data Lineage และ Audit Monitoring
from dataclasses import dataclass
from typing import List, Dict
from datetime import datetime
@dataclass
class LineageEdge:
source_table: str
target_table: str
transformation: str
job_name: str
@dataclass
class AuditEvent:
timestamp: str
user: str
action: str
resource: str
details: str
class DataGovernanceMonitor:
"""Monitor Data Governance"""
def __init__(self):
self.lineage: List[LineageEdge] = []
self.audit_events: List[AuditEvent] = []
def add_lineage(self, source, target, transform, job):
self.lineage.append(LineageEdge(source, target, transform, job))
def add_audit(self, user, action, resource, details=""):
self.audit_events.append(AuditEvent(
datetime.now().isoformat(), user, action, resource, details))
def lineage_report(self):
"""แสดง Data Lineage"""
print(f"\nData Lineage ({len(self.lineage)} edges):")
for edge in self.lineage:
print(f" {edge.source_table}")
print(f" -> [{edge.transformation}] ({edge.job_name})")
print(f" -> {edge.target_table}")
def audit_report(self):
"""แสดง Audit Report"""
print(f"\nAudit Trail ({len(self.audit_events)} events):")
for event in self.audit_events[-10:]:
print(f" [{event.timestamp[:16]}] {event.user}: "
f"{event.action} on {event.resource}")
def compliance_check(self):
"""ตรวจสอบ Compliance"""
checks = [
("PII tables have row filters", True),
("All catalogs have owners", True),
("Audit logging enabled", True),
("Column masking on PII", True),
("No public access grants", True),
]
print(f"\nCompliance Check:")
passed = sum(1 for _, ok in checks if ok)
for check, ok in checks:
status = "PASS" if ok else "FAIL"
print(f" [{status}] {check}")
print(f"\n Score: {passed}/{len(checks)} ({passed/len(checks)*100:.0f}%)")
# ตัวอย่าง
monitor = DataGovernanceMonitor()
monitor.add_lineage("bronze.raw_orders", "silver.clean_orders", "Clean + Dedupe", "etl-orders")
monitor.add_lineage("silver.clean_orders", "gold.daily_revenue", "Aggregate", "agg-revenue")
monitor.add_lineage("silver.clean_orders", "gold.customer_segments", "ML Features", "ml-features")
monitor.add_audit("alice@company.com", "SELECT", "production.sales.orders")
monitor.add_audit("bob@company.com", "GRANT", "production.sales", "Granted SELECT to analysts")
monitor.add_audit("automation", "CREATE TABLE", "production.sales.daily_metrics")
monitor.lineage_report()
monitor.audit_report()
monitor.compliance_check()
Best Practices
- 3-level Namespace: ใช้ Catalog แยกตาม Environment (prod/dev/staging) Schema แยกตาม Domain
- Least Privilege: ให้สิทธิ์น้อยที่สุดที่จำเป็น ใช้ Groups แทน Users
- PII Protection: ใช้ Column Masking และ Row-level Security สำหรับข้อมูล PII
- Automation: ใช้ Terraform หรือ Databricks Asset Bundles จัดการ Infrastructure as Code
- Audit Logging: เปิด Audit Logging ตรวจสอบการเข้าถึงข้อมูลทุกครั้ง
- Data Classification: ใช้ Tags จัดประเภทข้อมูล (PII, Confidential, Public)
การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Text Generation WebUI Container Orchestration
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
แนะนำเพิ่มเติม — อีบุ๊กการลงทุน SiamCafeBook
Unity Catalog คืออะไร
Data Governance Solution ของ Databricks จัดการ Data Assets จากจุดเดียว Tables Views Volumes Models Functions 3-level Namespace Fine-grained Access Control Lineage Audit Multi-cloud
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: LocalAI Self-hosted Automation Script
Data Governance คืออะไร
กระบวนการจัดการข้อมูลให้มีคุณภาพปลอดภัยตามกฎระเบียบ Access Control Data Quality Lineage Classification Audit Trail
3-level Namespace คืออะไร
Catalog.Schema.Table เช่น production.sales.orders Catalog ระดับสูงสุด Schema จัดกลุ่มตาม Domain Table เป็น Data Asset จัดระเบียบควบคุมสิทธิ์ง่าย
แนะนำเพิ่มเติม — บทวิเคราะห์จาก XM Signal
เนื้อหาเกี่ยวข้อง — อ่านต่อ: nist cybersecurity framework คือ
วิธี Automate Data Governance ทำอย่างไร
ใช้ Databricks REST API SDK สร้าง Scripts Create Catalogs Schemas Grant Permissions Monitor Quality Generate Reports CI/CD Pipeline Terraform Databricks Asset Bundles
สรุป
Databricks Unity Catalog ให้ Data Governance ที่ครบถ้วน 3-level Namespace จัดระเบียบ Fine-grained Access Control Least Privilege Column Masking Row-level Security PII Protection Automation ด้วย API SDK Terraform Audit Logging ตรวจสอบการเข้าถึง Data Lineage ติดตามข้อมูล
เนื้อหาเกี่ยวข้อง — อ่านต่อ: AWS Glue ETL API Gateway Pattern — คู่มือฉบับสมบูรณ์ 2026 สำหรับนักพัฒนา





