MLOps Pipeline คืออะไร
MLOps (Machine Learning Operations) เป็น practices ที่รวม ML development (ML) กับ IT operations (Ops) เพื่อ automate และ standardize ML lifecycle ตั้งแต่ data preparation, model training, evaluation, deployment จนถึง monitoring ใน production
MLOps Pipeline ประกอบด้วย Data Pipeline จัดการ data ingestion, transformation, feature engineering, Training Pipeline automate model training, hyperparameter tuning, experiment tracking, Evaluation Pipeline ทดสอบ model quality ก่อน deploy, Deployment Pipeline CI/CD สำหรับ model deployment, Monitoring Pipeline ตรวจสอบ model performance และ data drift ใน production
Tech Conference 2026 trends สำหรับ MLOps ได้แก่ LLMOps การจัดการ Large Language Models ใน production, Feature Stores as a Service เช่น Tecton, Feast, Databricks Feature Store, ML Platform Engineering สร้าง Internal ML Platform สำหรับ data scientists, GPU Infrastructure Management การจัดการ GPU clusters สำหรับ training และ inference และ AI Governance frameworks สำหรับ responsible AI
สร้าง MLOps Pipeline ตั้งแต่ต้น
ขั้นตอนสร้าง pipeline
#!/usr/bin/env python3
# mlops_pipeline.py — MLOps Pipeline Framework
import json
import logging
import hashlib
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("mlops")
@dataclass
class PipelineStep:
name: str
status: str = "pending"
start_time: Optional[str] = None
end_time: Optional[str] = None
metrics: Dict = field(default_factory=dict)
artifacts: List[str] = field(default_factory=list)
class MLOpsPipeline:
def __init__(self, name, version="1.0"):
self.name = name
self.version = version
self.steps: List[PipelineStep] = []
self.run_id = hashlib.md5(
f"{name}-{datetime.utcnow().isoformat()}".encode()
).hexdigest()[:12]
def add_step(self, name):
step = PipelineStep(name=name)
self.steps.append(step)
return step
def run_step(self, step_name, func, **kwargs):
step = next((s for s in self.steps if s.name == step_name), None)
if not step:
step = self.add_step(step_name)
step.status = "running"
step.start_time = datetime.utcnow().isoformat()
logger.info(f"[{self.run_id}] Running: {step_name}")
try:
result = func(**kwargs)
step.status = "completed"
step.metrics = result.get("metrics", {})
step.artifacts = result.get("artifacts", [])
except Exception as e:
step.status = "failed"
step.metrics = {"error": str(e)}
logger.error(f"[{self.run_id}] Failed: {step_name} — {e}")
step.end_time = datetime.utcnow().isoformat()
return step
def get_summary(self):
return {
"pipeline": self.name,
"version": self.version,
"run_id": self.run_id,
"steps": [
{"name": s.name, "status": s.status, "metrics": s.metrics}
for s in self.steps
],
"overall_status": "completed" if all(
s.status == "completed" for s in self.steps
) else "failed",
}
# Pipeline Steps
def data_validation(**kwargs):
logger.info("Validating training data...")
return {
"metrics": {"rows": 50000, "features": 25, "null_pct": 0.2, "schema_valid": True},
"artifacts": ["data_profile.html"],
}
def feature_engineering(**kwargs):
logger.info("Engineering features...")
return {
"metrics": {"features_created": 15, "features_selected": 10},
"artifacts": ["feature_store/v1"],
}
def model_training(**kwargs):
logger.info("Training model...")
return {
"metrics": {"accuracy": 0.94, "f1": 0.91, "auc": 0.97, "training_time_sec": 120},
"artifacts": ["model.pkl", "model_config.json"],
}
def model_evaluation(**kwargs):
logger.info("Evaluating model...")
return {
"metrics": {
"test_accuracy": 0.93, "test_f1": 0.90,
"bias_check": "passed", "fairness_check": "passed",
},
"artifacts": ["evaluation_report.html"],
}
def model_registry(**kwargs):
logger.info("Registering model...")
return {
"metrics": {"model_version": "v1.2", "stage": "staging"},
"artifacts": ["registry/fraud-detector/v1.2"],
}
# Run Pipeline
pipeline = MLOpsPipeline("fraud-detection-pipeline", "2.0")
pipeline.run_step("data_validation", data_validation)
pipeline.run_step("feature_engineering", feature_engineering)
pipeline.run_step("model_training", model_training)
pipeline.run_step("model_evaluation", model_evaluation)
pipeline.run_step("model_registry", model_registry)
print(json.dumps(pipeline.get_summary(), indent=2))
CI/CD สำหรับ Machine Learning
Automated CI/CD pipeline สำหรับ ML
# === ML CI/CD Pipeline ===
# 1. GitHub Actions for ML
# ===================================
cat > .github/workflows/ml-pipeline.yml << 'EOF'
name: ML Pipeline
on:
push:
branches: [main]
paths:
- 'src/**'
- 'data/**'
- 'configs/**'
jobs:
data-validation:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install -r requirements.txt
- run: python src/validate_data.py
- uses: actions/upload-artifact@v4
with:
name: data-profile
path: reports/data_profile.html
train:
needs: data-validation
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.11'
- run: pip install -r requirements.txt
- run: |
python src/train.py \
--config configs/train_config.yaml \
--experiment-name "ci-}"
- uses: actions/upload-artifact@v4
with:
name: model-artifacts
path: outputs/
evaluate:
needs: train
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: model-artifacts
path: outputs/
- run: pip install -r requirements.txt
- run: python src/evaluate.py --model outputs/model.pkl
- run: |
ACCURACY=$(cat outputs/metrics.json | jq '.accuracy')
if (( $(echo "$ACCURACY < 0.85" | bc -l) )); then
echo "Model accuracy too low: $ACCURACY"
exit 1
fi
deploy-staging:
needs: evaluate
runs-on: ubuntu-latest
environment: staging
steps:
- uses: actions/checkout@v4
- uses: actions/download-artifact@v4
with:
name: model-artifacts
- run: |
python src/deploy.py \
--model outputs/model.pkl \
--target staging \
--version }
integration-test:
needs: deploy-staging
runs-on: ubuntu-latest
steps:
- run: |
curl -s https://staging-api.example.com/predict \
-H "Content-Type: application/json" \
-d '{"features": [1.0, 2.0, 3.0]}' | \
jq '.prediction'
deploy-production:
needs: integration-test
runs-on: ubuntu-latest
environment: production
steps:
- run: |
python src/deploy.py \
--model outputs/model.pkl \
--target production \
--canary-percentage 10
EOF
# 2. DVC (Data Version Control) Integration
# ===================================
pip install dvc dvc-s3
dvc init
dvc remote add -d myremote s3://my-dvc-bucket/data
# Track data files
dvc add data/training_data.csv
git add data/training_data.csv.dvc .gitignore
git commit -m "Add training data"
# Push data to remote
dvc push
# Pull data in CI
# dvc pull
# 3. Model Testing
# ===================================
cat > tests/test_model.py << 'PYEOF'
import pytest
import json
def test_model_accuracy():
with open("outputs/metrics.json") as f:
metrics = json.load(f)
assert metrics["accuracy"] >= 0.85
assert metrics["f1_score"] >= 0.80
def test_model_latency():
with open("outputs/metrics.json") as f:
metrics = json.load(f)
assert metrics["inference_latency_ms"] < 100
def test_model_bias():
with open("outputs/fairness_report.json") as f:
report = json.load(f)
for group in report["groups"]:
assert abs(group["accuracy"] - report["overall_accuracy"]) < 0.05
PYEOF
echo "ML CI/CD configured"
Model Serving และ Inference
Deploy models สำหรับ production inference
#!/usr/bin/env python3
# model_server.py — Model Serving Framework
import json
import time
import logging
from typing import Dict, List, Any
from dataclasses import dataclass
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("serving")
@dataclass
class ModelVersion:
name: str
version: str
framework: str
endpoint: str
traffic_pct: int = 100
class ModelServer:
def __init__(self):
self.models: Dict[str, List[ModelVersion]] = {}
self.prediction_log = []
def deploy_model(self, name, version, framework="pytorch", traffic_pct=100):
if name not in self.models:
self.models[name] = []
# Adjust traffic for existing versions
if traffic_pct < 100:
for mv in self.models[name]:
mv.traffic_pct = 100 - traffic_pct
mv = ModelVersion(
name=name, version=version,
framework=framework,
endpoint=f"/v1/models/{name}/versions/{version}",
traffic_pct=traffic_pct)
self.models[name].append(mv)
logger.info(f"Deployed {name}:{version} ({traffic_pct}% traffic)")
return mv
def canary_deploy(self, name, new_version, canary_pct=10):
"""Canary deployment — gradual traffic shift"""
return self.deploy_model(name, new_version, traffic_pct=canary_pct)
def promote_canary(self, name, version):
"""Promote canary to 100% traffic"""
versions = self.models.get(name, [])
for mv in versions:
if mv.version == version:
mv.traffic_pct = 100
else:
mv.traffic_pct = 0
logger.info(f"Promoted {name}:{version} to 100%")
def rollback(self, name, target_version):
"""Rollback to previous version"""
versions = self.models.get(name, [])
for mv in versions:
mv.traffic_pct = 100 if mv.version == target_version else 0
logger.info(f"Rolled back {name} to {target_version}")
def predict(self, name, features):
start = time.time()
versions = self.models.get(name, [])
if not versions:
return {"error": f"Model {name} not found"}
# Route based on traffic percentage
import random
rand = random.randint(1, 100)
cumulative = 0
selected = versions[0]
for mv in versions:
cumulative += mv.traffic_pct
if rand <= cumulative:
selected = mv
break
# Simulate prediction
prediction = sum(features) * 0.1 + random.gauss(0, 0.01)
latency_ms = (time.time() - start) * 1000 + random.gauss(5, 1)
result = {
"model": name,
"version": selected.version,
"prediction": round(prediction, 4),
"latency_ms": round(latency_ms, 2),
}
self.prediction_log.append(result)
return result
def get_serving_status(self):
status = {}
for name, versions in self.models.items():
status[name] = [
{"version": mv.version, "traffic_pct": mv.traffic_pct, "endpoint": mv.endpoint}
for mv in versions
]
return status
server = ModelServer()
server.deploy_model("fraud-detector", "v1.0")
server.canary_deploy("fraud-detector", "v1.1", canary_pct=10)
import random
random.seed(42)
for _ in range(10):
result = server.predict("fraud-detector", [random.random() * 100, random.randint(1, 10)])
print(json.dumps(server.get_serving_status(), indent=2))
Monitoring และ Observability
Monitor ML pipeline ใน production
# === MLOps Monitoring Stack ===
# 1. Prometheus Metrics for ML Pipeline
# ===================================
# Training metrics:
# ml_training_duration_seconds{model="fraud-detector", version="v1.1"}
# ml_training_loss{model="fraud-detector", epoch="10"}
# ml_training_accuracy{model="fraud-detector", dataset="validation"}
# Serving metrics:
# ml_prediction_latency_seconds{model="fraud-detector", version="v1.1"}
# ml_prediction_total{model="fraud-detector", result="fraud"}
# ml_prediction_errors_total{model="fraud-detector"}
# Data quality metrics:
# ml_feature_drift_score{model="fraud-detector", feature="amount"}
# ml_prediction_drift_score{model="fraud-detector"}
# ml_data_quality_score{pipeline="etl-daily"}
# 2. Grafana Dashboard
# ===================================
# Panels:
# - Model prediction rate (requests/sec)
# - P50/P95/P99 latency
# - Error rate
# - Feature drift scores over time
# - Model accuracy trend
# - Training pipeline success rate
# - GPU utilization (training)
# - Data freshness
# 3. Alert Rules
# ===================================
# groups:
# - name: mlops-alerts
# rules:
# - alert: ModelLatencyHigh
# expr: histogram_quantile(0.99, ml_prediction_latency_seconds_bucket) > 0.2
# for: 5m
# labels: {severity: warning}
#
# - alert: DataDriftDetected
# expr: ml_feature_drift_score > 2.0
# for: 30m
# labels: {severity: warning}
# annotations:
# summary: "Data drift on {{ $labels.feature }}"
#
# - alert: TrainingPipelineFailed
# expr: ml_pipeline_status{stage="training"} == 0
# for: 1m
# labels: {severity: critical}
#
# - alert: ModelAccuracyDegraded
# expr: ml_model_accuracy < 0.85
# for: 1h
# labels: {severity: critical}
# annotations:
# runbook: "Consider retraining with latest data"
# 4. Automated Retraining Trigger
# ===================================
#!/usr/bin/env python3
# retrain_trigger.py
# def check_and_retrain():
# drift_score = get_current_drift_score("fraud-detector")
# accuracy = get_current_accuracy("fraud-detector")
#
# should_retrain = False
# reason = ""
#
# if drift_score > 2.0:
# should_retrain = True
# reason = f"Data drift detected: {drift_score}"
#
# if accuracy < 0.85:
# should_retrain = True
# reason = f"Accuracy degraded: {accuracy}"
#
# if should_retrain:
# trigger_training_pipeline(reason=reason)
# notify_team(f"Retraining triggered: {reason}")
echo "MLOps monitoring configured"
Trends จาก Tech Conference 2026
แนวโน้มล่าสุดจาก conference
# === MLOps Trends 2026 ===
# 1. LLMOps — Operations for Large Language Models
# ===================================
# Challenges:
# - GPU cost management ($2-10/hour for A100)
# - Prompt versioning and management
# - Evaluation of generative outputs (no single metric)
# - RAG pipeline monitoring
# - Fine-tuning pipeline automation
# - Model serving at scale (vLLM, TGI, TensorRT-LLM)
# Tools:
# - LangSmith (LangChain monitoring)
# - Weights & Biases Prompts
# - Humanloop (prompt management)
# - Arize Phoenix (LLM observability)
# 2. Feature Stores
# ===================================
# - Tecton (managed feature platform)
# - Feast (open source)
# - Databricks Feature Store (Unity Catalog)
# - Hopsworks (open source)
#
# Key capabilities:
# - Online serving (low latency < 10ms)
# - Offline store (batch features for training)
# - Feature versioning and lineage
# - Point-in-time correctness
# 3. ML Platform Engineering
# ===================================
# Internal Developer Platform for ML:
# - Self-service model training (Kubeflow, MLflow)
# - GPU cluster management (NVIDIA GPU Operator)
# - Experiment tracking (W&B, MLflow)
# - Model registry and deployment
# - Data catalog and discovery
# - Cost management and chargeback
# 4. AI Governance
# ===================================
# - Model cards (documentation)
# - Bias and fairness testing
# - Explainability (SHAP, LIME)
# - Audit trails for model decisions
# - Regulatory compliance (EU AI Act, PDPA)
# - Red teaming for LLMs
# 5. GPU Infrastructure
# ===================================
# - Multi-node training (DeepSpeed, FSDP)
# - GPU sharing (MIG, time-slicing)
# - Spot/preemptible instances for training
# - Inference optimization (quantization, batching)
# - Edge deployment (TensorRT, ONNX Runtime)
echo "MLOps trends 2026 documented"
FAQ คำถามที่พบบ่อย
Q: MLOps ต่างจาก DevOps อย่างไร?
A: DevOps จัดการ software lifecycle (code, build, test, deploy) MLOps เพิ่ม data และ model lifecycle เข้ามา ความแตกต่างหลัก MLOps ต้องจัดการ data versioning (DVC, Delta Lake), experiment tracking (W&B, MLflow), model versioning และ registry, data/model drift monitoring, training pipeline automation, GPU resource management DevOps focus ที่ code quality ส่วน MLOps focus ทั้ง code quality และ model quality
Q: เริ่มต้น MLOps ควรใช้ tools อะไร?
A: สำหรับทีมเล็ก (1-3 คน) เริ่มจาก Git สำหรับ code versioning, DVC สำหรับ data versioning, MLflow สำหรับ experiment tracking, GitHub Actions สำหรับ CI/CD, Docker สำหรับ reproducibility สำหรับทีมกลาง (3-10 คน) เพิ่ม W&B สำหรับ experiment tracking, Kubeflow หรือ Airflow สำหรับ pipeline orchestration, Feature Store (Feast) สำหรับทีมใหญ่ (10+ คน) พิจารณา managed platforms เช่น Databricks, SageMaker, Vertex AI
Q: Model drift detection ทำอย่างไร?
A: มี 2 ประเภทหลัก Data drift ตรวจเมื่อ input data distribution เปลี่ยน ใช้ statistical tests (KS test, PSI, Chi-square) เปรียบเทียบ recent data กับ training data Concept drift ตรวจเมื่อ relationship ระหว่าง input และ output เปลี่ยน ต้องใช้ ground truth labels Tools ที่ใช้ Evidently AI (open source), NannyML, Arize AI, WhyLabs ตั้ง threshold สำหรับ drift score และ alert เมื่อเกิน threshold trigger retraining อัตโนมัติ
Q: Canary deployment สำหรับ ML models ทำอย่างไร?
A: Deploy model version ใหม่ด้วย traffic เล็กน้อย (5-10%) เปรียบเทียบ metrics ระหว่าง old version กับ new version ได้แก่ prediction latency, error rate, prediction distribution, business metrics ถ้า new version ดีกว่าหรือเท่ากัน ค่อยๆ เพิ่ม traffic (10% -> 25% -> 50% -> 100%) ถ้า metrics แย่ลง rollback ทันที ใช้ service mesh (Istio) หรือ API gateway สำหรับ traffic splitting tools เช่น Seldon Core, KServe, BentoML รองรับ canary deployment สำหรับ ML
