Databricks Unity Catalog MLOps
Databricks Unity Catalog MLOps Model Registry Feature Store MLflow CI/CD DABs Data Governance Lineage Access Control Production
| Component | Tool | Unity Catalog Feature | Stage |
|---|---|---|---|
| Data Governance | Unity Catalog | Access Control Audit Lineage | ทุก Stage |
| Feature Store | UC Feature Tables | Feature Lookup Online Store | Training + Inference |
| Experiment Tracking | MLflow | Run Metrics Parameters | Training |
| Model Registry | UC Model Registry | Version Alias Lineage | Registration |
| Model Serving | Serving Endpoints | Real-time Inference | Deployment |
| CI/CD | DABs + GitHub Actions | Bundle Deploy | Automation |
| Monitoring | Lakehouse Monitoring | Data Drift Model Quality | Production |
Model Training & Registry
# === MLOps with Unity Catalog ===
# Databricks Notebook
# import mlflow
# from mlflow.models import infer_signature
# from sklearn.ensemble import RandomForestClassifier
# from databricks.feature_engineering import FeatureEngineeringClient
#
# # Feature Engineering Client
# fe = FeatureEngineeringClient()
#
# # Load Features from Unity Catalog
# training_set = fe.create_training_set(
# df=labels_df,
# feature_lookups=[
# FeatureLookup(
# table_name="production.ml.user_features",
# lookup_key="user_id"
# ),
# FeatureLookup(
# table_name="production.ml.item_features",
# lookup_key="item_id"
# ),
# ],
# label="target"
# )
# training_df = training_set.load_df()
#
# # Train Model
# mlflow.set_registry_uri("databricks-uc")
# with mlflow.start_run():
# model = RandomForestClassifier(n_estimators=100)
# model.fit(X_train, y_train)
# mlflow.log_metric("accuracy", accuracy)
# mlflow.log_metric("f1_score", f1)
#
# # Log Model with Feature Engineering
# fe.log_model(
# model=model,
# artifact_path="model",
# flavor=mlflow.sklearn,
# training_set=training_set,
# registered_model_name="production.ml.churn_model"
# )
from dataclasses import dataclass
@dataclass
class MLOpsStage:
stage: str
tool: str
uc_feature: str
code_example: str
stages = [
MLOpsStage("Feature Engineering",
"Feature Engineering Client",
"production.ml.user_features (UC Table)",
"fe.create_training_set(df, feature_lookups=[...])"),
MLOpsStage("Experiment Tracking",
"MLflow",
"mlflow.log_metric() mlflow.log_param()",
"with mlflow.start_run(): ..."),
MLOpsStage("Model Registration",
"UC Model Registry",
"production.ml.churn_model (UC Model)",
"fe.log_model(registered_model_name='catalog.schema.model')"),
MLOpsStage("Model Alias",
"MLflow Client",
"Champion / Challenger / Archived",
"client.set_registered_model_alias('model', 'Champion', 3)"),
MLOpsStage("Model Serving",
"Serving Endpoints",
"Auto-scale GPU/CPU Endpoint",
"POST /serving-endpoints/churn-model/invocations"),
]
print("=== MLOps Stages ===")
for s in stages:
print(f" [{s.stage}] Tool: {s.tool}")
print(f" UC: {s.uc_feature}")
print(f" Code: {s.code_example}")
CI/CD with DABs
# === Databricks Asset Bundles CI/CD ===
# databricks.yml
# bundle:
# name: churn-model-pipeline
# workspace:
# host: https://adb-123456.azuredatabricks.net
# resources:
# jobs:
# training_job:
# name: "Churn Model Training"
# tasks:
# - task_key: train
# notebook_task:
# notebook_path: ./notebooks/train.py
# existing_cluster_id: "0123-456789-abc"
# inference_job:
# name: "Churn Model Batch Inference"
# tasks:
# - task_key: inference
# notebook_task:
# notebook_path: ./notebooks/inference.py
# targets:
# dev:
# workspace:
# host: https://adb-dev.azuredatabricks.net
# staging:
# workspace:
# host: https://adb-staging.azuredatabricks.net
# production:
# workspace:
# host: https://adb-prod.azuredatabricks.net
# GitHub Actions
# name: Deploy ML Pipeline
# on:
# push:
# branches: [main]
# jobs:
# deploy:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - uses: databricks/setup-cli@main
# - run: databricks bundle validate -t production
# - run: databricks bundle deploy -t production
@dataclass
class CICDStep:
step: str
trigger: str
action: str
rollback: str
cicd_steps = [
CICDStep("Validate Bundle",
"PR Created",
"databricks bundle validate -t staging",
"Block PR ถ้า Validation Fail"),
CICDStep("Deploy to Staging",
"PR Merged to main",
"databricks bundle deploy -t staging",
"databricks bundle destroy -t staging"),
CICDStep("Run Training Job",
"After Staging Deploy",
"databricks jobs run-now --job-id $TRAINING_JOB",
"ใช้ Previous Model Version"),
CICDStep("Model Validation",
"After Training Complete",
"Compare Metrics กับ Champion Model",
"ไม่ Promote ถ้า Metrics ไม่ดีกว่า"),
CICDStep("Promote to Production",
"Validation Pass",
"set_registered_model_alias('Champion', new_version)",
"set_registered_model_alias('Champion', old_version)"),
]
print("=== CI/CD Pipeline ===")
for s in cicd_steps:
print(f" [{s.step}] Trigger: {s.trigger}")
print(f" Action: {s.action}")
print(f" Rollback: {s.rollback}")
Monitoring & Governance
# === Lakehouse Monitoring ===
@dataclass
class MonitorMetric:
metric: str
source: str
threshold: str
action: str
monitor_metrics = [
MonitorMetric("Model Accuracy",
"Lakehouse Monitoring (Inference Table)",
"Drop > 5% จาก Baseline",
"Alert + Trigger Retraining Job"),
MonitorMetric("Data Drift (PSI)",
"Lakehouse Monitoring (Feature Table)",
"PSI > 0.2 (Significant Drift)",
"Alert + Review Feature Pipeline"),
MonitorMetric("Prediction Drift",
"Inference Log Table",
"Distribution Change > 10%",
"Alert + Compare with Ground Truth"),
MonitorMetric("Latency (Serving)",
"Serving Endpoint Metrics",
"P99 > 200ms",
"Scale Up Endpoint หรือ Optimize Model"),
MonitorMetric("Data Quality",
"Lakehouse Monitoring (Source Table)",
"Null Rate > 5% Schema Change",
"Alert + Block Pipeline"),
MonitorMetric("Access Audit",
"Unity Catalog Audit Log",
"Unauthorized Access Attempt",
"Alert Security Team"),
]
print("=== Monitoring Metrics ===")
for m in monitor_metrics:
print(f" [{m.metric}] Source: {m.source}")
print(f" Threshold: {m.threshold}")
print(f" Action: {m.action}")
เคล็ดลับ
- UC: ใช้ Unity Catalog เป็น Single Source of Truth สำหรับ Data + Model
- Alias: ใช้ Champion/Challenger Alias แทน Stage Transition
- DABs: ใช้ Databricks Asset Bundles สำหรับ IaC CI/CD
- Feature Store: ใช้ UC Feature Store สำหรับ Consistent Features
- Monitor: ตั้ง Lakehouse Monitoring ตรวจ Drift ทุกวัน
Unity Catalog คืออะไร
Unified Governance Databricks Catalog.Schema.Table Access Control Lineage Audit Delta Sharing Cross-workspace Model Registry Feature Store
MLOps Workflow เป็นอย่างไร
Feature Store Training MLflow Registry Alias Champion Challenger Serving Endpoint Batch Inference CI/CD DABs Monitoring Drift
Model Registry ใช้อย่างไร
register_model UC Namespace Version Alias Champion Challenger Lineage Access Control EXECUTE MANAGE Deploy Serving Rollback Alias
CI/CD ทำอย่างไร
DABs databricks.yml GitHub Actions Validate Deploy Staging Training Validation Promote Champion Production Rollback Alias Version
สรุป
Databricks Unity Catalog MLOps Model Registry Feature Store MLflow DABs CI/CD Lakehouse Monitoring Governance Production
