SiamCafe · Blog
MLOps Pipeline MLOps Workflow — สร้าง ML
บทความ

MLOps Pipeline MLOps Workflow — สร้าง ML

เผยแพร่ 28 พฤษภาคม 2569

MLOps Pipeline

MLOps Pipeline Workflow Data Pipeline Model Training Experiment Tracking Registry CI/CD Deployment Monitoring Retraining Feature Engineering Production

StageToolInputOutputAutomation
Data IngestionAirflow / DagsterRaw data sourcesClean datasetScheduled daily
Feature EngineeringFeast / CustomClean datasetFeature vectorsPipeline step
TrainingPyTorch / sklearnFeatures + labelsTrained modelTriggered
EvaluationMLflow / customModel + test dataMetrics reportAuto after train
RegistryMLflow RegistryBest modelVersioned modelAuto promote
DeploymentBentoML / SeldonRegistry modelAPI endpointCI/CD pipeline
MonitoringEvidently / WhyLabsPredictions + actualsDrift alertsContinuous

Pipeline Implementation

# === MLOps Pipeline with MLflow ===

# pip install mlflow scikit-learn pandas

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from dataclasses import dataclass
import numpy as np

# mlflow.set_tracking_uri("http://mlflow-server:5000")
# mlflow.set_experiment("churn-prediction")

@dataclass
class ExperimentConfig:
    name: str
    model_type: str
    hyperparams: dict
    dataset_version: str
    feature_count: int

configs = [
    ExperimentConfig("baseline", "RandomForest",
        {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5},
        "v2.1", 25),
    ExperimentConfig("tuned_v1", "RandomForest",
        {"n_estimators": 500, "max_depth": 20, "min_samples_split": 2},
        "v2.1", 25),
    ExperimentConfig("feature_v2", "RandomForest",
        {"n_estimators": 300, "max_depth": 15, "min_samples_split": 3},
        "v2.2", 35),
]

# Training Pipeline
# def train_pipeline(config):
#     with mlflow.start_run(run_name=config.name):
#         # Log parameters
#         mlflow.log_params(config.hyperparams)
#         mlflow.log_param("dataset_version", config.dataset_version)
#         mlflow.log_param("feature_count", config.feature_count)
#
#         # Load and split data
#         X_train, X_test, y_train, y_test = load_data(config.dataset_version)
#
#         # Train model
#         model = RandomForestClassifier(**config.hyperparams)
#         model.fit(X_train, y_train)
#
#         # Evaluate
#         y_pred = model.predict(X_test)
#         metrics = {
#             "accuracy": accuracy_score(y_test, y_pred),
#             "f1": f1_score(y_test, y_pred),
#             "precision": precision_score(y_test, y_pred),
#             "recall": recall_score(y_test, y_pred),
#         }
#         mlflow.log_metrics(metrics)
#
#         # Log model
#         mlflow.sklearn.log_model(model, "model",
#             registered_model_name="churn-predictor")
#
#         return metrics

print("=== Experiment Configs ===")
for c in configs:
    print(f"  [{c.name}] Model: {c.model_type}")
    print(f"    Params: {c.hyperparams}")
    print(f"    Dataset: {c.dataset_version} | Features: {c.feature_count}")

Model Serving

# === Model Serving with BentoML ===

# bentofile.yaml
# service: "service:svc"
# include:
#   - "*.py"
# python:
#   packages:
#     - scikit-learn
#     - mlflow
#     - pandas
# docker:
#   base_image: python:3.11-slim

# service.py
# import bentoml
# import mlflow
# import pandas as pd
#
# model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
#
# svc = bentoml.Service("churn-predictor")
#
# @svc.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
# def predict(input_data: dict) -> dict:
#     df = pd.DataFrame([input_data])
#     prediction = model.predict(df)[0]
#     probability = model.predict_proba(df)[0].max()
#     return {
#         "prediction": int(prediction),
#         "probability": float(probability),
#         "model_version": "v2.1",
#     }

# Kubernetes Deployment
# apiVersion: apps/v1
# kind: Deployment
# metadata:
#   name: churn-predictor
# spec:
#   replicas: 3
#   selector:
#     matchLabels:
#       app: churn-predictor
#   template:
#     spec:
#       containers:
#         - name: model
#           image: registry.io/churn-predictor:v2.1
#           ports:
#             - containerPort: 3000
#           resources:
#             requests: { cpu: "500m", memory: "1Gi" }
#             limits: { cpu: "2", memory: "4Gi" }
#           readinessProbe:
#             httpGet: { path: /healthz, port: 3000 }

@dataclass
class ServingConfig:
    method: str
    tool: str
    latency: str
    throughput: str
    use_case: str

methods = [
    ServingConfig("REST API", "BentoML / FastAPI", "10-50ms", "100-1000 req/s", "Real-time prediction"),
    ServingConfig("gRPC", "Seldon Core / Triton", "5-20ms", "500-5000 req/s", "High-throughput"),
    ServingConfig("Batch", "Spark / Airflow", "Minutes-hours", "Millions/batch", "Offline scoring"),
    ServingConfig("Streaming", "Kafka + Flink", "50-200ms", "10K-100K events/s", "Real-time events"),
    ServingConfig("Edge", "ONNX Runtime / TFLite", "1-10ms", "Device-limited", "Mobile/IoT"),
]

print("\n=== Serving Methods ===")
for s in methods:
    print(f"  [{s.method}] Tool: {s.tool}")
    print(f"    Latency: {s.latency} | Throughput: {s.throughput}")
    print(f"    Use case: {s.use_case}")

Monitoring and Retraining

# === Model Monitoring ===

# Evidently AI — Data Drift Detection
# from evidently.report import Report
# from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
#
# report = Report(metrics=[
#     DataDriftPreset(),
#     TargetDriftPreset(),
# ])
# report.run(reference_data=train_df, current_data=prod_df)
# report.save_html("drift_report.html")

# Monitoring Alerts
# if drift_score > 0.3:
#     trigger_retraining_pipeline()
#     send_alert("Data drift detected", drift_score)

@dataclass
class MonitorMetric:
    metric: str
    threshold: str
    action: str
    frequency: str

monitors = [
    MonitorMetric("Prediction Accuracy", "< 85% (5% drop from baseline)",
        "Alert team + trigger retraining", "Daily"),
    MonitorMetric("Data Drift Score", "> 0.3 (Evidently drift score)",
        "Investigate features + retrain if needed", "Daily"),
    MonitorMetric("Feature Distribution", "KS test p-value < 0.05",
        "Check data pipeline + source changes", "Daily"),
    MonitorMetric("Prediction Latency", "> 100ms p99",
        "Scale up replicas or optimize model", "Real-time"),
    MonitorMetric("Error Rate", "> 1% of predictions",
        "Rollback to previous model version", "Real-time"),
    MonitorMetric("Model Staleness", "> 30 days since last train",
        "Schedule retraining regardless of metrics", "Weekly check"),
]

print("Monitoring Metrics:")
for m in monitors:
    print(f"  [{m.metric}] Threshold: {m.threshold}")
    print(f"    Action: {m.action}")
    print(f"    Frequency: {m.frequency}")

maturity = {
    "Level 0 — Manual": "Jupyter notebooks, manual deploy, no monitoring",
    "Level 1 — Pipeline": "Automated training pipeline, manual deploy",
    "Level 2 — CI/CD": "Automated testing + deployment, basic monitoring",
    "Level 3 — Auto Retrain": "Drift detection triggers retraining automatically",
    "Level 4 — Full Auto": "End-to-end automation with feedback loops",
}

print(f"\n\nMLOps Maturity:")
for k, v in maturity.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • MLflow: ใช้ MLflow Track ทุก Experiment ตั้งแต่วันแรก
  • Version: Version Control ทั้ง Code Data Model
  • Monitor: ติดตาม Data Drift และ Model Performance ใน Production
  • Automate: เริ่มจาก Level 1 แล้วค่อยๆเพิ่ม Automation
  • Test: เขียน Unit Test สำหรับ Data Pipeline และ Feature Engineering

MLOps คืออะไร

ML Engineering DevOps Data Engineering Deploy Production Lifecycle Data Feature Training Evaluation Deployment Monitoring Retraining Automation Version Control