SiamCafe · Blog
Databricks Unity Catalog Edge Deployment —
บทความ

Databricks Unity Catalog Edge Deployment —

เผยแพร่ 28 พฤษภาคม 2569

Unity Catalog Edge

Databricks Unity Catalog Edge Deployment —

Databricks Unity Catalog Edge Deployment Data Governance Catalog Schema Access Control Lineage Audit Model Registry MLflow Delta Lake IoT Factory Retail

ComponentPurposeScopeKey Feature
MetastoreTop-level ContainerAccount-levelWorkspace Binding
CatalogFirst NamespaceEnvironmentproduction staging
SchemaSecond NamespaceDomainsales analytics ml
Table/ViewData ObjectsObject-levelDelta Lake Format
ModelML ModelsObject-levelVersion Alias

Unity Catalog Setup

=== Unity Catalog Configuration ===

SQL — Create Catalog and Schema

CREATE CATALOG IF NOT EXISTS production;

CREATE SCHEMA IF NOT EXISTS production.ml_models;

CREATE SCHEMA IF NOT EXISTS production.feature_store;

CREATE SCHEMA IF NOT EXISTS production.edge_metrics;

-- Grant permissions

GRANT USE CATALOG ON CATALOG production TO `data-engineers`;

GRANT USE SCHEMA ON SCHEMA production.ml_models TO `ml-engineers`;

GRANT SELECT ON SCHEMA production.feature_store TO `ml-engineers`;

GRANT CREATE TABLE ON SCHEMA production.edge_metrics TO `edge-devices`;

-- Row-level security

CREATE FUNCTION production.ml_models.region_filter(region STRING)

RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admin'), true, region = current_user_region());

ALTER TABLE production.ml_models.predictions

SET ROW FILTER production.ml_models.region_filter ON (region);

-- Column masking

CREATE FUNCTION production.ml_models.mask_pii(val STRING)

RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), val, '***MASKED***');

ALTER TABLE production.feature_store.customers

ALTER COLUMN email SET MASK production.ml_models.mask_pii;

Python — Unity Catalog with MLflow

import mlflow

from mlflow.models import infer_signature

mlflow.set_registry_uri("databricks-uc")

# Register model in Unity Catalog

with mlflow.start_run():

model = train_model(X_train, y_train)

signature = infer_signature(X_train, model.predict(X_train))

mlflow.sklearn.log_model(

model,

artifact_path="model",

registered_model_name="production.ml_models.edge_detector",

signature=signature,

input_example=X_train[:5],

)

# Set model alias

from mlflow import MlflowClient

client = MlflowClient()

client.set_registered_model_alias(

name="production.ml_models.edge_detector",

alias="champion",

version=3

)

from dataclasses import dataclass

@dataclass

class CatalogObject:

full_name: str

object_type: str

owner: str

access: str

lineage: str

objects = [

CatalogObject("production.ml_models.edge_detector", "Model", "ml-team", "ml-engineers: USE", "training_data → model"),

CatalogObject("production.feature_store.user_features", "Table", "data-team", "ml-engineers: SELECT", "raw_events → features"),

CatalogObject("production.edge_metrics.predictions", "Table", "edge-service", "analysts: SELECT", "edge_device → metrics"),

CatalogObject("production.ml_models.preprocessing", "Function", "ml-team", "ml-engineers: EXECUTE", "feature_store → transform"),

]

print("=== Unity Catalog Objects ===")

for o in objects:

print(f" [{o.object_type}] {o.full_name}")

print(f" Owner: {o.owner} | Access: {o.access}")

print(f" Lineage: {o.lineage}")

Edge Deployment Pipeline

=== Edge Model Deployment ===

Deployment Pipeline

1. Train model in Databricks

2. Register in Unity Catalog

3. Set alias "champion"

4. Export to ONNX/TensorRT

5. Push to Edge Registry (Harbor/ECR)

6. Deploy to Edge devices via GitOps

7. Monitor metrics → send back to Unity Catalog

Edge Deployment Script

import mlflow

import onnx

import docker

def deploy_to_edge(model_name, alias, edge_devices):

Databricks Unity Catalog Edge Deployment —

client = MlflowClient()

model_version = client.get_model_version_by_alias(model_name, alias)

model_uri = f"models:/{model_name}@{alias}"

# Export to ONNX

model = mlflow.pyfunc.load_model(model_uri)

onnx_path = export_to_onnx(model, "model.onnx")

# Build edge container

docker_client = docker.from_env()

image = docker_client.images.build(

path="./edge-container",

tag=f"edge-model:{model_version.version}",

buildargs={"MODEL_PATH": onnx_path}

)

# Push to registry

docker_client.images.push(f"registry/edge-model:{model_version.version}")

# Deploy to edge devices

for device in edge_devices:

deploy_to_device(device, f"edge-model:{model_version.version}")

log_deployment(model_name, model_version.version, device)

@dataclass

class EdgeDevice:

device_id: str

location: str

model_version: str

status: str

latency_ms: float

accuracy: float

last_sync: str

devices = [

EdgeDevice("edge-001", "Factory A - Line 1", "v3.2", "Running", 12.5, 0.965, "2 min ago"),

EdgeDevice("edge-002", "Factory A - Line 2", "v3.2", "Running", 14.2, 0.958, "1 min ago"),

EdgeDevice("edge-003", "Factory B - Line 1", "v3.1", "Updating", 0, 0, "Deploying v3.2"),

EdgeDevice("edge-004", "Retail Store 1", "v3.2", "Running", 18.7, 0.942, "5 min ago"),

EdgeDevice("edge-005", "Retail Store 2", "v3.2", "Offline", 0, 0, "Last seen 2h ago"),

]

print("\n=== Edge Device Fleet ===")

running = sum(1 for d in devices if d.status == "Running")

print(f" Active: {running}/{len(devices)}")

for d in devices:

print(f" [{d.status}] {d.device_id} @ {d.location}")

if d.status == "Running":

print(f" Model: {d.model_version} | Latency: {d.latency_ms}ms | Accuracy: {d.accuracy:.1%}")

print(f" Last Sync: {d.last_sync}")

Monitoring

# === Edge Monitoring with Unity Catalog ===

# Edge metrics table in Unity Catalog
# CREATE TABLE production.edge_metrics.inference_logs (
#   device_id STRING,
#   timestamp TIMESTAMP,
#   model_version STRING,
#   prediction DOUBLE,
#   confidence DOUBLE,
#   latency_ms DOUBLE,
#   input_hash STRING
# ) USING DELTA
# PARTITIONED BY (date(timestamp))
# TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true');

# Monitoring Dashboard Queries
# -- Model performance by device
# SELECT device_id, model_version,
#   COUNT(*) as predictions,
#   AVG(confidence) as avg_confidence,
#   AVG(latency_ms) as avg_latency,
#   PERCENTILE(latency_ms, 0.99) as p99_latency
# FROM production.edge_metrics.inference_logs
# WHERE timestamp > current_timestamp() - INTERVAL 1 HOUR
# GROUP BY device_id, model_version;

# -- Data drift detection
# SELECT date(timestamp) as day,
#   AVG(confidence) as avg_confidence,
#   STDDEV(confidence) as std_confidence
# FROM production.edge_metrics.inference_logs
# GROUP BY day
# ORDER BY day DESC LIMIT 30;

@dataclass
class MonitoringAlert:
    alert: str
    condition: str
    severity: str
    action: str
    current: str

alerts = [
    MonitoringAlert("Low Accuracy", "accuracy < 0.90", "Critical", "Rollback model", "0.965 OK"),
    MonitoringAlert("High Latency", "p99 > 50ms", "Warning", "Check device resources", "28ms OK"),
    MonitoringAlert("Device Offline", "no sync > 1h", "Critical", "Check network/device", "1 offline"),
    MonitoringAlert("Data Drift", "confidence stddev > 0.15", "Warning", "Retrain model", "0.08 OK"),
    MonitoringAlert("Version Mismatch", "not latest champion", "Info", "Schedule update", "1 updating"),
]

print("Edge Monitoring Alerts:")
for a in alerts:
    print(f"  [{a.severity}] {a.alert}")
    print(f"    Condition: {a.condition} | Current: {a.current}")
    print(f"    Action: {a.action}")

governance = {
    "Data Lineage": "Auto-tracked training data → model → edge predictions",
    "Access Control": "GRANT/REVOKE per Catalog Schema Table",
    "Audit Log": "Every query access deployment logged",
    "Row Security": "Region-based row filtering",
    "Column Masking": "PII columns masked for non-authorized",
    "Delta Sharing": "Share data securely across organizations",
}

print(f"\n\nGovernance Features:")
for k, v in governance.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Namespace: ใช้ Catalog.Schema.Object จัดระเบียบ Data
  • Alias: ใช้ Model Alias champion/challenger แทน Version Number
  • Lineage: เปิด Lineage ติดตามที่มาข้อมูลอัตโนมัติ
  • Monitor: ส่ง Edge Metrics กลับ Unity Catalog ทุก Inference
  • Rollback: เตรียม Rollback Plan เสมอเมื่อ Deploy Model ใหม่

Unity Catalog คืออะไร

Unified Governance Databricks Data Assets Tables Views Functions Models Catalog Schema Object GRANT REVOKE Lineage Audit Delta Lake Sharing