SiamCafe · Blog
Databricks Unity Catalog MLOps Workflow — จัดการ
บทความ

Databricks Unity Catalog MLOps Workflow — จัดการ

เผยแพร่ 28 พฤษภาคม 2569

Databricks Unity Catalog MLOps

Databricks Unity Catalog MLOps Workflow — จัดการ

Databricks Unity Catalog MLOps Model Registry Feature Store MLflow CI/CD DABs Data Governance Lineage Access Control Production

ComponentToolUnity Catalog FeatureStage
Data GovernanceUnity CatalogAccess Control Audit Lineageทุก Stage
Feature StoreUC Feature TablesFeature Lookup Online StoreTraining + Inference
Experiment TrackingMLflowRun Metrics ParametersTraining
Model RegistryUC Model RegistryVersion Alias LineageRegistration
Model ServingServing EndpointsReal-time InferenceDeployment
CI/CDDABs + GitHub ActionsBundle DeployAutomation
MonitoringLakehouse MonitoringData Drift Model QualityProduction

Model Training & Registry

=== MLOps with Unity Catalog ===

Databricks Notebook

import mlflow

from mlflow.models import infer_signature

from sklearn.ensemble import RandomForestClassifier

from databricks.feature_engineering import FeatureEngineeringClient

# Feature Engineering Client

fe = FeatureEngineeringClient()

# Load Features from Unity Catalog

training_set = fe.create_training_set(

df=labels_df,

feature_lookups=[

FeatureLookup(

table_name="production.ml.user_features",

lookup_key="user_id"

),

FeatureLookup(

table_name="production.ml.item_features",

lookup_key="item_id"

),

],

label="target"

)

training_df = training_set.load_df()

# Train Model

mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run():

model = RandomForestClassifier(n_estimators=100)

model.fit(X_train, y_train)

mlflow.log_metric("accuracy", accuracy)

mlflow.log_metric("f1_score", f1)

# Log Model with Feature Engineering

fe.log_model(

model=model,

artifact_path="model",

flavor=mlflow.sklearn,

training_set=training_set,

registered_model_name="production.ml.churn_model"

)

from dataclasses import dataclass

@dataclass

class MLOpsStage:

stage: str

tool: str

uc_feature: str

code_example: str

stages = [

MLOpsStage("Feature Engineering",

"Feature Engineering Client",

"production.ml.user_features (UC Table)",

"fe.create_training_set(df, feature_lookups=[...])"),

MLOpsStage("Experiment Tracking",

"MLflow",

"mlflow.log_metric() mlflow.log_param()",

"with mlflow.start_run(): ..."),

MLOpsStage("Model Registration",

"UC Model Registry",

"production.ml.churn_model (UC Model)",

"fe.log_model(registered_model_name='catalog.schema.model')"),

MLOpsStage("Model Alias",

"MLflow Client",

"Champion / Challenger / Archived",

"client.set_registered_model_alias('model', 'Champion', 3)"),

MLOpsStage("Model Serving",

"Serving Endpoints",

"Auto-scale GPU/CPU Endpoint",

"POST /serving-endpoints/churn-model/invocations"),

]

print("=== MLOps Stages ===")

for s in stages:

print(f" [{s.stage}] Tool: {s.tool}")

print(f" UC: {s.uc_feature}")

print(f" Code: {s.code_example}")

CI/CD with DABs

=== Databricks Asset Bundles CI/CD ===

databricks.yml

bundle:

name: churn-model-pipeline

workspace:

host: https://adb-123456.azuredatabricks.net

resources:

jobs:

training_job:

name: "Churn Model Training"

tasks:

  • task_key: train

notebook_task:

notebook_path: ./notebooks/train.py

existing_cluster_id: "0123-456789-abc"

inference_job:

name: "Churn Model Batch Inference"

tasks:

  • task_key: inference

notebook_task:

notebook_path: ./notebooks/inference.py

targets:

Databricks Unity Catalog MLOps Workflow — จัดการ

dev:

workspace:

host: https://adb-dev.azuredatabricks.net

staging:

workspace:

host: https://adb-staging.azuredatabricks.net

production:

workspace:

host: https://adb-prod.azuredatabricks.net

GitHub Actions

name: Deploy ML Pipeline

on:

push:

branches: [main]

jobs:

deploy:

runs-on: ubuntu-latest

steps:

  • uses: actions/checkout@v4
  • uses: databricks/setup-cli@main
  • run: databricks bundle validate -t production
  • run: databricks bundle deploy -t production

@dataclass

class CICDStep:

step: str

trigger: str

action: str

rollback: str

cicd_steps = [

CICDStep("Validate Bundle",

"PR Created",

"databricks bundle validate -t staging",

"Block PR ถ้า Validation Fail"),

CICDStep("Deploy to Staging",

"PR Merged to main",

"databricks bundle deploy -t staging",

"databricks bundle destroy -t staging"),

CICDStep("Run Training Job",

"After Staging Deploy",

"databricks jobs run-now --job-id $TRAINING_JOB",

"ใช้ Previous Model Version"),

CICDStep("Model Validation",

"After Training Complete",

"Compare Metrics กับ Champion Model",

"ไม่ Promote ถ้า Metrics ไม่ดีกว่า"),

CICDStep("Promote to Production",

"Validation Pass",

"set_registered_model_alias('Champion', new_version)",

"set_registered_model_alias('Champion', old_version)"),

]

print("=== CI/CD Pipeline ===")

for s in cicd_steps:

print(f" [{s.step}] Trigger: {s.trigger}")

print(f" Action: {s.action}")

print(f" Rollback: {s.rollback}")

Monitoring & Governance

# === Lakehouse Monitoring ===

@dataclass
class MonitorMetric:
    metric: str
    source: str
    threshold: str
    action: str

monitor_metrics = [
    MonitorMetric("Model Accuracy",
        "Lakehouse Monitoring (Inference Table)",
        "Drop > 5% จาก Baseline",
        "Alert + Trigger Retraining Job"),
    MonitorMetric("Data Drift (PSI)",
        "Lakehouse Monitoring (Feature Table)",
        "PSI > 0.2 (Significant Drift)",
        "Alert + Review Feature Pipeline"),
    MonitorMetric("Prediction Drift",
        "Inference Log Table",
        "Distribution Change > 10%",
        "Alert + Compare with Ground Truth"),
    MonitorMetric("Latency (Serving)",
        "Serving Endpoint Metrics",
        "P99 > 200ms",
        "Scale Up Endpoint หรือ Optimize Model"),
    MonitorMetric("Data Quality",
        "Lakehouse Monitoring (Source Table)",
        "Null Rate > 5% Schema Change",
        "Alert + Block Pipeline"),
    MonitorMetric("Access Audit",
        "Unity Catalog Audit Log",
        "Unauthorized Access Attempt",
        "Alert Security Team"),
]

print("=== Monitoring Metrics ===")
for m in monitor_metrics:
    print(f"  [{m.metric}] Source: {m.source}")
    print(f"    Threshold: {m.threshold}")
    print(f"    Action: {m.action}")

เคล็ดลับ

  • UC: ใช้ Unity Catalog เป็น Single Source of Truth สำหรับ Data + Model
  • Alias: ใช้ Champion/Challenger Alias แทน Stage Transition
  • DABs: ใช้ Databricks Asset Bundles สำหรับ IaC CI/CD
  • Feature Store: ใช้ UC Feature Store สำหรับ Consistent Features
  • Monitor: ตั้ง Lakehouse Monitoring ตรวจ Drift ทุกวัน

Unity Catalog คืออะไร

Unified Governance Databricks Catalog.Schema.Table Access Control Lineage Audit Delta Sharing Cross-workspace Model Registry Feature Store