it

Dagster Pipeline Interview Preparation —

Dagster Pipeline Interview Preparation —

Dagster Interview

Dagster Pipeline Interview Preparation —

Dagster Pipeline Interview Preparation Software-defined Assets Orchestration Testing Data Engineer Asset Op Job Resource Partition Sensor Schedule Dagit Production

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน PlanetScale Vitess DNS Management — คู่มือฉบับสมบูรณ์ 2026

OrchestratorCore ConceptUITestingLearning Curveเหมาะกับ
DagsterSoftware-defined AssetsDagit (excellent)Built-inปานกลางModern Data Platform
AirflowTask DAGAirflow UILimitedปานกลางLegacy Established
PrefectFlow TaskPrefect CloudGoodง่ายPython-first
dbtSQL Modelsdbt CloudBuilt-inง่ายSQL Transforms
MageBlock PipelineNotebook-likeGoodง่ายQuick Start

Dagster Core Concepts

# === Dagster Software-defined Assets ===



# pip install dagster dagster-webserver dagster-duckdb dagster-dbt



from dagster import (

    asset, AssetIn, AssetKey, MaterializeResult,

    MetadataValue, Output, job, op, schedule,

    sensor, RunRequest, Definitions,

    DailyPartitionsDefinition, StaticPartitionsDefinition,

)

# import pandas as pd

# import duckdb



# === Assets ===



# @asset(

#     description="Raw orders from API",

#     group_name="raw",

#     compute_kind="python",

# )

# def raw_orders() -> pd.DataFrame:

#     """Extract raw orders from API"""

#     response = requests.get("https://api.example.com/orders")

#     df = pd.DataFrame(response.json())

#     return df



# @asset(

#     description="Cleaned and validated orders",

#     group_name="staging",

#     ins={"raw_orders": AssetIn()},

# )

# def stg_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:

#     """Clean and validate orders"""

#     df = raw_orders.copy()

#     df = df.dropna(subset=["order_id", "customer_id", "total"])

#     df["total"] = df["total"].astype(float)

#     df["order_date"] = pd.to_datetime(df["order_date"])

#     df = df[df["total"] > 0]

#     return df



# @asset(

#     description="Daily order metrics",

#     group_name="marts",

#     ins={"stg_orders": AssetIn()},

# )

# def daily_order_metrics(stg_orders: pd.DataFrame) -> pd.DataFrame:

#     """Aggregate daily metrics"""

#     df = stg_orders.groupby(stg_orders["order_date"].dt.date).agg(

#         total_orders=("order_id", "count"),

#         total_revenue=("total", "sum"),

#         avg_order_value=("total", "mean"),

#         unique_customers=("customer_id", "nunique"),

#     ).reset_index()

#     return df



# === Partitioned Assets ===

# daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")

#

# @asset(partitions_def=daily_partitions)

# def partitioned_orders(context) -> pd.DataFrame:

#     partition_date = context.partition_key

#     df = fetch_orders_for_date(partition_date)

#     context.log.info(f"Fetched {len(df)} orders for {partition_date}")

#     return df



from dataclasses import dataclass



@dataclass

class DagsterConcept:

    concept: str

    description: str

    use_case: str

    example: str



concepts = [

    DagsterConcept("Asset", "Data object produced by computation", "Table File Model", "@asset def my_table():"),

    DagsterConcept("Op", "Unit of computation", "Single transform step", "@op def process():"),

    DagsterConcept("Job", "Executable graph of ops", "Manual or scheduled run", "@job def my_job():"),

    DagsterConcept("Resource", "External service connection", "DB connection API client", "ConfigurableResource"),

    DagsterConcept("Partition", "Divide data into chunks", "Daily partitions", "DailyPartitionsDefinition"),

    DagsterConcept("Sensor", "Event-driven trigger", "New file detected", "@sensor def my_sensor():"),

    DagsterConcept("Schedule", "Time-based trigger", "Run daily at 2am", "@schedule(cron='0 2 * * *')"),

    DagsterConcept("IO Manager", "Read/write asset storage", "S3 DuckDB BigQuery", "ConfigurableIOManager"),

]



print("=== Dagster Core Concepts ===")

for c in concepts:

    print(f"  [{c.concept}] {c.description}")

    print(f"    Use Case: {c.use_case} | Example: {c.example}")

Interview Questions

Dagster Pipeline Interview Preparation —
# === Common Interview Questions ===



@dataclass

class InterviewQ:

    category: str

    question: str

    key_points: str

    difficulty: str



questions = [

    InterviewQ("Dagster", "Explain Software-defined Assets",

        "Data-centric vs task-centric, Lineage, Materialization history, Auto dependency",

        "Medium"),

    InterviewQ("Dagster", "Dagster vs Airflow — when to use which?",

        "Asset vs Task, Testing, UI, Modern vs Legacy, Team size",

        "Medium"),

    InterviewQ("Design", "Design ETL pipeline for e-commerce analytics",

        "Source → Staging → Marts, Partitioning, Incremental, Error handling",

        "Hard"),

    InterviewQ("Design", "How to handle late-arriving data?",

        "Partition backfill, Upsert strategy, Watermark, Reprocessing window",

        "Hard"),

    InterviewQ("Testing", "How to test data pipelines?",

        "Unit test ops, Integration test assets, Data quality checks, Dagster testing utilities",

        "Medium"),

    InterviewQ("Production", "How to monitor pipeline health?",

        "Dagster sensors, Asset freshness, SLA alerts, Metadata logging",

        "Medium"),

    InterviewQ("SQL", "Window functions for running total and rank",

        "SUM() OVER, ROW_NUMBER, RANK, PARTITION BY, ORDER BY",

        "Medium"),

    InterviewQ("Python", "Implement idempotent data loader",

        "Upsert logic, Dedup, Transaction, Checkpoint, Retry safe",

        "Hard"),

]



print("=== Interview Questions ===")

for q in questions:

    print(f"  [{q.difficulty}] [{q.category}] {q.question}")

    print(f"    Key Points: {q.key_points}")



# SQL Interview Example

# -- Running total and rank

# SELECT

#   order_date,

#   customer_id,

#   total,

#   SUM(total) OVER (PARTITION BY customer_id ORDER BY order_date) as running_total,

#   ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY total DESC) as rank_by_total,

#   LAG(total) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_total

# FROM orders

# WHERE order_date >= '2024-01-01'

# ORDER BY customer_id, order_date;

Testing and Production

# === Testing Dagster Pipelines ===



# from dagster import build_asset_context

#

# def test_stg_orders():

#     raw_data = pd.DataFrame({

#         "order_id": [1, 2, 3, None],

#         "customer_id": [10, 20, 30, 40],

#         "total": [100.0, -50.0, 200.0, 150.0],

#         "order_date": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"],

#     })

#     result = stg_orders(raw_data)

#     assert len(result) == 2  # Removed null order_id and negative total

#     assert result["total"].min() > 0

#

# def test_daily_order_metrics():

#     stg_data = pd.DataFrame({

#         "order_id": [1, 2, 3],

#         "customer_id": [10, 10, 20],

#         "total": [100.0, 200.0, 150.0],

#         "order_date": pd.to_datetime(["2024-01-01", "2024-01-01", "2024-01-02"]),

#     })

#     result = daily_order_metrics(stg_data)

#     assert len(result) == 2  # 2 dates

#     jan1 = result[result["order_date"] == "2024-01-01"].iloc[0]

#     assert jan1["total_orders"] == 2

#     assert jan1["total_revenue"] == 300.0



@dataclass

class PrepItem:

    category: str

    topic: str

    resources: str

    time_needed: str

    priority: str



prep_items = [

    PrepItem("Dagster", "Core Concepts + Assets", "Dagster docs + tutorials", "2 days", "Critical"),

    PrepItem("Dagster", "Partitions + Sensors + IO Managers", "Dagster docs advanced", "1 day", "High"),

    PrepItem("SQL", "Window Functions CTEs Subqueries", "LeetCode SQL problems", "3 days", "Critical"),

    PrepItem("Python", "Data Processing pandas numpy", "Coding challenges", "2 days", "Critical"),

    PrepItem("Design", "ETL Architecture Data Modeling", "System design articles", "2 days", "High"),

    PrepItem("Cloud", "AWS S3 Redshift BigQuery", "Cloud docs hands-on", "1 day", "Medium"),

    PrepItem("Testing", "Unit Integration Data Quality", "Dagster testing docs", "1 day", "Medium"),

    PrepItem("Behavioral", "STAR Method past projects", "Practice with friend", "1 day", "High"),

]



print("\nInterview Prep Plan:")

total_days = 0

for p in prep_items:

    print(f"  [{p.priority}] [{p.category}] {p.topic}")

    print(f"    Resources: {p.resources} | Time: {p.time_needed}")

    total_days += int(p.time_needed.split()[0])

print(f"\n  Total Prep Time: ~{total_days} days")

เคล็ดลับ

  • Assets: เข้าใจ Asset-based Thinking เป็นหัวใจ Dagster
  • Portfolio: สร้าง End-to-end Pipeline บน GitHub แสดงผลงาน
  • SQL: ฝึก Window Functions ทุกวัน ออกสอบบ่อยมาก
  • Design: ฝึกออกแบบ Pipeline วาด Diagram อธิบายได้
  • STAR: เตรียม STAR Method สำหรับ Behavioral Questions

Dagster คืออะไร

Data Orchestrator Software-defined Assets Dependency Partition Sensor Schedule Dagit Type System Materialization History Modern Pipeline

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

เนื้อหาเกี่ยวข้อง — private network คือ

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Rust Serde Home Lab Setup

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง