Technology

Dagster Pipeline Low Code No Code

dagster pipeline low code no code
Dagster Pipeline Low Code No Code | SiamCafe Blog
2026-01-17· อ. บอม — SiamCafe.net· 10,196 คำ

Dagster Pipeline Low Code

Dagster Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing

Low Code No Code ใช้โค้ดน้อย Visual Interface ลาก Drop Non-developer สร้างแอปได้เร็ว Dagster Config System Launchpad Reusable Assets

Dagster Assets และ Config

# dagster_lowcode.py — Dagster Low Code Pipeline
# pip install dagster dagster-webserver dagster-pandas

import dagster
from dagster import (
    asset, AssetExecutionContext, Config,
    Definitions, MaterializeResult, MetadataValue,
    schedule, sensor, RunRequest, RunConfig,
)
from dataclasses import dataclass
import pandas as pd

# 1. Config Classes — กำหนด Parameters ผ่าน UI
class ExtractConfig(Config):
    """Config สำหรับ Extract Data"""
    source_url: str = "https://api.example.com/data"
    date_from: str = "2024-01-01"
    date_to: str = "2024-12-31"
    limit: int = 1000
    format: str = "json"

class TransformConfig(Config):
    """Config สำหรับ Transform Data"""
    drop_nulls: bool = True
    normalize: bool = True
    columns: str = "id, name, value, date"  # comma-separated
    filter_column: str = ""
    filter_value: str = ""

class LoadConfig(Config):
    """Config สำหรับ Load Data"""
    destination: str = "warehouse"  # warehouse, s3, local
    table_name: str = "processed_data"
    mode: str = "append"  # append, replace, upsert

# 2. Assets — Reusable Data Components
@asset(
    description="Extract data from source API",
    group_name="etl_pipeline",
    compute_kind="python",
)
def raw_data(context: AssetExecutionContext, config: ExtractConfig):
    """Extract Data — Non-developer กำหนด Config ผ่าน Dagit UI"""
    context.log.info(f"Extracting from {config.source_url}")
    context.log.info(f"Date range: {config.date_from} to {config.date_to}")

    # Simulate API call
    data = pd.DataFrame({
        "id": range(1, config.limit + 1),
        "name": [f"item_{i}" for i in range(1, config.limit + 1)],
        "value": [i * 10.5 for i in range(1, config.limit + 1)],
        "date": pd.date_range(config.date_from, periods=config.limit, freq="h"),
    })

    return MaterializeResult(
        metadata={
            "num_rows": MetadataValue.int(len(data)),
            "columns": MetadataValue.text(", ".join(data.columns)),
            "source": MetadataValue.text(config.source_url),
        },
    )

@asset(
    deps=["raw_data"],
    description="Transform and clean data",
    group_name="etl_pipeline",
    compute_kind="pandas",
)
def clean_data(context: AssetExecutionContext, config: TransformConfig):
    """Transform Data — Config กำหนดผ่าน UI"""
    context.log.info(f"Transform: drop_nulls={config.drop_nulls}")
    context.log.info(f"Columns: {config.columns}")

    # Simulate transform
    columns = [c.strip() for c in config.columns.split(",")]
    context.log.info(f"Processing {len(columns)} columns")

    if config.filter_column and config.filter_value:
        context.log.info(f"Filter: {config.filter_column} = {config.filter_value}")

    return MaterializeResult(
        metadata={
            "columns_used": MetadataValue.text(config.columns),
            "drop_nulls": MetadataValue.bool(config.drop_nulls),
            "normalized": MetadataValue.bool(config.normalize),
        },
    )

@asset(
    deps=["clean_data"],
    description="Load data to destination",
    group_name="etl_pipeline",
    compute_kind="database",
)
def warehouse_data(context: AssetExecutionContext, config: LoadConfig):
    """Load Data — เลือก Destination ผ่าน UI"""
    context.log.info(f"Loading to {config.destination}/{config.table_name}")
    context.log.info(f"Mode: {config.mode}")

    return MaterializeResult(
        metadata={
            "destination": MetadataValue.text(config.destination),
            "table": MetadataValue.text(config.table_name),
            "mode": MetadataValue.text(config.mode),
        },
    )

# 3. Definitions
defs = Definitions(
    assets=[raw_data, clean_data, warehouse_data],
)

print("Dagster Low Code Pipeline:")
print("  Assets: raw_data -> clean_data -> warehouse_data")
print("  Config: ExtractConfig, TransformConfig, LoadConfig")
print("  UI: Dagit Launchpad สำหรับกรอก Config")

Sensors และ Automation

# dagster_automation.py — Sensors และ Automation
from dagster import (
    sensor, SensorEvaluationContext, RunRequest,
    schedule, ScheduleEvaluationContext,
    asset_sensor, AssetKey, EventLogEntry,
)

# 1. File Sensor — ตรวจจับไฟล์ใหม่
# @sensor(
#     asset_selection=[AssetKey("raw_data")],
#     minimum_interval_seconds=60,
# )
# def new_file_sensor(context: SensorEvaluationContext):
#     """ตรวจจับไฟล์ใหม่ใน S3/Local — ทำงานอัตโนมัติ"""
#     import os
#     watch_dir = "/data/incoming"
#     last_cursor = context.cursor or ""
#
#     new_files = []
#     for f in sorted(os.listdir(watch_dir)):
#         if f > last_cursor and f.endswith(".csv"):
#             new_files.append(f)
#
#     if new_files:
#         context.update_cursor(new_files[-1])
#         for f in new_files:
#             yield RunRequest(
#                 run_key=f,
#                 run_config={
#                     "ops": {
#                         "raw_data": {
#                             "config": {
#                                 "source_url": f"/data/incoming/{f}",
#                                 "format": "csv",
#                             }
#                         }
#                     }
#                 },
#             )

# 2. Webhook Sensor — รับ Webhook จาก n8n/Zapier
# @sensor(asset_selection=[AssetKey("raw_data")])
# def webhook_sensor(context: SensorEvaluationContext):
#     """รับ Trigger จาก Low Code Tools (n8n, Zapier, Make)"""
#     import requests
#     response = requests.get("http://localhost:8000/webhooks/pending")
#     webhooks = response.json()
#
#     for webhook in webhooks:
#         yield RunRequest(
#             run_key=webhook["id"],
#             run_config=RunConfig(
#                 ops={"raw_data": ExtractConfig(
#                     source_url=webhook["source_url"],
#                     date_from=webhook["date_from"],
#                     date_to=webhook["date_to"],
#                 )}
#             ),
#         )

# 3. Schedule — รันตามเวลา
# @schedule(
#     cron_schedule="0 6 * * *",  # ทุกวัน 6:00
#     target=[AssetKey("raw_data"), AssetKey("clean_data"), AssetKey("warehouse_data")],
# )
# def daily_etl_schedule(context: ScheduleEvaluationContext):
#     """ETL รายวัน — ตั้งเวลาผ่าน UI"""
#     from datetime import datetime, timedelta
#     yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
#     today = datetime.now().strftime("%Y-%m-%d")
#
#     return RunRequest(
#         run_config=RunConfig(
#             ops={"raw_data": ExtractConfig(
#                 date_from=yesterday,
#                 date_to=today,
#             )}
#         ),
#     )

# Low Code Integration Points
integrations = {
    "n8n": {
        "type": "Webhook Trigger",
        "how": "n8n Workflow -> HTTP Request -> Dagster GraphQL API",
        "use_case": "Trigger ETL เมื่อมี Event ใน n8n",
    },
    "Zapier": {
        "type": "Webhook Trigger",
        "how": "Zapier Zap -> Webhook -> Dagster Sensor",
        "use_case": "Trigger Pipeline จาก Google Sheets, Email",
    },
    "Retool": {
        "type": "Admin Dashboard",
        "how": "Retool -> Dagster GraphQL API -> View/Trigger Runs",
        "use_case": "Dashboard สำหรับ Non-developer ดู Pipeline Status",
    },
    "Appsmith": {
        "type": "Internal Tool",
        "how": "Appsmith -> REST API -> Dagster Runs",
        "use_case": "Internal Tool สำหรับจัดการ Pipeline Config",
    },
}

print("Low Code Integrations:")
for tool, info in integrations.items():
    print(f"\n  [{tool}] ({info['type']})")
    print(f"    How: {info['how']}")
    print(f"    Use Case: {info['use_case']}")

Dagster vs Airflow

# comparison.py — Dagster vs Airflow
comparison = {
    "Core Concept": {
        "Dagster": "Software-defined Assets (Data-centric)",
        "Airflow": "DAGs and Tasks (Task-centric)",
    },
    "Language": {
        "Dagster": "Python + Type hints",
        "Airflow": "Python",
    },
    "Testing": {
        "Dagster": "Built-in testing utilities ทดสอบง่าย",
        "Airflow": "ต้อง Mock มาก ทดสอบยาก",
    },
    "UI": {
        "Dagster": "Dagit — Asset Lineage, Launchpad Config",
        "Airflow": "Airflow UI — DAG Graph, Task Logs",
    },
    "Configuration": {
        "Dagster": "Config System + Dagit UI Input",
        "Airflow": "Variables + Connections + XCom",
    },
    "IO Management": {
        "Dagster": "IO Managers จัดการ Storage อัตโนมัติ",
        "Airflow": "ต้องจัดการ Storage เอง",
    },
    "Community": {
        "Dagster": "เติบโตเร็ว Modern",
        "Airflow": "ใหญ่มาก Plugin เยอะ",
    },
    "Low Code Support": {
        "Dagster": "Config UI, Launchpad, GraphQL API",
        "Airflow": "Limited — ต้องเขียน DAGs",
    },
}

print("Dagster vs Airflow:")
print(f"{'Feature':<20} {'Dagster':<35} {'Airflow':<35}")
print("-" * 90)
for feature, values in comparison.items():
    print(f"{feature:<20} {values['Dagster'][:34]:<35} {values['Airflow'][:34]:<35}")

Best Practices

Dagster คืออะไร

Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing Data Engineering MLOps

Low Code No Code คืออะไร

Low Code โค้ดน้อย Visual Interface ลาก Drop No Code ไม่เขียนโค้ด GUI ทั้งหมด Non-developer สร้างแอปเร็ว Retool Appsmith n8n Zapier Make

Dagster ใช้กับ Low Code ได้อย่างไร

Config System กำหนด Parameters UI Dagit Launchpad กรอก Config Reusable Assets Sensors ตรวจจับ Events n8n Zapier Webhooks GraphQL API

Dagster เทียบกับ Airflow อย่างไร

Dagster Software-defined Assets Type-safe Testing ง่าย IO Managers Airflow DAGs Tasks Community ใหญ่ Plugin มาก Dagster ทันสมัย Developer Experience ดี

สรุป

Dagster Data Orchestrator Software-defined Assets Config System Dagit UI Low Code Non-developer กรอก Config Sensors ตรวจจับ Events อัตโนมัติ Integration n8n Zapier Retool GraphQL API Testing ง่าย Modern Developer Experience

📖 บทความที่เกี่ยวข้อง

Cloudflare D1 Low Code No Codeอ่านบทความ → MongoDB Change Streams Low Code No Codeอ่านบทความ → QuestDB Time Series Low Code No Codeอ่านบทความ → Snyk Code Security Low Code No Codeอ่านบทความ → CSS Container Queries Code Review Best Practiceอ่านบทความ →

📚 ดูบทความทั้งหมด →