MLOps Pipeline MLOps Workflow — สร้าง ML
MLOps Pipeline
MLOps Pipeline Workflow Data Pipeline Model Training Experiment Tracking Registry CI/CD Deployment Monitoring Retraining Feature Engineering Production
| Stage | Tool | Input | Output | Automation |
|---|---|---|---|---|
| Data Ingestion | Airflow / Dagster | Raw data sources | Clean dataset | Scheduled daily |
| Feature Engineering | Feast / Custom | Clean dataset | Feature vectors | Pipeline step |
| Training | PyTorch / sklearn | Features + labels | Trained model | Triggered |
| Evaluation | MLflow / custom | Model + test data | Metrics report | Auto after train |
| Registry | MLflow Registry | Best model | Versioned model | Auto promote |
| Deployment | BentoML / Seldon | Registry model | API endpoint | CI/CD pipeline |
| Monitoring | Evidently / WhyLabs | Predictions + actuals | Drift alerts | Continuous |
Pipeline Implementation
# === MLOps Pipeline with MLflow ===
# pip install mlflow scikit-learn pandas
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from dataclasses import dataclass
import numpy as np
# mlflow.set_tracking_uri("http://mlflow-server:5000")
# mlflow.set_experiment("churn-prediction")
@dataclass
class ExperimentConfig:
name: str
model_type: str
hyperparams: dict
dataset_version: str
feature_count: int
configs = [
ExperimentConfig("baseline", "RandomForest",
{"n_estimators": 100, "max_depth": 10, "min_samples_split": 5},
"v2.1", 25),
ExperimentConfig("tuned_v1", "RandomForest",
{"n_estimators": 500, "max_depth": 20, "min_samples_split": 2},
"v2.1", 25),
ExperimentConfig("feature_v2", "RandomForest",
{"n_estimators": 300, "max_depth": 15, "min_samples_split": 3},
"v2.2", 35),
]
# Training Pipeline
# def train_pipeline(config):
# with mlflow.start_run(run_name=config.name):
# # Log parameters
# mlflow.log_params(config.hyperparams)
# mlflow.log_param("dataset_version", config.dataset_version)
# mlflow.log_param("feature_count", config.feature_count)
#
# # Load and split data
# X_train, X_test, y_train, y_test = load_data(config.dataset_version)
#
# # Train model
# model = RandomForestClassifier(**config.hyperparams)
# model.fit(X_train, y_train)
#
# # Evaluate
# y_pred = model.predict(X_test)
# metrics = {
# "accuracy": accuracy_score(y_test, y_pred),
# "f1": f1_score(y_test, y_pred),
# "precision": precision_score(y_test, y_pred),
# "recall": recall_score(y_test, y_pred),
# }
# mlflow.log_metrics(metrics)
#
# # Log model
# mlflow.sklearn.log_model(model, "model",
# registered_model_name="churn-predictor")
#
# return metrics
print("=== Experiment Configs ===")
for c in configs:
print(f" [{c.name}] Model: {c.model_type}")
print(f" Params: {c.hyperparams}")
print(f" Dataset: {c.dataset_version} | Features: {c.feature_count}")
Model Serving
# === Model Serving with BentoML ===
# bentofile.yaml
# service: "service:svc"
# include:
# - "*.py"
# python:
# packages:
# - scikit-learn
# - mlflow
# - pandas
# docker:
# base_image: python:3.11-slim
# service.py
# import bentoml
# import mlflow
# import pandas as pd
#
# model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
#
# svc = bentoml.Service("churn-predictor")
#
# @svc.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
# def predict(input_data: dict) -> dict:
# df = pd.DataFrame([input_data])
# prediction = model.predict(df)[0]
# probability = model.predict_proba(df)[0].max()
# return {
# "prediction": int(prediction),
# "probability": float(probability),
# "model_version": "v2.1",
# }
# Kubernetes Deployment
# apiVersion: apps/v1
# kind: Deployment
# metadata:
# name: churn-predictor
# spec:
# replicas: 3
# selector:
# matchLabels:
# app: churn-predictor
# template:
# spec:
# containers:
# - name: model
# image: registry.io/churn-predictor:v2.1
# ports:
# - containerPort: 3000
# resources:
# requests: { cpu: "500m", memory: "1Gi" }
# limits: { cpu: "2", memory: "4Gi" }
# readinessProbe:
# httpGet: { path: /healthz, port: 3000 }
@dataclass
class ServingConfig:
method: str
tool: str
latency: str
throughput: str
use_case: str
methods = [
ServingConfig("REST API", "BentoML / FastAPI", "10-50ms", "100-1000 req/s", "Real-time prediction"),
ServingConfig("gRPC", "Seldon Core / Triton", "5-20ms", "500-5000 req/s", "High-throughput"),
ServingConfig("Batch", "Spark / Airflow", "Minutes-hours", "Millions/batch", "Offline scoring"),
ServingConfig("Streaming", "Kafka + Flink", "50-200ms", "10K-100K events/s", "Real-time events"),
ServingConfig("Edge", "ONNX Runtime / TFLite", "1-10ms", "Device-limited", "Mobile/IoT"),
]
print("\n=== Serving Methods ===")
for s in methods:
print(f" [{s.method}] Tool: {s.tool}")
print(f" Latency: {s.latency} | Throughput: {s.throughput}")
print(f" Use case: {s.use_case}")
Monitoring and Retraining
# === Model Monitoring ===
# Evidently AI — Data Drift Detection
# from evidently.report import Report
# from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
#
# report = Report(metrics=[
# DataDriftPreset(),
# TargetDriftPreset(),
# ])
# report.run(reference_data=train_df, current_data=prod_df)
# report.save_html("drift_report.html")
# Monitoring Alerts
# if drift_score > 0.3:
# trigger_retraining_pipeline()
# send_alert("Data drift detected", drift_score)
@dataclass
class MonitorMetric:
metric: str
threshold: str
action: str
frequency: str
monitors = [
MonitorMetric("Prediction Accuracy", "< 85% (5% drop from baseline)",
"Alert team + trigger retraining", "Daily"),
MonitorMetric("Data Drift Score", "> 0.3 (Evidently drift score)",
"Investigate features + retrain if needed", "Daily"),
MonitorMetric("Feature Distribution", "KS test p-value < 0.05",
"Check data pipeline + source changes", "Daily"),
MonitorMetric("Prediction Latency", "> 100ms p99",
"Scale up replicas or optimize model", "Real-time"),
MonitorMetric("Error Rate", "> 1% of predictions",
"Rollback to previous model version", "Real-time"),
MonitorMetric("Model Staleness", "> 30 days since last train",
"Schedule retraining regardless of metrics", "Weekly check"),
]
print("Monitoring Metrics:")
for m in monitors:
print(f" [{m.metric}] Threshold: {m.threshold}")
print(f" Action: {m.action}")
print(f" Frequency: {m.frequency}")
maturity = {
"Level 0 — Manual": "Jupyter notebooks, manual deploy, no monitoring",
"Level 1 — Pipeline": "Automated training pipeline, manual deploy",
"Level 2 — CI/CD": "Automated testing + deployment, basic monitoring",
"Level 3 — Auto Retrain": "Drift detection triggers retraining automatically",
"Level 4 — Full Auto": "End-to-end automation with feedback loops",
}
print(f"\n\nMLOps Maturity:")
for k, v in maturity.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- MLflow: ใช้ MLflow Track ทุก Experiment ตั้งแต่วันแรก
- Version: Version Control ทั้ง Code Data Model
- Monitor: ติดตาม Data Drift และ Model Performance ใน Production
- Automate: เริ่มจาก Level 1 แล้วค่อยๆเพิ่ม Automation
- Test: เขียน Unit Test สำหรับ Data Pipeline และ Feature Engineering
MLOps คืออะไร
ML Engineering DevOps Data Engineering Deploy Production Lifecycle Data Feature Training Evaluation Deployment Monitoring Retraining Automation Version Control