it

Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ

Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ

Dagster คืออะไร

Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ

Dagster เป็น Data Orchestration Platform ที่ออกแบบมาสำหรับ Data Engineering สมัยใหม่ จุดเด่นคือแนวคิด Software-defined Assets ที่ให้คิดเรื่อง Data Pipeline ในรูปแบบของ Asset (ข้อมูลที่ต้องการสร้าง) แทนที่จะคิดเป็น Task (งานที่ต้องทำ) วิธีคิดนี้ทำให้เห็น Data Lineage ได้ชัดเจนและ Debug ปัญหาได้ง่ายกว่า

Dagster มีคุณสมบัติเด่นได้แก่ Type System สำหรับตรวจสอบ Data Quality, IO Managers สำหรับจัดการ Storage แบบ Abstract, Built-in Testing Framework, Partitions สำหรับจัดการ Incremental Processing และ Sensors/Schedules สำหรับ Trigger Pipeline อัตโนมัติ

เนื้อหาเกี่ยวข้อง — C# Minimal API Community Building — คู่มือฉบับสมบูรณ์ 2026

12-Factor App Methodology กับ Data Pipeline

12-Factor App เป็นแนวทาง 12 ข้อสำหรับออกแบบ Application ที่ Deploy ได้ง่าย Scale ได้ดี และ Maintain ได้ในระยะยาว เมื่อนำมาใช้กับ Data Pipeline จะได้ Pipeline ที่ Portable ข้าม Environment และ Reliable มากขึ้น

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน React Server Components Testing Strategy QA

Factorหลักการการนำไปใช้กับ Dagster
I. CodebaseOne codebase, many deploysเก็บ Pipeline Code ใน Git Repository เดียว Deploy ได้ทุก Environment
II. DependenciesExplicitly declare dependenciesใช้ requirements.txt หรือ pyproject.toml จัดการ Python Dependencies
III. ConfigStore config in environmentใช้ EnvVar ใน Dagster Resources และ dagster.yaml
IV. Backing ServicesTreat as attached resourcesใช้ IO Managers สำหรับ Database, S3, GCS แบบ Abstract
V. Build, Release, RunStrictly separate stagesBuild Docker Image, Tag Version, Deploy ด้วย Helm
VI. ProcessesStateless processesDagster Run แต่ละตัว Stateless ใช้ External Storage สำหรับ State
VII. Port BindingExport services via portDagster Webserver Bind Port สำหรับ UI และ GraphQL API
VIII. ConcurrencyScale via processesใช้ K8sRunLauncher สร้าง Pod แยกสำหรับแต่ละ Run
IX. DisposabilityFast startup, graceful shutdownPipeline Run สามารถ Cancel ได้ทันทีและ Resume จาก Checkpoint
X. Dev/Prod ParityKeep environments similarใช้ Resource Configuration แยกตาม Environment
XI. LogsTreat logs as event streamsDagster ส่ง Log ไปยัง stdout ใช้ Structured Logging
XII. AdminRun admin as one-offใช้ Dagster CLI สำหรับ One-off Tasks เช่น Backfill

การตั้งค่า Dagster Project ตาม 12-Factor

# โครงสร้าง Project

# dagster-pipeline/

# ├── dagster_pipeline/

# │   ├── __init__.py

# │   ├── assets/

# │   │   ├── __init__.py

# │   │   ├── raw_data.py

# │   │   ├── transformed.py

# │   │   └── analytics.py

# │   ├── resources/

# │   │   ├── __init__.py

# │   │   ├── database.py

# │   │   └── storage.py

# │   ├── sensors/

# │   │   └── file_sensor.py

# │   └── schedules/

# │       └── daily_schedule.py

# ├── dagster.yaml

# ├── workspace.yaml

# ├── pyproject.toml

# ├── Dockerfile

# └── helm/

#     └── values.yaml



# pyproject.toml

cat > pyproject.toml << 'TOML'

[build-system]

requires = ["setuptools"]

build-backend = "setuptools.build_meta"



[project]

name = "dagster-pipeline"

version = "1.0.0"

dependencies = [

    "dagster>=1.6.0",

    "dagster-postgres>=0.22.0",

    "dagster-k8s>=0.22.0",

    "dagster-aws>=0.22.0",

    "pandas>=2.0",

    "sqlalchemy>=2.0",

    "boto3>=1.28",

]



[project.optional-dependencies]

dev = [

    "dagster-webserver>=1.6.0",

    "pytest>=7.0",

]

TOML

Factor III: Config ใน Environment Variables

# dagster_pipeline/resources/database.py

from dagster import ConfigurableResource, EnvVar

from sqlalchemy import create_engine

from contextlib import contextmanager



class PostgresResource(ConfigurableResource):

    """Database Resource ที่ใช้ Environment Variables ตาม 12-Factor"""

    host: str = EnvVar("POSTGRES_HOST")

    port: int = 5432

    database: str = EnvVar("POSTGRES_DB")

    username: str = EnvVar("POSTGRES_USER")

    password: str = EnvVar("POSTGRES_PASSWORD")



    @contextmanager

    def get_connection(self):

        url = f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"

        engine = create_engine(url, pool_size=5, max_overflow=10)

        conn = engine.connect()

        try:

            yield conn

        finally:

            conn.close()

            engine.dispose()



# dagster_pipeline/resources/storage.py

class S3Resource(ConfigurableResource):

    """S3 Storage Resource ตาม Factor IV: Backing Services"""

    bucket: str = EnvVar("S3_BUCKET")

    region: str = EnvVar("AWS_REGION")

    endpoint_url: str = EnvVar("S3_ENDPOINT_URL")



    def upload_dataframe(self, df, key):

        import boto3

        import io

        s3 = boto3.client("s3", region_name=self.region,

                          endpoint_url=self.endpoint_url or None)

        buf = io.BytesIO()

        df.to_parquet(buf, index=False)

        buf.seek(0)

        s3.upload_fileobj(buf, self.bucket, key)



    def read_dataframe(self, key):

        import boto3

        import pandas as pd

        s3 = boto3.client("s3", region_name=self.region,

                          endpoint_url=self.endpoint_url or None)

        obj = s3.get_object(Bucket=self.bucket, Key=key)

        return pd.read_parquet(io.BytesIO(obj["Body"].read()))

Software-defined Assets

Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ
# dagster_pipeline/assets/raw_data.py

from dagster import asset, AssetExecutionContext, MetadataValue

import pandas as pd



@asset(

    group_name="raw",

    description="ดึงข้อมูล Raw จาก Source Database",

    compute_kind="postgres",

)

def raw_transactions(

    context: AssetExecutionContext,

    postgres: PostgresResource,

) -> pd.DataFrame:

    """ดึงข้อมูล Transaction จาก Source Database"""

    query = """

        SELECT id, user_id, amount, currency, status,

               created_at, updated_at

        FROM transactions

        WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'

    """

    with postgres.get_connection() as conn:

        df = pd.read_sql(query, conn)



    context.log.info(f"ดึงข้อมูลได้ {len(df)} rows")

    context.add_output_metadata({

        "row_count": MetadataValue.int(len(df)),

        "columns": MetadataValue.text(", ".join(df.columns)),

        "date_range": MetadataValue.text(

            f"{df['created_at'].min()} to {df['created_at'].max()}"

        ),

    })

    return df



# dagster_pipeline/assets/transformed.py

@asset(

    group_name="transformed",

    description="แปลงข้อมูล Transaction เป็น Daily Summary",

    compute_kind="pandas",

)

def daily_transaction_summary(

    context: AssetExecutionContext,

    raw_transactions: pd.DataFrame,

) -> pd.DataFrame:

    """สร้าง Daily Summary จาก Raw Transactions"""

    df = raw_transactions.copy()

    df["date"] = pd.to_datetime(df["created_at"]).dt.date



    summary = df.groupby(["date", "currency", "status"]).agg(

        total_amount=("amount", "sum"),

        transaction_count=("id", "count"),

        avg_amount=("amount", "mean"),

    ).reset_index()



    context.log.info(f"สร้าง Summary ได้ {len(summary)} rows")

    return summary



# dagster_pipeline/assets/analytics.py

@asset(

    group_name="analytics",

    description="Upload Analytics Data ไปยัง S3",

    compute_kind="s3",

)

def analytics_report(

    context: AssetExecutionContext,

    daily_transaction_summary: pd.DataFrame,

    s3: S3Resource,

) -> None:

    """Upload Report ไปยัง S3 Data Lake"""

    from datetime import date

    key = f"analytics/daily_summary/{date.today().isoformat()}.parquet"

    s3.upload_dataframe(daily_transaction_summary, key)

    context.log.info(f"Upload ไปยัง s3://{s3.bucket}/{key}")

Definitions และ Resource Configuration

# dagster_pipeline/__init__.py

from dagster import Definitions, EnvVar

from dagster_pipeline.assets.raw_data import raw_transactions

from dagster_pipeline.assets.transformed import daily_transaction_summary

from dagster_pipeline.assets.analytics import analytics_report

from dagster_pipeline.resources.database import PostgresResource

from dagster_pipeline.resources.storage import S3Resource



# Factor X: Dev/Prod Parity — ใช้ Resource เดียวกัน Config ต่างกัน

defs = Definitions(

    assets=[raw_transactions, daily_transaction_summary, analytics_report],

    resources={

        "postgres": PostgresResource(

            host=EnvVar("POSTGRES_HOST"),

            port=5432,

            database=EnvVar("POSTGRES_DB"),

            username=EnvVar("POSTGRES_USER"),

            password=EnvVar("POSTGRES_PASSWORD"),

        ),

        "s3": S3Resource(

            bucket=EnvVar("S3_BUCKET"),

            region=EnvVar("AWS_REGION"),

            endpoint_url=EnvVar("S3_ENDPOINT_URL"),

        ),

    },

)

Docker และ Kubernetes Deployment

# Dockerfile — Factor V: Build, Release, Run

FROM python:3.11-slim AS builder

WORKDIR /app

COPY pyproject.toml .

RUN pip install --no-cache-dir ".[dev]"

COPY . .

RUN pip install --no-cache-dir -e .



FROM python:3.11-slim

WORKDIR /app

COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages

COPY --from=builder /usr/local/bin /usr/local/bin

COPY --from=builder /app /app

EXPOSE 3000

CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "dagster_pipeline"]



---

# helm/values.yaml — Kubernetes Deployment

dagsterWebserver:

  replicaCount: 2

  image:

    repository: registry.company.com/dagster-pipeline

    tag: v1.0.0

  env:

    - name: POSTGRES_HOST

      valueFrom:

        secretKeyRef:

          name: dagster-secrets

          key: postgres-host

    - name: POSTGRES_DB

      value: dagster

    - name: POSTGRES_USER

      valueFrom:

        secretKeyRef:

          name: dagster-secrets

          key: postgres-user

    - name: POSTGRES_PASSWORD

      valueFrom:

        secretKeyRef:

          name: dagster-secrets

          key: postgres-password

    - name: S3_BUCKET

      value: company-data-lake

    - name: AWS_REGION

      value: ap-southeast-1

    - name: S3_ENDPOINT_URL

      value: ""



dagsterDaemon:

  enabled: true



runLauncher:

  type: K8sRunLauncher

  config:

    k8sRunLauncher:

      envVars:

        - POSTGRES_HOST

        - POSTGRES_DB

        - POSTGRES_USER

        - POSTGRES_PASSWORD

        - S3_BUCKET

        - AWS_REGION

      resources:

        requests:

          cpu: 250m

          memory: 512Mi

        limits:

          cpu: 1000m

          memory: 2Gi



---

# Deploy ด้วย Helm

helm repo add dagster https://dagster-io.github.io/helm

helm upgrade --install dagster dagster/dagster \

  -f helm/values.yaml \

  -n dagster --create-namespace

Testing ตาม 12-Factor

# tests/test_assets.py

from dagster import materialize, build_asset_context

from dagster_pipeline.assets.raw_data import raw_transactions

from dagster_pipeline.assets.transformed import daily_transaction_summary

import pandas as pd



def test_daily_transaction_summary():

    """ทดสอบ Transformation Logic"""

    # สร้าง Test Data

    test_data = pd.DataFrame({

        "id": [1, 2, 3, 4],

        "user_id": [100, 101, 100, 102],

        "amount": [500.0, 1200.0, 300.0, 800.0],

        "currency": ["THB", "THB", "USD", "THB"],

        "status": ["completed", "completed", "completed", "pending"],

        "created_at": pd.to_datetime(["2026-01-15"] * 4),

        "updated_at": pd.to_datetime(["2026-01-15"] * 4),

    })



    context = build_asset_context()

    result = daily_transaction_summary(context, test_data)



    assert len(result) > 0

    assert "total_amount" in result.columns

    assert "transaction_count" in result.columns



    # ตรวจสอบ THB completed

    thb_completed = result[

        (result["currency"] == "THB") & (result["status"] == "completed")

    ]

    assert thb_completed["total_amount"].iloc[0] == 1700.0

    assert thb_completed["transaction_count"].iloc[0] == 2



# รัน Test

# pytest tests/ -v --tb=short

Dagster คืออะไรและต่างจาก Airflow อย่างไร

Dagster เป็น Data Orchestrator ที่เน้น Software-defined Assets คือคิดเรื่อง Data ที่ต้องการสร้างก่อนแล้วค่อยกำหนดวิธีสร้าง ต่างจาก Airflow ที่เน้น Task-based DAGs Dagster มี Type System, Built-in Testing, IO Managers และ Asset Lineage ทำให้ Debug ง่ายกว่าและ Test ได้สะดวกกว่า

แนะนำเพิ่มเติม — เรียนเทรดกับ iCafeForex

เนื้อหาเกี่ยวข้อง — แพทเทรนกราฟ — วิธีตั้งค่าและใช้งานจริงพร้อมตัวอย่าง

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง