Dagster Data Orchestrator
Dagster เป็น Open-source Data Orchestrator เน้น Software-defined Assets ใช้ Python สร้าง Pipelines มี Type System ตรวจ Data Quality มี UI แสดง Asset Lineage
เพิ่ม Team Productivity ด้วย Asset-based Approach ทีมเห็น Data Lineage ชัดเจน แบ่ง Code Locations ตามทีม Reusable Asset Factories
Dagster Assets และ Jobs
# === Dagster Software-defined Assets ===
# pip install dagster dagster-webserver dagster-postgres dagster-duckdb
import dagster as dg
from dagster import (
asset, AssetExecutionContext, MaterializeResult,
MetadataValue, AssetKey, Definitions,
ScheduleDefinition, SensorDefinition,
)
from typing import Dict, List
import json
# 1. Software-defined Assets
@asset(
description="ดึงข้อมูล Orders จาก API",
group_name="raw",
metadata={"source": "REST API", "format": "JSON"},
)
def raw_orders(context: AssetExecutionContext) -> Dict:
"""Extract Orders from API"""
context.log.info("Fetching orders from API...")
# response = requests.get("https://api.example.com/orders")
# orders = response.json()
orders = [
{"id": i, "amount": 100 + i * 10, "status": "completed"}
for i in range(100)
]
context.log.info(f"Fetched {len(orders)} orders")
return {"orders": orders, "count": len(orders)}
@asset(
description="ดึงข้อมูล Customers จาก Database",
group_name="raw",
metadata={"source": "PostgreSQL", "table": "customers"},
)
def raw_customers(context: AssetExecutionContext) -> Dict:
"""Extract Customers from Database"""
customers = [
{"id": i, "name": f"Customer {i}", "tier": "gold" if i % 3 == 0 else "silver"}
for i in range(50)
]
return {"customers": customers, "count": len(customers)}
@asset(
description="รวม Orders กับ Customers",
group_name="transformed",
deps=[AssetKey("raw_orders"), AssetKey("raw_customers")],
)
def enriched_orders(
context: AssetExecutionContext,
raw_orders: Dict,
raw_customers: Dict,
) -> Dict:
"""Join Orders with Customers"""
orders = raw_orders["orders"]
customers = {c["id"]: c for c in raw_customers["customers"]}
enriched = []
for order in orders:
customer = customers.get(order["id"] % 50, {})
enriched.append({
**order,
"customer_name": customer.get("name", "Unknown"),
"customer_tier": customer.get("tier", "standard"),
})
context.log.info(f"Enriched {len(enriched)} orders")
return {"orders": enriched, "count": len(enriched)}
@asset(
description="สรุปยอดขายรายวัน",
group_name="analytics",
deps=[AssetKey("enriched_orders")],
)
def daily_sales_summary(
context: AssetExecutionContext,
enriched_orders: Dict,
) -> MaterializeResult:
"""Aggregate Daily Sales"""
orders = enriched_orders["orders"]
total_revenue = sum(o["amount"] for o in orders)
avg_order = total_revenue / len(orders) if orders else 0
by_tier = {}
for o in orders:
tier = o["customer_tier"]
by_tier[tier] = by_tier.get(tier, 0) + o["amount"]
context.log.info(f"Total Revenue: {total_revenue}")
return MaterializeResult(
metadata={
"total_revenue": MetadataValue.float(total_revenue),
"total_orders": MetadataValue.int(len(orders)),
"avg_order_value": MetadataValue.float(avg_order),
"revenue_by_tier": MetadataValue.json(by_tier),
}
)
# 2. Schedule
daily_schedule = ScheduleDefinition(
name="daily_pipeline",
target="*", # All assets
cron_schedule="0 6 * * *", # ทุกวัน 6:00
)
# 3. Definitions
# defs = Definitions(
# assets=[raw_orders, raw_customers, enriched_orders, daily_sales_summary],
# schedules=[daily_schedule],
# )
print("Dagster Assets:")
print(" raw_orders -> enriched_orders -> daily_sales_summary")
print(" raw_customers -> enriched_orders")
print(" Schedule: Daily 6:00 AM")
Team Productivity Patterns
# team_productivity.py — Dagster Team Productivity Patterns
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime
@dataclass
class TeamMember:
name: str
role: str
code_location: str
assets_owned: List[str]
@dataclass
class ProductivityMetric:
metric: str
before_dagster: str
after_dagster: str
improvement: str
class TeamProductivityDashboard:
"""Team Productivity Dashboard"""
def __init__(self):
self.team: List[TeamMember] = []
self.metrics: List[ProductivityMetric] = []
def add_member(self, member: TeamMember):
self.team.append(member)
def add_metric(self, metric: ProductivityMetric):
self.metrics.append(metric)
def team_overview(self):
"""Team Overview"""
print(f"\n{'='*55}")
print(f"Data Team Overview")
print(f"{'='*55}")
locations = {}
for member in self.team:
loc = member.code_location
if loc not in locations:
locations[loc] = []
locations[loc].append(member)
for loc, members in locations.items():
print(f"\n [{loc}]")
for m in members:
print(f" {m.name} ({m.role})")
print(f" Assets: {', '.join(m.assets_owned)}")
def productivity_report(self):
"""Productivity Improvement Report"""
print(f"\n Productivity Improvements:")
for m in self.metrics:
print(f" {m.metric}:")
print(f" Before: {m.before_dagster}")
print(f" After: {m.after_dagster}")
print(f" Improvement: {m.improvement}")
def best_practices(self):
"""Best Practices สำหรับทีม"""
practices = [
("Asset Naming Convention",
"ใช้ prefix ตาม layer: raw_, clean_, agg_, ml_"),
("Code Locations",
"แบ่ง Code Location ตามทีม/Domain"),
("Asset Factories",
"สร้าง Factory Functions สำหรับ Assets ที่คล้ายกัน"),
("Testing",
"เขียน Unit Test สำหรับทุก Asset ด้วย materialize_to_memory"),
("Documentation",
"ใส่ description และ metadata ทุก Asset"),
("Alerting",
"ตั้ง Slack/PagerDuty alerts เมื่อ Asset fail"),
("Partitioning",
"ใช้ Daily/Monthly Partitions สำหรับ Time-series Data"),
("CI/CD",
"ใช้ dagster-cloud CLI deploy อัตโนมัติ"),
]
print(f"\n Best Practices:")
for name, desc in practices:
print(f" {name}: {desc}")
# ตัวอย่าง
dashboard = TeamProductivityDashboard()
team = [
TeamMember("Alice", "Data Engineer", "ingestion",
["raw_orders", "raw_customers", "raw_products"]),
TeamMember("Bob", "Data Engineer", "transformation",
["enriched_orders", "clean_customers"]),
TeamMember("Charlie", "Analytics Engineer", "analytics",
["daily_sales", "monthly_report", "customer_segments"]),
TeamMember("Diana", "ML Engineer", "ml_pipeline",
["feature_store", "model_training", "predictions"]),
]
metrics = [
ProductivityMetric("Pipeline Development Time",
"2-3 weeks", "3-5 days", "70% faster"),
ProductivityMetric("Debugging Time",
"4-8 hours", "30 min-1 hour", "80% reduction"),
ProductivityMetric("Data Quality Issues",
"5-10 per week", "1-2 per week", "75% reduction"),
ProductivityMetric("Onboarding New Member",
"2-3 weeks", "3-5 days", "70% faster"),
ProductivityMetric("Deployment Frequency",
"Weekly", "Daily (CI/CD)", "5x more frequent"),
]
for m in team:
dashboard.add_member(m)
for m in metrics:
dashboard.add_metric(m)
dashboard.team_overview()
dashboard.productivity_report()
dashboard.best_practices()
Testing และ CI/CD
# === Dagster Testing และ CI/CD ===
# 1. Unit Testing Assets
# tests/test_assets.py
# from dagster import materialize_to_memory
# from my_project.assets import raw_orders, enriched_orders
#
# def test_raw_orders():
# result = materialize_to_memory([raw_orders])
# assert result.success
# output = result.output_for_node("raw_orders")
# assert output["count"] > 0
# assert "orders" in output
#
# def test_enriched_orders():
# result = materialize_to_memory(
# [raw_orders, raw_customers, enriched_orders]
# )
# assert result.success
# output = result.output_for_node("enriched_orders")
# assert output["count"] > 0
# for order in output["orders"]:
# assert "customer_name" in order
# assert "customer_tier" in order
# 2. GitHub Actions CI/CD
# .github/workflows/dagster.yml
# name: Dagster CI/CD
# on:
# push:
# branches: [main]
# pull_request:
# branches: [main]
#
# jobs:
# test:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - uses: actions/setup-python@v5
# with:
# python-version: '3.11'
# - run: pip install -e ".[dev]"
# - run: pytest tests/ -v
# - run: dagster asset list # Verify assets
#
# deploy:
# needs: test
# if: github.ref == 'refs/heads/main'
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - run: pip install dagster-cloud
# - run: dagster-cloud workspace sync
# env:
# DAGSTER_CLOUD_API_TOKEN: }
# 3. Asset Factory Pattern
# def create_table_asset(table_name, schema, source_query):
# @asset(
# name=f"raw_{table_name}",
# group_name="raw",
# metadata={"table": table_name, "schema": schema},
# )
# def _asset(context):
# # db.execute(source_query)
# context.log.info(f"Loaded {table_name}")
# return {"table": table_name, "rows": 1000}
# return _asset
#
# # สร้าง Assets จาก Factory
# tables = ["orders", "customers", "products", "payments"]
# raw_assets = [create_table_asset(t, "public", f"SELECT * FROM {t}") for t in tables]
testing_ci = {
"Unit Tests": "materialize_to_memory() ทดสอบ Asset ใน Memory",
"Integration Tests": "ทดสอบกับ Database จริงใน CI",
"Asset Validation": "dagster asset list ตรวจสอบ Assets",
"CI Pipeline": "GitHub Actions: test -> deploy",
"CD": "dagster-cloud workspace sync",
}
print("Testing & CI/CD:")
for key, value in testing_ci.items():
print(f" {key}: {value}")
Best Practices
- Asset-first: คิดเป็น Assets (Data Products) ไม่ใช่ Tasks
- Code Locations: แบ่ง Code Location ตามทีมหรือ Domain
- Asset Factories: สร้าง Factory Functions สำหรับ Assets ที่คล้ายกัน
- Type System: ใช้ Dagster Types ตรวจสอบ Data Quality
- Partitioning: ใช้ Partitions สำหรับ Time-series Data ลด Reprocessing
- Testing: เขียน Unit Test ทุก Asset ด้วย materialize_to_memory
Dagster คืออะไร
Open-source Data Orchestrator เน้น Software-defined Assets Python Data Pipelines Type System Data Quality UI Asset Lineage Partitioning Scheduling Sensors Modern Data Stack
Dagster ต่างจาก Airflow อย่างไร
Dagster Asset-based ข้อมูลศูนย์กลาง Airflow Task-based งานศูนย์กลาง Dagster Type System ตรวจ Quality ในตัว Unit Test Support Software-defined Assets Data Lineage อัตโนมัติ
Software-defined Assets คืออะไร
กำหนด Data Assets Tables Files ML Models ด้วย Code Dependencies ชัดเจน Dagster สร้าง DAG อัตโนมัติ Data Lineage ทั้งหมด Materialization สร้างใหม่เมื่อต้องการ
วิธีเพิ่ม Team Productivity ด้วย Dagster ทำอย่างไร
Software-defined Assets ทีมเห็น Data Lineage แบ่ง Code Locations ตามทีม Dagster Cloud Managed Infrastructure Reusable Asset Factories Sensors Event-driven Alerting Pipeline ล้มเหลว
สรุป
Dagster เป็น Data Orchestrator เน้น Software-defined Assets ทีมเห็น Data Lineage ชัดเจน แบ่ง Code Locations ตามทีม Asset Factories สร้าง Assets ซ้ำได้ Type System ตรวจ Quality Unit Test ด้วย materialize_to_memory CI/CD ด้วย dagster-cloud
