MLOps Pipeline
MLOps Pipeline Workflow Data Pipeline Model Training Experiment Tracking Registry CI/CD Deployment Monitoring Retraining Feature Engineering Production
| Stage | Tool | Input | Output | Automation |
|---|---|---|---|---|
| Data Ingestion | Airflow / Dagster | Raw data sources | Clean dataset | Scheduled daily |
| Feature Engineering | Feast / Custom | Clean dataset | Feature vectors | Pipeline step |
| Training | PyTorch / sklearn | Features + labels | Trained model | Triggered |
| Evaluation | MLflow / custom | Model + test data | Metrics report | Auto after train |
| Registry | MLflow Registry | Best model | Versioned model | Auto promote |
| Deployment | BentoML / Seldon | Registry model | API endpoint | CI/CD pipeline |
| Monitoring | Evidently / WhyLabs | Predictions + actuals | Drift alerts | Continuous |
Pipeline Implementation
# === MLOps Pipeline with MLflow ===
# pip install mlflow scikit-learn pandas
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from dataclasses import dataclass
import numpy as np
# mlflow.set_tracking_uri("http://mlflow-server:5000")
# mlflow.set_experiment("churn-prediction")
@dataclass
class ExperimentConfig:
name: str
model_type: str
hyperparams: dict
dataset_version: str
feature_count: int
configs = [
ExperimentConfig("baseline", "RandomForest",
{"n_estimators": 100, "max_depth": 10, "min_samples_split": 5},
"v2.1", 25),
ExperimentConfig("tuned_v1", "RandomForest",
{"n_estimators": 500, "max_depth": 20, "min_samples_split": 2},
"v2.1", 25),
ExperimentConfig("feature_v2", "RandomForest",
{"n_estimators": 300, "max_depth": 15, "min_samples_split": 3},
"v2.2", 35),
]
# Training Pipeline
# def train_pipeline(config):
# with mlflow.start_run(run_name=config.name):
# # Log parameters
# mlflow.log_params(config.hyperparams)
# mlflow.log_param("dataset_version", config.dataset_version)
# mlflow.log_param("feature_count", config.feature_count)
#
# # Load and split data
# X_train, X_test, y_train, y_test = load_data(config.dataset_version)
#
# # Train model
# model = RandomForestClassifier(**config.hyperparams)
# model.fit(X_train, y_train)
#
# # Evaluate
# y_pred = model.predict(X_test)
# metrics = {
# "accuracy": accuracy_score(y_test, y_pred),
# "f1": f1_score(y_test, y_pred),
# "precision": precision_score(y_test, y_pred),
# "recall": recall_score(y_test, y_pred),
# }
# mlflow.log_metrics(metrics)
#
# # Log model
# mlflow.sklearn.log_model(model, "model",
# registered_model_name="churn-predictor")
#
# return metrics
print("=== Experiment Configs ===")
for c in configs:
print(f" [{c.name}] Model: {c.model_type}")
print(f" Params: {c.hyperparams}")
print(f" Dataset: {c.dataset_version} | Features: {c.feature_count}")
Model Serving
# === Model Serving with BentoML ===
# bentofile.yaml
# service: "service:svc"
# include:
# - "*.py"
# python:
# packages:
# - scikit-learn
# - mlflow
# - pandas
# docker:
# base_image: python:3.11-slim
# service.py
# import bentoml
# import mlflow
# import pandas as pd
#
# model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
#
# svc = bentoml.Service("churn-predictor")
#
# @svc.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
# def predict(input_data: dict) -> dict:
# df = pd.DataFrame([input_data])
# prediction = model.predict(df)[0]
# probability = model.predict_proba(df)[0].max()
# return {
# "prediction": int(prediction),
# "probability": float(probability),
# "model_version": "v2.1",
# }
# Kubernetes Deployment
# apiVersion: apps/v1
# kind: Deployment
# metadata:
# name: churn-predictor
# spec:
# replicas: 3
# selector:
# matchLabels:
# app: churn-predictor
# template:
# spec:
# containers:
# - name: model
# image: registry.io/churn-predictor:v2.1
# ports:
# - containerPort: 3000
# resources:
# requests: { cpu: "500m", memory: "1Gi" }
# limits: { cpu: "2", memory: "4Gi" }
# readinessProbe:
# httpGet: { path: /healthz, port: 3000 }
@dataclass
class ServingConfig:
method: str
tool: str
latency: str
throughput: str
use_case: str
methods = [
ServingConfig("REST API", "BentoML / FastAPI", "10-50ms", "100-1000 req/s", "Real-time prediction"),
ServingConfig("gRPC", "Seldon Core / Triton", "5-20ms", "500-5000 req/s", "High-throughput"),
ServingConfig("Batch", "Spark / Airflow", "Minutes-hours", "Millions/batch", "Offline scoring"),
ServingConfig("Streaming", "Kafka + Flink", "50-200ms", "10K-100K events/s", "Real-time events"),
ServingConfig("Edge", "ONNX Runtime / TFLite", "1-10ms", "Device-limited", "Mobile/IoT"),
]
print("\n=== Serving Methods ===")
for s in methods:
print(f" [{s.method}] Tool: {s.tool}")
print(f" Latency: {s.latency} | Throughput: {s.throughput}")
print(f" Use case: {s.use_case}")
Monitoring and Retraining
# === Model Monitoring ===
# Evidently AI — Data Drift Detection
# from evidently.report import Report
# from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
#
# report = Report(metrics=[
# DataDriftPreset(),
# TargetDriftPreset(),
# ])
# report.run(reference_data=train_df, current_data=prod_df)
# report.save_html("drift_report.html")
# Monitoring Alerts
# if drift_score > 0.3:
# trigger_retraining_pipeline()
# send_alert("Data drift detected", drift_score)
@dataclass
class MonitorMetric:
metric: str
threshold: str
action: str
frequency: str
monitors = [
MonitorMetric("Prediction Accuracy", "< 85% (5% drop from baseline)",
"Alert team + trigger retraining", "Daily"),
MonitorMetric("Data Drift Score", "> 0.3 (Evidently drift score)",
"Investigate features + retrain if needed", "Daily"),
MonitorMetric("Feature Distribution", "KS test p-value < 0.05",
"Check data pipeline + source changes", "Daily"),
MonitorMetric("Prediction Latency", "> 100ms p99",
"Scale up replicas or optimize model", "Real-time"),
MonitorMetric("Error Rate", "> 1% of predictions",
"Rollback to previous model version", "Real-time"),
MonitorMetric("Model Staleness", "> 30 days since last train",
"Schedule retraining regardless of metrics", "Weekly check"),
]
print("Monitoring Metrics:")
for m in monitors:
print(f" [{m.metric}] Threshold: {m.threshold}")
print(f" Action: {m.action}")
print(f" Frequency: {m.frequency}")
maturity = {
"Level 0 — Manual": "Jupyter notebooks, manual deploy, no monitoring",
"Level 1 — Pipeline": "Automated training pipeline, manual deploy",
"Level 2 — CI/CD": "Automated testing + deployment, basic monitoring",
"Level 3 — Auto Retrain": "Drift detection triggers retraining automatically",
"Level 4 — Full Auto": "End-to-end automation with feedback loops",
}
print(f"\n\nMLOps Maturity:")
for k, v in maturity.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- MLflow: ใช้ MLflow Track ทุก Experiment ตั้งแต่วันแรก
- Version: Version Control ทั้ง Code Data Model
- Monitor: ติดตาม Data Drift และ Model Performance ใน Production
- Automate: เริ่มจาก Level 1 แล้วค่อยๆเพิ่ม Automation
- Test: เขียน Unit Test สำหรับ Data Pipeline และ Feature Engineering
MLOps คืออะไร
ML Engineering DevOps Data Engineering Deploy Production Lifecycle Data Feature Training Evaluation Deployment Monitoring Retraining Automation Version Control
MLOps Workflow มีขั้นตอนอะไรบ้าง
Data Ingestion Validation Feature Engineering Training Evaluation Registry Deployment Monitoring Retraining Pipeline Automation CI/CD
ใช้เครื่องมืออะไรบ้าง
MLflow W&B Feast Airflow Dagster PyTorch sklearn BentoML Seldon Evidently WhyLabs GitHub Actions DVC Model Registry
วัดผล MLOps Maturity อย่างไร
Level 0 Manual Level 1 Pipeline Level 2 CI/CD Level 3 Auto Retrain Level 4 Full Automation Monitoring Alert Feedback Loop เป้าหมาย Level 2-3
สรุป
MLOps Pipeline Workflow MLflow BentoML Experiment Tracking Model Registry CI/CD Deployment Monitoring Evidently Data Drift Retraining Maturity Production
