it
Dagster Pipeline กับ Low Code No Code —
Dagster Pipeline Low Code

Dagster Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน CDK Construct Cache Strategy Redis
Low Code No Code ใช้โค้ดน้อย Visual Interface ลาก Drop Non-developer สร้างแอปได้เร็ว Dagster Config System Launchpad Reusable Assets
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Htmx Alpine.js 12 Factor App
Dagster Assets และ Config
# dagster_lowcode.py — Dagster Low Code Pipeline
# pip install dagster dagster-webserver dagster-pandas
import dagster
from dagster import (
asset, AssetExecutionContext, Config,
Definitions, MaterializeResult, MetadataValue,
schedule, sensor, RunRequest, RunConfig,
)
from dataclasses import dataclass
import pandas as pd
# 1. Config Classes — กำหนด Parameters ผ่าน UI
class ExtractConfig(Config):
"""Config สำหรับ Extract Data"""
source_url: str = "https://api.example.com/data"
date_from: str = "2024-01-01"
date_to: str = "2024-12-31"
limit: int = 1000
format: str = "json"
class TransformConfig(Config):
"""Config สำหรับ Transform Data"""
drop_nulls: bool = True
normalize: bool = True
columns: str = "id, name, value, date" # comma-separated
filter_column: str = ""
filter_value: str = ""
class LoadConfig(Config):
"""Config สำหรับ Load Data"""
destination: str = "warehouse" # warehouse, s3, local
table_name: str = "processed_data"
mode: str = "append" # append, replace, upsert
# 2. Assets — Reusable Data Components
@asset(
description="Extract data from source API",
group_name="etl_pipeline",
compute_kind="python",
)
def raw_data(context: AssetExecutionContext, config: ExtractConfig):
"""Extract Data — Non-developer กำหนด Config ผ่าน Dagit UI"""
context.log.info(f"Extracting from {config.source_url}")
context.log.info(f"Date range: {config.date_from} to {config.date_to}")
# Simulate API call
data = pd.DataFrame({
"id": range(1, config.limit + 1),
"name": [f"item_{i}" for i in range(1, config.limit + 1)],
"value": [i * 10.5 for i in range(1, config.limit + 1)],
"date": pd.date_range(config.date_from, periods=config.limit, freq="h"),
})
return MaterializeResult(
metadata={
"num_rows": MetadataValue.int(len(data)),
"columns": MetadataValue.text(", ".join(data.columns)),
"source": MetadataValue.text(config.source_url),
},
)
@asset(
deps=["raw_data"],
description="Transform and clean data",
group_name="etl_pipeline",
compute_kind="pandas",
)
def clean_data(context: AssetExecutionContext, config: TransformConfig):
"""Transform Data — Config กำหนดผ่าน UI"""
context.log.info(f"Transform: drop_nulls={config.drop_nulls}")
context.log.info(f"Columns: {config.columns}")
# Simulate transform
columns = [c.strip() for c in config.columns.split(",")]
context.log.info(f"Processing {len(columns)} columns")
if config.filter_column and config.filter_value:
context.log.info(f"Filter: {config.filter_column} = {config.filter_value}")
return MaterializeResult(
metadata={
"columns_used": MetadataValue.text(config.columns),
"drop_nulls": MetadataValue.bool(config.drop_nulls),
"normalized": MetadataValue.bool(config.normalize),
},
)
@asset(
deps=["clean_data"],
description="Load data to destination",
group_name="etl_pipeline",
compute_kind="database",
)
def warehouse_data(context: AssetExecutionContext, config: LoadConfig):
"""Load Data — เลือก Destination ผ่าน UI"""
context.log.info(f"Loading to {config.destination}/{config.table_name}")
context.log.info(f"Mode: {config.mode}")
return MaterializeResult(
metadata={
"destination": MetadataValue.text(config.destination),
"table": MetadataValue.text(config.table_name),
"mode": MetadataValue.text(config.mode),
},
)
# 3. Definitions
defs = Definitions(
assets=[raw_data, clean_data, warehouse_data],
)
print("Dagster Low Code Pipeline:")
print(" Assets: raw_data -> clean_data -> warehouse_data")
print(" Config: ExtractConfig, TransformConfig, LoadConfig")
print(" UI: Dagit Launchpad สำหรับกรอก Config")
Sensors และ Automation

# dagster_automation.py — Sensors และ Automation
from dagster import (
sensor, SensorEvaluationContext, RunRequest,
schedule, ScheduleEvaluationContext,
asset_sensor, AssetKey, EventLogEntry,
)
# 1. File Sensor — ตรวจจับไฟล์ใหม่
# @sensor(
# asset_selection=[AssetKey("raw_data")],
# minimum_interval_seconds=60,
# )
# def new_file_sensor(context: SensorEvaluationContext):
# """ตรวจจับไฟล์ใหม่ใน S3/Local — ทำงานอัตโนมัติ"""
# import os
# watch_dir = "/data/incoming"
# last_cursor = context.cursor or ""
#
# new_files = []
# for f in sorted(os.listdir(watch_dir)):
# if f > last_cursor and f.endswith(".csv"):
# new_files.append(f)
#
# if new_files:
# context.update_cursor(new_files[-1])
# for f in new_files:
# yield RunRequest(
# run_key=f,
# run_config={
# "ops": {
# "raw_data": {
# "config": {
# "source_url": f"/data/incoming/{f}",
# "format": "csv",
# }
# }
# }
# },
# )
# 2. Webhook Sensor — รับ Webhook จาก n8n/Zapier
# @sensor(asset_selection=[AssetKey("raw_data")])
# def webhook_sensor(context: SensorEvaluationContext):
# """รับ Trigger จาก Low Code Tools (n8n, Zapier, Make)"""
# import requests
# response = requests.get("http://localhost:8000/webhooks/pending")
# webhooks = response.json()
#
# for webhook in webhooks:
# yield RunRequest(
# run_key=webhook["id"],
# run_config=RunConfig(
# ops={"raw_data": ExtractConfig(
# source_url=webhook["source_url"],
# date_from=webhook["date_from"],
# date_to=webhook["date_to"],
# )}
# ),
# )
# 3. Schedule — รันตามเวลา
# @schedule(
# cron_schedule="0 6 * * *", # ทุกวัน 6:00
# target=[AssetKey("raw_data"), AssetKey("clean_data"), AssetKey("warehouse_data")],
# )
# def daily_etl_schedule(context: ScheduleEvaluationContext):
# """ETL รายวัน — ตั้งเวลาผ่าน UI"""
# from datetime import datetime, timedelta
# yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
# today = datetime.now().strftime("%Y-%m-%d")
#
# return RunRequest(
# run_config=RunConfig(
# ops={"raw_data": ExtractConfig(
# date_from=yesterday,
# date_to=today,
# )}
# ),
# )
# Low Code Integration Points
integrations = {
"n8n": {
"type": "Webhook Trigger",
"how": "n8n Workflow -> HTTP Request -> Dagster GraphQL API",
"use_case": "Trigger ETL เมื่อมี Event ใน n8n",
},
"Zapier": {
"type": "Webhook Trigger",
"how": "Zapier Zap -> Webhook -> Dagster Sensor",
"use_case": "Trigger Pipeline จาก Google Sheets, Email",
},
"Retool": {
"type": "Admin Dashboard",
"how": "Retool -> Dagster GraphQL API -> View/Trigger Runs",
"use_case": "Dashboard สำหรับ Non-developer ดู Pipeline Status",
},
"Appsmith": {
"type": "Internal Tool",
"how": "Appsmith -> REST API -> Dagster Runs",
"use_case": "Internal Tool สำหรับจัดการ Pipeline Config",
},
}
print("Low Code Integrations:")
for tool, info in integrations.items():
print(f"\n [{tool}] ({info['type']})")
print(f" How: {info['how']}")
print(f" Use Case: {info['use_case']}")
Dagster vs Airflow
# comparison.py — Dagster vs Airflow
comparison = {
"Core Concept": {
"Dagster": "Software-defined Assets (Data-centric)",
"Airflow": "DAGs and Tasks (Task-centric)",
},
"Language": {
"Dagster": "Python + Type hints",
"Airflow": "Python",
},
"Testing": {
"Dagster": "Built-in testing utilities ทดสอบง่าย",
"Airflow": "ต้อง Mock มาก ทดสอบยาก",
},
"UI": {
"Dagster": "Dagit — Asset Lineage, Launchpad Config",
"Airflow": "Airflow UI — DAG Graph, Task Logs",
},
"Configuration": {
"Dagster": "Config System + Dagit UI Input",
"Airflow": "Variables + Connections + XCom",
},
"IO Management": {
"Dagster": "IO Managers จัดการ Storage อัตโนมัติ",
"Airflow": "ต้องจัดการ Storage เอง",
},
"Community": {
"Dagster": "เติบโตเร็ว Modern",
"Airflow": "ใหญ่มาก Plugin เยอะ",
},
"Low Code Support": {
"Dagster": "Config UI, Launchpad, GraphQL API",
"Airflow": "Limited — ต้องเขียน DAGs",
},
}
print("Dagster vs Airflow:")
print(f"{'Feature':<20} {'Dagster':<35} {'Airflow':<35}")
print("-" * 90)
for feature, values in comparison.items():
print(f"{feature:<20} {values['Dagster'][:34]:<35} {values['Airflow'][:34]:<35}")
Best Practices
- Config System: ใช้ Config Classes กำหนด Parameters ให้ Non-developer กรอกผ่าน UI
- Reusable Assets: สร้าง Assets ที่ใช้ซ้ำได้ เปลี่ยน Config ตาม Use Case
- Sensors: ใช้ Sensors ตรวจจับ Events ทำงานอัตโนมัติ
- GraphQL API: ใช้ Dagster GraphQL API เชื่อมกับ Low Code Tools
- Testing: เขียน Unit Tests สำหรับ Assets ทุกตัว
- Monitoring: ใช้ Dagit UI Monitor Pipeline Status
Dagster คืออะไร
Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing Data Engineering MLOps
แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ python automation คือ





