Technology

Dagster Pipeline Team Productivity

dagster pipeline team productivity
Dagster Pipeline Team Productivity | SiamCafe Blog
2026-05-09· อ. บอม — SiamCafe.net· 9,894 คำ

Dagster Data Orchestrator

Dagster เป็น Open-source Data Orchestrator เน้น Software-defined Assets ใช้ Python สร้าง Pipelines มี Type System ตรวจ Data Quality มี UI แสดง Asset Lineage

เพิ่ม Team Productivity ด้วย Asset-based Approach ทีมเห็น Data Lineage ชัดเจน แบ่ง Code Locations ตามทีม Reusable Asset Factories

Dagster Assets และ Jobs

# === Dagster Software-defined Assets ===
# pip install dagster dagster-webserver dagster-postgres dagster-duckdb

import dagster as dg
from dagster import (
    asset, AssetExecutionContext, MaterializeResult,
    MetadataValue, AssetKey, Definitions,
    ScheduleDefinition, SensorDefinition,
)
from typing import Dict, List
import json

# 1. Software-defined Assets
@asset(
    description="ดึงข้อมูล Orders จาก API",
    group_name="raw",
    metadata={"source": "REST API", "format": "JSON"},
)
def raw_orders(context: AssetExecutionContext) -> Dict:
    """Extract Orders from API"""
    context.log.info("Fetching orders from API...")
    # response = requests.get("https://api.example.com/orders")
    # orders = response.json()
    orders = [
        {"id": i, "amount": 100 + i * 10, "status": "completed"}
        for i in range(100)
    ]
    context.log.info(f"Fetched {len(orders)} orders")
    return {"orders": orders, "count": len(orders)}

@asset(
    description="ดึงข้อมูล Customers จาก Database",
    group_name="raw",
    metadata={"source": "PostgreSQL", "table": "customers"},
)
def raw_customers(context: AssetExecutionContext) -> Dict:
    """Extract Customers from Database"""
    customers = [
        {"id": i, "name": f"Customer {i}", "tier": "gold" if i % 3 == 0 else "silver"}
        for i in range(50)
    ]
    return {"customers": customers, "count": len(customers)}

@asset(
    description="รวม Orders กับ Customers",
    group_name="transformed",
    deps=[AssetKey("raw_orders"), AssetKey("raw_customers")],
)
def enriched_orders(
    context: AssetExecutionContext,
    raw_orders: Dict,
    raw_customers: Dict,
) -> Dict:
    """Join Orders with Customers"""
    orders = raw_orders["orders"]
    customers = {c["id"]: c for c in raw_customers["customers"]}

    enriched = []
    for order in orders:
        customer = customers.get(order["id"] % 50, {})
        enriched.append({
            **order,
            "customer_name": customer.get("name", "Unknown"),
            "customer_tier": customer.get("tier", "standard"),
        })

    context.log.info(f"Enriched {len(enriched)} orders")
    return {"orders": enriched, "count": len(enriched)}

@asset(
    description="สรุปยอดขายรายวัน",
    group_name="analytics",
    deps=[AssetKey("enriched_orders")],
)
def daily_sales_summary(
    context: AssetExecutionContext,
    enriched_orders: Dict,
) -> MaterializeResult:
    """Aggregate Daily Sales"""
    orders = enriched_orders["orders"]
    total_revenue = sum(o["amount"] for o in orders)
    avg_order = total_revenue / len(orders) if orders else 0

    by_tier = {}
    for o in orders:
        tier = o["customer_tier"]
        by_tier[tier] = by_tier.get(tier, 0) + o["amount"]

    context.log.info(f"Total Revenue: {total_revenue}")

    return MaterializeResult(
        metadata={
            "total_revenue": MetadataValue.float(total_revenue),
            "total_orders": MetadataValue.int(len(orders)),
            "avg_order_value": MetadataValue.float(avg_order),
            "revenue_by_tier": MetadataValue.json(by_tier),
        }
    )

# 2. Schedule
daily_schedule = ScheduleDefinition(
    name="daily_pipeline",
    target="*",  # All assets
    cron_schedule="0 6 * * *",  # ทุกวัน 6:00
)

# 3. Definitions
# defs = Definitions(
#     assets=[raw_orders, raw_customers, enriched_orders, daily_sales_summary],
#     schedules=[daily_schedule],
# )

print("Dagster Assets:")
print("  raw_orders -> enriched_orders -> daily_sales_summary")
print("  raw_customers -> enriched_orders")
print("  Schedule: Daily 6:00 AM")

Team Productivity Patterns

# team_productivity.py — Dagster Team Productivity Patterns
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime

@dataclass
class TeamMember:
    name: str
    role: str
    code_location: str
    assets_owned: List[str]

@dataclass
class ProductivityMetric:
    metric: str
    before_dagster: str
    after_dagster: str
    improvement: str

class TeamProductivityDashboard:
    """Team Productivity Dashboard"""

    def __init__(self):
        self.team: List[TeamMember] = []
        self.metrics: List[ProductivityMetric] = []

    def add_member(self, member: TeamMember):
        self.team.append(member)

    def add_metric(self, metric: ProductivityMetric):
        self.metrics.append(metric)

    def team_overview(self):
        """Team Overview"""
        print(f"\n{'='*55}")
        print(f"Data Team Overview")
        print(f"{'='*55}")

        locations = {}
        for member in self.team:
            loc = member.code_location
            if loc not in locations:
                locations[loc] = []
            locations[loc].append(member)

        for loc, members in locations.items():
            print(f"\n  [{loc}]")
            for m in members:
                print(f"    {m.name} ({m.role})")
                print(f"      Assets: {', '.join(m.assets_owned)}")

    def productivity_report(self):
        """Productivity Improvement Report"""
        print(f"\n  Productivity Improvements:")
        for m in self.metrics:
            print(f"    {m.metric}:")
            print(f"      Before: {m.before_dagster}")
            print(f"      After:  {m.after_dagster}")
            print(f"      Improvement: {m.improvement}")

    def best_practices(self):
        """Best Practices สำหรับทีม"""
        practices = [
            ("Asset Naming Convention",
             "ใช้ prefix ตาม layer: raw_, clean_, agg_, ml_"),
            ("Code Locations",
             "แบ่ง Code Location ตามทีม/Domain"),
            ("Asset Factories",
             "สร้าง Factory Functions สำหรับ Assets ที่คล้ายกัน"),
            ("Testing",
             "เขียน Unit Test สำหรับทุก Asset ด้วย materialize_to_memory"),
            ("Documentation",
             "ใส่ description และ metadata ทุก Asset"),
            ("Alerting",
             "ตั้ง Slack/PagerDuty alerts เมื่อ Asset fail"),
            ("Partitioning",
             "ใช้ Daily/Monthly Partitions สำหรับ Time-series Data"),
            ("CI/CD",
             "ใช้ dagster-cloud CLI deploy อัตโนมัติ"),
        ]

        print(f"\n  Best Practices:")
        for name, desc in practices:
            print(f"    {name}: {desc}")

# ตัวอย่าง
dashboard = TeamProductivityDashboard()

team = [
    TeamMember("Alice", "Data Engineer", "ingestion",
               ["raw_orders", "raw_customers", "raw_products"]),
    TeamMember("Bob", "Data Engineer", "transformation",
               ["enriched_orders", "clean_customers"]),
    TeamMember("Charlie", "Analytics Engineer", "analytics",
               ["daily_sales", "monthly_report", "customer_segments"]),
    TeamMember("Diana", "ML Engineer", "ml_pipeline",
               ["feature_store", "model_training", "predictions"]),
]

metrics = [
    ProductivityMetric("Pipeline Development Time",
                      "2-3 weeks", "3-5 days", "70% faster"),
    ProductivityMetric("Debugging Time",
                      "4-8 hours", "30 min-1 hour", "80% reduction"),
    ProductivityMetric("Data Quality Issues",
                      "5-10 per week", "1-2 per week", "75% reduction"),
    ProductivityMetric("Onboarding New Member",
                      "2-3 weeks", "3-5 days", "70% faster"),
    ProductivityMetric("Deployment Frequency",
                      "Weekly", "Daily (CI/CD)", "5x more frequent"),
]

for m in team:
    dashboard.add_member(m)
for m in metrics:
    dashboard.add_metric(m)

dashboard.team_overview()
dashboard.productivity_report()
dashboard.best_practices()

Testing และ CI/CD

# === Dagster Testing และ CI/CD ===

# 1. Unit Testing Assets
# tests/test_assets.py
# from dagster import materialize_to_memory
# from my_project.assets import raw_orders, enriched_orders
#
# def test_raw_orders():
#     result = materialize_to_memory([raw_orders])
#     assert result.success
#     output = result.output_for_node("raw_orders")
#     assert output["count"] > 0
#     assert "orders" in output
#
# def test_enriched_orders():
#     result = materialize_to_memory(
#         [raw_orders, raw_customers, enriched_orders]
#     )
#     assert result.success
#     output = result.output_for_node("enriched_orders")
#     assert output["count"] > 0
#     for order in output["orders"]:
#         assert "customer_name" in order
#         assert "customer_tier" in order

# 2. GitHub Actions CI/CD
# .github/workflows/dagster.yml
# name: Dagster CI/CD
# on:
#   push:
#     branches: [main]
#   pull_request:
#     branches: [main]
#
# jobs:
#   test:
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#       - uses: actions/setup-python@v5
#         with:
#           python-version: '3.11'
#       - run: pip install -e ".[dev]"
#       - run: pytest tests/ -v
#       - run: dagster asset list  # Verify assets
#
#   deploy:
#     needs: test
#     if: github.ref == 'refs/heads/main'
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#       - run: pip install dagster-cloud
#       - run: dagster-cloud workspace sync
#         env:
#           DAGSTER_CLOUD_API_TOKEN: }

# 3. Asset Factory Pattern
# def create_table_asset(table_name, schema, source_query):
#     @asset(
#         name=f"raw_{table_name}",
#         group_name="raw",
#         metadata={"table": table_name, "schema": schema},
#     )
#     def _asset(context):
#         # db.execute(source_query)
#         context.log.info(f"Loaded {table_name}")
#         return {"table": table_name, "rows": 1000}
#     return _asset
#
# # สร้าง Assets จาก Factory
# tables = ["orders", "customers", "products", "payments"]
# raw_assets = [create_table_asset(t, "public", f"SELECT * FROM {t}") for t in tables]

testing_ci = {
    "Unit Tests": "materialize_to_memory() ทดสอบ Asset ใน Memory",
    "Integration Tests": "ทดสอบกับ Database จริงใน CI",
    "Asset Validation": "dagster asset list ตรวจสอบ Assets",
    "CI Pipeline": "GitHub Actions: test -> deploy",
    "CD": "dagster-cloud workspace sync",
}

print("Testing & CI/CD:")
for key, value in testing_ci.items():
    print(f"  {key}: {value}")

Best Practices

Dagster คืออะไร

Open-source Data Orchestrator เน้น Software-defined Assets Python Data Pipelines Type System Data Quality UI Asset Lineage Partitioning Scheduling Sensors Modern Data Stack

Dagster ต่างจาก Airflow อย่างไร

Dagster Asset-based ข้อมูลศูนย์กลาง Airflow Task-based งานศูนย์กลาง Dagster Type System ตรวจ Quality ในตัว Unit Test Support Software-defined Assets Data Lineage อัตโนมัติ

Software-defined Assets คืออะไร

กำหนด Data Assets Tables Files ML Models ด้วย Code Dependencies ชัดเจน Dagster สร้าง DAG อัตโนมัติ Data Lineage ทั้งหมด Materialization สร้างใหม่เมื่อต้องการ

วิธีเพิ่ม Team Productivity ด้วย Dagster ทำอย่างไร

Software-defined Assets ทีมเห็น Data Lineage แบ่ง Code Locations ตามทีม Dagster Cloud Managed Infrastructure Reusable Asset Factories Sensors Event-driven Alerting Pipeline ล้มเหลว

สรุป

Dagster เป็น Data Orchestrator เน้น Software-defined Assets ทีมเห็น Data Lineage ชัดเจน แบ่ง Code Locations ตามทีม Asset Factories สร้าง Assets ซ้ำได้ Type System ตรวจ Quality Unit Test ด้วย materialize_to_memory CI/CD ด้วย dagster-cloud

📖 บทความที่เกี่ยวข้อง

TypeScript tRPC Team Productivityอ่านบทความ → Linkerd Service Mesh Team Productivityอ่านบทความ → Flux CD GitOps Team Productivityอ่านบทความ → Neon Serverless Postgres Team Productivityอ่านบทความ → Kafka Connect Team Productivityอ่านบทความ →

📚 ดูบทความทั้งหมด →