it

Dagster Pipeline กับ Low Code No Code —

Dagster Pipeline กับ Low Code No Code —

Dagster Pipeline Low Code

Dagster Pipeline กับ Low Code No Code —

Dagster Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน CDK Construct Cache Strategy Redis

Low Code No Code ใช้โค้ดน้อย Visual Interface ลาก Drop Non-developer สร้างแอปได้เร็ว Dagster Config System Launchpad Reusable Assets

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Htmx Alpine.js 12 Factor App

Dagster Assets และ Config

# dagster_lowcode.py — Dagster Low Code Pipeline

# pip install dagster dagster-webserver dagster-pandas



import dagster

from dagster import (

    asset, AssetExecutionContext, Config,

    Definitions, MaterializeResult, MetadataValue,

    schedule, sensor, RunRequest, RunConfig,

)

from dataclasses import dataclass

import pandas as pd



# 1. Config Classes — กำหนด Parameters ผ่าน UI

class ExtractConfig(Config):

    """Config สำหรับ Extract Data"""

    source_url: str = "https://api.example.com/data"

    date_from: str = "2024-01-01"

    date_to: str = "2024-12-31"

    limit: int = 1000

    format: str = "json"



class TransformConfig(Config):

    """Config สำหรับ Transform Data"""

    drop_nulls: bool = True

    normalize: bool = True

    columns: str = "id, name, value, date"  # comma-separated

    filter_column: str = ""

    filter_value: str = ""



class LoadConfig(Config):

    """Config สำหรับ Load Data"""

    destination: str = "warehouse"  # warehouse, s3, local

    table_name: str = "processed_data"

    mode: str = "append"  # append, replace, upsert



# 2. Assets — Reusable Data Components

@asset(

    description="Extract data from source API",

    group_name="etl_pipeline",

    compute_kind="python",

)

def raw_data(context: AssetExecutionContext, config: ExtractConfig):

    """Extract Data — Non-developer กำหนด Config ผ่าน Dagit UI"""

    context.log.info(f"Extracting from {config.source_url}")

    context.log.info(f"Date range: {config.date_from} to {config.date_to}")



    # Simulate API call

    data = pd.DataFrame({

        "id": range(1, config.limit + 1),

        "name": [f"item_{i}" for i in range(1, config.limit + 1)],

        "value": [i * 10.5 for i in range(1, config.limit + 1)],

        "date": pd.date_range(config.date_from, periods=config.limit, freq="h"),

    })



    return MaterializeResult(

        metadata={

            "num_rows": MetadataValue.int(len(data)),

            "columns": MetadataValue.text(", ".join(data.columns)),

            "source": MetadataValue.text(config.source_url),

        },

    )



@asset(

    deps=["raw_data"],

    description="Transform and clean data",

    group_name="etl_pipeline",

    compute_kind="pandas",

)

def clean_data(context: AssetExecutionContext, config: TransformConfig):

    """Transform Data — Config กำหนดผ่าน UI"""

    context.log.info(f"Transform: drop_nulls={config.drop_nulls}")

    context.log.info(f"Columns: {config.columns}")



    # Simulate transform

    columns = [c.strip() for c in config.columns.split(",")]

    context.log.info(f"Processing {len(columns)} columns")



    if config.filter_column and config.filter_value:

        context.log.info(f"Filter: {config.filter_column} = {config.filter_value}")



    return MaterializeResult(

        metadata={

            "columns_used": MetadataValue.text(config.columns),

            "drop_nulls": MetadataValue.bool(config.drop_nulls),

            "normalized": MetadataValue.bool(config.normalize),

        },

    )



@asset(

    deps=["clean_data"],

    description="Load data to destination",

    group_name="etl_pipeline",

    compute_kind="database",

)

def warehouse_data(context: AssetExecutionContext, config: LoadConfig):

    """Load Data — เลือก Destination ผ่าน UI"""

    context.log.info(f"Loading to {config.destination}/{config.table_name}")

    context.log.info(f"Mode: {config.mode}")



    return MaterializeResult(

        metadata={

            "destination": MetadataValue.text(config.destination),

            "table": MetadataValue.text(config.table_name),

            "mode": MetadataValue.text(config.mode),

        },

    )



# 3. Definitions

defs = Definitions(

    assets=[raw_data, clean_data, warehouse_data],

)



print("Dagster Low Code Pipeline:")

print("  Assets: raw_data -> clean_data -> warehouse_data")

print("  Config: ExtractConfig, TransformConfig, LoadConfig")

print("  UI: Dagit Launchpad สำหรับกรอก Config")

Sensors และ Automation

Dagster Pipeline กับ Low Code No Code —
# dagster_automation.py — Sensors และ Automation

from dagster import (

    sensor, SensorEvaluationContext, RunRequest,

    schedule, ScheduleEvaluationContext,

    asset_sensor, AssetKey, EventLogEntry,

)



# 1. File Sensor — ตรวจจับไฟล์ใหม่

# @sensor(

#     asset_selection=[AssetKey("raw_data")],

#     minimum_interval_seconds=60,

# )

# def new_file_sensor(context: SensorEvaluationContext):

#     """ตรวจจับไฟล์ใหม่ใน S3/Local — ทำงานอัตโนมัติ"""

#     import os

#     watch_dir = "/data/incoming"

#     last_cursor = context.cursor or ""

#

#     new_files = []

#     for f in sorted(os.listdir(watch_dir)):

#         if f > last_cursor and f.endswith(".csv"):

#             new_files.append(f)

#

#     if new_files:

#         context.update_cursor(new_files[-1])

#         for f in new_files:

#             yield RunRequest(

#                 run_key=f,

#                 run_config={

#                     "ops": {

#                         "raw_data": {

#                             "config": {

#                                 "source_url": f"/data/incoming/{f}",

#                                 "format": "csv",

#                             }

#                         }

#                     }

#                 },

#             )



# 2. Webhook Sensor — รับ Webhook จาก n8n/Zapier

# @sensor(asset_selection=[AssetKey("raw_data")])

# def webhook_sensor(context: SensorEvaluationContext):

#     """รับ Trigger จาก Low Code Tools (n8n, Zapier, Make)"""

#     import requests

#     response = requests.get("http://localhost:8000/webhooks/pending")

#     webhooks = response.json()

#

#     for webhook in webhooks:

#         yield RunRequest(

#             run_key=webhook["id"],

#             run_config=RunConfig(

#                 ops={"raw_data": ExtractConfig(

#                     source_url=webhook["source_url"],

#                     date_from=webhook["date_from"],

#                     date_to=webhook["date_to"],

#                 )}

#             ),

#         )



# 3. Schedule — รันตามเวลา

# @schedule(

#     cron_schedule="0 6 * * *",  # ทุกวัน 6:00

#     target=[AssetKey("raw_data"), AssetKey("clean_data"), AssetKey("warehouse_data")],

# )

# def daily_etl_schedule(context: ScheduleEvaluationContext):

#     """ETL รายวัน — ตั้งเวลาผ่าน UI"""

#     from datetime import datetime, timedelta

#     yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

#     today = datetime.now().strftime("%Y-%m-%d")

#

#     return RunRequest(

#         run_config=RunConfig(

#             ops={"raw_data": ExtractConfig(

#                 date_from=yesterday,

#                 date_to=today,

#             )}

#         ),

#     )



# Low Code Integration Points

integrations = {

    "n8n": {

        "type": "Webhook Trigger",

        "how": "n8n Workflow -> HTTP Request -> Dagster GraphQL API",

        "use_case": "Trigger ETL เมื่อมี Event ใน n8n",

    },

    "Zapier": {

        "type": "Webhook Trigger",

        "how": "Zapier Zap -> Webhook -> Dagster Sensor",

        "use_case": "Trigger Pipeline จาก Google Sheets, Email",

    },

    "Retool": {

        "type": "Admin Dashboard",

        "how": "Retool -> Dagster GraphQL API -> View/Trigger Runs",

        "use_case": "Dashboard สำหรับ Non-developer ดู Pipeline Status",

    },

    "Appsmith": {

        "type": "Internal Tool",

        "how": "Appsmith -> REST API -> Dagster Runs",

        "use_case": "Internal Tool สำหรับจัดการ Pipeline Config",

    },

}



print("Low Code Integrations:")

for tool, info in integrations.items():

    print(f"\n  [{tool}] ({info['type']})")

    print(f"    How: {info['how']}")

    print(f"    Use Case: {info['use_case']}")

Dagster vs Airflow

# comparison.py — Dagster vs Airflow

comparison = {

    "Core Concept": {

        "Dagster": "Software-defined Assets (Data-centric)",

        "Airflow": "DAGs and Tasks (Task-centric)",

    },

    "Language": {

        "Dagster": "Python + Type hints",

        "Airflow": "Python",

    },

    "Testing": {

        "Dagster": "Built-in testing utilities ทดสอบง่าย",

        "Airflow": "ต้อง Mock มาก ทดสอบยาก",

    },

    "UI": {

        "Dagster": "Dagit — Asset Lineage, Launchpad Config",

        "Airflow": "Airflow UI — DAG Graph, Task Logs",

    },

    "Configuration": {

        "Dagster": "Config System + Dagit UI Input",

        "Airflow": "Variables + Connections + XCom",

    },

    "IO Management": {

        "Dagster": "IO Managers จัดการ Storage อัตโนมัติ",

        "Airflow": "ต้องจัดการ Storage เอง",

    },

    "Community": {

        "Dagster": "เติบโตเร็ว Modern",

        "Airflow": "ใหญ่มาก Plugin เยอะ",

    },

    "Low Code Support": {

        "Dagster": "Config UI, Launchpad, GraphQL API",

        "Airflow": "Limited — ต้องเขียน DAGs",

    },

}



print("Dagster vs Airflow:")

print(f"{'Feature':<20} {'Dagster':<35} {'Airflow':<35}")

print("-" * 90)

for feature, values in comparison.items():

    print(f"{feature:<20} {values['Dagster'][:34]:<35} {values['Airflow'][:34]:<35}")

Best Practices

  • Config System: ใช้ Config Classes กำหนด Parameters ให้ Non-developer กรอกผ่าน UI
  • Reusable Assets: สร้าง Assets ที่ใช้ซ้ำได้ เปลี่ยน Config ตาม Use Case
  • Sensors: ใช้ Sensors ตรวจจับ Events ทำงานอัตโนมัติ
  • GraphQL API: ใช้ Dagster GraphQL API เชื่อมกับ Low Code Tools
  • Testing: เขียน Unit Tests สำหรับ Assets ทุกตัว
  • Monitoring: ใช้ Dagit UI Monitor Pipeline Status

Dagster คืออะไร

Data Orchestrator สร้าง Data Pipelines Python Software-defined Assets Jobs Schedules Sensors Dagit Web UI Monitor Debug Testing Data Engineering MLOps

แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ python automation คือ

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง