MLOps Pipeline คืออะไรและต่างจาก Data Pipeline อย่างไร
MLOps (Machine Learning Operations) เป็นแนวปฏิบัติที่รวม ML, DevOps และ Data Engineering เข้าด้วยกัน เป้าหมายคือทำให้ ML models ถูก deploy, monitor และ maintain ใน production ได้อย่างมีประสิทธิภาพ MLOps Pipeline ครอบคลุมตั้งแต่ data ingestion, feature engineering, model training, evaluation, deployment จนถึง monitoring
Data Pipeline (ETL/ELT) โฟกัสที่การ Extract ข้อมูลจาก sources, Transform ให้อยู่ในรูปแบบที่ต้องการ และ Load เข้า destination เช่น data warehouse ส่วน MLOps Pipeline มี components เพิ่มเติมคือ Feature Store สำหรับเก็บ features ที่พร้อมใช้, Training Pipeline สำหรับ train models, Model Registry สำหรับ version control models, Serving Infrastructure สำหรับ inference และ Monitoring สำหรับตรวจจับ data drift และ model degradation
ทั้งสอง pipeline ทำงานร่วมกัน Data Pipeline เตรียมข้อมูลให้ MLOps Pipeline ใช้ train models ข้อมูลที่ผ่าน ETL จะถูกส่งไป Feature Store แล้ว Training Pipeline ดึง features มา train ผลลัพธ์คือ model ที่ถูก deploy ผ่าน Serving Pipeline
เครื่องมือที่นิยมใช้ใน MLOps ได้แก่ Apache Airflow สำหรับ orchestration, MLflow สำหรับ experiment tracking, DVC สำหรับ data versioning, Feast สำหรับ Feature Store, Seldon/BentoML สำหรับ model serving และ Evidently/WhyLabs สำหรับ monitoring
ออกแบบ End-to-End MLOps Architecture
สถาปัตยกรรม MLOps Pipeline แบบครบวงจร
# MLOps Pipeline Architecture
#
# === Data Layer ===
# Sources -> Ingestion -> Raw Storage -> ETL -> Feature Store
#
# [Databases] ──┐
# [APIs] ──┤──> [Airflow ETL] ──> [S3/GCS Raw] ──> [Feature Engineering]
# [Streams] ──┤ |
# [Files] ──┘ v
# [Feast Feature Store]
# |
# === Training Layer === v
# [Training Pipeline]
# [Feature Store] ──> [Data Validation] ──> [Training] ──> [Evaluation]
# | | |
# [Great Expectations] [MLflow] [Metrics Check]
# |
# Pass?─────┤
# / \
# Yes No
# | |
# === Deployment Layer === v v
# [Model Registry] [Alert Team]
# |
# [Staging Deploy]
# |
# [A/B Testing]
# |
# [Production Deploy]
# |
# === Monitoring Layer === v
# [Model Serving]
# |
# ┌───────────┤───────────┐
# v v v
# [Data Drift] [Model Perf] [System Metrics]
# | | |
# └───────────┤───────────┘
# v
# [Alert/Retrain]
#
# === Tools Stack ===
# Orchestration: Apache Airflow
# ETL: Spark / dbt / Pandas
# Feature Store: Feast
# Experiment Tracking: MLflow
# Data Versioning: DVC
# Model Registry: MLflow Model Registry
# Serving: BentoML / Seldon Core
# Monitoring: Evidently AI
# CI/CD: GitHub Actions
# Infrastructure: Kubernetes / Docker
สร้าง Feature Engineering Pipeline ด้วย Python
โค้ดสำหรับ feature engineering pipeline
#!/usr/bin/env python3
# feature_pipeline.py — Feature Engineering Pipeline
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from feast import FeatureStore
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("feature_pipeline")
class FeatureEngineer:
def __init__(self, raw_data_path, feature_store_path="feature_repo/"):
self.raw_path = raw_data_path
self.store = FeatureStore(repo_path=feature_store_path)
def extract(self):
logger.info("Extracting raw data...")
orders = pd.read_parquet(f"{self.raw_path}/orders.parquet")
customers = pd.read_parquet(f"{self.raw_path}/customers.parquet")
products = pd.read_parquet(f"{self.raw_path}/products.parquet")
return orders, customers, products
def transform_customer_features(self, orders, customers):
logger.info("Engineering customer features...")
now = pd.Timestamp.now()
# RFM Features
rfm = orders.groupby("customer_id").agg(
recency=("order_date", lambda x: (now - x.max()).days),
frequency=("order_id", "count"),
monetary=("total_amount", "sum"),
avg_order_value=("total_amount", "mean"),
max_order_value=("total_amount", "max"),
min_order_value=("total_amount", "min"),
std_order_value=("total_amount", "std"),
first_order_date=("order_date", "min"),
last_order_date=("order_date", "max"),
).reset_index()
# Customer lifetime
rfm["customer_lifetime_days"] = (rfm["last_order_date"] - rfm["first_order_date"]).dt.days
rfm["avg_days_between_orders"] = rfm["customer_lifetime_days"] / rfm["frequency"].clip(lower=1)
# Time-based features
last_30d = orders[orders["order_date"] >= now - timedelta(days=30)]
recent_activity = last_30d.groupby("customer_id").agg(
orders_last_30d=("order_id", "count"),
spend_last_30d=("total_amount", "sum"),
).reset_index()
# Product diversity
product_diversity = orders.groupby("customer_id").agg(
unique_products=("product_id", "nunique"),
unique_categories=("category", "nunique"),
).reset_index()
# Merge all features
features = customers[["customer_id", "created_at", "status"]].merge(
rfm, on="customer_id", how="left"
).merge(
recent_activity, on="customer_id", how="left"
).merge(
product_diversity, on="customer_id", how="left"
)
# Fill NAs
features = features.fillna(0)
# Add event timestamp for Feast
features["event_timestamp"] = now
logger.info(f"Generated {len(features)} customer feature rows with {len(features.columns)} features")
return features
def validate_features(self, features):
logger.info("Validating features...")
checks = {
"no_nulls_in_key": features["customer_id"].notna().all(),
"positive_monetary": (features["monetary"] >= 0).all(),
"valid_frequency": (features["frequency"] >= 0).all(),
"recency_range": (features["recency"] >= 0).all(),
"row_count": len(features) > 0,
}
failed = [k for k, v in checks.items() if not v]
if failed:
raise ValueError(f"Feature validation failed: {failed}")
logger.info("All feature validations passed")
return True
def load_to_feature_store(self, features):
logger.info("Loading features to Feature Store...")
features.to_parquet("feature_repo/data/customer_features.parquet", index=False)
self.store.materialize_incremental(end_date=datetime.now())
logger.info("Features materialized to online store")
def run(self):
orders, customers, products = self.extract()
features = self.transform_customer_features(orders, customers)
self.validate_features(features)
self.load_to_feature_store(features)
return features
# Feast Feature Definition
# feature_repo/features.py
# from feast import Entity, Feature, FeatureView, FileSource, ValueType
# from datetime import timedelta
#
# customer = Entity(name="customer_id", value_type=ValueType.INT64)
#
# customer_source = FileSource(
# path="data/customer_features.parquet",
# event_timestamp_column="event_timestamp",
# )
#
# customer_features = FeatureView(
# name="customer_features",
# entities=["customer_id"],
# ttl=timedelta(days=1),
# features=[
# Feature(name="recency", dtype=ValueType.INT64),
# Feature(name="frequency", dtype=ValueType.INT64),
# Feature(name="monetary", dtype=ValueType.DOUBLE),
# Feature(name="avg_order_value", dtype=ValueType.DOUBLE),
# Feature(name="orders_last_30d", dtype=ValueType.INT64),
# Feature(name="unique_products", dtype=ValueType.INT64),
# ],
# source=customer_source,
# )
if __name__ == "__main__":
pipeline = FeatureEngineer("s3://datalake/raw/")
pipeline.run()
Training Pipeline และ Experiment Tracking
Pipeline สำหรับ train models พร้อม MLflow tracking
#!/usr/bin/env python3
# training_pipeline.py — Model Training with MLflow
import mlflow
import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.preprocessing import StandardScaler
from feast import FeatureStore
import joblib
import json
class TrainingPipeline:
def __init__(self, experiment_name="churn_prediction"):
mlflow.set_tracking_uri("http://mlflow:5000")
mlflow.set_experiment(experiment_name)
self.store = FeatureStore(repo_path="feature_repo/")
self.best_model = None
self.best_metrics = {}
def fetch_training_data(self):
entity_df = pd.read_parquet("data/training_entities.parquet")
training_data = self.store.get_historical_features(
entity_df=entity_df,
features=[
"customer_features:recency",
"customer_features:frequency",
"customer_features:monetary",
"customer_features:avg_order_value",
"customer_features:orders_last_30d",
"customer_features:unique_products",
],
).to_df()
return training_data
def prepare_data(self, df):
feature_cols = ["recency", "frequency", "monetary", "avg_order_value",
"orders_last_30d", "unique_products"]
X = df[feature_cols].fillna(0)
y = df["churned"].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
self.scaler = scaler
return X_train_scaled, X_test_scaled, y_train, y_test, feature_cols
def train_and_evaluate(self, X_train, X_test, y_train, y_test, feature_cols):
models = {
"random_forest": RandomForestClassifier(
n_estimators=200, max_depth=10, min_samples_split=5,
random_state=42, n_jobs=-1
),
"gradient_boosting": GradientBoostingClassifier(
n_estimators=200, max_depth=5, learning_rate=0.1,
random_state=42
),
}
best_f1 = 0
for name, model in models.items():
with mlflow.start_run(run_name=name):
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"f1": f1_score(y_test, y_pred),
"auc_roc": roc_auc_score(y_test, y_prob),
}
cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring="f1")
metrics["cv_f1_mean"] = cv_scores.mean()
metrics["cv_f1_std"] = cv_scores.std()
mlflow.log_params(model.get_params())
mlflow.log_metrics(metrics)
mlflow.log_param("features", json.dumps(feature_cols))
if hasattr(model, "feature_importances_"):
importance = dict(zip(feature_cols, model.feature_importances_))
mlflow.log_dict(importance, "feature_importance.json")
mlflow.sklearn.log_model(model, name)
print(f"{name}: F1={metrics['f1']:.4f}, AUC={metrics['auc_roc']:.4f}")
if metrics["f1"] > best_f1:
best_f1 = metrics["f1"]
self.best_model = model
self.best_metrics = metrics
self.best_run_id = mlflow.active_run().info.run_id
return self.best_model, self.best_metrics
def register_model(self, model_name="churn_predictor"):
if self.best_model is None:
raise ValueError("No model trained yet")
if self.best_metrics["f1"] < 0.7:
print(f"Model F1 ({self.best_metrics['f1']:.4f}) below threshold (0.7). Not registering.")
return None
model_uri = f"runs:/{self.best_run_id}/model"
result = mlflow.register_model(model_uri, model_name)
client = mlflow.tracking.MlflowClient()
client.transition_model_version_stage(
name=model_name, version=result.version, stage="Staging"
)
print(f"Model registered: {model_name} v{result.version} (Staging)")
return result
def run(self):
data = self.fetch_training_data()
X_train, X_test, y_train, y_test, features = self.prepare_data(data)
model, metrics = self.train_and_evaluate(X_train, X_test, y_train, y_test, features)
result = self.register_model()
return model, metrics, result
if __name__ == "__main__":
pipeline = TrainingPipeline()
model, metrics, reg = pipeline.run()
print(f"Best model metrics: {metrics}")
Model Serving และ Monitoring Pipeline
Deploy model สำหรับ inference และ monitoring
#!/usr/bin/env python3
# serving_pipeline.py — Model Serving with BentoML
import bentoml
import mlflow
import pandas as pd
import numpy as np
from datetime import datetime
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset
class ModelServer:
def __init__(self, model_name="churn_predictor", stage="Production"):
self.model = mlflow.sklearn.load_model(f"models:/{model_name}/{stage}")
self.predictions_log = []
def predict(self, features):
prediction = self.model.predict(features)
probability = self.model.predict_proba(features)[:, 1]
self.predictions_log.append({
"timestamp": datetime.utcnow().isoformat(),
"features": features.tolist() if hasattr(features, "tolist") else features,
"prediction": int(prediction[0]),
"probability": float(probability[0]),
})
return {"prediction": int(prediction[0]), "probability": float(probability[0])}
class ModelMonitor:
def __init__(self, reference_data_path):
self.reference_data = pd.read_parquet(reference_data_path)
def check_data_drift(self, current_data):
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=self.reference_data, current_data=current_data)
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
return {
"drift_detected": drift_detected,
"drift_share": drift_share,
"timestamp": datetime.utcnow().isoformat(),
"reference_rows": len(self.reference_data),
"current_rows": len(current_data),
}
def check_model_performance(self, predictions, actuals):
from sklearn.metrics import accuracy_score, f1_score
metrics = {
"accuracy": accuracy_score(actuals, predictions),
"f1": f1_score(actuals, predictions),
"timestamp": datetime.utcnow().isoformat(),
}
if metrics["f1"] < 0.6:
metrics["alert"] = "Model performance degraded! F1 below threshold"
metrics["action"] = "retrain"
return metrics
def generate_monitoring_report(self, current_data, output_path="monitoring_report.html"):
report = Report(metrics=[DataDriftPreset(), TargetDriftPreset()])
report.run(reference_data=self.reference_data, current_data=current_data)
report.save_html(output_path)
return output_path
# BentoML Service Definition
# service.py
# import bentoml
# from bentoml.io import JSON, NumpyNdarray
# import numpy as np
#
# runner = bentoml.mlflow.get("churn_predictor:latest").to_runner()
# svc = bentoml.Service("churn_prediction_service", runners=[runner])
#
# @svc.api(input=NumpyNdarray(), output=JSON())
# async def predict(input_array: np.ndarray) -> dict:
# result = await runner.predict.async_run(input_array)
# return {"prediction": int(result[0]), "probability": float(result[0])}
# Dockerfile for serving
# FROM python:3.11-slim
# RUN pip install bentoml mlflow scikit-learn
# COPY . /app
# WORKDIR /app
# CMD ["bentoml", "serve", "service:svc", "--host", "0.0.0.0", "--port", "3000"]
# Kubernetes deployment
# apiVersion: apps/v1
# kind: Deployment
# metadata:
# name: churn-predictor
# spec:
# replicas: 3
# selector:
# matchLabels:
# app: churn-predictor
# template:
# spec:
# containers:
# - name: model
# image: churn-predictor:latest
# ports:
# - containerPort: 3000
# resources:
# requests: {cpu: "500m", memory: "512Mi"}
# limits: {cpu: "1000m", memory: "1Gi"}
Orchestrate ทั้งหมดด้วย Airflow
Airflow DAG สำหรับ orchestrate MLOps pipeline
#!/usr/bin/env python3
# mlops_dag.py — Airflow DAG for MLOps Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
default_args = {
"owner": "ml-team",
"retries": 2,
"retry_delay": timedelta(minutes=10),
"execution_timeout": timedelta(hours=2),
}
def run_feature_pipeline(**kwargs):
from feature_pipeline import FeatureEngineer
pipeline = FeatureEngineer("s3://datalake/raw/")
features = pipeline.run()
kwargs["ti"].xcom_push(key="feature_count", value=len(features))
def run_training(**kwargs):
from training_pipeline import TrainingPipeline
pipeline = TrainingPipeline()
model, metrics, reg = pipeline.run()
kwargs["ti"].xcom_push(key="metrics", value=metrics)
kwargs["ti"].xcom_push(key="model_version", value=reg.version if reg else None)
def evaluate_model(**kwargs):
metrics = kwargs["ti"].xcom_pull(key="metrics", task_ids="train_model")
if metrics and metrics.get("f1", 0) >= 0.7:
return "deploy_staging"
return "notify_failure"
def run_monitoring(**kwargs):
from serving_pipeline import ModelMonitor
monitor = ModelMonitor("data/reference_data.parquet")
import pandas as pd
current = pd.read_parquet("data/current_predictions.parquet")
drift = monitor.check_data_drift(current)
kwargs["ti"].xcom_push(key="drift_result", value=drift)
if drift["drift_detected"]:
return "trigger_retrain"
return "monitoring_ok"
with DAG(
"mlops_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["mlops", "ml", "production"],
) as dag:
ingest = BashOperator(
task_id="ingest_data",
bash_command="python3 scripts/ingest.py --source all --date {{ ds }}",
)
features = PythonOperator(
task_id="feature_engineering",
python_callable=run_feature_pipeline,
)
validate = BashOperator(
task_id="validate_data",
bash_command="soda scan -d production -c soda/config.yml soda/checks/features.yml",
)
train = PythonOperator(
task_id="train_model",
python_callable=run_training,
)
evaluate = BranchPythonOperator(
task_id="evaluate_model",
python_callable=evaluate_model,
)
deploy_staging = BashOperator(
task_id="deploy_staging",
bash_command="kubectl apply -f k8s/staging-deployment.yml",
)
integration_test = BashOperator(
task_id="integration_test",
bash_command="pytest tests/integration/ -v --tb=short",
)
deploy_prod = BashOperator(
task_id="deploy_production",
bash_command="kubectl apply -f k8s/production-deployment.yml",
)
monitor = BranchPythonOperator(
task_id="monitoring_check",
python_callable=run_monitoring,
)
notify_fail = SlackWebhookOperator(
task_id="notify_failure",
slack_webhook_conn_id="slack",
message="MLOps Pipeline: Model evaluation failed. F1 below threshold.",
)
ingest >> features >> validate >> train >> evaluate
evaluate >> deploy_staging >> integration_test >> deploy_prod >> monitor
evaluate >> notify_fail
FAQ คำถามที่พบบ่อย
Q: MLOps กับ DataOps ต่างกันอย่างไร?
A: DataOps โฟกัสที่การจัดการ data pipelines ให้มีคุณภาพ เร็ว และ reliable ครอบคลุม data ingestion, transformation, quality, governance ส่วน MLOps โฟกัสที่ ML model lifecycle ตั้งแต่ training จนถึง production monitoring ทั้งสองทำงานร่วมกัน DataOps เตรียมข้อมูลให้ MLOps ใช้
Q: Feature Store จำเป็นไหม?
A: สำหรับทีมเล็กที่มี models ไม่กี่ตัวอาจไม่จำเป็น แต่เมื่อทีมโตขึ้นและมี models หลายตัวที่ใช้ features ร่วมกัน Feature Store ช่วยลด duplication ของ feature computation, ทำให้ online/offline features consistent และเป็น single source of truth สำหรับ features ที่ทุก model ใช้ร่วมกัน
Q: ควรใช้ MLflow หรือ Weights and Biases?
A: MLflow เป็น open source ติดตั้งเอง ฟรี มี Model Registry ในตัว เหมาะสำหรับทีมที่ต้องการ control เต็มที่ W&B เป็น cloud service ที่ UI ดีกว่า collaboration features ดีกว่า แต่มีค่าใช้จ่าย สำหรับ startup แนะนำเริ่มจาก MLflow แล้วเปลี่ยนเมื่อ scale ถ้าต้องการ
Q: Model monitoring ต้องดูอะไรบ้าง?
A: ต้อง monitor 4 ด้าน คือ Data Drift (input features เปลี่ยนจาก training data ไหม), Concept Drift (ความสัมพันธ์ระหว่าง features กับ target เปลี่ยนไหม), Model Performance (accuracy, F1, AUC ลดลงไหม) และ System Metrics (latency, throughput, error rate) ถ้าตรวจพบ drift ควร trigger retrain pipeline อัตโนมัติ
