SiamCafe.net Blog
Technology

MLOps Pipeline MLOps Workflow

mlops pipeline mlops workflow
MLOps Pipeline MLOps Workflow | SiamCafe Blog
2025-08-17· อ. บอม — SiamCafe.net· 10,141 คำ

MLOps Pipeline

MLOps Pipeline Workflow Data Pipeline Model Training Experiment Tracking Registry CI/CD Deployment Monitoring Retraining Feature Engineering Production

StageToolInputOutputAutomation
Data IngestionAirflow / DagsterRaw data sourcesClean datasetScheduled daily
Feature EngineeringFeast / CustomClean datasetFeature vectorsPipeline step
TrainingPyTorch / sklearnFeatures + labelsTrained modelTriggered
EvaluationMLflow / customModel + test dataMetrics reportAuto after train
RegistryMLflow RegistryBest modelVersioned modelAuto promote
DeploymentBentoML / SeldonRegistry modelAPI endpointCI/CD pipeline
MonitoringEvidently / WhyLabsPredictions + actualsDrift alertsContinuous

Pipeline Implementation

# === MLOps Pipeline with MLflow ===

# pip install mlflow scikit-learn pandas

import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from dataclasses import dataclass
import numpy as np

# mlflow.set_tracking_uri("http://mlflow-server:5000")
# mlflow.set_experiment("churn-prediction")

@dataclass
class ExperimentConfig:
    name: str
    model_type: str
    hyperparams: dict
    dataset_version: str
    feature_count: int

configs = [
    ExperimentConfig("baseline", "RandomForest",
        {"n_estimators": 100, "max_depth": 10, "min_samples_split": 5},
        "v2.1", 25),
    ExperimentConfig("tuned_v1", "RandomForest",
        {"n_estimators": 500, "max_depth": 20, "min_samples_split": 2},
        "v2.1", 25),
    ExperimentConfig("feature_v2", "RandomForest",
        {"n_estimators": 300, "max_depth": 15, "min_samples_split": 3},
        "v2.2", 35),
]

# Training Pipeline
# def train_pipeline(config):
#     with mlflow.start_run(run_name=config.name):
#         # Log parameters
#         mlflow.log_params(config.hyperparams)
#         mlflow.log_param("dataset_version", config.dataset_version)
#         mlflow.log_param("feature_count", config.feature_count)
#
#         # Load and split data
#         X_train, X_test, y_train, y_test = load_data(config.dataset_version)
#
#         # Train model
#         model = RandomForestClassifier(**config.hyperparams)
#         model.fit(X_train, y_train)
#
#         # Evaluate
#         y_pred = model.predict(X_test)
#         metrics = {
#             "accuracy": accuracy_score(y_test, y_pred),
#             "f1": f1_score(y_test, y_pred),
#             "precision": precision_score(y_test, y_pred),
#             "recall": recall_score(y_test, y_pred),
#         }
#         mlflow.log_metrics(metrics)
#
#         # Log model
#         mlflow.sklearn.log_model(model, "model",
#             registered_model_name="churn-predictor")
#
#         return metrics

print("=== Experiment Configs ===")
for c in configs:
    print(f"  [{c.name}] Model: {c.model_type}")
    print(f"    Params: {c.hyperparams}")
    print(f"    Dataset: {c.dataset_version} | Features: {c.feature_count}")

Model Serving

# === Model Serving with BentoML ===

# bentofile.yaml
# service: "service:svc"
# include:
#   - "*.py"
# python:
#   packages:
#     - scikit-learn
#     - mlflow
#     - pandas
# docker:
#   base_image: python:3.11-slim

# service.py
# import bentoml
# import mlflow
# import pandas as pd
#
# model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
#
# svc = bentoml.Service("churn-predictor")
#
# @svc.api(input=bentoml.io.JSON(), output=bentoml.io.JSON())
# def predict(input_data: dict) -> dict:
#     df = pd.DataFrame([input_data])
#     prediction = model.predict(df)[0]
#     probability = model.predict_proba(df)[0].max()
#     return {
#         "prediction": int(prediction),
#         "probability": float(probability),
#         "model_version": "v2.1",
#     }

# Kubernetes Deployment
# apiVersion: apps/v1
# kind: Deployment
# metadata:
#   name: churn-predictor
# spec:
#   replicas: 3
#   selector:
#     matchLabels:
#       app: churn-predictor
#   template:
#     spec:
#       containers:
#         - name: model
#           image: registry.io/churn-predictor:v2.1
#           ports:
#             - containerPort: 3000
#           resources:
#             requests: { cpu: "500m", memory: "1Gi" }
#             limits: { cpu: "2", memory: "4Gi" }
#           readinessProbe:
#             httpGet: { path: /healthz, port: 3000 }

@dataclass
class ServingConfig:
    method: str
    tool: str
    latency: str
    throughput: str
    use_case: str

methods = [
    ServingConfig("REST API", "BentoML / FastAPI", "10-50ms", "100-1000 req/s", "Real-time prediction"),
    ServingConfig("gRPC", "Seldon Core / Triton", "5-20ms", "500-5000 req/s", "High-throughput"),
    ServingConfig("Batch", "Spark / Airflow", "Minutes-hours", "Millions/batch", "Offline scoring"),
    ServingConfig("Streaming", "Kafka + Flink", "50-200ms", "10K-100K events/s", "Real-time events"),
    ServingConfig("Edge", "ONNX Runtime / TFLite", "1-10ms", "Device-limited", "Mobile/IoT"),
]

print("\n=== Serving Methods ===")
for s in methods:
    print(f"  [{s.method}] Tool: {s.tool}")
    print(f"    Latency: {s.latency} | Throughput: {s.throughput}")
    print(f"    Use case: {s.use_case}")

Monitoring and Retraining

# === Model Monitoring ===

# Evidently AI — Data Drift Detection
# from evidently.report import Report
# from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
#
# report = Report(metrics=[
#     DataDriftPreset(),
#     TargetDriftPreset(),
# ])
# report.run(reference_data=train_df, current_data=prod_df)
# report.save_html("drift_report.html")

# Monitoring Alerts
# if drift_score > 0.3:
#     trigger_retraining_pipeline()
#     send_alert("Data drift detected", drift_score)

@dataclass
class MonitorMetric:
    metric: str
    threshold: str
    action: str
    frequency: str

monitors = [
    MonitorMetric("Prediction Accuracy", "< 85% (5% drop from baseline)",
        "Alert team + trigger retraining", "Daily"),
    MonitorMetric("Data Drift Score", "> 0.3 (Evidently drift score)",
        "Investigate features + retrain if needed", "Daily"),
    MonitorMetric("Feature Distribution", "KS test p-value < 0.05",
        "Check data pipeline + source changes", "Daily"),
    MonitorMetric("Prediction Latency", "> 100ms p99",
        "Scale up replicas or optimize model", "Real-time"),
    MonitorMetric("Error Rate", "> 1% of predictions",
        "Rollback to previous model version", "Real-time"),
    MonitorMetric("Model Staleness", "> 30 days since last train",
        "Schedule retraining regardless of metrics", "Weekly check"),
]

print("Monitoring Metrics:")
for m in monitors:
    print(f"  [{m.metric}] Threshold: {m.threshold}")
    print(f"    Action: {m.action}")
    print(f"    Frequency: {m.frequency}")

maturity = {
    "Level 0 — Manual": "Jupyter notebooks, manual deploy, no monitoring",
    "Level 1 — Pipeline": "Automated training pipeline, manual deploy",
    "Level 2 — CI/CD": "Automated testing + deployment, basic monitoring",
    "Level 3 — Auto Retrain": "Drift detection triggers retraining automatically",
    "Level 4 — Full Auto": "End-to-end automation with feedback loops",
}

print(f"\n\nMLOps Maturity:")
for k, v in maturity.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

MLOps คืออะไร

ML Engineering DevOps Data Engineering Deploy Production Lifecycle Data Feature Training Evaluation Deployment Monitoring Retraining Automation Version Control

MLOps Workflow มีขั้นตอนอะไรบ้าง

Data Ingestion Validation Feature Engineering Training Evaluation Registry Deployment Monitoring Retraining Pipeline Automation CI/CD

ใช้เครื่องมืออะไรบ้าง

MLflow W&B Feast Airflow Dagster PyTorch sklearn BentoML Seldon Evidently WhyLabs GitHub Actions DVC Model Registry

วัดผล MLOps Maturity อย่างไร

Level 0 Manual Level 1 Pipeline Level 2 CI/CD Level 3 Auto Retrain Level 4 Full Automation Monitoring Alert Feedback Loop เป้าหมาย Level 2-3

สรุป

MLOps Pipeline Workflow MLflow BentoML Experiment Tracking Model Registry CI/CD Deployment Monitoring Evidently Data Drift Retraining Maturity Production

📖 บทความที่เกี่ยวข้อง

WordPress Headless MLOps Workflowอ่านบทความ → Tailwind CSS v4 MLOps Workflowอ่านบทความ → gRPC Protobuf MLOps Workflowอ่านบทความ → DALL-E API MLOps Workflowอ่านบทความ → MLOps Pipeline Freelance IT Careerอ่านบทความ →

📚 ดูบทความทั้งหมด →