SiamCafe · Blog
Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ
บทความ

Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ

เผยแพร่ 28 พฤษภาคม 2569

Dagster คืออะไร

Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ

Dagster เป็น Data Orchestration Platform ที่ออกแบบมาสำหรับ Data Engineering สมัยใหม่ จุดเด่นคือแนวคิด Software-defined Assets ที่ให้คิดเรื่อง Data Pipeline ในรูปแบบของ Asset (ข้อมูลที่ต้องการสร้าง) แทนที่จะคิดเป็น Task (งานที่ต้องทำ) วิธีคิดนี้ทำให้เห็น Data Lineage ได้ชัดเจนและ Debug ปัญหาได้ง่ายกว่า

Dagster มีคุณสมบัติเด่นได้แก่ Type System สำหรับตรวจสอบ Data Quality, IO Managers สำหรับจัดการ Storage แบบ Abstract, Built-in Testing Framework, Partitions สำหรับจัดการ Incremental Processing และ Sensors/Schedules สำหรับ Trigger Pipeline อัตโนมัติ

12-Factor App Methodology กับ Data Pipeline

12-Factor App เป็นแนวทาง 12 ข้อสำหรับออกแบบ Application ที่ Deploy ได้ง่าย Scale ได้ดี และ Maintain ได้ในระยะยาว เมื่อนำมาใช้กับ Data Pipeline จะได้ Pipeline ที่ Portable ข้าม Environment และ Reliable มากขึ้น

Factorหลักการการนำไปใช้กับ Dagster
I. CodebaseOne codebase, many deploysเก็บ Pipeline Code ใน Git Repository เดียว Deploy ได้ทุก Environment
II. DependenciesExplicitly declare dependenciesใช้ requirements.txt หรือ pyproject.toml จัดการ Python Dependencies
III. ConfigStore config in environmentใช้ EnvVar ใน Dagster Resources และ dagster.yaml
IV. Backing ServicesTreat as attached resourcesใช้ IO Managers สำหรับ Database, S3, GCS แบบ Abstract
V. Build, Release, RunStrictly separate stagesBuild Docker Image, Tag Version, Deploy ด้วย Helm
VI. ProcessesStateless processesDagster Run แต่ละตัว Stateless ใช้ External Storage สำหรับ State
VII. Port BindingExport services via portDagster Webserver Bind Port สำหรับ UI และ GraphQL API
VIII. ConcurrencyScale via processesใช้ K8sRunLauncher สร้าง Pod แยกสำหรับแต่ละ Run
IX. DisposabilityFast startup, graceful shutdownPipeline Run สามารถ Cancel ได้ทันทีและ Resume จาก Checkpoint
X. Dev/Prod ParityKeep environments similarใช้ Resource Configuration แยกตาม Environment
XI. LogsTreat logs as event streamsDagster ส่ง Log ไปยัง stdout ใช้ Structured Logging
XII. AdminRun admin as one-offใช้ Dagster CLI สำหรับ One-off Tasks เช่น Backfill

การตั้งค่า Dagster Project ตาม 12-Factor

# โครงสร้าง Project
# dagster-pipeline/
# ├── dagster_pipeline/
# │   ├── __init__.py
# │   ├── assets/
# │   │   ├── __init__.py
# │   │   ├── raw_data.py
# │   │   ├── transformed.py
# │   │   └── analytics.py
# │   ├── resources/
# │   │   ├── __init__.py
# │   │   ├── database.py
# │   │   └── storage.py
# │   ├── sensors/
# │   │   └── file_sensor.py
# │   └── schedules/
# │       └── daily_schedule.py
# ├── dagster.yaml
# ├── workspace.yaml
# ├── pyproject.toml
# ├── Dockerfile
# └── helm/
#     └── values.yaml

# pyproject.toml
cat > pyproject.toml << 'TOML'
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[project]
name = "dagster-pipeline"
version = "1.0.0"
dependencies = [
    "dagster>=1.6.0",
    "dagster-postgres>=0.22.0",
    "dagster-k8s>=0.22.0",
    "dagster-aws>=0.22.0",
    "pandas>=2.0",
    "sqlalchemy>=2.0",
    "boto3>=1.28",
]

[project.optional-dependencies]
dev = [
    "dagster-webserver>=1.6.0",
    "pytest>=7.0",
]
TOML

Factor III: Config ใน Environment Variables

# dagster_pipeline/resources/database.py
from dagster import ConfigurableResource, EnvVar
from sqlalchemy import create_engine
from contextlib import contextmanager

class PostgresResource(ConfigurableResource):
    """Database Resource ที่ใช้ Environment Variables ตาม 12-Factor"""
    host: str = EnvVar("POSTGRES_HOST")
    port: int = 5432
    database: str = EnvVar("POSTGRES_DB")
    username: str = EnvVar("POSTGRES_USER")
    password: str = EnvVar("POSTGRES_PASSWORD")

    @contextmanager
    def get_connection(self):
        url = f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
        engine = create_engine(url, pool_size=5, max_overflow=10)
        conn = engine.connect()
        try:
            yield conn
        finally:
            conn.close()
            engine.dispose()

# dagster_pipeline/resources/storage.py
class S3Resource(ConfigurableResource):
    """S3 Storage Resource ตาม Factor IV: Backing Services"""
    bucket: str = EnvVar("S3_BUCKET")
    region: str = EnvVar("AWS_REGION")
    endpoint_url: str = EnvVar("S3_ENDPOINT_URL")

    def upload_dataframe(self, df, key):
        import boto3
        import io
        s3 = boto3.client("s3", region_name=self.region,
                          endpoint_url=self.endpoint_url or None)
        buf = io.BytesIO()
        df.to_parquet(buf, index=False)
        buf.seek(0)
        s3.upload_fileobj(buf, self.bucket, key)

    def read_dataframe(self, key):
        import boto3
        import pandas as pd
        s3 = boto3.client("s3", region_name=self.region,
                          endpoint_url=self.endpoint_url or None)
        obj = s3.get_object(Bucket=self.bucket, Key=key)
        return pd.read_parquet(io.BytesIO(obj["Body"].read()))

Software-defined Assets

Dagster Pipeline กับ 12-Factor App — วิธีออกแบบ
# dagster_pipeline/assets/raw_data.py
from dagster import asset, AssetExecutionContext, MetadataValue
import pandas as pd

@asset(
    group_name="raw",
    description="ดึงข้อมูล Raw จาก Source Database",
    compute_kind="postgres",
)
def raw_transactions(
    context: AssetExecutionContext,
    postgres: PostgresResource,
) -> pd.DataFrame:
    """ดึงข้อมูล Transaction จาก Source Database"""
    query = """
        SELECT id, user_id, amount, currency, status,
               created_at, updated_at
        FROM transactions
        WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
    """
    with postgres.get_connection() as conn:
        df = pd.read_sql(query, conn)

    context.log.info(f"ดึงข้อมูลได้ {len(df)} rows")
    context.add_output_metadata({
        "row_count": MetadataValue.int(len(df)),
        "columns": MetadataValue.text(", ".join(df.columns)),
        "date_range": MetadataValue.text(
            f"{df['created_at'].min()} to {df['created_at'].max()}"
        ),
    })
    return df

# dagster_pipeline/assets/transformed.py
@asset(
    group_name="transformed",
    description="แปลงข้อมูล Transaction เป็น Daily Summary",
    compute_kind="pandas",
)
def daily_transaction_summary(
    context: AssetExecutionContext,
    raw_transactions: pd.DataFrame,
) -> pd.DataFrame:
    """สร้าง Daily Summary จาก Raw Transactions"""
    df = raw_transactions.copy()
    df["date"] = pd.to_datetime(df["created_at"]).dt.date

    summary = df.groupby(["date", "currency", "status"]).agg(
        total_amount=("amount", "sum"),
        transaction_count=("id", "count"),
        avg_amount=("amount", "mean"),
    ).reset_index()

    context.log.info(f"สร้าง Summary ได้ {len(summary)} rows")
    return summary

# dagster_pipeline/assets/analytics.py
@asset(
    group_name="analytics",
    description="Upload Analytics Data ไปยัง S3",
    compute_kind="s3",
)
def analytics_report(
    context: AssetExecutionContext,
    daily_transaction_summary: pd.DataFrame,
    s3: S3Resource,
) -> None:
    """Upload Report ไปยัง S3 Data Lake"""
    from datetime import date
    key = f"analytics/daily_summary/{date.today().isoformat()}.parquet"
    s3.upload_dataframe(daily_transaction_summary, key)
    context.log.info(f"Upload ไปยัง s3://{s3.bucket}/{key}")

Definitions และ Resource Configuration

# dagster_pipeline/__init__.py
from dagster import Definitions, EnvVar
from dagster_pipeline.assets.raw_data import raw_transactions
from dagster_pipeline.assets.transformed import daily_transaction_summary
from dagster_pipeline.assets.analytics import analytics_report
from dagster_pipeline.resources.database import PostgresResource
from dagster_pipeline.resources.storage import S3Resource

# Factor X: Dev/Prod Parity — ใช้ Resource เดียวกัน Config ต่างกัน
defs = Definitions(
    assets=[raw_transactions, daily_transaction_summary, analytics_report],
    resources={
        "postgres": PostgresResource(
            host=EnvVar("POSTGRES_HOST"),
            port=5432,
            database=EnvVar("POSTGRES_DB"),
            username=EnvVar("POSTGRES_USER"),
            password=EnvVar("POSTGRES_PASSWORD"),
        ),
        "s3": S3Resource(
            bucket=EnvVar("S3_BUCKET"),
            region=EnvVar("AWS_REGION"),
            endpoint_url=EnvVar("S3_ENDPOINT_URL"),
        ),
    },
)

Docker และ Kubernetes Deployment

# Dockerfile — Factor V: Build, Release, Run
FROM python:3.11-slim AS builder
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir ".[dev]"
COPY . .
RUN pip install --no-cache-dir -e .

FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY --from=builder /app /app
EXPOSE 3000
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "dagster_pipeline"]

---
# helm/values.yaml — Kubernetes Deployment
dagsterWebserver:
  replicaCount: 2
  image:
    repository: registry.company.com/dagster-pipeline
    tag: v1.0.0
  env:
    - name: POSTGRES_HOST
      valueFrom:
        secretKeyRef:
          name: dagster-secrets
          key: postgres-host
    - name: POSTGRES_DB
      value: dagster
    - name: POSTGRES_USER
      valueFrom:
        secretKeyRef:
          name: dagster-secrets
          key: postgres-user
    - name: POSTGRES_PASSWORD
      valueFrom:
        secretKeyRef:
          name: dagster-secrets
          key: postgres-password
    - name: S3_BUCKET
      value: company-data-lake
    - name: AWS_REGION
      value: ap-southeast-1
    - name: S3_ENDPOINT_URL
      value: ""

dagsterDaemon:
  enabled: true

runLauncher:
  type: K8sRunLauncher
  config:
    k8sRunLauncher:
      envVars:
        - POSTGRES_HOST
        - POSTGRES_DB
        - POSTGRES_USER
        - POSTGRES_PASSWORD
        - S3_BUCKET
        - AWS_REGION
      resources:
        requests:
          cpu: 250m
          memory: 512Mi
        limits:
          cpu: 1000m
          memory: 2Gi

---
# Deploy ด้วย Helm
helm repo add dagster https://dagster-io.github.io/helm
helm upgrade --install dagster dagster/dagster \
  -f helm/values.yaml \
  -n dagster --create-namespace

Testing ตาม 12-Factor

# tests/test_assets.py
from dagster import materialize, build_asset_context
from dagster_pipeline.assets.raw_data import raw_transactions
from dagster_pipeline.assets.transformed import daily_transaction_summary
import pandas as pd

def test_daily_transaction_summary():
    """ทดสอบ Transformation Logic"""
    # สร้าง Test Data
    test_data = pd.DataFrame({
        "id": [1, 2, 3, 4],
        "user_id": [100, 101, 100, 102],
        "amount": [500.0, 1200.0, 300.0, 800.0],
        "currency": ["THB", "THB", "USD", "THB"],
        "status": ["completed", "completed", "completed", "pending"],
        "created_at": pd.to_datetime(["2026-01-15"] * 4),
        "updated_at": pd.to_datetime(["2026-01-15"] * 4),
    })

    context = build_asset_context()
    result = daily_transaction_summary(context, test_data)

    assert len(result) > 0
    assert "total_amount" in result.columns
    assert "transaction_count" in result.columns

    # ตรวจสอบ THB completed
    thb_completed = result[
        (result["currency"] == "THB") & (result["status"] == "completed")
    ]
    assert thb_completed["total_amount"].iloc[0] == 1700.0
    assert thb_completed["transaction_count"].iloc[0] == 2

# รัน Test
# pytest tests/ -v --tb=short

Dagster คืออะไรและต่างจาก Airflow อย่างไร

Dagster เป็น Data Orchestrator ที่เน้น Software-defined Assets คือคิดเรื่อง Data ที่ต้องการสร้างก่อนแล้วค่อยกำหนดวิธีสร้าง ต่างจาก Airflow ที่เน้น Task-based DAGs Dagster มี Type System, Built-in Testing, IO Managers และ Asset Lineage ทำให้ Debug ง่ายกว่าและ Test ได้สะดวกกว่า