Cybersecurity

Databricks Unity Catalog Edge Deployment

databricks unity catalog edge deployment
Databricks Unity Catalog Edge Deployment | SiamCafe Blog
2026-02-21· อ. บอม — SiamCafe.net· 9,420 คำ

Unity Catalog Edge

Databricks Unity Catalog Edge Deployment Data Governance Catalog Schema Access Control Lineage Audit Model Registry MLflow Delta Lake IoT Factory Retail

ComponentPurposeScopeKey Feature
MetastoreTop-level ContainerAccount-levelWorkspace Binding
CatalogFirst NamespaceEnvironmentproduction staging
SchemaSecond NamespaceDomainsales analytics ml
Table/ViewData ObjectsObject-levelDelta Lake Format
ModelML ModelsObject-levelVersion Alias

Unity Catalog Setup

# === Unity Catalog Configuration ===

# SQL — Create Catalog and Schema
# CREATE CATALOG IF NOT EXISTS production;
# CREATE SCHEMA IF NOT EXISTS production.ml_models;
# CREATE SCHEMA IF NOT EXISTS production.feature_store;
# CREATE SCHEMA IF NOT EXISTS production.edge_metrics;
#
# -- Grant permissions
# GRANT USE CATALOG ON CATALOG production TO `data-engineers`;
# GRANT USE SCHEMA ON SCHEMA production.ml_models TO `ml-engineers`;
# GRANT SELECT ON SCHEMA production.feature_store TO `ml-engineers`;
# GRANT CREATE TABLE ON SCHEMA production.edge_metrics TO `edge-devices`;
#
# -- Row-level security
# CREATE FUNCTION production.ml_models.region_filter(region STRING)
# RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admin'), true, region = current_user_region());
#
# ALTER TABLE production.ml_models.predictions
# SET ROW FILTER production.ml_models.region_filter ON (region);
#
# -- Column masking
# CREATE FUNCTION production.ml_models.mask_pii(val STRING)
# RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), val, '***MASKED***');
#
# ALTER TABLE production.feature_store.customers
# ALTER COLUMN email SET MASK production.ml_models.mask_pii;

# Python — Unity Catalog with MLflow
# import mlflow
# from mlflow.models import infer_signature
#
# mlflow.set_registry_uri("databricks-uc")
#
# # Register model in Unity Catalog
# with mlflow.start_run():
#     model = train_model(X_train, y_train)
#     signature = infer_signature(X_train, model.predict(X_train))
#
#     mlflow.sklearn.log_model(
#         model,
#         artifact_path="model",
#         registered_model_name="production.ml_models.edge_detector",
#         signature=signature,
#         input_example=X_train[:5],
#     )
#
# # Set model alias
# from mlflow import MlflowClient
# client = MlflowClient()
# client.set_registered_model_alias(
#     name="production.ml_models.edge_detector",
#     alias="champion",
#     version=3
# )

from dataclasses import dataclass

@dataclass
class CatalogObject:
    full_name: str
    object_type: str
    owner: str
    access: str
    lineage: str

objects = [
    CatalogObject("production.ml_models.edge_detector", "Model", "ml-team", "ml-engineers: USE", "training_data → model"),
    CatalogObject("production.feature_store.user_features", "Table", "data-team", "ml-engineers: SELECT", "raw_events → features"),
    CatalogObject("production.edge_metrics.predictions", "Table", "edge-service", "analysts: SELECT", "edge_device → metrics"),
    CatalogObject("production.ml_models.preprocessing", "Function", "ml-team", "ml-engineers: EXECUTE", "feature_store → transform"),
]

print("=== Unity Catalog Objects ===")
for o in objects:
    print(f"  [{o.object_type}] {o.full_name}")
    print(f"    Owner: {o.owner} | Access: {o.access}")
    print(f"    Lineage: {o.lineage}")

Edge Deployment Pipeline

# === Edge Model Deployment ===

# Deployment Pipeline
# 1. Train model in Databricks
# 2. Register in Unity Catalog
# 3. Set alias "champion"
# 4. Export to ONNX/TensorRT
# 5. Push to Edge Registry (Harbor/ECR)
# 6. Deploy to Edge devices via GitOps
# 7. Monitor metrics → send back to Unity Catalog

# Edge Deployment Script
# import mlflow
# import onnx
# import docker
#
# def deploy_to_edge(model_name, alias, edge_devices):
#     client = MlflowClient()
#     model_version = client.get_model_version_by_alias(model_name, alias)
#     model_uri = f"models:/{model_name}@{alias}"
#
#     # Export to ONNX
#     model = mlflow.pyfunc.load_model(model_uri)
#     onnx_path = export_to_onnx(model, "model.onnx")
#
#     # Build edge container
#     docker_client = docker.from_env()
#     image = docker_client.images.build(
#         path="./edge-container",
#         tag=f"edge-model:{model_version.version}",
#         buildargs={"MODEL_PATH": onnx_path}
#     )
#
#     # Push to registry
#     docker_client.images.push(f"registry/edge-model:{model_version.version}")
#
#     # Deploy to edge devices
#     for device in edge_devices:
#         deploy_to_device(device, f"edge-model:{model_version.version}")
#         log_deployment(model_name, model_version.version, device)

@dataclass
class EdgeDevice:
    device_id: str
    location: str
    model_version: str
    status: str
    latency_ms: float
    accuracy: float
    last_sync: str

devices = [
    EdgeDevice("edge-001", "Factory A - Line 1", "v3.2", "Running", 12.5, 0.965, "2 min ago"),
    EdgeDevice("edge-002", "Factory A - Line 2", "v3.2", "Running", 14.2, 0.958, "1 min ago"),
    EdgeDevice("edge-003", "Factory B - Line 1", "v3.1", "Updating", 0, 0, "Deploying v3.2"),
    EdgeDevice("edge-004", "Retail Store 1", "v3.2", "Running", 18.7, 0.942, "5 min ago"),
    EdgeDevice("edge-005", "Retail Store 2", "v3.2", "Offline", 0, 0, "Last seen 2h ago"),
]

print("\n=== Edge Device Fleet ===")
running = sum(1 for d in devices if d.status == "Running")
print(f"  Active: {running}/{len(devices)}")
for d in devices:
    print(f"  [{d.status}] {d.device_id} @ {d.location}")
    if d.status == "Running":
        print(f"    Model: {d.model_version} | Latency: {d.latency_ms}ms | Accuracy: {d.accuracy:.1%}")
    print(f"    Last Sync: {d.last_sync}")

Monitoring

# === Edge Monitoring with Unity Catalog ===

# Edge metrics table in Unity Catalog
# CREATE TABLE production.edge_metrics.inference_logs (
#   device_id STRING,
#   timestamp TIMESTAMP,
#   model_version STRING,
#   prediction DOUBLE,
#   confidence DOUBLE,
#   latency_ms DOUBLE,
#   input_hash STRING
# ) USING DELTA
# PARTITIONED BY (date(timestamp))
# TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true');

# Monitoring Dashboard Queries
# -- Model performance by device
# SELECT device_id, model_version,
#   COUNT(*) as predictions,
#   AVG(confidence) as avg_confidence,
#   AVG(latency_ms) as avg_latency,
#   PERCENTILE(latency_ms, 0.99) as p99_latency
# FROM production.edge_metrics.inference_logs
# WHERE timestamp > current_timestamp() - INTERVAL 1 HOUR
# GROUP BY device_id, model_version;

# -- Data drift detection
# SELECT date(timestamp) as day,
#   AVG(confidence) as avg_confidence,
#   STDDEV(confidence) as std_confidence
# FROM production.edge_metrics.inference_logs
# GROUP BY day
# ORDER BY day DESC LIMIT 30;

@dataclass
class MonitoringAlert:
    alert: str
    condition: str
    severity: str
    action: str
    current: str

alerts = [
    MonitoringAlert("Low Accuracy", "accuracy < 0.90", "Critical", "Rollback model", "0.965 OK"),
    MonitoringAlert("High Latency", "p99 > 50ms", "Warning", "Check device resources", "28ms OK"),
    MonitoringAlert("Device Offline", "no sync > 1h", "Critical", "Check network/device", "1 offline"),
    MonitoringAlert("Data Drift", "confidence stddev > 0.15", "Warning", "Retrain model", "0.08 OK"),
    MonitoringAlert("Version Mismatch", "not latest champion", "Info", "Schedule update", "1 updating"),
]

print("Edge Monitoring Alerts:")
for a in alerts:
    print(f"  [{a.severity}] {a.alert}")
    print(f"    Condition: {a.condition} | Current: {a.current}")
    print(f"    Action: {a.action}")

governance = {
    "Data Lineage": "Auto-tracked training data → model → edge predictions",
    "Access Control": "GRANT/REVOKE per Catalog Schema Table",
    "Audit Log": "Every query access deployment logged",
    "Row Security": "Region-based row filtering",
    "Column Masking": "PII columns masked for non-authorized",
    "Delta Sharing": "Share data securely across organizations",
}

print(f"\n\nGovernance Features:")
for k, v in governance.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

Unity Catalog คืออะไร

Unified Governance Databricks Data Assets Tables Views Functions Models Catalog Schema Object GRANT REVOKE Lineage Audit Delta Lake Sharing

Edge Deployment คืออะไร

Deploy Model Edge Device IoT Gateway Factory Retail Latency Offline Bandwidth Unity Catalog Model Version Lineage Audit Deployment

ตั้งค่า Unity Catalog อย่างไร

Metastore Workspace Catalog Schema Table GRANT Permission Row Security Column Masking Lineage Audit Log SIEM production staging

จัดการ Model สำหรับ Edge อย่างไร

Register Unity Catalog MLflow Version Alias champion challenger ONNX TensorRT Pipeline Push Device Monitor Metrics Baseline Rollback

สรุป

Databricks Unity Catalog Edge Deployment Data Governance Catalog Schema Access Control Lineage Audit Model Registry MLflow ONNX Edge Device Monitoring Production

📖 บทความที่เกี่ยวข้อง

Databricks Unity Catalog Chaos Engineeringอ่านบทความ → Databricks Unity Catalog Site Reliability SREอ่านบทความ → Databricks Unity Catalog Data Pipeline ETLอ่านบทความ → Databricks Unity Catalog Cloud Native Designอ่านบทความ →

📚 ดูบทความทั้งหมด →