SiamCafe · Blog
MLOps Pipeline กับ Batch Processing — วิธีสร้าง
บทความ

MLOps Pipeline กับ Batch Processing — วิธีสร้าง

เผยแพร่ 28 พฤษภาคม 2569

MLOps Pipeline คืออะไร

MLOps Pipeline กับ Batch Processing — วิธีสร้าง

MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning Lifecycle อย่างเป็นระบบ ตั้งแต่การดึงข้อมูล (Data Ingestion) การทำ Feature Engineering การ Train Model การ Evaluate ผลลัพธ์ การ Deploy Model ไปจนถึงการ Monitor Model Performance ในระบบ Production

Batch Processing Pipeline เป็น Pattern ที่พบบ่อยที่สุดใน MLOps เพราะงาน ML ส่วนใหญ่ไม่ต้องการผลลัพธ์แบบ Real-time เช่น การ Retrain Model ด้วยข้อมูลใหม่ทุกสัปดาห์ การ Generate Recommendations สำหรับผู้ใช้ทุกคนทุกวัน หรือการ Score ลูกค้าเพื่อทำ Marketing Campaign

สถาปัตยกรรม Batch MLOps Pipeline

  • Data Ingestion: ดึงข้อมูลจาก Database, API, S3 มาเก็บใน Data Lake
  • Data Validation: ตรวจสอบคุณภาพข้อมูล Schema, Missing Values, Drift
  • Feature Engineering: แปลงข้อมูลดิบเป็น Features สำหรับ Model
  • Model Training: Train Model ด้วย Features ใหม่ Log ไปยัง MLflow
  • Model Evaluation: เปรียบเทียบ Model ใหม่กับ Model เดิม
  • Model Registry: Promote Model ที่ดีกว่าไป Production
  • Batch Inference: ใช้ Model ทำ Prediction บนข้อมูลขนาดใหญ่
  • Result Storage: เก็บผลลัพธ์ใน Database หรือ Data Warehouse

Airflow DAG สำหรับ Batch ML Pipeline

# dags/ml_batch_pipeline.py — Airflow DAG สำหรับ ML Batch Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import mlflow
import json

default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["ml-team@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=10),
}

dag = DAG(
    "ml_batch_pipeline",
    default_args=default_args,
    description="Batch ML Pipeline: Data → Features → Train → Evaluate → Inference",
    schedule_interval="0 2 * * *",  # รันทุกวันตี 2
    start_date=days_ago(1),
    catchup=False,
    tags=["ml", "batch", "production"],
)

def validate_data(**context):
    """ตรวจสอบคุณภาพข้อมูล"""
    import pandas as pd
    from great_expectations.core import ExpectationSuite

    df = pd.read_parquet("s3://data-lake/raw/transactions/latest/")
    
    checks = {
        "row_count": len(df) > 1000,
        "no_nulls_in_id": df["customer_id"].notna().all(),
        "amount_positive": (df["amount"] > 0).all(),
        "date_range_valid": df["date"].max() >= pd.Timestamp.now() - pd.Timedelta(days=2),
    }
    
    failed = [k for k, v in checks.items() if not v]
    if failed:
        raise ValueError(f"Data Validation Failed: {failed}")
    
    context["ti"].xcom_push(key="row_count", value=len(df))
    return {"status": "passed", "rows": len(df)}

def train_model(**context):
    """Train Model และ Log ไปยัง MLflow"""
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import f1_score, precision_score, recall_score
    
    mlflow.set_tracking_uri("https://mlflow.company.com")
    mlflow.set_experiment("churn-prediction-daily")
    
    df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
    X = df.drop(columns=["customer_id", "churned"])
    y = df["churned"]
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    with mlflow.start_run(run_name=f"daily-{context['ds']}"):
        params = {
            "n_estimators": 200,
            "max_depth": 6,
            "learning_rate": 0.1,
            "min_samples_split": 20,
        }
        mlflow.log_params(params)
        
        model = GradientBoostingClassifier(**params, random_state=42)
        model.fit(X_train, y_train)
        
        y_pred = model.predict(X_test)
        metrics = {
            "f1": f1_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred),
            "recall": recall_score(y_test, y_pred),
            "train_size": len(X_train),
            "test_size": len(X_test),
        }
        mlflow.log_metrics(metrics)
        mlflow.sklearn.log_model(model, "model",
                                  registered_model_name="churn-predictor")
        
        context["ti"].xcom_push(key="f1_score", value=metrics["f1"])
        context["ti"].xcom_push(key="run_id", value=mlflow.active_run().info.run_id)

def evaluate_and_promote(**context):
    """เปรียบเทียบกับ Production Model แล้ว Promote ถ้าดีกว่า"""
    from mlflow.tracking import MlflowClient
    
    client = MlflowClient()
    new_f1 = context["ti"].xcom_pull(key="f1_score")
    
    # ดึง Production Model metrics
    prod_versions = client.get_latest_versions("churn-predictor", stages=["Production"])
    if prod_versions:
        prod_run = client.get_run(prod_versions[0].run_id)
        prod_f1 = float(prod_run.data.metrics.get("f1", 0))
    else:
        prod_f1 = 0
    
    if new_f1 > prod_f1 + 0.005:  # ต้องดีกว่าอย่างน้อย 0.5%
        new_version = client.get_latest_versions("churn-predictor", stages=["None"])[0]
        client.transition_model_version_stage(
            name="churn-predictor",
            version=new_version.version,
            stage="Production",
            archive_existing_versions=True,
        )
        return f"Promoted v{new_version.version}: F1 {new_f1:.4f} > {prod_f1:.4f}"
    return f"Kept current: New F1 {new_f1:.4f} <= Prod F1 {prod_f1:.4f}"

def batch_inference(**context):
    """รัน Batch Inference บนข้อมูลทั้งหมด"""
    import pandas as pd
    import mlflow
    
    model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
    df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
    
    X = df.drop(columns=["customer_id", "churned"])
    df["churn_probability"] = model.predict_proba(X)[:, 1]
    df["churn_prediction"] = (df["churn_probability"] >= 0.5).astype(int)
    
    # เก็บผลลัพธ์
    output = df[["customer_id", "churn_probability", "churn_prediction"]]
    output.to_parquet(f"s3://data-lake/predictions/churn/{context['ds']}/")
    
    # สรุปผล
    high_risk = (df["churn_probability"] >= 0.7).sum()
    return {"total": len(df), "high_risk": int(high_risk)}

# สร้าง Tasks
t_validate = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)

t_features = SparkSubmitOperator(
    task_id="feature_engineering",
    application="s3://ml-code/spark/feature_engineering.py",
    conn_id="spark_default",
    conf={"spark.executor.memory": "4g", "spark.executor.cores": "2"},
    dag=dag,
)

t_train = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
t_evaluate = PythonOperator(task_id="evaluate_promote", python_callable=evaluate_and_promote, dag=dag)
t_inference = PythonOperator(task_id="batch_inference", python_callable=batch_inference, dag=dag)

t_notify = SlackWebhookOperator(
    task_id="notify_slack",
    slack_webhook_conn_id="slack_ml",
    message="ML Pipeline completed: {{ ti.xcom_pull(task_ids='batch_inference') }}",
    dag=dag,
)

# กำหนดลำดับ
t_validate >> t_features >> t_train >> t_evaluate >> t_inference >> t_notify

Spark Job สำหรับ Feature Engineering

MLOps Pipeline กับ Batch Processing — วิธีสร้าง
# spark/feature_engineering.py — PySpark Feature Engineering
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta

spark = SparkSession.builder \
    .appName("ChurnFeatureEngineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# อ่านข้อมูล
transactions = spark.read.parquet("s3://data-lake/raw/transactions/")
customers = spark.read.parquet("s3://data-lake/raw/customers/")

# Feature 1: Transaction Aggregations (30 วัน, 90 วัน)
cutoff_30d = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
cutoff_90d = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")

txn_30d = transactions.filter(F.col("date") >= cutoff_30d).groupBy("customer_id").agg(
    F.count("*").alias("txn_count_30d"),
    F.sum("amount").alias("txn_amount_30d"),
    F.avg("amount").alias("txn_avg_30d"),
    F.max("amount").alias("txn_max_30d"),
    F.countDistinct("category").alias("txn_categories_30d"),
)

txn_90d = transactions.filter(F.col("date") >= cutoff_90d).groupBy("customer_id").agg(
    F.count("*").alias("txn_count_90d"),
    F.sum("amount").alias("txn_amount_90d"),
    F.avg("amount").alias("txn_avg_90d"),
)

# Feature 2: Recency (วันนับจาก Transaction ล่าสุด)
recency = transactions.groupBy("customer_id").agg(
    F.datediff(F.current_date(), F.max("date")).alias("days_since_last_txn"),
    F.datediff(F.max("date"), F.min("date")).alias("customer_tenure_days"),
)

# Feature 3: Trend (เปรียบเทียบ 30 วันล่าสุดกับ 30 วันก่อนหน้า)
txn_prev_30d = transactions.filter(
    (F.col("date") >= (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")) &
    (F.col("date") < cutoff_30d)
).groupBy("customer_id").agg(
    F.count("*").alias("txn_count_prev_30d"),
    F.sum("amount").alias("txn_amount_prev_30d"),
)

# รวม Features ทั้งหมด
features = customers.select("customer_id", "age", "gender", "registration_date") \
    .join(txn_30d, "customer_id", "left") \
    .join(txn_90d, "customer_id", "left") \
    .join(recency, "customer_id", "left") \
    .join(txn_prev_30d, "customer_id", "left")

# คำนวณ Derived Features
features = features.withColumn(
    "txn_trend", 
    F.when(F.col("txn_count_prev_30d") > 0,
           (F.col("txn_count_30d") - F.col("txn_count_prev_30d")) / F.col("txn_count_prev_30d")
    ).otherwise(0)
).fillna(0)

# เขียนผลลัพธ์
features.write.mode("overwrite").parquet("s3://data-lake/features/churn_features/latest/")

print(f"Features generated: {features.count()} rows, {len(features.columns)} columns")
spark.stop()

Docker และ Kubernetes สำหรับ Pipeline

# Dockerfile สำหรับ ML Pipeline Worker
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY configs/ ./configs/

ENV PYTHONPATH=/app
ENV MLFLOW_TRACKING_URI=https://mlflow.company.com

CMD ["python", "-m", "src.pipeline.run"]

---
# requirements.txt
apache-airflow==2.8.0
mlflow==2.10.0
scikit-learn==1.4.0
pandas==2.2.0
pyarrow==15.0.0
great-expectations==0.18.0
boto3==1.34.0

---
# kubernetes/ml-pipeline-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: ml-batch-inference
  namespace: ml-platform
spec:
  schedule: "0 3 * * *"  # ทุกวันตี 3
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 7
  failedJobsHistoryLimit: 3
  jobTemplate:
    spec:
      backoffLimit: 2
      activeDeadlineSeconds: 7200  # Timeout 2 ชั่วโมง
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: inference
              image: registry.company.com/ml-pipeline:latest
              command: ["python", "-m", "src.inference.batch_run"]
              resources:
                requests:
                  cpu: "2"
                  memory: 8Gi
                limits:
                  cpu: "4"
                  memory: 16Gi
              env:
                - name: S3_BUCKET
                  value: "data-lake"
                - name: MODEL_NAME
                  value: "churn-predictor"
                - name: MLFLOW_TRACKING_URI
                  valueFrom:
                    configMapKeyRef:
                      name: ml-config
                      key: mlflow-uri
              volumeMounts:
                - name: ml-secrets
                  mountPath: /secrets
                  readOnly: true
          volumes:
            - name: ml-secrets
              secret:
                secretName: ml-pipeline-secrets

Monitoring และ Alerting สำหรับ Batch Pipeline

  • Pipeline Health: ติดตาม Success/Failure Rate ของแต่ละ Task, Duration Trend และ SLA Compliance
  • Data Quality: ตรวจสอบ Row Count, Schema Changes, Distribution Drift ของ Input Data ทุกรอบ
  • Model Performance: ติดตาม Prediction Distribution, F1 Score Trend, Feature Importance Changes
  • Resource Usage: CPU, Memory, Disk ของ Pipeline Workers เพื่อ Right-sizing
  • Cost Tracking: คำนวณ Cost ต่อ Pipeline Run (Compute + Storage + Network)

MLOps Pipeline คืออะไร

MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning ตั้งแต่ Data Ingestion, Feature Engineering, Model Training, Evaluation, Deployment ไปจนถึง Monitoring นำหลักการ DevOps มาใช้กับ ML เพื่อให้ทำซ้ำได้ Scale ได้ และ Maintain ง่าย