Cybersecurity

Databricks Unity Catalog MLOps Workflow

databricks unity catalog mlops workflow
Databricks Unity Catalog MLOps Workflow | SiamCafe Blog
2025-10-26· อ. บอม — SiamCafe.net· 11,127 คำ

Databricks Unity Catalog MLOps

Databricks Unity Catalog MLOps Model Registry Feature Store MLflow CI/CD DABs Data Governance Lineage Access Control Production

ComponentToolUnity Catalog FeatureStage
Data GovernanceUnity CatalogAccess Control Audit Lineageทุก Stage
Feature StoreUC Feature TablesFeature Lookup Online StoreTraining + Inference
Experiment TrackingMLflowRun Metrics ParametersTraining
Model RegistryUC Model RegistryVersion Alias LineageRegistration
Model ServingServing EndpointsReal-time InferenceDeployment
CI/CDDABs + GitHub ActionsBundle DeployAutomation
MonitoringLakehouse MonitoringData Drift Model QualityProduction

Model Training & Registry

# === MLOps with Unity Catalog ===

# Databricks Notebook
# import mlflow
# from mlflow.models import infer_signature
# from sklearn.ensemble import RandomForestClassifier
# from databricks.feature_engineering import FeatureEngineeringClient
#
# # Feature Engineering Client
# fe = FeatureEngineeringClient()
#
# # Load Features from Unity Catalog
# training_set = fe.create_training_set(
#     df=labels_df,
#     feature_lookups=[
#         FeatureLookup(
#             table_name="production.ml.user_features",
#             lookup_key="user_id"
#         ),
#         FeatureLookup(
#             table_name="production.ml.item_features",
#             lookup_key="item_id"
#         ),
#     ],
#     label="target"
# )
# training_df = training_set.load_df()
#
# # Train Model
# mlflow.set_registry_uri("databricks-uc")
# with mlflow.start_run():
#     model = RandomForestClassifier(n_estimators=100)
#     model.fit(X_train, y_train)
#     mlflow.log_metric("accuracy", accuracy)
#     mlflow.log_metric("f1_score", f1)
#
#     # Log Model with Feature Engineering
#     fe.log_model(
#         model=model,
#         artifact_path="model",
#         flavor=mlflow.sklearn,
#         training_set=training_set,
#         registered_model_name="production.ml.churn_model"
#     )

from dataclasses import dataclass

@dataclass
class MLOpsStage:
    stage: str
    tool: str
    uc_feature: str
    code_example: str

stages = [
    MLOpsStage("Feature Engineering",
        "Feature Engineering Client",
        "production.ml.user_features (UC Table)",
        "fe.create_training_set(df, feature_lookups=[...])"),
    MLOpsStage("Experiment Tracking",
        "MLflow",
        "mlflow.log_metric() mlflow.log_param()",
        "with mlflow.start_run(): ..."),
    MLOpsStage("Model Registration",
        "UC Model Registry",
        "production.ml.churn_model (UC Model)",
        "fe.log_model(registered_model_name='catalog.schema.model')"),
    MLOpsStage("Model Alias",
        "MLflow Client",
        "Champion / Challenger / Archived",
        "client.set_registered_model_alias('model', 'Champion', 3)"),
    MLOpsStage("Model Serving",
        "Serving Endpoints",
        "Auto-scale GPU/CPU Endpoint",
        "POST /serving-endpoints/churn-model/invocations"),
]

print("=== MLOps Stages ===")
for s in stages:
    print(f"  [{s.stage}] Tool: {s.tool}")
    print(f"    UC: {s.uc_feature}")
    print(f"    Code: {s.code_example}")

CI/CD with DABs

# === Databricks Asset Bundles CI/CD ===

# databricks.yml
# bundle:
#   name: churn-model-pipeline
# workspace:
#   host: https://adb-123456.azuredatabricks.net
# resources:
#   jobs:
#     training_job:
#       name: "Churn Model Training"
#       tasks:
#         - task_key: train
#           notebook_task:
#             notebook_path: ./notebooks/train.py
#           existing_cluster_id: "0123-456789-abc"
#     inference_job:
#       name: "Churn Model Batch Inference"
#       tasks:
#         - task_key: inference
#           notebook_task:
#             notebook_path: ./notebooks/inference.py
# targets:
#   dev:
#     workspace:
#       host: https://adb-dev.azuredatabricks.net
#   staging:
#     workspace:
#       host: https://adb-staging.azuredatabricks.net
#   production:
#     workspace:
#       host: https://adb-prod.azuredatabricks.net

# GitHub Actions
# name: Deploy ML Pipeline
# on:
#   push:
#     branches: [main]
# jobs:
#   deploy:
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#       - uses: databricks/setup-cli@main
#       - run: databricks bundle validate -t production
#       - run: databricks bundle deploy -t production

@dataclass
class CICDStep:
    step: str
    trigger: str
    action: str
    rollback: str

cicd_steps = [
    CICDStep("Validate Bundle",
        "PR Created",
        "databricks bundle validate -t staging",
        "Block PR ถ้า Validation Fail"),
    CICDStep("Deploy to Staging",
        "PR Merged to main",
        "databricks bundle deploy -t staging",
        "databricks bundle destroy -t staging"),
    CICDStep("Run Training Job",
        "After Staging Deploy",
        "databricks jobs run-now --job-id $TRAINING_JOB",
        "ใช้ Previous Model Version"),
    CICDStep("Model Validation",
        "After Training Complete",
        "Compare Metrics กับ Champion Model",
        "ไม่ Promote ถ้า Metrics ไม่ดีกว่า"),
    CICDStep("Promote to Production",
        "Validation Pass",
        "set_registered_model_alias('Champion', new_version)",
        "set_registered_model_alias('Champion', old_version)"),
]

print("=== CI/CD Pipeline ===")
for s in cicd_steps:
    print(f"  [{s.step}] Trigger: {s.trigger}")
    print(f"    Action: {s.action}")
    print(f"    Rollback: {s.rollback}")

Monitoring & Governance

# === Lakehouse Monitoring ===

@dataclass
class MonitorMetric:
    metric: str
    source: str
    threshold: str
    action: str

monitor_metrics = [
    MonitorMetric("Model Accuracy",
        "Lakehouse Monitoring (Inference Table)",
        "Drop > 5% จาก Baseline",
        "Alert + Trigger Retraining Job"),
    MonitorMetric("Data Drift (PSI)",
        "Lakehouse Monitoring (Feature Table)",
        "PSI > 0.2 (Significant Drift)",
        "Alert + Review Feature Pipeline"),
    MonitorMetric("Prediction Drift",
        "Inference Log Table",
        "Distribution Change > 10%",
        "Alert + Compare with Ground Truth"),
    MonitorMetric("Latency (Serving)",
        "Serving Endpoint Metrics",
        "P99 > 200ms",
        "Scale Up Endpoint หรือ Optimize Model"),
    MonitorMetric("Data Quality",
        "Lakehouse Monitoring (Source Table)",
        "Null Rate > 5% Schema Change",
        "Alert + Block Pipeline"),
    MonitorMetric("Access Audit",
        "Unity Catalog Audit Log",
        "Unauthorized Access Attempt",
        "Alert Security Team"),
]

print("=== Monitoring Metrics ===")
for m in monitor_metrics:
    print(f"  [{m.metric}] Source: {m.source}")
    print(f"    Threshold: {m.threshold}")
    print(f"    Action: {m.action}")

เคล็ดลับ

Unity Catalog คืออะไร

Unified Governance Databricks Catalog.Schema.Table Access Control Lineage Audit Delta Sharing Cross-workspace Model Registry Feature Store

MLOps Workflow เป็นอย่างไร

Feature Store Training MLflow Registry Alias Champion Challenger Serving Endpoint Batch Inference CI/CD DABs Monitoring Drift

Model Registry ใช้อย่างไร

register_model UC Namespace Version Alias Champion Challenger Lineage Access Control EXECUTE MANAGE Deploy Serving Rollback Alias

CI/CD ทำอย่างไร

DABs databricks.yml GitHub Actions Validate Deploy Staging Training Validation Promote Champion Production Rollback Alias Version

สรุป

Databricks Unity Catalog MLOps Model Registry Feature Store MLflow DABs CI/CD Lakehouse Monitoring Governance Production

📖 บทความที่เกี่ยวข้อง

Databricks Unity Catalog DevSecOps Integrationอ่านบทความ → Databricks Unity Catalog อ่านบทความ → Databricks Unity Catalog Network Segmentationอ่านบทความ → Databricks Unity Catalog Disaster Recovery Planอ่านบทความ → Databricks Unity Catalog DevOps Cultureอ่านบทความ →

📚 ดูบทความทั้งหมด →