Dagster Data Orchestrator

Dagster เป็น Open-source Data Orchestrator เน้น Software-defined Assets ใช้ Python สร้าง Pipelines มี Type System ตรวจ Data Quality มี UI แสดง Asset Lineage

เพิ่ม Team Productivity ด้วย Asset-based Approach ทีมเห็น Data Lineage ชัดเจน แบ่ง Code Locations ตามทีม Reusable Asset Factories

Dagster Assets และ Jobs

# === Dagster Software-defined Assets ===
# pip install dagster dagster-webserver dagster-postgres dagster-duckdb

import dagster as dg
from dagster import (
 asset, AssetExecutionContext, MaterializeResult,
 MetadataValue, AssetKey, Definitions,
 ScheduleDefinition, SensorDefinition,
)
from typing import Dict, List
import json

# 1. Software-defined Assets
@asset(
 description="ดึงข้อมูล Orders จาก API",
 group_name="raw",
 metadata={"source": "REST API", "format": "JSON"},
)
def raw_orders(context: AssetExecutionContext) -> Dict:
 """Extract Orders from API"""
 context.log.info("Fetching orders from API...")
 # response = requests.get("https://api.example.com/orders")
 # orders = response.json()
 orders = [
 {"id": i, "amount": 100 + i * 10, "status": "completed"}
 for i in range(100)
 ]
 context.log.info(f"Fetched {len(orders)} orders")
 return {"orders": orders, "count": len(orders)}

@asset(
 description="ดึงข้อมูล Customers จาก Database",
 group_name="raw",
 metadata={"source": "PostgreSQL", "table": "customers"},
)
def raw_customers(context: AssetExecutionContext) -> Dict:
 """Extract Customers from Database"""
 customers = [
 {"id": i, "name": f"Customer {i}", "tier": "gold" if i % 3 == 0 else "silver"}
 for i in range(50)
 ]
 return {"customers": customers, "count": len(customers)}

@asset(
 description="รวม Orders กับ Customers",
 group_name="transformed",
 deps=[AssetKey("raw_orders"), AssetKey("raw_customers")],
)
def enriched_orders(
 context: AssetExecutionContext,
 raw_orders: Dict,
 raw_customers: Dict,
) -> Dict:
 """Join Orders with Customers"""
 orders = raw_orders["orders"]
 customers = {c["id"]: c for c in raw_customers["customers"]}

 enriched = []
 for order in orders:
 customer = customers.get(order["id"] % 50, {})
 enriched.append({
 **order,
 "customer_name": customer.get("name", "Unknown"),
 "customer_tier": customer.get("tier", "standard"),
 })

 context.log.info(f"Enriched {len(enriched)} orders")
 return {"orders": enriched, "count": len(enriched)}

@asset(
 description="สรุปยอดขายรายวัน",
 group_name="analytics",
 deps=[AssetKey("enriched_orders")],
)
def daily_sales_summary(
 context: AssetExecutionContext,
 enriched_orders: Dict,
) -> MaterializeResult:
 """Aggregate Daily Sales"""
 orders = enriched_orders["orders"]
 total_revenue = sum(o["amount"] for o in orders)
 avg_order = total_revenue / len(orders) if orders else 0

 by_tier = {}
 for o in orders:
 tier = o["customer_tier"]
 by_tier[tier] = by_tier.get(tier, 0) + o["amount"]

 context.log.info(f"Total Revenue: {total_revenue}")

 return MaterializeResult(
 metadata={
 "total_revenue": MetadataValue.float(total_revenue),
 "total_orders": MetadataValue.int(len(orders)),
 "avg_order_value": MetadataValue.float(avg_order),
 "revenue_by_tier": MetadataValue.json(by_tier),
 }
 )

# 2. Schedule
daily_schedule = ScheduleDefinition(
 name="daily_pipeline",
 target="*", # All assets
 cron_schedule="0 6 * * *", # ทุกวัน 6:00
)

# 3. Definitions
# defs = Definitions(
# assets=[raw_orders, raw_customers, enriched_orders, daily_sales_summary],
# schedules=[daily_schedule],
# )

print("Dagster Assets:")
print(" raw_orders -> enriched_orders -> daily_sales_summary")
print(" raw_customers -> enriched_orders")
print(" Schedule: Daily 6:00 AM")

Team Productivity Patterns

# team_productivity.py — Dagster Team Productivity Patterns
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime

@dataclass
class TeamMember:
 name: str
 role: str
 code_location: str
 assets_owned: List[str]

@dataclass
class ProductivityMetric:
 metric: str
 before_dagster: str
 after_dagster: str
 improvement: str

class TeamProductivityDashboard:
 """Team Productivity Dashboard"""

 def __init__(self):
 self.team: List[TeamMember] = []
 self.metrics: List[ProductivityMetric] = []

 def add_member(self, member: TeamMember):
 self.team.append(member)

 def add_metric(self, metric: ProductivityMetric):
 self.metrics.append(metric)

 def team_overview(self):
 """Team Overview"""
 print(f"\n{'='*55}")
 print(f"Data Team Overview")
 print(f"{'='*55}")

 locations = {}
 for member in self.team:
 loc = member.code_location
 if loc not in locations:
 locations[loc] = []
 locations[loc].append(member)

 for loc, members in locations.items():
 print(f"\n [{loc}]")
 for m in members:
 print(f" {m.name} ({m.role})")
 print(f" Assets: {', '.join(m.assets_owned)}")

 def productivity_report(self):
 """Productivity Improvement Report"""
 print(f"\n Productivity Improvements:")
 for m in self.metrics:
 print(f" {m.metric}:")
 print(f" Before: {m.before_dagster}")
 print(f" After: {m.after_dagster}")
 print(f" Improvement: {m.improvement}")

 def best_practices(self):
 """Best Practices สำหรับทีม"""
 practices = [
 ("Asset Naming Convention",
 "ใช้ prefix ตาม layer: raw_, clean_, agg_, ml_"),
 ("Code Locations",
 "แบ่ง Code Location ตามทีม/Domain"),
 ("Asset Factories",
 "สร้าง Factory Functions สำหรับ Assets ที่คล้ายกัน"),
 ("Testing",
 "เขียน Unit Test สำหรับทุก Asset ด้วย materialize_to_memory"),
 ("Documentation",
 "ใส่ description และ metadata ทุก Asset"),
 ("Alerting",
 "ตั้ง Slack/PagerDuty alerts เมื่อ Asset fail"),
 ("Partitioning",
 "ใช้ Daily/Monthly Partitions สำหรับ Time-series Data"),
 ("CI/CD",
 "ใช้ dagster-cloud CLI deploy อัตโนมัติ"),
 ]

 print(f"\n Best Practices:")
 for name, desc in practices:
 print(f" {name}: {desc}")

# ตัวอย่าง
dashboard = TeamProductivityDashboard()

team = [
 TeamMember("Alice", "Data Engineer", "ingestion",
 ["raw_orders", "raw_customers", "raw_products"]),
 TeamMember("Bob", "Data Engineer", "transformation",
 ["enriched_orders", "clean_customers"]),
 TeamMember("Charlie", "Analytics Engineer", "analytics",
 ["daily_sales", "monthly_report", "customer_segments"]),
 TeamMember("Diana", "ML Engineer", "ml_pipeline",
 ["feature_store", "model_training", "predictions"]),
]

metrics = [
 ProductivityMetric("Pipeline Development Time",
 "2-3 weeks", "3-5 days", "70% faster"),
 ProductivityMetric("Debugging Time",
 "4-8 hours", "30 min-1 hour", "80% reduction"),
 ProductivityMetric("Data Quality Issues",
 "5-10 per week", "1-2 per week", "75% reduction"),
 ProductivityMetric("Onboarding New Member",
 "2-3 weeks", "3-5 days", "70% faster"),
 ProductivityMetric("Deployment Frequency",
 "Weekly", "Daily (CI/CD)", "5x more frequent"),
]

for m in team:
 dashboard.add_member(m)
for m in metrics:
 dashboard.add_metric(m)

dashboard.team_overview()
dashboard.productivity_report()
dashboard.best_practices()

Testing และ CI/CD

# === Dagster Testing และ CI/CD ===

# 1. Unit Testing Assets
# tests/test_assets.py
# from dagster import materialize_to_memory
# from my_project.assets import raw_orders, enriched_orders
#
# def test_raw_orders():
# result = materialize_to_memory([raw_orders])
# assert result.success
# output = result.output_for_node("raw_orders")
# assert output["count"] > 0
# assert "orders" in output
#
# def test_enriched_orders():
# result = materialize_to_memory(
# [raw_orders, raw_customers, enriched_orders]
# )
# assert result.success
# output = result.output_for_node("enriched_orders")
# assert output["count"] > 0
# for order in output["orders"]:
# assert "customer_name" in order
# assert "customer_tier" in order

# 2. GitHub Actions CI/CD
# .github/workflows/dagster.yml
# name: Dagster CI/CD
# on:
# push:
# branches: [main]
# pull_request:
# branches: [main]
#
# jobs:
# test:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - uses: actions/setup-python@v5
# with:
# python-version: '3.11'
# - run: pip install -e ".[dev]"
# - run: pytest tests/ -v
# - run: dagster asset list # Verify assets
#
# deploy:
# needs: test
# if: github.ref == 'refs/heads/main'
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - run: pip install dagster-cloud
# - run: dagster-cloud workspace sync
# env:
# DAGSTER_CLOUD_API_TOKEN: }

# 3. Asset Factory Pattern
# def create_table_asset(table_name, schema, source_query):
# @asset(
# name=f"raw_{table_name}",
# group_name="raw",
# metadata={"table": table_name, "schema": schema},
# )
# def _asset(context):
# # db.execute(source_query)
# context.log.info(f"Loaded {table_name}")
# return {"table": table_name, "rows": 1000}
# return _asset
#
# # สร้าง Assets จาก Factory
# tables = ["orders", "customers", "products", "payments"]
# raw_assets = [create_table_asset(t, "public", f"SELECT * FROM {t}") for t in tables]

testing_ci = {
 "Unit Tests": "materialize_to_memory() ทดสอบ Asset ใน Memory",
 "Integration Tests": "ทดสอบกับ Database จริงใน CI",
 "Asset Validation": "dagster asset list ตรวจสอบ Assets",
 "CI Pipeline": "GitHub Actions: test -> deploy",
 "CD": "dagster-cloud workspace sync",
}

print("Testing & CI/CD:")
for key, value in testing_ci.items():
 print(f" {key}: {value}")

Best Practices

  • Asset-first: คิดเป็น Assets (Data Products) ไม่ใช่ Tasks
  • Code Locations: แบ่ง Code Location ตามทีมหรือ Domain
  • Asset Factories: สร้าง Factory Functions สำหรับ Assets ที่คล้ายกัน
  • Type System: ใช้ Dagster Types ตรวจสอบ Data Quality
  • Partitioning: ใช้ Partitions สำหรับ Time-series Data ลด Reprocessing
  • Testing: เขียน Unit Test ทุก Asset ด้วย materialize_to_memory

Dagster คืออะไร

Open-source Data Orchestrator เน้น Software-defined Assets Python Data Pipelines Type System Data Quality UI Asset Lineage Partitioning Scheduling Sensors Modern Data Stack