MLOps Pipeline คืออะไร
MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning Lifecycle อย่างเป็นระบบ ตั้งแต่การดึงข้อมูล (Data Ingestion) การทำ Feature Engineering การ Train Model การ Evaluate ผลลัพธ์ การ Deploy Model ไปจนถึงการ Monitor Model Performance ในระบบ Production
Batch Processing Pipeline เป็น Pattern ที่พบบ่อยที่สุดใน MLOps เพราะงาน ML ส่วนใหญ่ไม่ต้องการผลลัพธ์แบบ Real-time เช่น การ Retrain Model ด้วยข้อมูลใหม่ทุกสัปดาห์ การ Generate Recommendations สำหรับผู้ใช้ทุกู้คืนทุกวัน หรือการ Score ลูกค้าเพื่อทำ Marketing Campaign
สถาปัตยกรรม Batch MLOps Pipeline
- Data Ingestion: ดึงข้อมูลจาก Database, API, S3 มาเก็บใน Data Lake
- Data Validation: ตรวจสอบคุณภาพข้อมูล Schema, Missing Values, Drift
- Feature Engineering: แปลงข้อมูลดิบเป็น Features สำหรับ Model
- Model Training: Train Model ด้วย Features ใหม่ Log ไปยัง MLflow
- Model Evaluation: เปรียบเทียบ Model ใหม่กับ Model เดิม
- Model Registry: Promote Model ที่ดีกว่าไป Production
- Batch Inference: ใช้ Model ทำ Prediction บนข้อมูลขนาดใหญ่
- Result Storage: เก็บผลลัพธ์ใน Database หรือ Data Warehouse
Airflow DAG สำหรับ Batch ML Pipeline
# dags/ml_batch_pipeline.py — Airflow DAG สำหรับ ML Batch Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import mlflow
import json
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["ml-team@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=10),
}
dag = DAG(
"ml_batch_pipeline",
default_args=default_args,
description="Batch ML Pipeline: Data → Features → Train → Evaluate → Inference",
schedule_interval="0 2 * * *", # รันทุกวันตี 2
start_date=days_ago(1),
catchup=False,
tags=["ml", "batch", "production"],
)
def validate_data(**context):
"""ตรวจสอบคุณภาพข้อมูล"""
import pandas as pd
from great_expectations.core import ExpectationSuite
df = pd.read_parquet("s3://data-lake/raw/transactions/latest/")
checks = {
"row_count": len(df) > 1000,
"no_nulls_in_id": df["customer_id"].notna().all(),
"amount_positive": (df["amount"] > 0).all(),
"date_range_valid": df["date"].max() >= pd.Timestamp.now() - pd.Timedelta(days=2),
}
failed = [k for k, v in checks.items() if not v]
if failed:
raise ValueError(f"Data Validation Failed: {failed}")
context["ti"].xcom_push(key="row_count", value=len(df))
return {"status": "passed", "rows": len(df)}
def train_model(**context):
"""Train Model และ Log ไปยัง MLflow"""
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score
mlflow.set_tracking_uri("https://mlflow.company.com")
mlflow.set_experiment("churn-prediction-daily")
df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
X = df.drop(columns=["customer_id", "churned"])
y = df["churned"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
with mlflow.start_run(run_name=f"daily-{context['ds']}"):
params = {
"n_estimators": 200,
"max_depth": 6,
"learning_rate": 0.1,
"min_samples_split": 20,
}
mlflow.log_params(params)
model = GradientBoostingClassifier(**params, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
metrics = {
"f1": f1_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"train_size": len(X_train),
"test_size": len(X_test),
}
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, "model",
registered_model_name="churn-predictor")
context["ti"].xcom_push(key="f1_score", value=metrics["f1"])
context["ti"].xcom_push(key="run_id", value=mlflow.active_run().info.run_id)
def evaluate_and_promote(**context):
"""เปรียบเทียบกับ Production Model แล้ว Promote ถ้าดีกว่า"""
from mlflow.tracking import MlflowClient
client = MlflowClient()
new_f1 = context["ti"].xcom_pull(key="f1_score")
# ดึง Production Model metrics
prod_versions = client.get_latest_versions("churn-predictor", stages=["Production"])
if prod_versions:
prod_run = client.get_run(prod_versions[0].run_id)
prod_f1 = float(prod_run.data.metrics.get("f1", 0))
else:
prod_f1 = 0
if new_f1 > prod_f1 + 0.005: # ต้องดีกว่าอย่างน้อย 0.5%
new_version = client.get_latest_versions("churn-predictor", stages=["None"])[0]
client.transition_model_version_stage(
name="churn-predictor",
version=new_version.version,
stage="Production",
archive_existing_versions=True,
)
return f"Promoted v{new_version.version}: F1 {new_f1:.4f} > {prod_f1:.4f}"
return f"Kept current: New F1 {new_f1:.4f} <= Prod F1 {prod_f1:.4f}"
def batch_inference(**context):
"""รัน Batch Inference บนข้อมูลทั้งหมด"""
import pandas as pd
import mlflow
model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
X = df.drop(columns=["customer_id", "churned"])
df["churn_probability"] = model.predict_proba(X)[:, 1]
df["churn_prediction"] = (df["churn_probability"] >= 0.5).astype(int)
# เก็บผลลัพธ์
output = df[["customer_id", "churn_probability", "churn_prediction"]]
output.to_parquet(f"s3://data-lake/predictions/churn/{context['ds']}/")
# สรุปผล
high_risk = (df["churn_probability"] >= 0.7).sum()
return {"total": len(df), "high_risk": int(high_risk)}
# สร้าง Tasks
t_validate = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)
t_features = SparkSubmitOperator(
task_id="feature_engineering",
application="s3://ml-code/spark/feature_engineering.py",
conn_id="spark_default",
conf={"spark.executor.memory": "4g", "spark.executor.cores": "2"},
dag=dag,
)
t_train = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
t_evaluate = PythonOperator(task_id="evaluate_promote", python_callable=evaluate_and_promote, dag=dag)
t_inference = PythonOperator(task_id="batch_inference", python_callable=batch_inference, dag=dag)
t_notify = SlackWebhookOperator(
task_id="notify_slack",
slack_webhook_conn_id="slack_ml",
message="ML Pipeline completed: {{ ti.xcom_pull(task_ids='batch_inference') }}",
dag=dag,
)
# กำหนดลำดับ
t_validate >> t_features >> t_train >> t_evaluate >> t_inference >> t_notify
Spark Job สำหรับ Feature Engineering
# spark/feature_engineering.py — PySpark Feature Engineering
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
spark = SparkSession.builder \
.appName("ChurnFeatureEngineering") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# อ่านข้อมูล
transactions = spark.read.parquet("s3://data-lake/raw/transactions/")
customers = spark.read.parquet("s3://data-lake/raw/customers/")
# Feature 1: Transaction Aggregations (30 วัน, 90 วัน)
cutoff_30d = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
cutoff_90d = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
txn_30d = transactions.filter(F.col("date") >= cutoff_30d).groupBy("customer_id").agg(
F.count("*").alias("txn_count_30d"),
F.sum("amount").alias("txn_amount_30d"),
F.avg("amount").alias("txn_avg_30d"),
F.max("amount").alias("txn_max_30d"),
F.countDistinct("category").alias("txn_categories_30d"),
)
txn_90d = transactions.filter(F.col("date") >= cutoff_90d).groupBy("customer_id").agg(
F.count("*").alias("txn_count_90d"),
F.sum("amount").alias("txn_amount_90d"),
F.avg("amount").alias("txn_avg_90d"),
)
# Feature 2: Recency (วันนับจาก Transaction ล่าสุด)
recency = transactions.groupBy("customer_id").agg(
F.datediff(F.current_date(), F.max("date")).alias("days_since_last_txn"),
F.datediff(F.max("date"), F.min("date")).alias("customer_tenure_days"),
)
# Feature 3: Trend (เปรียบเทียบ 30 วันล่าสุดกับ 30 วันก่อนหน้า)
txn_prev_30d = transactions.filter(
(F.col("date") >= (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")) &
(F.col("date") < cutoff_30d)
).groupBy("customer_id").agg(
F.count("*").alias("txn_count_prev_30d"),
F.sum("amount").alias("txn_amount_prev_30d"),
)
# รวม Features ทั้งหมด
features = customers.select("customer_id", "age", "gender", "registration_date") \
.join(txn_30d, "customer_id", "left") \
.join(txn_90d, "customer_id", "left") \
.join(recency, "customer_id", "left") \
.join(txn_prev_30d, "customer_id", "left")
# คำนวณ Derived Features
features = features.withColumn(
"txn_trend",
F.when(F.col("txn_count_prev_30d") > 0,
(F.col("txn_count_30d") - F.col("txn_count_prev_30d")) / F.col("txn_count_prev_30d")
).otherwise(0)
).fillna(0)
# เขียนผลลัพธ์
features.write.mode("overwrite").parquet("s3://data-lake/features/churn_features/latest/")
print(f"Features generated: {features.count()} rows, {len(features.columns)} columns")
spark.stop()
Docker และ Kubernetes สำหรับ Pipeline
# Dockerfile สำหรับ ML Pipeline Worker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY configs/ ./configs/
ENV PYTHONPATH=/app
ENV MLFLOW_TRACKING_URI=https://mlflow.company.com
CMD ["python", "-m", "src.pipeline.run"]
---
# requirements.txt
apache-airflow==2.8.0
mlflow==2.10.0
scikit-learn==1.4.0
pandas==2.2.0
pyarrow==15.0.0
great-expectations==0.18.0
boto3==1.34.0
---
# kubernetes/ml-pipeline-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: ml-batch-inference
namespace: ml-platform
spec:
schedule: "0 3 * * *" # ทุกวันตี 3
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 7
failedJobsHistoryLimit: 3
jobTemplate:
spec:
backoffLimit: 2
activeDeadlineSeconds: 7200 # Timeout 2 ชั่วโมง
template:
spec:
restartPolicy: Never
containers:
- name: inference
image: registry.company.com/ml-pipeline:latest
command: ["python", "-m", "src.inference.batch_run"]
resources:
requests:
cpu: "2"
memory: 8Gi
limits:
cpu: "4"
memory: 16Gi
env:
- name: S3_BUCKET
value: "data-lake"
- name: MODEL_NAME
value: "churn-predictor"
- name: MLFLOW_TRACKING_URI
valueFrom:
configMapKeyRef:
name: ml-config
key: mlflow-uri
volumeMounts:
- name: ml-secrets
mountPath: /secrets
readOnly: true
volumes:
- name: ml-secrets
secret:
secretName: ml-pipeline-secrets
Monitoring และ Alerting สำหรับ Batch Pipeline
- Pipeline Health: ติดตาม Success/Failure Rate ของแต่ละ Task, Duration Trend และ SLA Compliance
- Data Quality: ตรวจสอบ Row Count, Schema Changes, Distribution Drift ของ Input Data ทุกรอบ
- Model Performance: ติดตาม Prediction Distribution, F1 Score Trend, Feature Importance Changes
- Resource Usage: CPU, Memory, Disk ของ Pipeline Workers เพื่อ Right-sizing
- Cost Tracking: คำนวณ Cost ต่อ Pipeline Run (Compute + Storage + Network)
MLOps Pipeline คืออะไร
MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning ตั้งแต่ Data Ingestion, Feature Engineering, Model Training, Evaluation, Deployment ไปจนถึง Monitoring นำหลักการ DevOps มาใช้กับ ML เพื่อให้ทำซ้ำได้ Scale ได้ และ Maintain ง่าย
Batch Processing ต่างจาก Real-time Processing อย่างไร
Batch Processing ประมวลผลข้อมูลเป็นชุดใหญ่ตามรอบเวลา เช่น ทุกวันหรือทุกชั่วโมง เหมาะกับ Model Training และ Batch Inference ส่วน Real-time ประมวลผลทันทีที่ข้อมูลเข้ามา เหมาะกับ Fraud Detection, Real-time Recommendation Batch ง่ายกว่าในการจัดการและ Debug
เครื่องมือที่ใช้สร้าง MLOps Batch Pipeline มีอะไรบ้าง
Orchestration: Apache Airflow หรือ Prefect สำหรับจัดลำดับ Tasks, Data Processing: Apache Spark หรือ Dask สำหรับข้อมูลใหญ่, Feature Store: Feast, Model Tracking: MLflow, Containerization: Docker + Kubernetes สำหรับ Scheduling และ Scaling
ควรรัน Batch Pipeline บ่อยแค่ไหน
ขึ้นอยู่กับ Use Case: Model Retraining รันสัปดาห์ละครั้งหรือเดือนละครั้ง, Batch Inference เช่น Recommendation รันทุกวัน, Feature Engineering รันทุกชั่วโมง พิจารณาจากความถี่ที่ข้อมูลเปลี่ยน, Business SLA และ Cost ของ Compute Resources
สรุปและแนวทางปฏิบัติ
MLOps Batch Pipeline เป็น Pattern ที่จำเป็นสำหรับทุกองค์กรที่ใช้ ML ใน Production การใช้ Airflow สำหรับ Orchestration, Spark สำหรับ Feature Engineering ขนาดใหญ่, MLflow สำหรับ Experiment Tracking และ Model Registry และ Kubernetes สำหรับ Scheduling จะทำให้ Pipeline มีความ Reliable, Reproducible และ Scalable สิ่งสำคัญคือ Data Validation ทุกรอบ, Model Evaluation ก่อน Promote และ Monitoring หลัง Deploy เพื่อจับ Data Drift และ Performance Degradation ได้เร็ว
