Dagster คืออะไร
Dagster เป็น Data Orchestration Platform ที่ออกแบบมาสำหรับ Data Engineering สมัยใหม่ จุดเด่นคือแนวคิด Software-defined Assets ที่ให้คิดเรื่อง Data Pipeline ในรูปแบบของ Asset (ข้อมูลที่ต้องการสร้าง) แทนที่จะคิดเป็น Task (งานที่ต้องทำ) วิธีคิดนี้ทำให้เห็น Data Lineage ได้ชัดเจนและ Debug ปัญหาได้ง่ายกว่า
Dagster มีคุณสมบัติเด่นได้แก่ Type System สำหรับตรวจสอบ Data Quality, IO Managers สำหรับจัดการ Storage แบบ Abstract, Built-in Testing Framework, Partitions สำหรับจัดการ Incremental Processing และ Sensors/Schedules สำหรับ Trigger Pipeline อัตโนมัติ
12-Factor App Methodology กับ Data Pipeline
12-Factor App เป็นแนวทาง 12 ข้อสำหรับออกแบบ Application ที่ Deploy ได้ง่าย Scale ได้ดี และ Maintain ได้ในระยะยาว เมื่อนำมาใช้กับ Data Pipeline จะได้ Pipeline ที่ Portable ข้าม Environment และ Reliable มากขึ้น
| Factor | หลักการ | การนำไปใช้กับ Dagster |
|---|---|---|
| I. Codebase | One codebase, many deploys | เก็บ Pipeline Code ใน Git Repository เดียว Deploy ได้ทุก Environment |
| II. Dependencies | Explicitly declare dependencies | ใช้ requirements.txt หรือ pyproject.toml จัดการ Python Dependencies |
| III. Config | Store config in environment | ใช้ EnvVar ใน Dagster Resources และ dagster.yaml |
| IV. Backing Services | Treat as attached resources | ใช้ IO Managers สำหรับ Database, S3, GCS แบบ Abstract |
| V. Build, Release, Run | Strictly separate stages | Build Docker Image, Tag Version, Deploy ด้วย Helm |
| VI. Processes | Stateless processes | Dagster Run แต่ละตัว Stateless ใช้ External Storage สำหรับ State |
| VII. Port Binding | Export services via port | Dagster Webserver Bind Port สำหรับ UI และ GraphQL API |
| VIII. Concurrency | Scale via processes | ใช้ K8sRunLauncher สร้าง Pod แยกสำหรับแต่ละ Run |
| IX. Disposability | Fast startup, graceful shutdown | Pipeline Run สามารถ Cancel ได้ทันทีและ Resume จาก Checkpoint |
| X. Dev/Prod Parity | Keep environments similar | ใช้ Resource Configuration แยกตาม Environment |
| XI. Logs | Treat logs as event streams | Dagster ส่ง Log ไปยัง stdout ใช้ Structured Logging |
| XII. Admin | Run admin as one-off | ใช้ Dagster CLI สำหรับ One-off Tasks เช่น Backfill |
การตั้งค่า Dagster Project ตาม 12-Factor
# โครงสร้าง Project
# dagster-pipeline/
# ├── dagster_pipeline/
# │ ├── __init__.py
# │ ├── assets/
# │ │ ├── __init__.py
# │ │ ├── raw_data.py
# │ │ ├── transformed.py
# │ │ └── analytics.py
# │ ├── resources/
# │ │ ├── __init__.py
# │ │ ├── database.py
# │ │ └── storage.py
# │ ├── sensors/
# │ │ └── file_sensor.py
# │ └── schedules/
# │ └── daily_schedule.py
# ├── dagster.yaml
# ├── workspace.yaml
# ├── pyproject.toml
# ├── Dockerfile
# └── helm/
# └── values.yaml
# pyproject.toml
cat > pyproject.toml << 'TOML'
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"
[project]
name = "dagster-pipeline"
version = "1.0.0"
dependencies = [
"dagster>=1.6.0",
"dagster-postgres>=0.22.0",
"dagster-k8s>=0.22.0",
"dagster-aws>=0.22.0",
"pandas>=2.0",
"sqlalchemy>=2.0",
"boto3>=1.28",
]
[project.optional-dependencies]
dev = [
"dagster-webserver>=1.6.0",
"pytest>=7.0",
]
TOML
Factor III: Config ใน Environment Variables
# dagster_pipeline/resources/database.py
from dagster import ConfigurableResource, EnvVar
from sqlalchemy import create_engine
from contextlib import contextmanager
class PostgresResource(ConfigurableResource):
"""Database Resource ที่ใช้ Environment Variables ตาม 12-Factor"""
host: str = EnvVar("POSTGRES_HOST")
port: int = 5432
database: str = EnvVar("POSTGRES_DB")
username: str = EnvVar("POSTGRES_USER")
password: str = EnvVar("POSTGRES_PASSWORD")
@contextmanager
def get_connection(self):
url = f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
engine = create_engine(url, pool_size=5, max_overflow=10)
conn = engine.connect()
try:
yield conn
finally:
conn.close()
engine.dispose()
# dagster_pipeline/resources/storage.py
class S3Resource(ConfigurableResource):
"""S3 Storage Resource ตาม Factor IV: Backing Services"""
bucket: str = EnvVar("S3_BUCKET")
region: str = EnvVar("AWS_REGION")
endpoint_url: str = EnvVar("S3_ENDPOINT_URL")
def upload_dataframe(self, df, key):
import boto3
import io
s3 = boto3.client("s3", region_name=self.region,
endpoint_url=self.endpoint_url or None)
buf = io.BytesIO()
df.to_parquet(buf, index=False)
buf.seek(0)
s3.upload_fileobj(buf, self.bucket, key)
def read_dataframe(self, key):
import boto3
import pandas as pd
s3 = boto3.client("s3", region_name=self.region,
endpoint_url=self.endpoint_url or None)
obj = s3.get_object(Bucket=self.bucket, Key=key)
return pd.read_parquet(io.BytesIO(obj["Body"].read()))
Software-defined Assets
# dagster_pipeline/assets/raw_data.py
from dagster import asset, AssetExecutionContext, MetadataValue
import pandas as pd
@asset(
group_name="raw",
description="ดึงข้อมูล Raw จาก Source Database",
compute_kind="postgres",
)
def raw_transactions(
context: AssetExecutionContext,
postgres: PostgresResource,
) -> pd.DataFrame:
"""ดึงข้อมูล Transaction จาก Source Database"""
query = """
SELECT id, user_id, amount, currency, status,
created_at, updated_at
FROM transactions
WHERE created_at >= CURRENT_DATE - INTERVAL '1 day'
"""
with postgres.get_connection() as conn:
df = pd.read_sql(query, conn)
context.log.info(f"ดึงข้อมูลได้ {len(df)} rows")
context.add_output_metadata({
"row_count": MetadataValue.int(len(df)),
"columns": MetadataValue.text(", ".join(df.columns)),
"date_range": MetadataValue.text(
f"{df['created_at'].min()} to {df['created_at'].max()}"
),
})
return df
# dagster_pipeline/assets/transformed.py
@asset(
group_name="transformed",
description="แปลงข้อมูล Transaction เป็น Daily Summary",
compute_kind="pandas",
)
def daily_transaction_summary(
context: AssetExecutionContext,
raw_transactions: pd.DataFrame,
) -> pd.DataFrame:
"""สร้าง Daily Summary จาก Raw Transactions"""
df = raw_transactions.copy()
df["date"] = pd.to_datetime(df["created_at"]).dt.date
summary = df.groupby(["date", "currency", "status"]).agg(
total_amount=("amount", "sum"),
transaction_count=("id", "count"),
avg_amount=("amount", "mean"),
).reset_index()
context.log.info(f"สร้าง Summary ได้ {len(summary)} rows")
return summary
# dagster_pipeline/assets/analytics.py
@asset(
group_name="analytics",
description="Upload Analytics Data ไปยัง S3",
compute_kind="s3",
)
def analytics_report(
context: AssetExecutionContext,
daily_transaction_summary: pd.DataFrame,
s3: S3Resource,
) -> None:
"""Upload Report ไปยัง S3 Data Lake"""
from datetime import date
key = f"analytics/daily_summary/{date.today().isoformat()}.parquet"
s3.upload_dataframe(daily_transaction_summary, key)
context.log.info(f"Upload ไปยัง s3://{s3.bucket}/{key}")
Definitions และ Resource Configuration
# dagster_pipeline/__init__.py
from dagster import Definitions, EnvVar
from dagster_pipeline.assets.raw_data import raw_transactions
from dagster_pipeline.assets.transformed import daily_transaction_summary
from dagster_pipeline.assets.analytics import analytics_report
from dagster_pipeline.resources.database import PostgresResource
from dagster_pipeline.resources.storage import S3Resource
# Factor X: Dev/Prod Parity — ใช้ Resource เดียวกัน Config ต่างกัน
defs = Definitions(
assets=[raw_transactions, daily_transaction_summary, analytics_report],
resources={
"postgres": PostgresResource(
host=EnvVar("POSTGRES_HOST"),
port=5432,
database=EnvVar("POSTGRES_DB"),
username=EnvVar("POSTGRES_USER"),
password=EnvVar("POSTGRES_PASSWORD"),
),
"s3": S3Resource(
bucket=EnvVar("S3_BUCKET"),
region=EnvVar("AWS_REGION"),
endpoint_url=EnvVar("S3_ENDPOINT_URL"),
),
},
)
Docker และ Kubernetes Deployment
# Dockerfile — Factor V: Build, Release, Run
FROM python:3.11-slim AS builder
WORKDIR /app
COPY pyproject.toml .
RUN pip install --no-cache-dir ".[dev]"
COPY . .
RUN pip install --no-cache-dir -e .
FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11/site-packages /usr/local/lib/python3.11/site-packages
COPY --from=builder /usr/local/bin /usr/local/bin
COPY --from=builder /app /app
EXPOSE 3000
CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "3000", "-m", "dagster_pipeline"]
---
# helm/values.yaml — Kubernetes Deployment
dagsterWebserver:
replicaCount: 2
image:
repository: registry.company.com/dagster-pipeline
tag: v1.0.0
env:
- name: POSTGRES_HOST
valueFrom:
secretKeyRef:
name: dagster-secrets
key: postgres-host
- name: POSTGRES_DB
value: dagster
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: dagster-secrets
key: postgres-user
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: dagster-secrets
key: postgres-password
- name: S3_BUCKET
value: company-data-lake
- name: AWS_REGION
value: ap-southeast-1
- name: S3_ENDPOINT_URL
value: ""
dagsterDaemon:
enabled: true
runLauncher:
type: K8sRunLauncher
config:
k8sRunLauncher:
envVars:
- POSTGRES_HOST
- POSTGRES_DB
- POSTGRES_USER
- POSTGRES_PASSWORD
- S3_BUCKET
- AWS_REGION
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 1000m
memory: 2Gi
---
# Deploy ด้วย Helm
helm repo add dagster https://dagster-io.github.io/helm
helm upgrade --install dagster dagster/dagster \
-f helm/values.yaml \
-n dagster --create-namespace
Testing ตาม 12-Factor
# tests/test_assets.py
from dagster import materialize, build_asset_context
from dagster_pipeline.assets.raw_data import raw_transactions
from dagster_pipeline.assets.transformed import daily_transaction_summary
import pandas as pd
def test_daily_transaction_summary():
"""ทดสอบ Transformation Logic"""
# สร้าง Test Data
test_data = pd.DataFrame({
"id": [1, 2, 3, 4],
"user_id": [100, 101, 100, 102],
"amount": [500.0, 1200.0, 300.0, 800.0],
"currency": ["THB", "THB", "USD", "THB"],
"status": ["completed", "completed", "completed", "pending"],
"created_at": pd.to_datetime(["2026-01-15"] * 4),
"updated_at": pd.to_datetime(["2026-01-15"] * 4),
})
context = build_asset_context()
result = daily_transaction_summary(context, test_data)
assert len(result) > 0
assert "total_amount" in result.columns
assert "transaction_count" in result.columns
# ตรวจสอบ THB completed
thb_completed = result[
(result["currency"] == "THB") & (result["status"] == "completed")
]
assert thb_completed["total_amount"].iloc[0] == 1700.0
assert thb_completed["transaction_count"].iloc[0] == 2
# รัน Test
# pytest tests/ -v --tb=short
Dagster คืออะไรและต่างจาก Airflow อย่างไร
Dagster เป็น Data Orchestrator ที่เน้น Software-defined Assets คือคิดเรื่อง Data ที่ต้องการสร้างก่อนแล้วค่อยกำหนดวิธีสร้าง ต่างจาก Airflow ที่เน้น Task-based DAGs Dagster มี Type System, Built-in Testing, IO Managers และ Asset Lineage ทำให้ Debug ง่ายกว่าและ Test ได้สะดวกกว่า
12-Factor App คืออะไรและเกี่ยวอะไรกับ Data Pipeline
12-Factor App เป็นแนวทาง 12 ข้อสำหรับออกแบบ Application ให้ Deploy ง่าย Scale ดี และ Maintain ได้ เมื่อนำมาใช้กับ Data Pipeline เช่นเก็บ Config ใน Environment Variables, ใช้ Stateless Processes, แยก Build/Release/Run ชัดเจน ช่วยให้ Pipeline Portable ข้าม Environment และ Reliable มากขึ้น
วิธีจัดการ Config ของ Dagster ตาม 12-Factor ทำอย่างไร
ใช้ ConfigurableResource ร่วมกับ EnvVar สำหรับ Inject Config ที่เปลี่ยนตาม Environment เช่น Database Connection, API Keys ใส่ใน Kubernetes Secrets หรือ Environment Variables ของ Container ไม่ Hardcode ค่าใน Code เด็ดขาด
Dagster รองรับ Kubernetes Deployment หรือไม่
รองรับผ่าน dagster-k8s Package และ Official Helm Chart ซึ่ง Deploy Dagster Webserver, Daemon และ User Code แยกกัน ใช้ K8sRunLauncher สำหรับสร้าง Pod ใหม่สำหรับแต่ละ Pipeline Run ทำให้ Scale ได้ดีและแต่ละ Run แยก Resource กัน
สรุปและแนวทางปฏิบัติ
การนำหลัก 12-Factor App มาใช้กับ Dagster Data Pipeline ช่วยให้ได้ Pipeline ที่ Portable, Scalable และ Maintainable สิ่งสำคัญคือเก็บ Config ใน Environment Variables ผ่าน ConfigurableResource ใช้ IO Managers สำหรับ Abstract Storage Layer เขียน Test สำหรับ Transformation Logic ทุกตัว และ Deploy ด้วย Helm Chart บน Kubernetes ที่ใช้ K8sRunLauncher สำหรับ Isolation ระหว่าง Pipeline Runs
