Unity Catalog Edge
Databricks Unity Catalog Edge Deployment Data Governance Catalog Schema Access Control Lineage Audit Model Registry MLflow Delta Lake IoT Factory Retail
| Component | Purpose | Scope | Key Feature |
|---|---|---|---|
| Metastore | Top-level Container | Account-level | Workspace Binding |
| Catalog | First Namespace | Environment | production staging |
| Schema | Second Namespace | Domain | sales analytics ml |
| Table/View | Data Objects | Object-level | Delta Lake Format |
| Model | ML Models | Object-level | Version Alias |
Unity Catalog Setup
# === Unity Catalog Configuration ===
# SQL — Create Catalog and Schema
# CREATE CATALOG IF NOT EXISTS production;
# CREATE SCHEMA IF NOT EXISTS production.ml_models;
# CREATE SCHEMA IF NOT EXISTS production.feature_store;
# CREATE SCHEMA IF NOT EXISTS production.edge_metrics;
#
# -- Grant permissions
# GRANT USE CATALOG ON CATALOG production TO `data-engineers`;
# GRANT USE SCHEMA ON SCHEMA production.ml_models TO `ml-engineers`;
# GRANT SELECT ON SCHEMA production.feature_store TO `ml-engineers`;
# GRANT CREATE TABLE ON SCHEMA production.edge_metrics TO `edge-devices`;
#
# -- Row-level security
# CREATE FUNCTION production.ml_models.region_filter(region STRING)
# RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admin'), true, region = current_user_region());
#
# ALTER TABLE production.ml_models.predictions
# SET ROW FILTER production.ml_models.region_filter ON (region);
#
# -- Column masking
# CREATE FUNCTION production.ml_models.mask_pii(val STRING)
# RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), val, '***MASKED***');
#
# ALTER TABLE production.feature_store.customers
# ALTER COLUMN email SET MASK production.ml_models.mask_pii;
# Python — Unity Catalog with MLflow
# import mlflow
# from mlflow.models import infer_signature
#
# mlflow.set_registry_uri("databricks-uc")
#
# # Register model in Unity Catalog
# with mlflow.start_run():
# model = train_model(X_train, y_train)
# signature = infer_signature(X_train, model.predict(X_train))
#
# mlflow.sklearn.log_model(
# model,
# artifact_path="model",
# registered_model_name="production.ml_models.edge_detector",
# signature=signature,
# input_example=X_train[:5],
# )
#
# # Set model alias
# from mlflow import MlflowClient
# client = MlflowClient()
# client.set_registered_model_alias(
# name="production.ml_models.edge_detector",
# alias="champion",
# version=3
# )
from dataclasses import dataclass
@dataclass
class CatalogObject:
full_name: str
object_type: str
owner: str
access: str
lineage: str
objects = [
CatalogObject("production.ml_models.edge_detector", "Model", "ml-team", "ml-engineers: USE", "training_data → model"),
CatalogObject("production.feature_store.user_features", "Table", "data-team", "ml-engineers: SELECT", "raw_events → features"),
CatalogObject("production.edge_metrics.predictions", "Table", "edge-service", "analysts: SELECT", "edge_device → metrics"),
CatalogObject("production.ml_models.preprocessing", "Function", "ml-team", "ml-engineers: EXECUTE", "feature_store → transform"),
]
print("=== Unity Catalog Objects ===")
for o in objects:
print(f" [{o.object_type}] {o.full_name}")
print(f" Owner: {o.owner} | Access: {o.access}")
print(f" Lineage: {o.lineage}")
Edge Deployment Pipeline
# === Edge Model Deployment ===
# Deployment Pipeline
# 1. Train model in Databricks
# 2. Register in Unity Catalog
# 3. Set alias "champion"
# 4. Export to ONNX/TensorRT
# 5. Push to Edge Registry (Harbor/ECR)
# 6. Deploy to Edge devices via GitOps
# 7. Monitor metrics → send back to Unity Catalog
# Edge Deployment Script
# import mlflow
# import onnx
# import docker
#
# def deploy_to_edge(model_name, alias, edge_devices):
# client = MlflowClient()
# model_version = client.get_model_version_by_alias(model_name, alias)
# model_uri = f"models:/{model_name}@{alias}"
#
# # Export to ONNX
# model = mlflow.pyfunc.load_model(model_uri)
# onnx_path = export_to_onnx(model, "model.onnx")
#
# # Build edge container
# docker_client = docker.from_env()
# image = docker_client.images.build(
# path="./edge-container",
# tag=f"edge-model:{model_version.version}",
# buildargs={"MODEL_PATH": onnx_path}
# )
#
# # Push to registry
# docker_client.images.push(f"registry/edge-model:{model_version.version}")
#
# # Deploy to edge devices
# for device in edge_devices:
# deploy_to_device(device, f"edge-model:{model_version.version}")
# log_deployment(model_name, model_version.version, device)
@dataclass
class EdgeDevice:
device_id: str
location: str
model_version: str
status: str
latency_ms: float
accuracy: float
last_sync: str
devices = [
EdgeDevice("edge-001", "Factory A - Line 1", "v3.2", "Running", 12.5, 0.965, "2 min ago"),
EdgeDevice("edge-002", "Factory A - Line 2", "v3.2", "Running", 14.2, 0.958, "1 min ago"),
EdgeDevice("edge-003", "Factory B - Line 1", "v3.1", "Updating", 0, 0, "Deploying v3.2"),
EdgeDevice("edge-004", "Retail Store 1", "v3.2", "Running", 18.7, 0.942, "5 min ago"),
EdgeDevice("edge-005", "Retail Store 2", "v3.2", "Offline", 0, 0, "Last seen 2h ago"),
]
print("\n=== Edge Device Fleet ===")
running = sum(1 for d in devices if d.status == "Running")
print(f" Active: {running}/{len(devices)}")
for d in devices:
print(f" [{d.status}] {d.device_id} @ {d.location}")
if d.status == "Running":
print(f" Model: {d.model_version} | Latency: {d.latency_ms}ms | Accuracy: {d.accuracy:.1%}")
print(f" Last Sync: {d.last_sync}")
Monitoring
# === Edge Monitoring with Unity Catalog ===
# Edge metrics table in Unity Catalog
# CREATE TABLE production.edge_metrics.inference_logs (
# device_id STRING,
# timestamp TIMESTAMP,
# model_version STRING,
# prediction DOUBLE,
# confidence DOUBLE,
# latency_ms DOUBLE,
# input_hash STRING
# ) USING DELTA
# PARTITIONED BY (date(timestamp))
# TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true');
# Monitoring Dashboard Queries
# -- Model performance by device
# SELECT device_id, model_version,
# COUNT(*) as predictions,
# AVG(confidence) as avg_confidence,
# AVG(latency_ms) as avg_latency,
# PERCENTILE(latency_ms, 0.99) as p99_latency
# FROM production.edge_metrics.inference_logs
# WHERE timestamp > current_timestamp() - INTERVAL 1 HOUR
# GROUP BY device_id, model_version;
# -- Data drift detection
# SELECT date(timestamp) as day,
# AVG(confidence) as avg_confidence,
# STDDEV(confidence) as std_confidence
# FROM production.edge_metrics.inference_logs
# GROUP BY day
# ORDER BY day DESC LIMIT 30;
@dataclass
class MonitoringAlert:
alert: str
condition: str
severity: str
action: str
current: str
alerts = [
MonitoringAlert("Low Accuracy", "accuracy < 0.90", "Critical", "Rollback model", "0.965 OK"),
MonitoringAlert("High Latency", "p99 > 50ms", "Warning", "Check device resources", "28ms OK"),
MonitoringAlert("Device Offline", "no sync > 1h", "Critical", "Check network/device", "1 offline"),
MonitoringAlert("Data Drift", "confidence stddev > 0.15", "Warning", "Retrain model", "0.08 OK"),
MonitoringAlert("Version Mismatch", "not latest champion", "Info", "Schedule update", "1 updating"),
]
print("Edge Monitoring Alerts:")
for a in alerts:
print(f" [{a.severity}] {a.alert}")
print(f" Condition: {a.condition} | Current: {a.current}")
print(f" Action: {a.action}")
governance = {
"Data Lineage": "Auto-tracked training data → model → edge predictions",
"Access Control": "GRANT/REVOKE per Catalog Schema Table",
"Audit Log": "Every query access deployment logged",
"Row Security": "Region-based row filtering",
"Column Masking": "PII columns masked for non-authorized",
"Delta Sharing": "Share data securely across organizations",
}
print(f"\n\nGovernance Features:")
for k, v in governance.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Namespace: ใช้ Catalog.Schema.Object จัดระเบียบ Data
- Alias: ใช้ Model Alias champion/challenger แทน Version Number
- Lineage: เปิด Lineage ติดตามที่มาข้อมูลอัตโนมัติ
- Monitor: ส่ง Edge Metrics กลับ Unity Catalog ทุก Inference
- Rollback: เตรียม Rollback Plan เสมอเมื่อ Deploy Model ใหม่
Unity Catalog คืออะไร
Unified Governance Databricks Data Assets Tables Views Functions Models Catalog Schema Object GRANT REVOKE Lineage Audit Delta Lake Sharing
Edge Deployment คืออะไร
Deploy Model Edge Device IoT Gateway Factory Retail Latency Offline Bandwidth Unity Catalog Model Version Lineage Audit Deployment
ตั้งค่า Unity Catalog อย่างไร
Metastore Workspace Catalog Schema Table GRANT Permission Row Security Column Masking Lineage Audit Log SIEM production staging
จัดการ Model สำหรับ Edge อย่างไร
Register Unity Catalog MLflow Version Alias champion challenger ONNX TensorRT Pipeline Push Device Monitor Metrics Baseline Rollback
สรุป
Databricks Unity Catalog Edge Deployment Data Governance Catalog Schema Access Control Lineage Audit Model Registry MLflow ONNX Edge Device Monitoring Production
