Dagster Pipeline Low Code
Dagster Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing
Low Code No Code ใช้โค้ดน้อย Visual Interface ลาก Drop Non-developer สร้างแอปได้เร็ว Dagster Config System Launchpad Reusable Assets
Dagster Assets และ Config
# dagster_lowcode.py — Dagster Low Code Pipeline
# pip install dagster dagster-webserver dagster-pandas
import dagster
from dagster import (
asset, AssetExecutionContext, Config,
Definitions, MaterializeResult, MetadataValue,
schedule, sensor, RunRequest, RunConfig,
)
from dataclasses import dataclass
import pandas as pd
# 1. Config Classes — กำหนด Parameters ผ่าน UI
class ExtractConfig(Config):
"""Config สำหรับ Extract Data"""
source_url: str = "https://api.example.com/data"
date_from: str = "2024-01-01"
date_to: str = "2024-12-31"
limit: int = 1000
format: str = "json"
class TransformConfig(Config):
"""Config สำหรับ Transform Data"""
drop_nulls: bool = True
normalize: bool = True
columns: str = "id, name, value, date" # comma-separated
filter_column: str = ""
filter_value: str = ""
class LoadConfig(Config):
"""Config สำหรับ Load Data"""
destination: str = "warehouse" # warehouse, s3, local
table_name: str = "processed_data"
mode: str = "append" # append, replace, upsert
# 2. Assets — Reusable Data Components
@asset(
description="Extract data from source API",
group_name="etl_pipeline",
compute_kind="python",
)
def raw_data(context: AssetExecutionContext, config: ExtractConfig):
"""Extract Data — Non-developer กำหนด Config ผ่าน Dagit UI"""
context.log.info(f"Extracting from {config.source_url}")
context.log.info(f"Date range: {config.date_from} to {config.date_to}")
# Simulate API call
data = pd.DataFrame({
"id": range(1, config.limit + 1),
"name": [f"item_{i}" for i in range(1, config.limit + 1)],
"value": [i * 10.5 for i in range(1, config.limit + 1)],
"date": pd.date_range(config.date_from, periods=config.limit, freq="h"),
})
return MaterializeResult(
metadata={
"num_rows": MetadataValue.int(len(data)),
"columns": MetadataValue.text(", ".join(data.columns)),
"source": MetadataValue.text(config.source_url),
},
)
@asset(
deps=["raw_data"],
description="Transform and clean data",
group_name="etl_pipeline",
compute_kind="pandas",
)
def clean_data(context: AssetExecutionContext, config: TransformConfig):
"""Transform Data — Config กำหนดผ่าน UI"""
context.log.info(f"Transform: drop_nulls={config.drop_nulls}")
context.log.info(f"Columns: {config.columns}")
# Simulate transform
columns = [c.strip() for c in config.columns.split(",")]
context.log.info(f"Processing {len(columns)} columns")
if config.filter_column and config.filter_value:
context.log.info(f"Filter: {config.filter_column} = {config.filter_value}")
return MaterializeResult(
metadata={
"columns_used": MetadataValue.text(config.columns),
"drop_nulls": MetadataValue.bool(config.drop_nulls),
"normalized": MetadataValue.bool(config.normalize),
},
)
@asset(
deps=["clean_data"],
description="Load data to destination",
group_name="etl_pipeline",
compute_kind="database",
)
def warehouse_data(context: AssetExecutionContext, config: LoadConfig):
"""Load Data — เลือก Destination ผ่าน UI"""
context.log.info(f"Loading to {config.destination}/{config.table_name}")
context.log.info(f"Mode: {config.mode}")
return MaterializeResult(
metadata={
"destination": MetadataValue.text(config.destination),
"table": MetadataValue.text(config.table_name),
"mode": MetadataValue.text(config.mode),
},
)
# 3. Definitions
defs = Definitions(
assets=[raw_data, clean_data, warehouse_data],
)
print("Dagster Low Code Pipeline:")
print(" Assets: raw_data -> clean_data -> warehouse_data")
print(" Config: ExtractConfig, TransformConfig, LoadConfig")
print(" UI: Dagit Launchpad สำหรับกรอก Config")
Sensors และ Automation
# dagster_automation.py — Sensors และ Automation
from dagster import (
sensor, SensorEvaluationContext, RunRequest,
schedule, ScheduleEvaluationContext,
asset_sensor, AssetKey, EventLogEntry,
)
# 1. File Sensor — ตรวจจับไฟล์ใหม่
# @sensor(
# asset_selection=[AssetKey("raw_data")],
# minimum_interval_seconds=60,
# )
# def new_file_sensor(context: SensorEvaluationContext):
# """ตรวจจับไฟล์ใหม่ใน S3/Local — ทำงานอัตโนมัติ"""
# import os
# watch_dir = "/data/incoming"
# last_cursor = context.cursor or ""
#
# new_files = []
# for f in sorted(os.listdir(watch_dir)):
# if f > last_cursor and f.endswith(".csv"):
# new_files.append(f)
#
# if new_files:
# context.update_cursor(new_files[-1])
# for f in new_files:
# yield RunRequest(
# run_key=f,
# run_config={
# "ops": {
# "raw_data": {
# "config": {
# "source_url": f"/data/incoming/{f}",
# "format": "csv",
# }
# }
# }
# },
# )
# 2. Webhook Sensor — รับ Webhook จาก n8n/Zapier
# @sensor(asset_selection=[AssetKey("raw_data")])
# def webhook_sensor(context: SensorEvaluationContext):
# """รับ Trigger จาก Low Code Tools (n8n, Zapier, Make)"""
# import requests
# response = requests.get("http://localhost:8000/webhooks/pending")
# webhooks = response.json()
#
# for webhook in webhooks:
# yield RunRequest(
# run_key=webhook["id"],
# run_config=RunConfig(
# ops={"raw_data": ExtractConfig(
# source_url=webhook["source_url"],
# date_from=webhook["date_from"],
# date_to=webhook["date_to"],
# )}
# ),
# )
# 3. Schedule — รันตามเวลา
# @schedule(
# cron_schedule="0 6 * * *", # ทุกวัน 6:00
# target=[AssetKey("raw_data"), AssetKey("clean_data"), AssetKey("warehouse_data")],
# )
# def daily_etl_schedule(context: ScheduleEvaluationContext):
# """ETL รายวัน — ตั้งเวลาผ่าน UI"""
# from datetime import datetime, timedelta
# yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# today = datetime.now().strftime("%Y-%m-%d")
#
# return RunRequest(
# run_config=RunConfig(
# ops={"raw_data": ExtractConfig(
# date_from=yesterday,
# date_to=today,
# )}
# ),
# )
# Low Code Integration Points
integrations = {
"n8n": {
"type": "Webhook Trigger",
"how": "n8n Workflow -> HTTP Request -> Dagster GraphQL API",
"use_case": "Trigger ETL เมื่อมี Event ใน n8n",
},
"Zapier": {
"type": "Webhook Trigger",
"how": "Zapier Zap -> Webhook -> Dagster Sensor",
"use_case": "Trigger Pipeline จาก Google Sheets, Email",
},
"Retool": {
"type": "Admin Dashboard",
"how": "Retool -> Dagster GraphQL API -> View/Trigger Runs",
"use_case": "Dashboard สำหรับ Non-developer ดู Pipeline Status",
},
"Appsmith": {
"type": "Internal Tool",
"how": "Appsmith -> REST API -> Dagster Runs",
"use_case": "Internal Tool สำหรับจัดการ Pipeline Config",
},
}
print("Low Code Integrations:")
for tool, info in integrations.items():
print(f"\n [{tool}] ({info['type']})")
print(f" How: {info['how']}")
print(f" Use Case: {info['use_case']}")
Dagster vs Airflow
# comparison.py — Dagster vs Airflow
comparison = {
"Core Concept": {
"Dagster": "Software-defined Assets (Data-centric)",
"Airflow": "DAGs and Tasks (Task-centric)",
},
"Language": {
"Dagster": "Python + Type hints",
"Airflow": "Python",
},
"Testing": {
"Dagster": "Built-in testing utilities ทดสอบง่าย",
"Airflow": "ต้อง Mock มาก ทดสอบยาก",
},
"UI": {
"Dagster": "Dagit — Asset Lineage, Launchpad Config",
"Airflow": "Airflow UI — DAG Graph, Task Logs",
},
"Configuration": {
"Dagster": "Config System + Dagit UI Input",
"Airflow": "Variables + Connections + XCom",
},
"IO Management": {
"Dagster": "IO Managers จัดการ Storage อัตโนมัติ",
"Airflow": "ต้องจัดการ Storage เอง",
},
"Community": {
"Dagster": "เติบโตเร็ว Modern",
"Airflow": "ใหญ่มาก Plugin เยอะ",
},
"Low Code Support": {
"Dagster": "Config UI, Launchpad, GraphQL API",
"Airflow": "Limited — ต้องเขียน DAGs",
},
}
print("Dagster vs Airflow:")
print(f"{'Feature':<20} {'Dagster':<35} {'Airflow':<35}")
print("-" * 90)
for feature, values in comparison.items():
print(f"{feature:<20} {values['Dagster'][:34]:<35} {values['Airflow'][:34]:<35}")
Best Practices
- Config System: ใช้ Config Classes กำหนด Parameters ให้ Non-developer กรอกผ่าน UI
- Reusable Assets: สร้าง Assets ที่ใช้ซ้ำได้ เปลี่ยน Config ตาม Use Case
- Sensors: ใช้ Sensors ตรวจจับ Events ทำงานอัตโนมัติ
- GraphQL API: ใช้ Dagster GraphQL API เชื่อมกับ Low Code Tools
- Testing: เขียน Unit Tests สำหรับ Assets ทุกตัว
- Monitoring: ใช้ Dagit UI Monitor Pipeline Status
Dagster คืออะไร
Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing Data Engineering MLOps
Low Code No Code คืออะไร
Low Code โค้ดน้อย Visual Interface ลาก Drop No Code ไม่เขียนโค้ด GUI ทั้งหมด Non-developer สร้างแอปเร็ว Retool Appsmith n8n Zapier Make
Dagster ใช้กับ Low Code ได้อย่างไร
Config System กำหนด Parameters UI Dagit Launchpad กรอก Config Reusable Assets Sensors ตรวจจับ Events n8n Zapier Webhooks GraphQL API
Dagster เทียบกับ Airflow อย่างไร
Dagster Software-defined Assets Type-safe Testing ง่าย IO Managers Airflow DAGs Tasks Community ใหญ่ Plugin มาก Dagster ทันสมัย Developer Experience ดี
สรุป
Dagster Data Orchestrator Software-defined Assets Config System Dagit UI Low Code Non-developer กรอก Config Sensors ตรวจจับ Events อัตโนมัติ Integration n8n Zapier Retool GraphQL API Testing ง่าย Modern Developer Experience
