Dagster Interview
Dagster Pipeline Interview Preparation Software-defined Assets Orchestration Testing Data Engineer Asset Op Job Resource Partition Sensor Schedule Dagit Production
| Orchestrator | Core Concept | UI | Testing | Learning Curve | เหมาะกับ |
|---|---|---|---|---|---|
| Dagster | Software-defined Assets | Dagit (excellent) | Built-in | ปานกลาง | Modern Data Platform |
| Airflow | Task DAG | Airflow UI | Limited | ปานกลาง | Legacy Established |
| Prefect | Flow Task | Prefect Cloud | Good | ง่าย | Python-first |
| dbt | SQL Models | dbt Cloud | Built-in | ง่าย | SQL Transforms |
| Mage | Block Pipeline | Notebook-like | Good | ง่าย | Quick Start |
Dagster Core Concepts
# === Dagster Software-defined Assets ===
# pip install dagster dagster-webserver dagster-duckdb dagster-dbt
from dagster import (
asset, AssetIn, AssetKey, MaterializeResult,
MetadataValue, Output, job, op, schedule,
sensor, RunRequest, Definitions,
DailyPartitionsDefinition, StaticPartitionsDefinition,
)
# import pandas as pd
# import duckdb
# === Assets ===
# @asset(
# description="Raw orders from API",
# group_name="raw",
# compute_kind="python",
# )
# def raw_orders() -> pd.DataFrame:
# """Extract raw orders from API"""
# response = requests.get("https://api.example.com/orders")
# df = pd.DataFrame(response.json())
# return df
# @asset(
# description="Cleaned and validated orders",
# group_name="staging",
# ins={"raw_orders": AssetIn()},
# )
# def stg_orders(raw_orders: pd.DataFrame) -> pd.DataFrame:
# """Clean and validate orders"""
# df = raw_orders.copy()
# df = df.dropna(subset=["order_id", "customer_id", "total"])
# df["total"] = df["total"].astype(float)
# df["order_date"] = pd.to_datetime(df["order_date"])
# df = df[df["total"] > 0]
# return df
# @asset(
# description="Daily order metrics",
# group_name="marts",
# ins={"stg_orders": AssetIn()},
# )
# def daily_order_metrics(stg_orders: pd.DataFrame) -> pd.DataFrame:
# """Aggregate daily metrics"""
# df = stg_orders.groupby(stg_orders["order_date"].dt.date).agg(
# total_orders=("order_id", "count"),
# total_revenue=("total", "sum"),
# avg_order_value=("total", "mean"),
# unique_customers=("customer_id", "nunique"),
# ).reset_index()
# return df
# === Partitioned Assets ===
# daily_partitions = DailyPartitionsDefinition(start_date="2024-01-01")
#
# @asset(partitions_def=daily_partitions)
# def partitioned_orders(context) -> pd.DataFrame:
# partition_date = context.partition_key
# df = fetch_orders_for_date(partition_date)
# context.log.info(f"Fetched {len(df)} orders for {partition_date}")
# return df
from dataclasses import dataclass
@dataclass
class DagsterConcept:
concept: str
description: str
use_case: str
example: str
concepts = [
DagsterConcept("Asset", "Data object produced by computation", "Table File Model", "@asset def my_table():"),
DagsterConcept("Op", "Unit of computation", "Single transform step", "@op def process():"),
DagsterConcept("Job", "Executable graph of ops", "Manual or scheduled run", "@job def my_job():"),
DagsterConcept("Resource", "External service connection", "DB connection API client", "ConfigurableResource"),
DagsterConcept("Partition", "Divide data into chunks", "Daily partitions", "DailyPartitionsDefinition"),
DagsterConcept("Sensor", "Event-driven trigger", "New file detected", "@sensor def my_sensor():"),
DagsterConcept("Schedule", "Time-based trigger", "Run daily at 2am", "@schedule(cron='0 2 * * *')"),
DagsterConcept("IO Manager", "Read/write asset storage", "S3 DuckDB BigQuery", "ConfigurableIOManager"),
]
print("=== Dagster Core Concepts ===")
for c in concepts:
print(f" [{c.concept}] {c.description}")
print(f" Use Case: {c.use_case} | Example: {c.example}")
Interview Questions
# === Common Interview Questions ===
@dataclass
class InterviewQ:
category: str
question: str
key_points: str
difficulty: str
questions = [
InterviewQ("Dagster", "Explain Software-defined Assets",
"Data-centric vs task-centric, Lineage, Materialization history, Auto dependency",
"Medium"),
InterviewQ("Dagster", "Dagster vs Airflow — when to use which?",
"Asset vs Task, Testing, UI, Modern vs Legacy, Team size",
"Medium"),
InterviewQ("Design", "Design ETL pipeline for e-commerce analytics",
"Source → Staging → Marts, Partitioning, Incremental, Error handling",
"Hard"),
InterviewQ("Design", "How to handle late-arriving data?",
"Partition backfill, Upsert strategy, Watermark, Reprocessing window",
"Hard"),
InterviewQ("Testing", "How to test data pipelines?",
"Unit test ops, Integration test assets, Data quality checks, Dagster testing utilities",
"Medium"),
InterviewQ("Production", "How to monitor pipeline health?",
"Dagster sensors, Asset freshness, SLA alerts, Metadata logging",
"Medium"),
InterviewQ("SQL", "Window functions for running total and rank",
"SUM() OVER, ROW_NUMBER, RANK, PARTITION BY, ORDER BY",
"Medium"),
InterviewQ("Python", "Implement idempotent data loader",
"Upsert logic, Dedup, Transaction, Checkpoint, Retry safe",
"Hard"),
]
print("=== Interview Questions ===")
for q in questions:
print(f" [{q.difficulty}] [{q.category}] {q.question}")
print(f" Key Points: {q.key_points}")
# SQL Interview Example
# -- Running total and rank
# SELECT
# order_date,
# customer_id,
# total,
# SUM(total) OVER (PARTITION BY customer_id ORDER BY order_date) as running_total,
# ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY total DESC) as rank_by_total,
# LAG(total) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_total
# FROM orders
# WHERE order_date >= '2024-01-01'
# ORDER BY customer_id, order_date;
Testing and Production
# === Testing Dagster Pipelines ===
# from dagster import build_asset_context
#
# def test_stg_orders():
# raw_data = pd.DataFrame({
# "order_id": [1, 2, 3, None],
# "customer_id": [10, 20, 30, 40],
# "total": [100.0, -50.0, 200.0, 150.0],
# "order_date": ["2024-01-01", "2024-01-02", "2024-01-03", "2024-01-04"],
# })
# result = stg_orders(raw_data)
# assert len(result) == 2 # Removed null order_id and negative total
# assert result["total"].min() > 0
#
# def test_daily_order_metrics():
# stg_data = pd.DataFrame({
# "order_id": [1, 2, 3],
# "customer_id": [10, 10, 20],
# "total": [100.0, 200.0, 150.0],
# "order_date": pd.to_datetime(["2024-01-01", "2024-01-01", "2024-01-02"]),
# })
# result = daily_order_metrics(stg_data)
# assert len(result) == 2 # 2 dates
# jan1 = result[result["order_date"] == "2024-01-01"].iloc[0]
# assert jan1["total_orders"] == 2
# assert jan1["total_revenue"] == 300.0
@dataclass
class PrepItem:
category: str
topic: str
resources: str
time_needed: str
priority: str
prep_items = [
PrepItem("Dagster", "Core Concepts + Assets", "Dagster docs + tutorials", "2 days", "Critical"),
PrepItem("Dagster", "Partitions + Sensors + IO Managers", "Dagster docs advanced", "1 day", "High"),
PrepItem("SQL", "Window Functions CTEs Subqueries", "LeetCode SQL problems", "3 days", "Critical"),
PrepItem("Python", "Data Processing pandas numpy", "Coding challenges", "2 days", "Critical"),
PrepItem("Design", "ETL Architecture Data Modeling", "System design articles", "2 days", "High"),
PrepItem("Cloud", "AWS S3 Redshift BigQuery", "Cloud docs hands-on", "1 day", "Medium"),
PrepItem("Testing", "Unit Integration Data Quality", "Dagster testing docs", "1 day", "Medium"),
PrepItem("Behavioral", "STAR Method past projects", "Practice with friend", "1 day", "High"),
]
print("\nInterview Prep Plan:")
total_days = 0
for p in prep_items:
print(f" [{p.priority}] [{p.category}] {p.topic}")
print(f" Resources: {p.resources} | Time: {p.time_needed}")
total_days += int(p.time_needed.split()[0])
print(f"\n Total Prep Time: ~{total_days} days")
เคล็ดลับ
- Assets: เข้าใจ Asset-based Thinking เป็นหัวใจ Dagster
- Portfolio: สร้าง End-to-end Pipeline บน GitHub แสดงผลงาน
- SQL: ฝึก Window Functions ทุกวัน ออกสอบบ่อยมาก
- Design: ฝึกออกแบบ Pipeline วาด Diagram อธิบายได้
- STAR: เตรียม STAR Method สำหรับ Behavioral Questions
Dagster คืออะไร
Data Orchestrator Software-defined Assets Dependency Partition Sensor Schedule Dagit Type System Materialization History Modern Pipeline
Asset-based ต่างจาก Task-based อย่างไร
Task-based Airflow DAG ไม่รู้ผลลัพธ์ Asset-based Dagster รู้ข้อมูลที่สร้าง Lineage Materialization Debug Refactor ง่าย
คำถามสัมภาษณ์ Dagster มีอะไรบ้าง
Software-defined Assets เปรียบเทียบ Airflow Pipeline Design Partition Error Handling Testing Resource Configuration Sensor Schedule
เตรียมตัวสัมภาษณ์ Data Engineer อย่างไร
Dagster Core Concepts Portfolio GitHub SQL Window Functions Python Cloud Docker CI/CD System Design Data Modeling STAR Behavioral
สรุป
Dagster Pipeline Interview Preparation Software-defined Assets Orchestration Testing SQL Window Functions Data Modeling Portfolio Production Career
