SiamCafe.net Blog
Technology

MLOps Pipeline Batch Processing Pipeline

mlops pipeline batch processing pipeline
MLOps Pipeline Batch Processing Pipeline | SiamCafe Blog
2025-11-06· อ. บอม — SiamCafe.net· 8,466 คำ

MLOps Pipeline คืออะไร

MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning Lifecycle อย่างเป็นระบบ ตั้งแต่การดึงข้อมูล (Data Ingestion) การทำ Feature Engineering การ Train Model การ Evaluate ผลลัพธ์ การ Deploy Model ไปจนถึงการ Monitor Model Performance ในระบบ Production

Batch Processing Pipeline เป็น Pattern ที่พบบ่อยที่สุดใน MLOps เพราะงาน ML ส่วนใหญ่ไม่ต้องการผลลัพธ์แบบ Real-time เช่น การ Retrain Model ด้วยข้อมูลใหม่ทุกสัปดาห์ การ Generate Recommendations สำหรับผู้ใช้ทุกู้คืนทุกวัน หรือการ Score ลูกค้าเพื่อทำ Marketing Campaign

สถาปัตยกรรม Batch MLOps Pipeline

Airflow DAG สำหรับ Batch ML Pipeline

# dags/ml_batch_pipeline.py — Airflow DAG สำหรับ ML Batch Pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import mlflow
import json

default_args = {
    "owner": "ml-team",
    "depends_on_past": False,
    "email_on_failure": True,
    "email": ["ml-team@company.com"],
    "retries": 2,
    "retry_delay": timedelta(minutes=10),
}

dag = DAG(
    "ml_batch_pipeline",
    default_args=default_args,
    description="Batch ML Pipeline: Data → Features → Train → Evaluate → Inference",
    schedule_interval="0 2 * * *",  # รันทุกวันตี 2
    start_date=days_ago(1),
    catchup=False,
    tags=["ml", "batch", "production"],
)

def validate_data(**context):
    """ตรวจสอบคุณภาพข้อมูล"""
    import pandas as pd
    from great_expectations.core import ExpectationSuite

    df = pd.read_parquet("s3://data-lake/raw/transactions/latest/")
    
    checks = {
        "row_count": len(df) > 1000,
        "no_nulls_in_id": df["customer_id"].notna().all(),
        "amount_positive": (df["amount"] > 0).all(),
        "date_range_valid": df["date"].max() >= pd.Timestamp.now() - pd.Timedelta(days=2),
    }
    
    failed = [k for k, v in checks.items() if not v]
    if failed:
        raise ValueError(f"Data Validation Failed: {failed}")
    
    context["ti"].xcom_push(key="row_count", value=len(df))
    return {"status": "passed", "rows": len(df)}

def train_model(**context):
    """Train Model และ Log ไปยัง MLflow"""
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import f1_score, precision_score, recall_score
    
    mlflow.set_tracking_uri("https://mlflow.company.com")
    mlflow.set_experiment("churn-prediction-daily")
    
    df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
    X = df.drop(columns=["customer_id", "churned"])
    y = df["churned"]
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42, stratify=y
    )
    
    with mlflow.start_run(run_name=f"daily-{context['ds']}"):
        params = {
            "n_estimators": 200,
            "max_depth": 6,
            "learning_rate": 0.1,
            "min_samples_split": 20,
        }
        mlflow.log_params(params)
        
        model = GradientBoostingClassifier(**params, random_state=42)
        model.fit(X_train, y_train)
        
        y_pred = model.predict(X_test)
        metrics = {
            "f1": f1_score(y_test, y_pred),
            "precision": precision_score(y_test, y_pred),
            "recall": recall_score(y_test, y_pred),
            "train_size": len(X_train),
            "test_size": len(X_test),
        }
        mlflow.log_metrics(metrics)
        mlflow.sklearn.log_model(model, "model",
                                  registered_model_name="churn-predictor")
        
        context["ti"].xcom_push(key="f1_score", value=metrics["f1"])
        context["ti"].xcom_push(key="run_id", value=mlflow.active_run().info.run_id)

def evaluate_and_promote(**context):
    """เปรียบเทียบกับ Production Model แล้ว Promote ถ้าดีกว่า"""
    from mlflow.tracking import MlflowClient
    
    client = MlflowClient()
    new_f1 = context["ti"].xcom_pull(key="f1_score")
    
    # ดึง Production Model metrics
    prod_versions = client.get_latest_versions("churn-predictor", stages=["Production"])
    if prod_versions:
        prod_run = client.get_run(prod_versions[0].run_id)
        prod_f1 = float(prod_run.data.metrics.get("f1", 0))
    else:
        prod_f1 = 0
    
    if new_f1 > prod_f1 + 0.005:  # ต้องดีกว่าอย่างน้อย 0.5%
        new_version = client.get_latest_versions("churn-predictor", stages=["None"])[0]
        client.transition_model_version_stage(
            name="churn-predictor",
            version=new_version.version,
            stage="Production",
            archive_existing_versions=True,
        )
        return f"Promoted v{new_version.version}: F1 {new_f1:.4f} > {prod_f1:.4f}"
    return f"Kept current: New F1 {new_f1:.4f} <= Prod F1 {prod_f1:.4f}"

def batch_inference(**context):
    """รัน Batch Inference บนข้อมูลทั้งหมด"""
    import pandas as pd
    import mlflow
    
    model = mlflow.sklearn.load_model("models:/churn-predictor/Production")
    df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")
    
    X = df.drop(columns=["customer_id", "churned"])
    df["churn_probability"] = model.predict_proba(X)[:, 1]
    df["churn_prediction"] = (df["churn_probability"] >= 0.5).astype(int)
    
    # เก็บผลลัพธ์
    output = df[["customer_id", "churn_probability", "churn_prediction"]]
    output.to_parquet(f"s3://data-lake/predictions/churn/{context['ds']}/")
    
    # สรุปผล
    high_risk = (df["churn_probability"] >= 0.7).sum()
    return {"total": len(df), "high_risk": int(high_risk)}

# สร้าง Tasks
t_validate = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)

t_features = SparkSubmitOperator(
    task_id="feature_engineering",
    application="s3://ml-code/spark/feature_engineering.py",
    conn_id="spark_default",
    conf={"spark.executor.memory": "4g", "spark.executor.cores": "2"},
    dag=dag,
)

t_train = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)
t_evaluate = PythonOperator(task_id="evaluate_promote", python_callable=evaluate_and_promote, dag=dag)
t_inference = PythonOperator(task_id="batch_inference", python_callable=batch_inference, dag=dag)

t_notify = SlackWebhookOperator(
    task_id="notify_slack",
    slack_webhook_conn_id="slack_ml",
    message="ML Pipeline completed: {{ ti.xcom_pull(task_ids='batch_inference') }}",
    dag=dag,
)

# กำหนดลำดับ
t_validate >> t_features >> t_train >> t_evaluate >> t_inference >> t_notify

Spark Job สำหรับ Feature Engineering

# spark/feature_engineering.py — PySpark Feature Engineering
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from datetime import datetime, timedelta

spark = SparkSession.builder \
    .appName("ChurnFeatureEngineering") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

# อ่านข้อมูล
transactions = spark.read.parquet("s3://data-lake/raw/transactions/")
customers = spark.read.parquet("s3://data-lake/raw/customers/")

# Feature 1: Transaction Aggregations (30 วัน, 90 วัน)
cutoff_30d = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
cutoff_90d = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")

txn_30d = transactions.filter(F.col("date") >= cutoff_30d).groupBy("customer_id").agg(
    F.count("*").alias("txn_count_30d"),
    F.sum("amount").alias("txn_amount_30d"),
    F.avg("amount").alias("txn_avg_30d"),
    F.max("amount").alias("txn_max_30d"),
    F.countDistinct("category").alias("txn_categories_30d"),
)

txn_90d = transactions.filter(F.col("date") >= cutoff_90d).groupBy("customer_id").agg(
    F.count("*").alias("txn_count_90d"),
    F.sum("amount").alias("txn_amount_90d"),
    F.avg("amount").alias("txn_avg_90d"),
)

# Feature 2: Recency (วันนับจาก Transaction ล่าสุด)
recency = transactions.groupBy("customer_id").agg(
    F.datediff(F.current_date(), F.max("date")).alias("days_since_last_txn"),
    F.datediff(F.max("date"), F.min("date")).alias("customer_tenure_days"),
)

# Feature 3: Trend (เปรียบเทียบ 30 วันล่าสุดกับ 30 วันก่อนหน้า)
txn_prev_30d = transactions.filter(
    (F.col("date") >= (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")) &
    (F.col("date") < cutoff_30d)
).groupBy("customer_id").agg(
    F.count("*").alias("txn_count_prev_30d"),
    F.sum("amount").alias("txn_amount_prev_30d"),
)

# รวม Features ทั้งหมด
features = customers.select("customer_id", "age", "gender", "registration_date") \
    .join(txn_30d, "customer_id", "left") \
    .join(txn_90d, "customer_id", "left") \
    .join(recency, "customer_id", "left") \
    .join(txn_prev_30d, "customer_id", "left")

# คำนวณ Derived Features
features = features.withColumn(
    "txn_trend", 
    F.when(F.col("txn_count_prev_30d") > 0,
           (F.col("txn_count_30d") - F.col("txn_count_prev_30d")) / F.col("txn_count_prev_30d")
    ).otherwise(0)
).fillna(0)

# เขียนผลลัพธ์
features.write.mode("overwrite").parquet("s3://data-lake/features/churn_features/latest/")

print(f"Features generated: {features.count()} rows, {len(features.columns)} columns")
spark.stop()

Docker และ Kubernetes สำหรับ Pipeline

# Dockerfile สำหรับ ML Pipeline Worker
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY src/ ./src/
COPY configs/ ./configs/

ENV PYTHONPATH=/app
ENV MLFLOW_TRACKING_URI=https://mlflow.company.com

CMD ["python", "-m", "src.pipeline.run"]

---
# requirements.txt
apache-airflow==2.8.0
mlflow==2.10.0
scikit-learn==1.4.0
pandas==2.2.0
pyarrow==15.0.0
great-expectations==0.18.0
boto3==1.34.0

---
# kubernetes/ml-pipeline-cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
  name: ml-batch-inference
  namespace: ml-platform
spec:
  schedule: "0 3 * * *"  # ทุกวันตี 3
  concurrencyPolicy: Forbid
  successfulJobsHistoryLimit: 7
  failedJobsHistoryLimit: 3
  jobTemplate:
    spec:
      backoffLimit: 2
      activeDeadlineSeconds: 7200  # Timeout 2 ชั่วโมง
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: inference
              image: registry.company.com/ml-pipeline:latest
              command: ["python", "-m", "src.inference.batch_run"]
              resources:
                requests:
                  cpu: "2"
                  memory: 8Gi
                limits:
                  cpu: "4"
                  memory: 16Gi
              env:
                - name: S3_BUCKET
                  value: "data-lake"
                - name: MODEL_NAME
                  value: "churn-predictor"
                - name: MLFLOW_TRACKING_URI
                  valueFrom:
                    configMapKeyRef:
                      name: ml-config
                      key: mlflow-uri
              volumeMounts:
                - name: ml-secrets
                  mountPath: /secrets
                  readOnly: true
          volumes:
            - name: ml-secrets
              secret:
                secretName: ml-pipeline-secrets

Monitoring และ Alerting สำหรับ Batch Pipeline

MLOps Pipeline คืออะไร

MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning ตั้งแต่ Data Ingestion, Feature Engineering, Model Training, Evaluation, Deployment ไปจนถึง Monitoring นำหลักการ DevOps มาใช้กับ ML เพื่อให้ทำซ้ำได้ Scale ได้ และ Maintain ง่าย

Batch Processing ต่างจาก Real-time Processing อย่างไร

Batch Processing ประมวลผลข้อมูลเป็นชุดใหญ่ตามรอบเวลา เช่น ทุกวันหรือทุกชั่วโมง เหมาะกับ Model Training และ Batch Inference ส่วน Real-time ประมวลผลทันทีที่ข้อมูลเข้ามา เหมาะกับ Fraud Detection, Real-time Recommendation Batch ง่ายกว่าในการจัดการและ Debug

เครื่องมือที่ใช้สร้าง MLOps Batch Pipeline มีอะไรบ้าง

Orchestration: Apache Airflow หรือ Prefect สำหรับจัดลำดับ Tasks, Data Processing: Apache Spark หรือ Dask สำหรับข้อมูลใหญ่, Feature Store: Feast, Model Tracking: MLflow, Containerization: Docker + Kubernetes สำหรับ Scheduling และ Scaling

ควรรัน Batch Pipeline บ่อยแค่ไหน

ขึ้นอยู่กับ Use Case: Model Retraining รันสัปดาห์ละครั้งหรือเดือนละครั้ง, Batch Inference เช่น Recommendation รันทุกวัน, Feature Engineering รันทุกชั่วโมง พิจารณาจากความถี่ที่ข้อมูลเปลี่ยน, Business SLA และ Cost ของ Compute Resources

สรุปและแนวทางปฏิบัติ

MLOps Batch Pipeline เป็น Pattern ที่จำเป็นสำหรับทุกองค์กรที่ใช้ ML ใน Production การใช้ Airflow สำหรับ Orchestration, Spark สำหรับ Feature Engineering ขนาดใหญ่, MLflow สำหรับ Experiment Tracking และ Model Registry และ Kubernetes สำหรับ Scheduling จะทำให้ Pipeline มีความ Reliable, Reproducible และ Scalable สิ่งสำคัญคือ Data Validation ทุกรอบ, Model Evaluation ก่อน Promote และ Monitoring หลัง Deploy เพื่อจับ Data Drift และ Performance Degradation ได้เร็ว

📖 บทความที่เกี่ยวข้อง

AWS Amplify Batch Processing Pipelineอ่านบทความ → Crowdsec IPS Batch Processing Pipelineอ่านบทความ → Azure Container Apps Batch Processing Pipelineอ่านบทความ → ArgoCD ApplicationSet Batch Processing Pipelineอ่านบทความ → Apache Beam Pipeline Event Driven Designอ่านบทความ →

📚 ดูบทความทั้งหมด →