
Databricks Unity Catalog Edge Deployment —
Unity Catalog Edge

Databricks Unity Catalog Edge Deployment Data Governance Catalog Schema Access Control Lineage Audit Model Registry MLflow Delta Lake IoT Factory Retail
| Component | Purpose | Scope | Key Feature |
|---|---|---|---|
| Metastore | Top-level Container | Account-level | Workspace Binding |
| Catalog | First Namespace | Environment | production staging |
| Schema | Second Namespace | Domain | sales analytics ml |
| Table/View | Data Objects | Object-level | Delta Lake Format |
| Model | ML Models | Object-level | Version Alias |
Unity Catalog Setup
=== Unity Catalog Configuration ===
SQL — Create Catalog and Schema
CREATE CATALOG IF NOT EXISTS production;
CREATE SCHEMA IF NOT EXISTS production.ml_models;
CREATE SCHEMA IF NOT EXISTS production.feature_store;
CREATE SCHEMA IF NOT EXISTS production.edge_metrics;
-- Grant permissions
GRANT USE CATALOG ON CATALOG production TO `data-engineers`;
GRANT USE SCHEMA ON SCHEMA production.ml_models TO `ml-engineers`;
GRANT SELECT ON SCHEMA production.feature_store TO `ml-engineers`;
GRANT CREATE TABLE ON SCHEMA production.edge_metrics TO `edge-devices`;
-- Row-level security
CREATE FUNCTION production.ml_models.region_filter(region STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admin'), true, region = current_user_region());
ALTER TABLE production.ml_models.predictions
SET ROW FILTER production.ml_models.region_filter ON (region);
-- Column masking
CREATE FUNCTION production.ml_models.mask_pii(val STRING)
RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), val, '***MASKED***');
ALTER TABLE production.feature_store.customers
ALTER COLUMN email SET MASK production.ml_models.mask_pii;
Python — Unity Catalog with MLflow
import mlflow
from mlflow.models import infer_signature
mlflow.set_registry_uri("databricks-uc")
# Register model in Unity Catalog
with mlflow.start_run():
model = train_model(X_train, y_train)
signature = infer_signature(X_train, model.predict(X_train))
mlflow.sklearn.log_model(
model,
artifact_path="model",
registered_model_name="production.ml_models.edge_detector",
signature=signature,
input_example=X_train[:5],
)
# Set model alias
from mlflow import MlflowClient
client = MlflowClient()
client.set_registered_model_alias(
name="production.ml_models.edge_detector",
alias="champion",
version=3
)
from dataclasses import dataclass
@dataclass
class CatalogObject:
full_name: str
object_type: str
owner: str
access: str
lineage: str
objects = [
CatalogObject("production.ml_models.edge_detector", "Model", "ml-team", "ml-engineers: USE", "training_data → model"),
CatalogObject("production.feature_store.user_features", "Table", "data-team", "ml-engineers: SELECT", "raw_events → features"),
CatalogObject("production.edge_metrics.predictions", "Table", "edge-service", "analysts: SELECT", "edge_device → metrics"),
CatalogObject("production.ml_models.preprocessing", "Function", "ml-team", "ml-engineers: EXECUTE", "feature_store → transform"),
]
print("=== Unity Catalog Objects ===")
for o in objects:
print(f" [{o.object_type}] {o.full_name}")
print(f" Owner: {o.owner} | Access: {o.access}")
print(f" Lineage: {o.lineage}")
Edge Deployment Pipeline
=== Edge Model Deployment ===
Deployment Pipeline
1. Train model in Databricks
2. Register in Unity Catalog
3. Set alias "champion"
4. Export to ONNX/TensorRT
5. Push to Edge Registry (Harbor/ECR)
6. Deploy to Edge devices via GitOps
7. Monitor metrics → send back to Unity Catalog
Edge Deployment Script
import mlflow
import onnx
import docker
def deploy_to_edge(model_name, alias, edge_devices):

client = MlflowClient()
model_version = client.get_model_version_by_alias(model_name, alias)
model_uri = f"models:/{model_name}@{alias}"
# Export to ONNX
model = mlflow.pyfunc.load_model(model_uri)
onnx_path = export_to_onnx(model, "model.onnx")
# Build edge container
docker_client = docker.from_env()
image = docker_client.images.build(
path="./edge-container",
tag=f"edge-model:{model_version.version}",
buildargs={"MODEL_PATH": onnx_path}
)
# Push to registry
docker_client.images.push(f"registry/edge-model:{model_version.version}")
# Deploy to edge devices
for device in edge_devices:
deploy_to_device(device, f"edge-model:{model_version.version}")
log_deployment(model_name, model_version.version, device)
@dataclass
class EdgeDevice:
device_id: str
location: str
model_version: str
status: str
latency_ms: float
accuracy: float
last_sync: str
devices = [
EdgeDevice("edge-001", "Factory A - Line 1", "v3.2", "Running", 12.5, 0.965, "2 min ago"),
EdgeDevice("edge-002", "Factory A - Line 2", "v3.2", "Running", 14.2, 0.958, "1 min ago"),
EdgeDevice("edge-003", "Factory B - Line 1", "v3.1", "Updating", 0, 0, "Deploying v3.2"),
EdgeDevice("edge-004", "Retail Store 1", "v3.2", "Running", 18.7, 0.942, "5 min ago"),
EdgeDevice("edge-005", "Retail Store 2", "v3.2", "Offline", 0, 0, "Last seen 2h ago"),
]
print("\n=== Edge Device Fleet ===")
running = sum(1 for d in devices if d.status == "Running")
print(f" Active: {running}/{len(devices)}")
for d in devices:
print(f" [{d.status}] {d.device_id} @ {d.location}")
if d.status == "Running":
print(f" Model: {d.model_version} | Latency: {d.latency_ms}ms | Accuracy: {d.accuracy:.1%}")
print(f" Last Sync: {d.last_sync}")
Monitoring
# === Edge Monitoring with Unity Catalog ===
# Edge metrics table in Unity Catalog
# CREATE TABLE production.edge_metrics.inference_logs (
# device_id STRING,
# timestamp TIMESTAMP,
# model_version STRING,
# prediction DOUBLE,
# confidence DOUBLE,
# latency_ms DOUBLE,
# input_hash STRING
# ) USING DELTA
# PARTITIONED BY (date(timestamp))
# TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true');
# Monitoring Dashboard Queries
# -- Model performance by device
# SELECT device_id, model_version,
# COUNT(*) as predictions,
# AVG(confidence) as avg_confidence,
# AVG(latency_ms) as avg_latency,
# PERCENTILE(latency_ms, 0.99) as p99_latency
# FROM production.edge_metrics.inference_logs
# WHERE timestamp > current_timestamp() - INTERVAL 1 HOUR
# GROUP BY device_id, model_version;
# -- Data drift detection
# SELECT date(timestamp) as day,
# AVG(confidence) as avg_confidence,
# STDDEV(confidence) as std_confidence
# FROM production.edge_metrics.inference_logs
# GROUP BY day
# ORDER BY day DESC LIMIT 30;
@dataclass
class MonitoringAlert:
alert: str
condition: str
severity: str
action: str
current: str
alerts = [
MonitoringAlert("Low Accuracy", "accuracy < 0.90", "Critical", "Rollback model", "0.965 OK"),
MonitoringAlert("High Latency", "p99 > 50ms", "Warning", "Check device resources", "28ms OK"),
MonitoringAlert("Device Offline", "no sync > 1h", "Critical", "Check network/device", "1 offline"),
MonitoringAlert("Data Drift", "confidence stddev > 0.15", "Warning", "Retrain model", "0.08 OK"),
MonitoringAlert("Version Mismatch", "not latest champion", "Info", "Schedule update", "1 updating"),
]
print("Edge Monitoring Alerts:")
for a in alerts:
print(f" [{a.severity}] {a.alert}")
print(f" Condition: {a.condition} | Current: {a.current}")
print(f" Action: {a.action}")
governance = {
"Data Lineage": "Auto-tracked training data → model → edge predictions",
"Access Control": "GRANT/REVOKE per Catalog Schema Table",
"Audit Log": "Every query access deployment logged",
"Row Security": "Region-based row filtering",
"Column Masking": "PII columns masked for non-authorized",
"Delta Sharing": "Share data securely across organizations",
}
print(f"\n\nGovernance Features:")
for k, v in governance.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Namespace: ใช้ Catalog.Schema.Object จัดระเบียบ Data
- Alias: ใช้ Model Alias champion/challenger แทน Version Number
- Lineage: เปิด Lineage ติดตามที่มาข้อมูลอัตโนมัติ
- Monitor: ส่ง Edge Metrics กลับ Unity Catalog ทุก Inference
- Rollback: เตรียม Rollback Plan เสมอเมื่อ Deploy Model ใหม่
Unity Catalog คืออะไร
Unified Governance Databricks Data Assets Tables Views Functions Models Catalog Schema Object GRANT REVOKE Lineage Audit Delta Lake Sharing