
MLOps Pipeline กับ Batch Processing — วิธีสร้าง
MLOps Pipeline คืออะไร

MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning Lifecycle อย่างเป็นระบบ ตั้งแต่การดึงข้อมูล (Data Ingestion) การทำ Feature Engineering การ Train Model การ Evaluate ผลลัพธ์ การ Deploy Model ไปจนถึงการ Monitor Model Performance ในระบบ Production
Batch Processing Pipeline เป็น Pattern ที่พบบ่อยที่สุดใน MLOps เพราะงาน ML ส่วนใหญ่ไม่ต้องการผลลัพธ์แบบ Real-time เช่น การ Retrain Model ด้วยข้อมูลใหม่ทุกสัปดาห์ การ Generate Recommendations สำหรับผู้ใช้ทุกคนทุกวัน หรือการ Score ลูกค้าเพื่อทำ Marketing Campaign
สถาปัตยกรรม Batch MLOps Pipeline
- Data Ingestion: ดึงข้อมูลจาก Database, API, S3 มาเก็บใน Data Lake
- Data Validation: ตรวจสอบคุณภาพข้อมูล Schema, Missing Values, Drift
- Feature Engineering: แปลงข้อมูลดิบเป็น Features สำหรับ Model
- Model Training: Train Model ด้วย Features ใหม่ Log ไปยัง MLflow
- Model Evaluation: เปรียบเทียบ Model ใหม่กับ Model เดิม
- Model Registry: Promote Model ที่ดีกว่าไป Production
- Batch Inference: ใช้ Model ทำ Prediction บนข้อมูลขนาดใหญ่
- Result Storage: เก็บผลลัพธ์ใน Database หรือ Data Warehouse
Airflow DAG สำหรับ Batch ML Pipeline
# dags/ml_batch_pipeline.py — Airflow DAG สำหรับ ML Batch Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import mlflow
import json
default_args = {
"owner": "ml-team",
"depends_on_past": False,
"email_on_failure": True,
"email": ["ml-team@company.com"],
"retries": 2,
"retry_delay": timedelta(minutes=10),
}
dag = DAG(
"ml_batch_pipeline",
default_args=default_args,
description="Batch ML Pipeline: Data → Features → Train → Evaluate → Inference",
schedule_interval="0 2 * * *", # รันทุกวันตี 2
start_date=days_ago(1),
catchup=False,
tags=["ml", "batch", "production"],
)
def validate_data(**context):
"""ตรวจสอบคุณภาพข้อมูล"""
import pandas as pd
from great_expectations.core import ExpectationSuite
df = pd.read_parquet("s3://data-lake/raw/transactions/latest/")
checks = {
"row_count": len(df) > 1000,
"no_nulls_in_id": df["customer_id"].notna().all(),
"amount_positive": (df["amount"] > 0).all(),
"date_range_valid": df["date"].max() >= pd.Timestamp.now() - pd.Timedelta(days=2),
}
failed = [k for k, v in checks.items() if not v]
if failed:
raise ValueError(f"Data Validation Failed: {failed}")
context["ti"].xcom_push(key="row_count", value=len(df))
return {"status": "passed", "rows": len(df)}
def train_model(**context):
"""Train Model และ Log ไปยัง MLflow"""
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, precision_score, recall_score
mlflow.set_tracking_uri("https://mlflow.company.com")
mlflow.set_experiment("churn-prediction-daily")
df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
X = df.drop(columns=["customer_id", "churned"])
y = df["churned"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
with mlflow.start_run(run_name=f"daily-{context['ds']}"):
params = {
"n_estimators": 200,
"max_depth": 6,
"learning_rate": 0.1,
"min_samples_split": 20,
}
mlflow.log_params(params)
model = GradientBoostingClassifier(**params, random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
metrics = {
"f1": f1_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred),
"recall": recall_score(y_test, y_pred),
"train_size": len(X_train),
"test_size": len(X_test),
}
mlflow.log_metrics(metrics)
mlflow.sklearn.log_model(model, "model",
registered_model_name="churn-predictor")
context["ti"].xcom_push(key="f1_score", value=metrics["f1"])
context["ti"].xcom_push(key="run_id", value=mlflow.active_run().info.run_id)
def evaluate_and_promote(**context):
"""เปรียบเทียบกับ Production Model แล้ว Promote ถ้าดีกว่า"""
from mlflow.tracking import MlflowClient
client = MlflowClient()
new_f1 = context["ti"].xcom_pull(key="f1_score")
# ดึง Production Model metrics
prod_versions = client.get_latest_versions("churn-predictor", stages=["Production"])
if prod_versions:
prod_run = client.get_run(prod_versions[0].run_id)
prod_f1 = float(prod_run.data.metrics.get("f1", 0))
else:
prod_f1 = 0
if new_f1 > prod_f1 + 0.005: # ต้องดีกว่าอย่างน้อย 0.5%
new_version = client.get_latest_versions("churn-predictor", stages=["None"])[0]
client.transition_model_version_stage(
name="churn-predictor",
version=new_version.version,
stage="Production",
archive_existing_versions=True,
)
return f"Promoted v{new_version.version}: F1 {new_f1:.4f} > {prod_f1:.4f}"
return f"Kept current: New F1 {new_f1:.4f} <= Prod F1 {prod_f1:.4f}"
def batch_inference(**context):
"""รัน Batch Inference บนข้อมูลทั้งหมด"""
import pandas as pd
import mlflow
model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
X = df.drop(columns=["customer_id", "churned"])
df["churn_probability"] = model.predict_proba(X)[:, 1]
df["churn_prediction"] = (df["churn_probability"] >= 0.5).astype(int)
# เก็บผลลัพธ์
output = df[["customer_id", "churn_probability", "churn_prediction"]]
output.to_parquet(f"s3://data-lake/predictions/churn/{context['ds']}/")
# สรุปผล
high_risk = (df["churn_probability"] >= 0.7).sum()
return {"total": len(df), "high_risk": int(high_risk)}
# สร้าง Tasks
t_validate = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)
t_features = SparkSubmitOperator(
task_id="feature_engineering",
application="s3://ml-code/spark/feature_engineering.py",
conn_id="spark_default",
conf={"spark.executor.memory": "4g", "spark.executor.cores": "2"},
dag=dag,
)
t_train = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
t_evaluate = PythonOperator(task_id="evaluate_promote", python_callable=evaluate_and_promote, dag=dag)
t_inference = PythonOperator(task_id="batch_inference", python_callable=batch_inference, dag=dag)
t_notify = SlackWebhookOperator(
task_id="notify_slack",
slack_webhook_conn_id="slack_ml",
message="ML Pipeline completed: {{ ti.xcom_pull(task_ids='batch_inference') }}",
dag=dag,
)
# กำหนดลำดับ
t_validate >> t_features >> t_train >> t_evaluate >> t_inference >> t_notify
Spark Job สำหรับ Feature Engineering

# spark/feature_engineering.py — PySpark Feature Engineering
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta
spark = SparkSession.builder \
.appName("ChurnFeatureEngineering") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# อ่านข้อมูล
transactions = spark.read.parquet("s3://data-lake/raw/transactions/")
customers = spark.read.parquet("s3://data-lake/raw/customers/")
# Feature 1: Transaction Aggregations (30 วัน, 90 วัน)
cutoff_30d = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
cutoff_90d = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")
txn_30d = transactions.filter(F.col("date") >= cutoff_30d).groupBy("customer_id").agg(
F.count("*").alias("txn_count_30d"),
F.sum("amount").alias("txn_amount_30d"),
F.avg("amount").alias("txn_avg_30d"),
F.max("amount").alias("txn_max_30d"),
F.countDistinct("category").alias("txn_categories_30d"),
)
txn_90d = transactions.filter(F.col("date") >= cutoff_90d).groupBy("customer_id").agg(
F.count("*").alias("txn_count_90d"),
F.sum("amount").alias("txn_amount_90d"),
F.avg("amount").alias("txn_avg_90d"),
)
# Feature 2: Recency (วันนับจาก Transaction ล่าสุด)
recency = transactions.groupBy("customer_id").agg(
F.datediff(F.current_date(), F.max("date")).alias("days_since_last_txn"),
F.datediff(F.max("date"), F.min("date")).alias("customer_tenure_days"),
)
# Feature 3: Trend (เปรียบเทียบ 30 วันล่าสุดกับ 30 วันก่อนหน้า)
txn_prev_30d = transactions.filter(
(F.col("date") >= (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")) &
(F.col("date") < cutoff_30d)
).groupBy("customer_id").agg(
F.count("*").alias("txn_count_prev_30d"),
F.sum("amount").alias("txn_amount_prev_30d"),
)
# รวม Features ทั้งหมด
features = customers.select("customer_id", "age", "gender", "registration_date") \
.join(txn_30d, "customer_id", "left") \
.join(txn_90d, "customer_id", "left") \
.join(recency, "customer_id", "left") \
.join(txn_prev_30d, "customer_id", "left")
# คำนวณ Derived Features
features = features.withColumn(
"txn_trend",
F.when(F.col("txn_count_prev_30d") > 0,
(F.col("txn_count_30d") - F.col("txn_count_prev_30d")) / F.col("txn_count_prev_30d")
).otherwise(0)
).fillna(0)
# เขียนผลลัพธ์
features.write.mode("overwrite").parquet("s3://data-lake/features/churn_features/latest/")
print(f"Features generated: {features.count()} rows, {len(features.columns)} columns")
spark.stop()
Docker และ Kubernetes สำหรับ Pipeline
# Dockerfile สำหรับ ML Pipeline Worker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY src/ ./src/
COPY configs/ ./configs/
ENV PYTHONPATH=/app
ENV MLFLOW_TRACKING_URI=https://mlflow.company.com
CMD ["python", "-m", "src.pipeline.run"]
---
# requirements.txt
apache-airflow==2.8.0
mlflow==2.10.0
scikit-learn==1.4.0
pandas==2.2.0
pyarrow==15.0.0
great-expectations==0.18.0
boto3==1.34.0
---
# kubernetes/ml-pipeline-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: ml-batch-inference
namespace: ml-platform
spec:
schedule: "0 3 * * *" # ทุกวันตี 3
concurrencyPolicy: Forbid
successfulJobsHistoryLimit: 7
failedJobsHistoryLimit: 3
jobTemplate:
spec:
backoffLimit: 2
activeDeadlineSeconds: 7200 # Timeout 2 ชั่วโมง
template:
spec:
restartPolicy: Never
containers:
- name: inference
image: registry.company.com/ml-pipeline:latest
command: ["python", "-m", "src.inference.batch_run"]
resources:
requests:
cpu: "2"
memory: 8Gi
limits:
cpu: "4"
memory: 16Gi
env:
- name: S3_BUCKET
value: "data-lake"
- name: MODEL_NAME
value: "churn-predictor"
- name: MLFLOW_TRACKING_URI
valueFrom:
configMapKeyRef:
name: ml-config
key: mlflow-uri
volumeMounts:
- name: ml-secrets
mountPath: /secrets
readOnly: true
volumes:
- name: ml-secrets
secret:
secretName: ml-pipeline-secrets
Monitoring และ Alerting สำหรับ Batch Pipeline
- Pipeline Health: ติดตาม Success/Failure Rate ของแต่ละ Task, Duration Trend และ SLA Compliance
- Data Quality: ตรวจสอบ Row Count, Schema Changes, Distribution Drift ของ Input Data ทุกรอบ
- Model Performance: ติดตาม Prediction Distribution, F1 Score Trend, Feature Importance Changes
- Resource Usage: CPU, Memory, Disk ของ Pipeline Workers เพื่อ Right-sizing
- Cost Tracking: คำนวณ Cost ต่อ Pipeline Run (Compute + Storage + Network)
MLOps Pipeline คืออะไร
MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning ตั้งแต่ Data Ingestion, Feature Engineering, Model Training, Evaluation, Deployment ไปจนถึง Monitoring นำหลักการ DevOps มาใช้กับ ML เพื่อให้ทำซ้ำได้ Scale ได้ และ Maintain ง่าย