ai

MLOps Pipeline กับ Batch Processing — วิธีสร้าง

MLOps Pipeline กับ Batch Processing — วิธีสร้าง

MLOps Pipeline คืออะไร

MLOps Pipeline กับ Batch Processing — วิธีสร้าง

MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning Lifecycle อย่างเป็นระบบ ตั้งแต่การดึงข้อมูล (Data Ingestion) การทำ Feature Engineering การ Train Model การ Evaluate ผลลัพธ์ การ Deploy Model ไปจนถึงการ Monitor Model Performance ในระบบ Production

เนื้อหาเกี่ยวข้อง — put call option — ข้อมูลครบถ้วน 2026

Batch Processing Pipeline เป็น Pattern ที่พบบ่อยที่สุดใน MLOps เพราะงาน ML ส่วนใหญ่ไม่ต้องการผลลัพธ์แบบ Real-time เช่น การ Retrain Model ด้วยข้อมูลใหม่ทุกสัปดาห์ การ Generate Recommendations สำหรับผู้ใช้ทุกคนทุกวัน หรือการ Score ลูกค้าเพื่อทำ Marketing Campaign

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Ollama Local LLM RBAC ABAC Policy —

สถาปัตยกรรม Batch MLOps Pipeline

  • Data Ingestion: ดึงข้อมูลจาก Database, API, S3 มาเก็บใน Data Lake
  • Data Validation: ตรวจสอบคุณภาพข้อมูล Schema, Missing Values, Drift
  • Feature Engineering: แปลงข้อมูลดิบเป็น Features สำหรับ Model
  • Model Training: Train Model ด้วย Features ใหม่ Log ไปยัง MLflow
  • Model Evaluation: เปรียบเทียบ Model ใหม่กับ Model เดิม
  • Model Registry: Promote Model ที่ดีกว่าไป Production
  • Batch Inference: ใช้ Model ทำ Prediction บนข้อมูลขนาดใหญ่
  • Result Storage: เก็บผลลัพธ์ใน Database หรือ Data Warehouse

Airflow DAG สำหรับ Batch ML Pipeline

# dags/ml_batch_pipeline.py — Airflow DAG สำหรับ ML Batch Pipeline

from airflow import DAG

from airflow.operators.python import PythonOperator

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator

from airflow.utils.dates import days_ago

from datetime import timedelta

import mlflow

import json



default_args = {

    "owner": "ml-team",

    "depends_on_past": False,

    "email_on_failure": True,

    "email": ["ml-team@company.com"],

    "retries": 2,

    "retry_delay": timedelta(minutes=10),

}



dag = DAG(

    "ml_batch_pipeline",

    default_args=default_args,

    description="Batch ML Pipeline: Data → Features → Train → Evaluate → Inference",

    schedule_interval="0 2 * * *",  # รันทุกวันตี 2

    start_date=days_ago(1),

    catchup=False,

    tags=["ml", "batch", "production"],

)



def validate_data(**context):

    """ตรวจสอบคุณภาพข้อมูล"""

    import pandas as pd

    from great_expectations.core import ExpectationSuite



    df = pd.read_parquet("s3://data-lake/raw/transactions/latest/")

    

    checks = {

        "row_count": len(df) > 1000,

        "no_nulls_in_id": df["customer_id"].notna().all(),

        "amount_positive": (df["amount"] > 0).all(),

        "date_range_valid": df["date"].max() >= pd.Timestamp.now() - pd.Timedelta(days=2),

    }

    

    failed = [k for k, v in checks.items() if not v]

    if failed:

        raise ValueError(f"Data Validation Failed: {failed}")

    

    context["ti"].xcom_push(key="row_count", value=len(df))

    return {"status": "passed", "rows": len(df)}



def train_model(**context):

    """Train Model และ Log ไปยัง MLflow"""

    import pandas as pd

    from sklearn.ensemble import GradientBoostingClassifier

    from sklearn.model_selection import train_test_split

    from sklearn.metrics import f1_score, precision_score, recall_score

    

    mlflow.set_tracking_uri("https://mlflow.company.com")

    mlflow.set_experiment("churn-prediction-daily")

    

    df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")

    X = df.drop(columns=["customer_id", "churned"])

    y = df["churned"]

    

    X_train, X_test, y_train, y_test = train_test_split(

        X, y, test_size=0.2, random_state=42, stratify=y

    )

    

    with mlflow.start_run(run_name=f"daily-{context['ds']}"):

        params = {

            "n_estimators": 200,

            "max_depth": 6,

            "learning_rate": 0.1,

            "min_samples_split": 20,

        }

        mlflow.log_params(params)

        

        model = GradientBoostingClassifier(**params, random_state=42)

        model.fit(X_train, y_train)

        

        y_pred = model.predict(X_test)

        metrics = {

            "f1": f1_score(y_test, y_pred),

            "precision": precision_score(y_test, y_pred),

            "recall": recall_score(y_test, y_pred),

            "train_size": len(X_train),

            "test_size": len(X_test),

        }

        mlflow.log_metrics(metrics)

        mlflow.sklearn.log_model(model, "model",

                                  registered_model_name="churn-predictor")

        

        context["ti"].xcom_push(key="f1_score", value=metrics["f1"])

        context["ti"].xcom_push(key="run_id", value=mlflow.active_run().info.run_id)



def evaluate_and_promote(**context):

    """เปรียบเทียบกับ Production Model แล้ว Promote ถ้าดีกว่า"""

    from mlflow.tracking import MlflowClient

    

    client = MlflowClient()

    new_f1 = context["ti"].xcom_pull(key="f1_score")

    

    # ดึง Production Model metrics

    prod_versions = client.get_latest_versions("churn-predictor", stages=["Production"])

    if prod_versions:

        prod_run = client.get_run(prod_versions[0].run_id)

        prod_f1 = float(prod_run.data.metrics.get("f1", 0))

    else:

        prod_f1 = 0

    

    if new_f1 > prod_f1 + 0.005:  # ต้องดีกว่าอย่างน้อย 0.5%

        new_version = client.get_latest_versions("churn-predictor", stages=["None"])[0]

        client.transition_model_version_stage(

            name="churn-predictor",

            version=new_version.version,

            stage="Production",

            archive_existing_versions=True,

        )

        return f"Promoted v{new_version.version}: F1 {new_f1:.4f} > {prod_f1:.4f}"

    return f"Kept current: New F1 {new_f1:.4f} <= Prod F1 {prod_f1:.4f}"



def batch_inference(**context):

    """รัน Batch Inference บนข้อมูลทั้งหมด"""

    import pandas as pd

    import mlflow

    

    model = mlflow.sklearn.load_model("models:/churn-predictor/Production")

    df = pd.read_parquet("s3://data-lake/features/churn_features/latest/")

    

    X = df.drop(columns=["customer_id", "churned"])

    df["churn_probability"] = model.predict_proba(X)[:, 1]

    df["churn_prediction"] = (df["churn_probability"] >= 0.5).astype(int)

    

    # เก็บผลลัพธ์

    output = df[["customer_id", "churn_probability", "churn_prediction"]]

    output.to_parquet(f"s3://data-lake/predictions/churn/{context['ds']}/")

    

    # สรุปผล

    high_risk = (df["churn_probability"] >= 0.7).sum()

    return {"total": len(df), "high_risk": int(high_risk)}



# สร้าง Tasks

t_validate = PythonOperator(task_id="validate_data", python_callable=validate_data, dag=dag)



t_features = SparkSubmitOperator(

    task_id="feature_engineering",

    application="s3://ml-code/spark/feature_engineering.py",

    conn_id="spark_default",

    conf={"spark.executor.memory": "4g", "spark.executor.cores": "2"},

    dag=dag,

)



t_train = PythonOperator(task_id="train_model", python_callable=train_model, dag=dag)

t_evaluate = PythonOperator(task_id="evaluate_promote", python_callable=evaluate_and_promote, dag=dag)

t_inference = PythonOperator(task_id="batch_inference", python_callable=batch_inference, dag=dag)



t_notify = SlackWebhookOperator(

    task_id="notify_slack",

    slack_webhook_conn_id="slack_ml",

    message="ML Pipeline completed: {{ ti.xcom_pull(task_ids='batch_inference') }}",

    dag=dag,

)



# กำหนดลำดับ

t_validate >> t_features >> t_train >> t_evaluate >> t_inference >> t_notify

Spark Job สำหรับ Feature Engineering

MLOps Pipeline กับ Batch Processing — วิธีสร้าง
# spark/feature_engineering.py — PySpark Feature Engineering

from pyspark.sql import SparkSession

from pyspark.sql import functions as F

from pyspark.sql.window import Window

from datetime import datetime, timedelta



spark = SparkSession.builder \

    .appName("ChurnFeatureEngineering") \

    .config("spark.sql.adaptive.enabled", "true") \

    .config("spark.sql.shuffle.partitions", "200") \

    .getOrCreate()



# อ่านข้อมูล

transactions = spark.read.parquet("s3://data-lake/raw/transactions/")

customers = spark.read.parquet("s3://data-lake/raw/customers/")



# Feature 1: Transaction Aggregations (30 วัน, 90 วัน)

cutoff_30d = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")

cutoff_90d = (datetime.now() - timedelta(days=90)).strftime("%Y-%m-%d")



txn_30d = transactions.filter(F.col("date") >= cutoff_30d).groupBy("customer_id").agg(

    F.count("*").alias("txn_count_30d"),

    F.sum("amount").alias("txn_amount_30d"),

    F.avg("amount").alias("txn_avg_30d"),

    F.max("amount").alias("txn_max_30d"),

    F.countDistinct("category").alias("txn_categories_30d"),

)



txn_90d = transactions.filter(F.col("date") >= cutoff_90d).groupBy("customer_id").agg(

    F.count("*").alias("txn_count_90d"),

    F.sum("amount").alias("txn_amount_90d"),

    F.avg("amount").alias("txn_avg_90d"),

)



# Feature 2: Recency (วันนับจาก Transaction ล่าสุด)

recency = transactions.groupBy("customer_id").agg(

    F.datediff(F.current_date(), F.max("date")).alias("days_since_last_txn"),

    F.datediff(F.max("date"), F.min("date")).alias("customer_tenure_days"),

)



# Feature 3: Trend (เปรียบเทียบ 30 วันล่าสุดกับ 30 วันก่อนหน้า)

txn_prev_30d = transactions.filter(

    (F.col("date") >= (datetime.now() - timedelta(days=60)).strftime("%Y-%m-%d")) &

    (F.col("date") < cutoff_30d)

).groupBy("customer_id").agg(

    F.count("*").alias("txn_count_prev_30d"),

    F.sum("amount").alias("txn_amount_prev_30d"),

)



# รวม Features ทั้งหมด

features = customers.select("customer_id", "age", "gender", "registration_date") \

    .join(txn_30d, "customer_id", "left") \

    .join(txn_90d, "customer_id", "left") \

    .join(recency, "customer_id", "left") \

    .join(txn_prev_30d, "customer_id", "left")



# คำนวณ Derived Features

features = features.withColumn(

    "txn_trend", 

    F.when(F.col("txn_count_prev_30d") > 0,

           (F.col("txn_count_30d") - F.col("txn_count_prev_30d")) / F.col("txn_count_prev_30d")

    ).otherwise(0)

).fillna(0)



# เขียนผลลัพธ์

features.write.mode("overwrite").parquet("s3://data-lake/features/churn_features/latest/")



print(f"Features generated: {features.count()} rows, {len(features.columns)} columns")

spark.stop()

Docker และ Kubernetes สำหรับ Pipeline

# Dockerfile สำหรับ ML Pipeline Worker

FROM python:3.11-slim



WORKDIR /app



COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt



COPY src/ ./src/

COPY configs/ ./configs/



ENV PYTHONPATH=/app

ENV MLFLOW_TRACKING_URI=https://mlflow.company.com



CMD ["python", "-m", "src.pipeline.run"]



---

# requirements.txt

apache-airflow==2.8.0

mlflow==2.10.0

scikit-learn==1.4.0

pandas==2.2.0

pyarrow==15.0.0

great-expectations==0.18.0

boto3==1.34.0



---

# kubernetes/ml-pipeline-cronjob.yaml

apiVersion: batch/v1

kind: CronJob

metadata:

  name: ml-batch-inference

  namespace: ml-platform

spec:

  schedule: "0 3 * * *"  # ทุกวันตี 3

  concurrencyPolicy: Forbid

  successfulJobsHistoryLimit: 7

  failedJobsHistoryLimit: 3

  jobTemplate:

    spec:

      backoffLimit: 2

      activeDeadlineSeconds: 7200  # Timeout 2 ชั่วโมง

      template:

        spec:

          restartPolicy: Never

          containers:

            - name: inference

              image: registry.company.com/ml-pipeline:latest

              command: ["python", "-m", "src.inference.batch_run"]

              resources:

                requests:

                  cpu: "2"

                  memory: 8Gi

                limits:

                  cpu: "4"

                  memory: 16Gi

              env:

                - name: S3_BUCKET

                  value: "data-lake"

                - name: MODEL_NAME

                  value: "churn-predictor"

                - name: MLFLOW_TRACKING_URI

                  valueFrom:

                    configMapKeyRef:

                      name: ml-config

                      key: mlflow-uri

              volumeMounts:

                - name: ml-secrets

                  mountPath: /secrets

                  readOnly: true

          volumes:

            - name: ml-secrets

              secret:

                secretName: ml-pipeline-secrets

Monitoring และ Alerting สำหรับ Batch Pipeline

  • Pipeline Health: ติดตาม Success/Failure Rate ของแต่ละ Task, Duration Trend และ SLA Compliance
  • Data Quality: ตรวจสอบ Row Count, Schema Changes, Distribution Drift ของ Input Data ทุกรอบ
  • Model Performance: ติดตาม Prediction Distribution, F1 Score Trend, Feature Importance Changes
  • Resource Usage: CPU, Memory, Disk ของ Pipeline Workers เพื่อ Right-sizing
  • Cost Tracking: คำนวณ Cost ต่อ Pipeline Run (Compute + Storage + Network)

MLOps Pipeline คืออะไร

MLOps Pipeline คือ Automated Workflow ที่จัดการทุกขั้นตอนของ Machine Learning ตั้งแต่ Data Ingestion, Feature Engineering, Model Training, Evaluation, Deployment ไปจนถึง Monitoring นำหลักการ DevOps มาใช้กับ ML เพื่อให้ทำซ้ำได้ Scale ได้ และ Maintain ง่าย

แนะนำเพิ่มเติม — iCafeForex

เนื้อหาเกี่ยวข้อง — Linkerd Service Mesh Disaster Recovery Plan

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง