Technology

CrewAI Multi-Agent Data Pipeline ETL

crewai multi agent data pipeline etl
CrewAI Multi-Agent Data Pipeline ETL | SiamCafe Blog
2026-01-02· อ. บอม — SiamCafe.net· 8,607 คำ

CrewAI Multi-Agent ETL

CrewAI Python Multi-Agent AI System Agents Role Goal Tools LLM ETL Extract Transform Load Data Pipeline Orchestration Airflow dbt Spark

AgentRoleToolsOutput
Extractorดึงข้อมูลจากแหล่งAPI, DB, File ReaderRaw Data
ValidatorตรวจสอบคุณภาพSchema Validator, RulesValidated Data
Transformerแปลงรูปแบบข้อมูลPandas, SQL, dbtClean Data
Loaderโหลดเข้า WarehouseBigQuery, SnowflakeLoaded Records
Monitorตรวจสอบ PipelineMetrics, AlertsStatus Report

CrewAI Agent Design

# === CrewAI ETL Pipeline ===

# pip install crewai crewai-tools

# from crewai import Agent, Task, Crew, Process
# from crewai_tools import (
#     FileReadTool, CSVSearchTool, JSONSearchTool,
#     DatabaseTool, WebScraperTool
# )
#
# # Define Agents
# extractor = Agent(
#     role="Data Extraction Specialist",
#     goal="Extract data from multiple sources accurately",
#     backstory="""You are an expert data engineer who specializes in
#     extracting data from APIs, databases, and files. You handle
#     pagination, rate limits, and error recovery.""",
#     tools=[WebScraperTool(), FileReadTool(), DatabaseTool()],
#     verbose=True,
#     allow_delegation=False,
# )
#
# validator = Agent(
#     role="Data Quality Analyst",
#     goal="Validate data quality and flag anomalies",
#     backstory="""You are a meticulous data quality analyst who checks
#     schema compliance, null values, duplicates, and business rules.
#     You never let bad data pass through.""",
#     tools=[CSVSearchTool()],
#     verbose=True,
# )
#
# transformer = Agent(
#     role="Data Transformation Engineer",
#     goal="Transform raw data into clean analytical format",
#     backstory="""You are a skilled data engineer who transforms messy
#     data into clean, normalized, enriched datasets ready for analysis.
#     You handle type casting, joins, and aggregations.""",
#     tools=[CSVSearchTool(), JSONSearchTool()],
#     verbose=True,
# )
#
# monitor = Agent(
#     role="Pipeline Monitor",
#     goal="Monitor pipeline health and report status",
#     backstory="""You monitor data pipeline execution, track metrics,
#     detect failures, and send alerts. You ensure SLA compliance.""",
#     verbose=True,
# )

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum

class AgentStatus(Enum):
    IDLE = "idle"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class ETLAgent:
    name: str
    role: str
    tools: List[str]
    status: AgentStatus = AgentStatus.IDLE
    records_processed: int = 0
    errors: int = 0

@dataclass
class ETLPipeline:
    name: str
    agents: List[ETLAgent] = field(default_factory=list)
    schedule: str = ""
    source: str = ""
    destination: str = ""

    def run(self):
        for agent in self.agents:
            agent.status = AgentStatus.RUNNING
            print(f"  Running: {agent.name} ({agent.role})")
            agent.status = AgentStatus.COMPLETED
            agent.records_processed = 1000

pipeline = ETLPipeline(
    name="daily-sales-etl",
    schedule="0 2 * * *",
    source="PostgreSQL + REST API",
    destination="BigQuery",
    agents=[
        ETLAgent("extractor", "Data Extraction", ["API Client", "DB Connector", "File Reader"]),
        ETLAgent("validator", "Data Quality", ["Schema Validator", "Rule Engine"]),
        ETLAgent("transformer", "Data Transform", ["Pandas", "SQL", "dbt"]),
        ETLAgent("loader", "Data Loading", ["BigQuery Client", "Bulk Insert"]),
        ETLAgent("monitor", "Pipeline Monitor", ["Prometheus", "Slack Alert"]),
    ],
)

print("=== ETL Pipeline ===")
print(f"  Name: {pipeline.name}")
print(f"  Schedule: {pipeline.schedule}")
print(f"  Source: {pipeline.source} -> Dest: {pipeline.destination}")
print(f"\n  Agents:")
for agent in pipeline.agents:
    print(f"    [{agent.name}] {agent.role} — Tools: {', '.join(agent.tools)}")

Task Orchestration

# === Task Orchestration ===

# from crewai import Task
#
# extract_task = Task(
#     description="""Extract sales data from the following sources:
#     1. PostgreSQL database (orders, products, customers tables)
#     2. REST API (inventory endpoint, paginated)
#     3. CSV files from SFTP server
#     Combine all data into a unified raw dataset.""",
#     expected_output="Raw dataset with all records from all sources",
#     agent=extractor,
# )
#
# validate_task = Task(
#     description="""Validate the extracted data:
#     1. Check schema compliance (column names, types)
#     2. Check for null values in required fields
#     3. Check for duplicates on primary keys
#     4. Validate business rules (price > 0, quantity >= 0)
#     5. Flag anomalies (values outside 3 std dev)""",
#     expected_output="Validation report with pass/fail and anomalies",
#     agent=validator,
#     context=[extract_task],
# )
#
# transform_task = Task(
#     description="""Transform validated data:
#     1. Normalize customer names (trim, title case)
#     2. Convert currencies to THB
#     3. Calculate derived fields (total, profit margin)
#     4. Join orders with products and customers
#     5. Aggregate daily summaries""",
#     expected_output="Clean transformed dataset ready for loading",
#     agent=transformer,
#     context=[validate_task],
# )
#
# # Create Crew
# crew = Crew(
#     agents=[extractor, validator, transformer, monitor],
#     tasks=[extract_task, validate_task, transform_task],
#     process=Process.sequential,
#     verbose=True,
# )
#
# result = crew.kickoff()

@dataclass
class TaskResult:
    task: str
    agent: str
    status: str
    records_in: int
    records_out: int
    duration_sec: float
    errors: int

results = [
    TaskResult("Extract", "extractor", "completed", 0, 50000, 120.5, 3),
    TaskResult("Validate", "validator", "completed", 50000, 49850, 45.2, 150),
    TaskResult("Transform", "transformer", "completed", 49850, 49850, 89.7, 0),
    TaskResult("Load", "loader", "completed", 49850, 49850, 30.1, 0),
    TaskResult("Monitor", "monitor", "completed", 0, 0, 5.0, 0),
]

print("\n=== Pipeline Execution Results ===")
total_time = 0
for r in results:
    total_time += r.duration_sec
    print(f"  [{r.status.upper()}] {r.task} by {r.agent}")
    print(f"    Records: {r.records_in} -> {r.records_out} | "
          f"Time: {r.duration_sec}s | Errors: {r.errors}")

print(f"\n  Total Time: {total_time:.1f}s ({total_time/60:.1f} min)")
print(f"  Final Records: {results[-2].records_out:,}")

# ETL vs ELT
comparison = {
    "ETL (Extract Transform Load)": {
        "flow": "Source -> Transform -> Warehouse",
        "transform": "ก่อนโหลด (ETL Server)",
        "use_case": "Legacy Systems, On-premise",
        "tools": "Informatica, Talend, SSIS",
    },
    "ELT (Extract Load Transform)": {
        "flow": "Source -> Warehouse -> Transform",
        "transform": "หลังโหลด (ใน Warehouse)",
        "use_case": "Cloud Warehouse, Big Data",
        "tools": "dbt, Fivetran, Airbyte + BigQuery/Snowflake",
    },
}

print(f"\n\nETL vs ELT:")
for approach, info in comparison.items():
    print(f"\n  [{approach}]")
    for k, v in info.items():
        print(f"    {k}: {v}")

Production Deployment

# === Production ETL Architecture ===

architecture = {
    "Orchestration": {
        "tools": "Apache Airflow, Prefect, Dagster",
        "desc": "กำหนด Schedule, Dependencies, Retry Logic",
    },
    "Data Sources": {
        "tools": "PostgreSQL, MySQL, REST API, S3, SFTP",
        "desc": "แหล่งข้อมูลต้นทาง หลายรูปแบบ",
    },
    "Processing": {
        "tools": "CrewAI Agents, Spark, Pandas, dbt",
        "desc": "Extract Validate Transform ด้วย AI Agents",
    },
    "Storage": {
        "tools": "BigQuery, Snowflake, Redshift, Delta Lake",
        "desc": "Data Warehouse สำหรับ Analytics",
    },
    "Monitoring": {
        "tools": "Prometheus, Grafana, PagerDuty, Slack",
        "desc": "ติดตาม Pipeline Health Alerts",
    },
    "Data Quality": {
        "tools": "Great Expectations, dbt tests, Soda",
        "desc": "ตรวจสอบคุณภาพข้อมูลอัตโนมัติ",
    },
}

print("Production ETL Architecture:")
for layer, info in architecture.items():
    print(f"\n  [{layer}]")
    print(f"    Tools: {info['tools']}")
    print(f"    {info['desc']}")

# Best Practices
practices = [
    "Idempotent — รันซ้ำได้ผลเหมือนเดิม",
    "Incremental — โหลดเฉพาะข้อมูลใหม่/เปลี่ยนแปลง",
    "Schema Evolution — รองรับ Schema เปลี่ยนแปลง",
    "Data Quality Checks — ตรวจทุกขั้นตอน",
    "Retry Logic — ลองใหม่อัตโนมัติเมื่อล้มเหลว",
    "Alerting — แจ้งเตือนทันทีเมื่อมีปัญหา",
    "Lineage — ติดตามที่มาของข้อมูล",
]

print(f"\n\nETL Best Practices:")
for i, p in enumerate(practices, 1):
    print(f"  {i}. {p}")

เคล็ดลับ

CrewAI คืออะไร

Python Framework Multi-Agent AI System Agents Role Goal Tools LLM Collaboration Task Orchestration งานซับซ้อนหลายทักษะ

ETL Pipeline คืออะไร

Extract Transform Load ดึงข้อมูล แปลงรูปแบบ โหลดเข้า Warehouse Airflow dbt Spark Prefect ELT โหลดก่อน Transform

Multi-Agent ดีกว่า Single Agent อย่างไร

แบ่งงาน Expertise เชี่ยวชาญด้านเดียว ลด Complexity ทำงานขนาน Scale Debug ง่าย เปลี่ยน Agent ไม่กระทบ

CrewAI กับ LangChain ต่างกันอย่างไร

LangChain LLM Application Chain RAG Tools CrewAI Multi-Agent Orchestration Role Goal Backstory Team Collaboration ใช้ร่วมกันได้

สรุป

CrewAI Multi-Agent ETL Pipeline Extract Transform Load Agent Design Task Orchestration Data Quality Airflow dbt Spark Incremental Idempotent Monitoring Great Expectations Production

📖 บทความที่เกี่ยวข้อง

CrewAI Multi-Agent 12 Factor Appอ่านบทความ → CrewAI Multi-Agent Distributed Systemอ่านบทความ → CrewAI Multi-Agent Clean Architectureอ่านบทความ → PlanetScale Vitess Data Pipeline ETLอ่านบทความ → Ceph Storage Cluster Data Pipeline ETLอ่านบทความ →

📚 ดูบทความทั้งหมด →