ai

Databricks Unity Catalog Edge Deployment —

Databricks Unity Catalog Edge Deployment —

Unity Catalog Edge

Databricks Unity Catalog Edge Deployment —

Databricks Unity Catalog Edge Deployment Data Governance Catalog Schema Access Control Lineage Audit Model Registry MLflow Delta Lake IoT Factory Retail

ComponentPurposeScopeKey Feature
MetastoreTop-level ContainerAccount-levelWorkspace Binding
CatalogFirst NamespaceEnvironmentproduction staging
SchemaSecond NamespaceDomainsales analytics ml
Table/ViewData ObjectsObject-levelDelta Lake Format
ModelML ModelsObject-levelVersion Alias

Unity Catalog Setup

=== Unity Catalog Configuration ===

SQL — Create Catalog and Schema

CREATE CATALOG IF NOT EXISTS production;

CREATE SCHEMA IF NOT EXISTS production.ml_models;

CREATE SCHEMA IF NOT EXISTS production.feature_store;

CREATE SCHEMA IF NOT EXISTS production.edge_metrics;

-- Grant permissions

GRANT USE CATALOG ON CATALOG production TO `data-engineers`;

GRANT USE SCHEMA ON SCHEMA production.ml_models TO `ml-engineers`;

GRANT SELECT ON SCHEMA production.feature_store TO `ml-engineers`;

GRANT CREATE TABLE ON SCHEMA production.edge_metrics TO `edge-devices`;

-- Row-level security

CREATE FUNCTION production.ml_models.region_filter(region STRING)

RETURN IF(IS_ACCOUNT_GROUP_MEMBER('global-admin'), true, region = current_user_region());

ALTER TABLE production.ml_models.predictions

SET ROW FILTER production.ml_models.region_filter ON (region);

-- Column masking

CREATE FUNCTION production.ml_models.mask_pii(val STRING)

RETURN IF(IS_ACCOUNT_GROUP_MEMBER('pii-access'), val, '***MASKED***');

ALTER TABLE production.feature_store.customers

ALTER COLUMN email SET MASK production.ml_models.mask_pii;

Python — Unity Catalog with MLflow

import mlflow

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Tailscale Mesh Stream Processing

from mlflow.models import infer_signature

mlflow.set_registry_uri("databricks-uc")

# Register model in Unity Catalog

with mlflow.start_run():

model = train_model(X_train, y_train)

signature = infer_signature(X_train, model.predict(X_train))

mlflow.sklearn.log_model(

แนะนำเพิ่มเติม — สัญญาณเทรดรายวัน XM Signal

model,

artifact_path="model",

registered_model_name="production.ml_models.edge_detector",

signature=signature,

input_example=X_train[:5],

)

# Set model alias

from mlflow import MlflowClient

client = MlflowClient()

client.set_registered_model_alias(

name="production.ml_models.edge_detector",

alias="champion",

version=3

)

from dataclasses import dataclass

@dataclass

class CatalogObject:

full_name: str

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Several การใช — คู่มือฉบับสมบูรณ์ 2026

object_type: str

owner: str

access: str

lineage: str

objects = [

CatalogObject("production.ml_models.edge_detector", "Model", "ml-team", "ml-engineers: USE", "training_data → model"),

CatalogObject("production.feature_store.user_features", "Table", "data-team", "ml-engineers: SELECT", "raw_events → features"),

CatalogObject("production.edge_metrics.predictions", "Table", "edge-service", "analysts: SELECT", "edge_device → metrics"),

CatalogObject("production.ml_models.preprocessing", "Function", "ml-team", "ml-engineers: EXECUTE", "feature_store → transform"),

]

print("=== Unity Catalog Objects ===")

for o in objects:

print(f" [{o.object_type}] {o.full_name}")

แนะนำเพิ่มเติม — ระบบเทรดของ iCafeForex

print(f" Owner: {o.owner} | Access: {o.access}")

print(f" Lineage: {o.lineage}")

Edge Deployment Pipeline

=== Edge Model Deployment ===

Deployment Pipeline

1. Train model in Databricks

2. Register in Unity Catalog

3. Set alias "champion"

4. Export to ONNX/TensorRT

5. Push to Edge Registry (Harbor/ECR)

6. Deploy to Edge devices via GitOps

7. Monitor metrics → send back to Unity Catalog

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Java Micronaut DNS Management — คู่มือฉบับสมบูรณ์ 2026

Edge Deployment Script

import mlflow

import onnx

import docker

def deploy_to_edge(model_name, alias, edge_devices):

Databricks Unity Catalog Edge Deployment —

client = MlflowClient()

model_version = client.get_model_version_by_alias(model_name, alias)

model_uri = f"models:/{model_name}@{alias}"

# Export to ONNX

model = mlflow.pyfunc.load_model(model_uri)

onnx_path = export_to_onnx(model, "model.onnx")

# Build edge container

docker_client = docker.from_env()

image = docker_client.images.build(

path="./edge-container",

tag=f"edge-model:{model_version.version}",

buildargs={"MODEL_PATH": onnx_path}

)

# Push to registry

docker_client.images.push(f"registry/edge-model:{model_version.version}")

# Deploy to edge devices

for device in edge_devices:

deploy_to_device(device, f"edge-model:{model_version.version}")

log_deployment(model_name, model_version.version, device)

@dataclass

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Snyk Code Security Identity Access Management

class EdgeDevice:

device_id: str

location: str

model_version: str

status: str

latency_ms: float

accuracy: float

last_sync: str

devices = [

EdgeDevice("edge-001", "Factory A - Line 1", "v3.2", "Running", 12.5, 0.965, "2 min ago"),

EdgeDevice("edge-002", "Factory A - Line 2", "v3.2", "Running", 14.2, 0.958, "1 min ago"),

EdgeDevice("edge-003", "Factory B - Line 1", "v3.1", "Updating", 0, 0, "Deploying v3.2"),

EdgeDevice("edge-004", "Retail Store 1", "v3.2", "Running", 18.7, 0.942, "5 min ago"),

EdgeDevice("edge-005", "Retail Store 2", "v3.2", "Offline", 0, 0, "Last seen 2h ago"),

]

print("\n=== Edge Device Fleet ===")

running = sum(1 for d in devices if d.status == "Running")

print(f" Active: {running}/{len(devices)}")

for d in devices:

print(f" [{d.status}] {d.device_id} @ {d.location}")

if d.status == "Running":

print(f" Model: {d.model_version} | Latency: {d.latency_ms}ms | Accuracy: {d.accuracy:.1%}")

print(f" Last Sync: {d.last_sync}")

Monitoring

# === Edge Monitoring with Unity Catalog ===



# Edge metrics table in Unity Catalog

# CREATE TABLE production.edge_metrics.inference_logs (

#   device_id STRING,

#   timestamp TIMESTAMP,

#   model_version STRING,

#   prediction DOUBLE,

#   confidence DOUBLE,

#   latency_ms DOUBLE,

#   input_hash STRING

# ) USING DELTA

# PARTITIONED BY (date(timestamp))

# TBLPROPERTIES ('delta.autoOptimize.optimizeWrite' = 'true');



# Monitoring Dashboard Queries

# -- Model performance by device

# SELECT device_id, model_version,

#   COUNT(*) as predictions,

#   AVG(confidence) as avg_confidence,

#   AVG(latency_ms) as avg_latency,

#   PERCENTILE(latency_ms, 0.99) as p99_latency

# FROM production.edge_metrics.inference_logs

# WHERE timestamp > current_timestamp() - INTERVAL 1 HOUR

# GROUP BY device_id, model_version;



# -- Data drift detection

# SELECT date(timestamp) as day,

#   AVG(confidence) as avg_confidence,

#   STDDEV(confidence) as std_confidence

# FROM production.edge_metrics.inference_logs

# GROUP BY day

# ORDER BY day DESC LIMIT 30;



@dataclass

class MonitoringAlert:

    alert: str

    condition: str

    severity: str

    action: str

    current: str



alerts = [

    MonitoringAlert("Low Accuracy", "accuracy < 0.90", "Critical", "Rollback model", "0.965 OK"),

    MonitoringAlert("High Latency", "p99 > 50ms", "Warning", "Check device resources", "28ms OK"),

    MonitoringAlert("Device Offline", "no sync > 1h", "Critical", "Check network/device", "1 offline"),

    MonitoringAlert("Data Drift", "confidence stddev > 0.15", "Warning", "Retrain model", "0.08 OK"),

    MonitoringAlert("Version Mismatch", "not latest champion", "Info", "Schedule update", "1 updating"),

]



print("Edge Monitoring Alerts:")

for a in alerts:

    print(f"  [{a.severity}] {a.alert}")

    print(f"    Condition: {a.condition} | Current: {a.current}")

    print(f"    Action: {a.action}")



governance = {

    "Data Lineage": "Auto-tracked training data → model → edge predictions",

    "Access Control": "GRANT/REVOKE per Catalog Schema Table",

    "Audit Log": "Every query access deployment logged",

    "Row Security": "Region-based row filtering",

    "Column Masking": "PII columns masked for non-authorized",

    "Delta Sharing": "Share data securely across organizations",

}



print(f"\n\nGovernance Features:")

for k, v in governance.items():

    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Namespace: ใช้ Catalog.Schema.Object จัดระเบียบ Data
  • Alias: ใช้ Model Alias champion/challenger แทน Version Number
  • Lineage: เปิด Lineage ติดตามที่มาข้อมูลอัตโนมัติ
  • Monitor: ส่ง Edge Metrics กลับ Unity Catalog ทุก Inference
  • Rollback: เตรียม Rollback Plan เสมอเมื่อ Deploy Model ใหม่

Unity Catalog คืออะไร

Unified Governance Databricks Data Assets Tables Views Functions Models Catalog Schema Object GRANT REVOKE Lineage Audit Delta Lake Sharing

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง