ai

Databricks Unity Catalog MLOps Workflow — จัดการ

Databricks Unity Catalog MLOps Workflow — จัดการ

Databricks Unity Catalog MLOps

Databricks Unity Catalog MLOps Workflow — จัดการ

Databricks Unity Catalog MLOps Model Registry Feature Store MLflow CI/CD DABs Data Governance Lineage Access Control Production

ComponentToolUnity Catalog FeatureStage
Data GovernanceUnity CatalogAccess Control Audit Lineageทุก Stage
Feature StoreUC Feature TablesFeature Lookup Online StoreTraining + Inference
Experiment TrackingMLflowRun Metrics ParametersTraining
Model RegistryUC Model RegistryVersion Alias LineageRegistration
Model ServingServing EndpointsReal-time InferenceDeployment
CI/CDDABs + GitHub ActionsBundle DeployAutomation
MonitoringLakehouse MonitoringData Drift Model QualityProduction

Model Training & Registry

=== MLOps with Unity Catalog ===

Databricks Notebook

import mlflow

from mlflow.models import infer_signature

from sklearn.ensemble import RandomForestClassifier

from databricks.feature_engineering import FeatureEngineeringClient

# Feature Engineering Client

fe = FeatureEngineeringClient()

# Load Features from Unity Catalog

training_set = fe.create_training_set(

df=labels_df,

feature_lookups=[

FeatureLookup(

table_name="production.ml.user_features",

lookup_key="user_id"

),

FeatureLookup(

table_name="production.ml.item_features",

lookup_key="item_id"

),

],

label="target"

)

training_df = training_set.load_df()

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Immutable OS Fedora CoreOS Infrastructure as Code

# Train Model

mlflow.set_registry_uri("databricks-uc")

with mlflow.start_run():

model = RandomForestClassifier(n_estimators=100)

model.fit(X_train, y_train)

mlflow.log_metric("accuracy", accuracy)

mlflow.log_metric("f1_score", f1)

# Log Model with Feature Engineering

แนะนำเพิ่มเติม — XM Signal

fe.log_model(

model=model,

artifact_path="model",

flavor=mlflow.sklearn,

training_set=training_set,

registered_model_name="production.ml.churn_model"

)

from dataclasses import dataclass

@dataclass

class MLOpsStage:

stage: str

tool: str

uc_feature: str

code_example: str

stages = [

MLOpsStage("Feature Engineering",

"Feature Engineering Client",

"production.ml.user_features (UC Table)",

เนื้อหาเกี่ยวข้อง — nguyên tắc đầu tư vàng

"fe.create_training_set(df, feature_lookups=[...])"),

MLOpsStage("Experiment Tracking",

"MLflow",

"mlflow.log_metric() mlflow.log_param()",

"with mlflow.start_run(): ..."),

MLOpsStage("Model Registration",

"UC Model Registry",

"production.ml.churn_model (UC Model)",

"fe.log_model(registered_model_name='catalog.schema.model')"),

MLOpsStage("Model Alias",

"MLflow Client",

"Champion / Challenger / Archived",

"client.set_registered_model_alias('model', 'Champion', 3)"),

แนะนำเพิ่มเติม — SiamCafeBook

MLOpsStage("Model Serving",

"Serving Endpoints",

"Auto-scale GPU/CPU Endpoint",

"POST /serving-endpoints/churn-model/invocations"),

]

print("=== MLOps Stages ===")

for s in stages:

print(f" [{s.stage}] Tool: {s.tool}")

print(f" UC: {s.uc_feature}")

print(f" Code: {s.code_example}")

CI/CD with DABs

=== Databricks Asset Bundles CI/CD ===

databricks.yml

เนื้อหาเกี่ยวข้อง — Python Click CLI Container Orchestration

bundle:

name: churn-model-pipeline

workspace:

host: https://adb-123456.azuredatabricks.net

resources:

jobs:

training_job:

name: "Churn Model Training"

tasks:

  • task_key: train

notebook_task:

notebook_path: ./notebooks/train.py

existing_cluster_id: "0123-456789-abc"

inference_job:

name: "Churn Model Batch Inference"

tasks:

  • task_key: inference

notebook_task:

notebook_path: ./notebooks/inference.py

targets:

Databricks Unity Catalog MLOps Workflow — จัดการ

dev:

workspace:

host: https://adb-dev.azuredatabricks.net

staging:

workspace:

host: https://adb-staging.azuredatabricks.net

production:

workspace:

host: https://adb-prod.azuredatabricks.net

GitHub Actions

name: Deploy ML Pipeline

on:

push:

branches: [main]

jobs:

deploy:

runs-on: ubuntu-latest

steps:

  • uses: actions/checkout@v4
  • uses: databricks/setup-cli@main
  • run: databricks bundle validate -t production
  • run: databricks bundle deploy -t production

@dataclass

class CICDStep:

step: str

trigger: str

action: str

rollback: str

cicd_steps = [

CICDStep("Validate Bundle",

"PR Created",

"databricks bundle validate -t staging",

"Block PR ถ้า Validation Fail"),

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Gaming PC Build — วิธีประกอบคอมเล่นเกม

CICDStep("Deploy to Staging",

"PR Merged to main",

"databricks bundle deploy -t staging",

"databricks bundle destroy -t staging"),

CICDStep("Run Training Job",

"After Staging Deploy",

"databricks jobs run-now --job-id $TRAINING_JOB",

"ใช้ Previous Model Version"),

CICDStep("Model Validation",

"After Training Complete",

"Compare Metrics กับ Champion Model",

"ไม่ Promote ถ้า Metrics ไม่ดีกว่า"),

CICDStep("Promote to Production",

"Validation Pass",

"set_registered_model_alias('Champion', new_version)",

"set_registered_model_alias('Champion', old_version)"),

]

print("=== CI/CD Pipeline ===")

for s in cicd_steps:

print(f" [{s.step}] Trigger: {s.trigger}")

print(f" Action: {s.action}")

print(f" Rollback: {s.rollback}")

Monitoring & Governance

# === Lakehouse Monitoring ===



@dataclass

class MonitorMetric:

    metric: str

    source: str

    threshold: str

    action: str



monitor_metrics = [

    MonitorMetric("Model Accuracy",

        "Lakehouse Monitoring (Inference Table)",

        "Drop > 5% จาก Baseline",

        "Alert + Trigger Retraining Job"),

    MonitorMetric("Data Drift (PSI)",

        "Lakehouse Monitoring (Feature Table)",

        "PSI > 0.2 (Significant Drift)",

        "Alert + Review Feature Pipeline"),

    MonitorMetric("Prediction Drift",

        "Inference Log Table",

        "Distribution Change > 10%",

        "Alert + Compare with Ground Truth"),

    MonitorMetric("Latency (Serving)",

        "Serving Endpoint Metrics",

        "P99 > 200ms",

        "Scale Up Endpoint หรือ Optimize Model"),

    MonitorMetric("Data Quality",

        "Lakehouse Monitoring (Source Table)",

        "Null Rate > 5% Schema Change",

        "Alert + Block Pipeline"),

    MonitorMetric("Access Audit",

        "Unity Catalog Audit Log",

        "Unauthorized Access Attempt",

        "Alert Security Team"),

]



print("=== Monitoring Metrics ===")

for m in monitor_metrics:

    print(f"  [{m.metric}] Source: {m.source}")

    print(f"    Threshold: {m.threshold}")

    print(f"    Action: {m.action}")

เคล็ดลับ

  • UC: ใช้ Unity Catalog เป็น Single Source of Truth สำหรับ Data + Model
  • Alias: ใช้ Champion/Challenger Alias แทน Stage Transition
  • DABs: ใช้ Databricks Asset Bundles สำหรับ IaC CI/CD
  • Feature Store: ใช้ UC Feature Store สำหรับ Consistent Features
  • Monitor: ตั้ง Lakehouse Monitoring ตรวจ Drift ทุกวัน

Unity Catalog คืออะไร

Unified Governance Databricks Catalog.Schema.Table Access Control Lineage Audit Delta Sharing Cross-workspace Model Registry Feature Store

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง