Technology

Dagster Pipeline 12 Factor App

dagster pipeline 12 factor app
Dagster Pipeline 12 Factor App | SiamCafe Blog
2025-12-03· อ. บอม — SiamCafe.net· 8,934 คำ

Dagster คืออะไร

Dagster เป็น Data Orchestration Platform ที่ออกแบบมาสำหรับ Data Engineering สมัยใหม่ จุดเด่นคือแนวคิด Software-defined Assets ที่ให้คิดเรื่อง Data Pipeline ในรูปแบบของ Asset (ข้อมูลที่ต้องการสร้าง) แทนที่จะคิดเป็น Task (งานที่ต้องทำ) วิธีคิดนี้ทำให้เห็น Data Lineage ได้ชัดเจนและ Debug ปัญหาได้ง่ายกว่า

Dagster มีคุณสมบัติเด่นได้แก่ Type System สำหรับตรวจสอบ Data Quality, IO Managers สำหรับจัดการ Storage แบบ Abstract, Built-in Testing Framework, Partitions สำหรับจัดการ Incremental Processing และ Sensors/Schedules สำหรับ Trigger Pipeline อัตโนมัติ

12-Factor App Methodology กับ Data Pipeline

12-Factor App เป็นแนวทาง 12 ข้อสำหรับออกแบบ Application ที่ Deploy ได้ง่าย Scale ได้ดี และ Maintain ได้ในระยะยาว เมื่อนำมาใช้กับ Data Pipeline จะได้ Pipeline ที่ Portable ข้าม Environment และ Reliable มากขึ้น

Factorหลักการการนำไปใช้กับ Dagster
I. CodebaseOne codebase, many deploysเก็บ Pipeline Code ใน Git Repository เดียว Deploy ได้ทุก Environment
II. DependenciesExplicitly declare dependenciesใช้ requirements.txt หรือ pyproject.toml จัดการ Python Dependencies
III. ConfigStore config in environmentใช้ EnvVar ใน Dagster Resources และ dagster.yaml
IV. Backing ServicesTreat as attached resourcesใช้ IO Managers สำหรับ Database, S3, GCS แบบ Abstract
V. Build, Release, RunStrictly separate stagesBuild Docker Image, Tag Version, Deploy ด้วย Helm
VI. ProcessesStateless processesDagster Run แต่ละตัว Stateless ใช้ External Storage สำหรับ State
VII. Port BindingExport services via portDagster Webserver Bind Port สำหรับ UI และ GraphQL API
VIII. ConcurrencyScale via processesใช้ K8sRunLauncher สร้าง Pod แยกสำหรับแต่ละ Run
IX. DisposabilityFast startup, graceful shutdownPipeline Run สามารถ Cancel ได้ทันทีและ Resume จาก Checkpoint
X. Dev/Prod ParityKeep environments similarใช้ Resource Configuration แยกตาม Environment
XI. LogsTreat logs as event streamsDagster ส่ง Log ไปยัง stdout ใช้ Structured Logging
XII. AdminRun admin as one-offใช้ Dagster CLI สำหรับ One-off Tasks เช่น Backfill

การตั้งค่า Dagster Project ตาม 12-Factor

# โครงสร้าง Project
# dagster-pipeline/
# ├── dagster_pipeline/
# │   ├── __init__.py
# │   ├── assets/
# │   │   ├── __init__.py
# │   │   ├── raw_data.py
# │   │   ├── transformed.py
# │   │   └── analytics.py
# │   ├── resources/
# │   │   ├── __init__.py
# │   │   ├── database.py
# │   │   └── storage.py
# │   ├── sensors/
# │   │   └── file_sensor.py
# │   └── schedules/
# │       └── daily_schedule.py
# ├── dagster.yaml
# ├── workspace.yaml
# ├── pyproject.toml
# ├── Dockerfile
# └── helm/
#     └── values.yaml

# pyproject.toml
cat > pyproject.toml << 'TOML'
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[project]
name = "dagster-pipeline"
version = "1.0.0"
dependencies = [
    "dagster>=1.6.0",
    "dagster-postgres>=0.22.0",
    "dagster-k8s>=0.22.0",
    "dagster-aws>=0.22.0",
    "pandas>=2.0",
    "sqlalchemy>=2.0",
    "boto3>=1.28",
]

[project.optional-dependencies]
dev = [
    "dagster-webserver>=1.6.0",
    "pytest>=7.0",
]
TOML

Factor III: Config ใน Environment Variables

# dagster_pipeline/resources/database.py
from dagster import ConfigurableResource, EnvVar
from sqlalchemy import create_engine
from contextlib import contextmanager

class PostgresResource(ConfigurableResource):
    """Database Resource ที่ใช้ Environment Variables ตาม 12-Factor"""
    host: str = EnvVar("POSTGRES_HOST")
    port: int = 5432
    database: str = EnvVar("POSTGRES_DB")
    username: str = EnvVar("POSTGRES_USER")
    password: str = EnvVar("POSTGRES_PASSWORD")

    @contextmanager
    def get_connection(self):
        url = f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
        engine = create_engine(url, pool_size=5, max_overflow=10)
        conn = engine.connect()
        try:
            yield conn
        finally:
            conn.close()
            engine.dispose()

# dagster_pipeline/resources/storage.py
class S3Resource(ConfigurableResource):
    """S3 Storage Resource ตาม Factor IV: Backing Services"""
    bucket: str = EnvVar("S3_BUCKET")
    region: str = EnvVar("AWS_REGION")
    endpoint_url: str = EnvVar("S3_ENDPOINT_URL")

    def upload_dataframe(self, df, key):
        import boto3
        import io
        s3 = boto3.client("s3", region_name=self.region,
                          endpoint_url=self.endpoint_url or None)
        buf = io.BytesIO()
        df.to_parquet(buf, index=False)
        buf.seek(0)
        s3.upload_fileobj(buf, self.bucket, key)

    def read_dataframe(self, key):
        import boto3
        import pandas as pd
        s3 = boto3.client("s3", region_name=self.region,
                          endpoint_url=self.endpoint_url or None)
        obj = s3.get_object(Bucket=self.bucket, Key=key)
        return pd.read_parquet(io.BytesIO(obj["Body"].read()))

Software-defined Assets

# dagster_pipeline/assets/raw_data.py
from dagster import asset, AssetExecutionContext, MetadataValue
import pandas as pd

@asset(
    group_name="raw",
    description="ดึงข้อมูล Raw จาก Source Database",
    compute_kind="postgres",
)
def raw_transactions(
    context: AssetExecutionContext,
    postgres: PostgresResource,
) -> pd.DataFrame:
    """ดึงข้อมูล Transaction จาก Source Database"""
    query = """
        SELECT id, user_id, amount, currency, status,
               created_at, updated_at
        FROM transactions
        WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
    """
    with postgres.get_connection() as conn:
        df = pd.read_sql(query, conn)

    context.log.info(f"ดึงข้อมูลได้ {len(df)} rows")
    context.add_output_metadata({
        "row_count": MetadataValue.int(len(df)),
        "columns": MetadataValue.text(", ".join(df.columns)),
        "date_range": MetadataValue.text(
            f"{df['created_at'].min()} to {df['created_at'].max()}"
        ),
    })
    return df

# dagster_pipeline/assets/transformed.py
@asset(
    group_name="transformed",
    description="แปลงข้อมูล Transaction เป็น Daily Summary",
    compute_kind="pandas",
)
def daily_transaction_summary(
    context: AssetExecutionContext,
    raw_transactions: pd.DataFrame,
) -> pd.DataFrame:
    """สร้าง Daily Summary จาก Raw Transactions"""
    df = raw_transactions.copy()
    df["date"] = pd.to_datetime(df["created_at"]).dt.date

    summary = df.groupby(["date", "currency", "status"]).agg(
        total_amount=("amount", "sum"),
        transaction_count=("id", "count"),
        avg_amount=("amount", "mean"),
    ).reset_index()

    context.log.info(f"สร้าง Summary ได้ {len(summary)} rows")
    return summary

# dagster_pipeline/assets/analytics.py
@asset(
    group_name="analytics",
    description="Upload Analytics Data ไปยัง S3",
    compute_kind="s3",
)
def analytics_report(
    context: AssetExecutionContext,
    daily_transaction_summary: pd.DataFrame,
    s3: S3Resource,
) -> None:
    """Upload Report ไปยัง S3 Data Lake"""
    from datetime import date
    key = f"analytics/daily_summary/{date.today().isoformat()}.parquet"
    s3.upload_dataframe(daily_transaction_summary, key)
    context.log.info(f"Upload ไปยัง s3://{s3.bucket}/{key}")

Definitions และ Resource Configuration

# dagster_pipeline/__init__.py
from dagster import Definitions, EnvVar
from dagster_pipeline.assets.raw_data import raw_transactions
from dagster_pipeline.assets.transformed import daily_transaction_summary
from dagster_pipeline.assets.analytics import analytics_report
from dagster_pipeline.resources.database import PostgresResource
from dagster_pipeline.resources.storage import S3Resource

# Factor X: Dev/Prod Parity — ใช้ Resource เดียวกัน Config ต่างกัน
defs = Definitions(
    assets=[raw_transactions, daily_transaction_summary, analytics_report],
    resources={
        "postgres": PostgresResource(
            host=EnvVar("POSTGRES_HOST"),
            port=5432,
            database=EnvVar("POSTGRES_DB"),
            username=EnvVar("POSTGRES_USER"),
            password=EnvVar("POSTGRES_PASSWORD"),
        ),
        "s3": S3Resource(
            bucket=EnvVar("S3_BUCKET"),
            region=EnvVar("AWS_REGION"),
            endpoint_url=EnvVar("S3_ENDPOINT_URL"),
        ),
    },
)

Docker และ Kubernetes Deployment

# Dockerfile — Factor V: Build, Release, Run
FROM python:3.11-slim AS builder
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir ".[dev]"
COPY . .
RUN pip install --no-cache-dir -e .

FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY --from=builder /app /app
EXPOSE 3000
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "dagster_pipeline"]

---
# helm/values.yaml — Kubernetes Deployment
dagsterWebserver:
  replicaCount: 2
  image:
    repository: registry.company.com/dagster-pipeline
    tag: v1.0.0
  env:
    - name: POSTGRES_HOST
      valueFrom:
        secretKeyRef:
          name: dagster-secrets
          key: postgres-host
    - name: POSTGRES_DB
      value: dagster
    - name: POSTGRES_USER
      valueFrom:
        secretKeyRef:
          name: dagster-secrets
          key: postgres-user
    - name: POSTGRES_PASSWORD
      valueFrom:
        secretKeyRef:
          name: dagster-secrets
          key: postgres-password
    - name: S3_BUCKET
      value: company-data-lake
    - name: AWS_REGION
      value: ap-southeast-1
    - name: S3_ENDPOINT_URL
      value: ""

dagsterDaemon:
  enabled: true

runLauncher:
  type: K8sRunLauncher
  config:
    k8sRunLauncher:
      envVars:
        - POSTGRES_HOST
        - POSTGRES_DB
        - POSTGRES_USER
        - POSTGRES_PASSWORD
        - S3_BUCKET
        - AWS_REGION
      resources:
        requests:
          cpu: 250m
          memory: 512Mi
        limits:
          cpu: 1000m
          memory: 2Gi

---
# Deploy ด้วย Helm
helm repo add dagster https://dagster-io.github.io/helm
helm upgrade --install dagster dagster/dagster \
  -f helm/values.yaml \
  -n dagster --create-namespace

Testing ตาม 12-Factor

# tests/test_assets.py
from dagster import materialize, build_asset_context
from dagster_pipeline.assets.raw_data import raw_transactions
from dagster_pipeline.assets.transformed import daily_transaction_summary
import pandas as pd

def test_daily_transaction_summary():
    """ทดสอบ Transformation Logic"""
    # สร้าง Test Data
    test_data = pd.DataFrame({
        "id": [1, 2, 3, 4],
        "user_id": [100, 101, 100, 102],
        "amount": [500.0, 1200.0, 300.0, 800.0],
        "currency": ["THB", "THB", "USD", "THB"],
        "status": ["completed", "completed", "completed", "pending"],
        "created_at": pd.to_datetime(["2026-01-15"] * 4),
        "updated_at": pd.to_datetime(["2026-01-15"] * 4),
    })

    context = build_asset_context()
    result = daily_transaction_summary(context, test_data)

    assert len(result) > 0
    assert "total_amount" in result.columns
    assert "transaction_count" in result.columns

    # ตรวจสอบ THB completed
    thb_completed = result[
        (result["currency"] == "THB") & (result["status"] == "completed")
    ]
    assert thb_completed["total_amount"].iloc[0] == 1700.0
    assert thb_completed["transaction_count"].iloc[0] == 2

# รัน Test
# pytest tests/ -v --tb=short

Dagster คืออะไรและต่างจาก Airflow อย่างไร

Dagster เป็น Data Orchestrator ที่เน้น Software-defined Assets คือคิดเรื่อง Data ที่ต้องการสร้างก่อนแล้วค่อยกำหนดวิธีสร้าง ต่างจาก Airflow ที่เน้น Task-based DAGs Dagster มี Type System, Built-in Testing, IO Managers และ Asset Lineage ทำให้ Debug ง่ายกว่าและ Test ได้สะดวกกว่า

12-Factor App คืออะไรและเกี่ยวอะไรกับ Data Pipeline

12-Factor App เป็นแนวทาง 12 ข้อสำหรับออกแบบ Application ให้ Deploy ง่าย Scale ดี และ Maintain ได้ เมื่อนำมาใช้กับ Data Pipeline เช่นเก็บ Config ใน Environment Variables, ใช้ Stateless Processes, แยก Build/Release/Run ชัดเจน ช่วยให้ Pipeline Portable ข้าม Environment และ Reliable มากขึ้น

วิธีจัดการ Config ของ Dagster ตาม 12-Factor ทำอย่างไร

ใช้ ConfigurableResource ร่วมกับ EnvVar สำหรับ Inject Config ที่เปลี่ยนตาม Environment เช่น Database Connection, API Keys ใส่ใน Kubernetes Secrets หรือ Environment Variables ของ Container ไม่ Hardcode ค่าใน Code เด็ดขาด

Dagster รองรับ Kubernetes Deployment หรือไม่

รองรับผ่าน dagster-k8s Package และ Official Helm Chart ซึ่ง Deploy Dagster Webserver, Daemon และ User Code แยกกัน ใช้ K8sRunLauncher สำหรับสร้าง Pod ใหม่สำหรับแต่ละ Pipeline Run ทำให้ Scale ได้ดีและแต่ละ Run แยก Resource กัน

สรุปและแนวทางปฏิบัติ

การนำหลัก 12-Factor App มาใช้กับ Dagster Data Pipeline ช่วยให้ได้ Pipeline ที่ Portable, Scalable และ Maintainable สิ่งสำคัญคือเก็บ Config ใน Environment Variables ผ่าน ConfigurableResource ใช้ IO Managers สำหรับ Abstract Storage Layer เขียน Test สำหรับ Transformation Logic ทุกตัว และ Deploy ด้วย Helm Chart บน Kubernetes ที่ใช้ K8sRunLauncher สำหรับ Isolation ระหว่าง Pipeline Runs

📖 บทความที่เกี่ยวข้อง

Vue Nuxt Server 12 Factor Appอ่านบทความ → LVM Thin Provisioning 12 Factor Appอ่านบทความ → Dagster Pipeline Docker Container Deployอ่านบทความ → Dagster Pipeline Progressive Deliveryอ่านบทความ → Dagster Pipeline Career Development ITอ่านบทความ →

📚 ดูบทความทั้งหมด →