Technology

Dagster Pipeline Interview Preparation

dagster pipeline interview preparation
Dagster Pipeline Interview Preparation | SiamCafe Blog
2026-05-05· อ. บอม — SiamCafe.net· 8,678 คำ

Dagster Interview

Dagster Pipeline Interview Preparation Software-defined Assets Orchestration Testing Data Engineer Asset Op Job Resource Partition Sensor Schedule Dagit Production

OrchestratorCore ConceptUITestingLearning Curveเหมาะกับ
DagsterSoftware-defined AssetsDagit (excellent)Built-inปานกลางModern Data Platform
AirflowTask DAGAirflow UILimitedปานกลางLegacy Established
PrefectFlow TaskPrefect CloudGoodง่ายPython-first
dbtSQL Modelsdbt CloudBuilt-inง่ายSQL Transforms
MageBlock PipelineNotebook-likeGoodง่ายQuick Start

Dagster Core Concepts

# === Dagster Software-defined Assets ===

# pip install dagster dagster-webserver dagster-duckdb dagster-dbt

from dagster import (
    asset, AssetIn, AssetKey, MaterializeResult,
    MetadataValue, Output, job, op, schedule,
    sensor, RunRequest, Definitions,
    DailyPartitionsDefinition, StaticPartitionsDefinition,
)
# import pandas as pd
# import duckdb

# === Assets ===

# @asset(
#     description="Raw orders from API",
#     group_name="raw",
#     compute_kind="python",
# )
# def raw_orders() -> pd.DataFrame:
#     """Extract raw orders from API"""
#     response = requests.get("https://api.example.com/orders")
#     df = pd.DataFrame(response.json())
#     return df

# @asset(
#     description="Cleaned and validated orders",
#     group_name="staging",
#     ins={"raw_orders": AssetIn()},
# )
# def stg_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
#     """Clean and validate orders"""
#     df = raw_orders.copy()
#     df = df.dropna(subset=["order_id", "customer_id", "total"])
#     df["total"] = df["total"].astype(float)
#     df["order_date"] = pd.to_datetime(df["order_date"])
#     df = df[df["total"] > 0]
#     return df

# @asset(
#     description="Daily order metrics",
#     group_name="marts",
#     ins={"stg_orders": AssetIn()},
# )
# def daily_order_metrics(stg_orders: pd.DataFrame) -> pd.DataFrame:
#     """Aggregate daily metrics"""
#     df = stg_orders.groupby(stg_orders["order_date"].dt.date).agg(
#         total_orders=("order_id", "count"),
#         total_revenue=("total", "sum"),
#         avg_order_value=("total", "mean"),
#         unique_customers=("customer_id", "nunique"),
#     ).reset_index()
#     return df

# === Partitioned Assets ===
# daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
#
# @asset(partitions_def=daily_partitions)
# def partitioned_orders(context) -> pd.DataFrame:
#     partition_date = context.partition_key
#     df = fetch_orders_for_date(partition_date)
#     context.log.info(f"Fetched {len(df)} orders for {partition_date}")
#     return df

from dataclasses import dataclass

@dataclass
class DagsterConcept:
    concept: str
    description: str
    use_case: str
    example: str

concepts = [
    DagsterConcept("Asset", "Data object produced by computation", "Table File Model", "@asset def my_table():"),
    DagsterConcept("Op", "Unit of computation", "Single transform step", "@op def process():"),
    DagsterConcept("Job", "Executable graph of ops", "Manual or scheduled run", "@job def my_job():"),
    DagsterConcept("Resource", "External service connection", "DB connection API client", "ConfigurableResource"),
    DagsterConcept("Partition", "Divide data into chunks", "Daily partitions", "DailyPartitionsDefinition"),
    DagsterConcept("Sensor", "Event-driven trigger", "New file detected", "@sensor def my_sensor():"),
    DagsterConcept("Schedule", "Time-based trigger", "Run daily at 2am", "@schedule(cron='0 2 * * *')"),
    DagsterConcept("IO Manager", "Read/write asset storage", "S3 DuckDB BigQuery", "ConfigurableIOManager"),
]

print("=== Dagster Core Concepts ===")
for c in concepts:
    print(f"  [{c.concept}] {c.description}")
    print(f"    Use Case: {c.use_case} | Example: {c.example}")

Interview Questions

# === Common Interview Questions ===

@dataclass
class InterviewQ:
    category: str
    question: str
    key_points: str
    difficulty: str

questions = [
    InterviewQ("Dagster", "Explain Software-defined Assets",
        "Data-centric vs task-centric, Lineage, Materialization history, Auto dependency",
        "Medium"),
    InterviewQ("Dagster", "Dagster vs Airflow — when to use which?",
        "Asset vs Task, Testing, UI, Modern vs Legacy, Team size",
        "Medium"),
    InterviewQ("Design", "Design ETL pipeline for e-commerce analytics",
        "Source → Staging → Marts, Partitioning, Incremental, Error handling",
        "Hard"),
    InterviewQ("Design", "How to handle late-arriving data?",
        "Partition backfill, Upsert strategy, Watermark, Reprocessing window",
        "Hard"),
    InterviewQ("Testing", "How to test data pipelines?",
        "Unit test ops, Integration test assets, Data quality checks, Dagster testing utilities",
        "Medium"),
    InterviewQ("Production", "How to monitor pipeline health?",
        "Dagster sensors, Asset freshness, SLA alerts, Metadata logging",
        "Medium"),
    InterviewQ("SQL", "Window functions for running total and rank",
        "SUM() OVER, ROW_NUMBER, RANK, PARTITION BY, ORDER BY",
        "Medium"),
    InterviewQ("Python", "Implement idempotent data loader",
        "Upsert logic, Dedup, Transaction, Checkpoint, Retry safe",
        "Hard"),
]

print("=== Interview Questions ===")
for q in questions:
    print(f"  [{q.difficulty}] [{q.category}] {q.question}")
    print(f"    Key Points: {q.key_points}")

# SQL Interview Example
# -- Running total and rank
# SELECT
#   order_date,
#   customer_id,
#   total,
#   SUM(total) OVER (PARTITION BY customer_id ORDER BY order_date) as running_total,
#   ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY total DESC) as rank_by_total,
#   LAG(total) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_total
# FROM orders
# WHERE order_date >= '2024-01-01'
# ORDER BY customer_id, order_date;

Testing and Production

# === Testing Dagster Pipelines ===

# from dagster import build_asset_context
#
# def test_stg_orders():
#     raw_data = pd.DataFrame({
#         "order_id": [1, 2, 3, None],
#         "customer_id": [10, 20, 30, 40],
#         "total": [100.0, -50.0, 200.0, 150.0],
#         "order_date": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"],
#     })
#     result = stg_orders(raw_data)
#     assert len(result) == 2  # Removed null order_id and negative total
#     assert result["total"].min() > 0
#
# def test_daily_order_metrics():
#     stg_data = pd.DataFrame({
#         "order_id": [1, 2, 3],
#         "customer_id": [10, 10, 20],
#         "total": [100.0, 200.0, 150.0],
#         "order_date": pd.to_datetime(["2024-01-01", "2024-01-01", "2024-01-02"]),
#     })
#     result = daily_order_metrics(stg_data)
#     assert len(result) == 2  # 2 dates
#     jan1 = result[result["order_date"] == "2024-01-01"].iloc[0]
#     assert jan1["total_orders"] == 2
#     assert jan1["total_revenue"] == 300.0

@dataclass
class PrepItem:
    category: str
    topic: str
    resources: str
    time_needed: str
    priority: str

prep_items = [
    PrepItem("Dagster", "Core Concepts + Assets", "Dagster docs + tutorials", "2 days", "Critical"),
    PrepItem("Dagster", "Partitions + Sensors + IO Managers", "Dagster docs advanced", "1 day", "High"),
    PrepItem("SQL", "Window Functions CTEs Subqueries", "LeetCode SQL problems", "3 days", "Critical"),
    PrepItem("Python", "Data Processing pandas numpy", "Coding challenges", "2 days", "Critical"),
    PrepItem("Design", "ETL Architecture Data Modeling", "System design articles", "2 days", "High"),
    PrepItem("Cloud", "AWS S3 Redshift BigQuery", "Cloud docs hands-on", "1 day", "Medium"),
    PrepItem("Testing", "Unit Integration Data Quality", "Dagster testing docs", "1 day", "Medium"),
    PrepItem("Behavioral", "STAR Method past projects", "Practice with friend", "1 day", "High"),
]

print("\nInterview Prep Plan:")
total_days = 0
for p in prep_items:
    print(f"  [{p.priority}] [{p.category}] {p.topic}")
    print(f"    Resources: {p.resources} | Time: {p.time_needed}")
    total_days += int(p.time_needed.split()[0])
print(f"\n  Total Prep Time: ~{total_days} days")

เคล็ดลับ

Dagster คืออะไร

Data Orchestrator Software-defined Assets Dependency Partition Sensor Schedule Dagit Type System Materialization History Modern Pipeline

Asset-based ต่างจาก Task-based อย่างไร

Task-based Airflow DAG ไม่รู้ผลลัพธ์ Asset-based Dagster รู้ข้อมูลที่สร้าง Lineage Materialization Debug Refactor ง่าย

คำถามสัมภาษณ์ Dagster มีอะไรบ้าง

Software-defined Assets เปรียบเทียบ Airflow Pipeline Design Partition Error Handling Testing Resource Configuration Sensor Schedule

เตรียมตัวสัมภาษณ์ Data Engineer อย่างไร

Dagster Core Concepts Portfolio GitHub SQL Window Functions Python Cloud Docker CI/CD System Design Data Modeling STAR Behavioral

สรุป

Dagster Pipeline Interview Preparation Software-defined Assets Orchestration Testing SQL Window Functions Data Modeling Portfolio Production Career

📖 บทความที่เกี่ยวข้อง

SSE Security Interview Preparationอ่านบทความ → Whisper Speech Interview Preparationอ่านบทความ → Dagster Pipeline Progressive Deliveryอ่านบทความ → Ansible Vault Interview Preparationอ่านบทความ → Dagster Pipeline Career Development ITอ่านบทความ →

📚 ดูบทความทั้งหมด →