CrewAI Multi-Agent ETL
CrewAI Python Multi-Agent AI System Agents Role Goal Tools LLM ETL Extract Transform Load Data Pipeline Orchestration Airflow dbt Spark
| Agent | Role | Tools | Output |
|---|---|---|---|
| Extractor | ดึงข้อมูลจากแหล่ง | API, DB, File Reader | Raw Data |
| Validator | ตรวจสอบคุณภาพ | Schema Validator, Rules | Validated Data |
| Transformer | แปลงรูปแบบข้อมูล | Pandas, SQL, dbt | Clean Data |
| Loader | โหลดเข้า Warehouse | BigQuery, Snowflake | Loaded Records |
| Monitor | ตรวจสอบ Pipeline | Metrics, Alerts | Status Report |
CrewAI Agent Design
# === CrewAI ETL Pipeline ===
# pip install crewai crewai-tools
# from crewai import Agent, Task, Crew, Process
# from crewai_tools import (
# FileReadTool, CSVSearchTool, JSONSearchTool,
# DatabaseTool, WebScraperTool
# )
#
# # Define Agents
# extractor = Agent(
# role="Data Extraction Specialist",
# goal="Extract data from multiple sources accurately",
# backstory="""You are an expert data engineer who specializes in
# extracting data from APIs, databases, and files. You handle
# pagination, rate limits, and error recovery.""",
# tools=[WebScraperTool(), FileReadTool(), DatabaseTool()],
# verbose=True,
# allow_delegation=False,
# )
#
# validator = Agent(
# role="Data Quality Analyst",
# goal="Validate data quality and flag anomalies",
# backstory="""You are a meticulous data quality analyst who checks
# schema compliance, null values, duplicates, and business rules.
# You never let bad data pass through.""",
# tools=[CSVSearchTool()],
# verbose=True,
# )
#
# transformer = Agent(
# role="Data Transformation Engineer",
# goal="Transform raw data into clean analytical format",
# backstory="""You are a skilled data engineer who transforms messy
# data into clean, normalized, enriched datasets ready for analysis.
# You handle type casting, joins, and aggregations.""",
# tools=[CSVSearchTool(), JSONSearchTool()],
# verbose=True,
# )
#
# monitor = Agent(
# role="Pipeline Monitor",
# goal="Monitor pipeline health and report status",
# backstory="""You monitor data pipeline execution, track metrics,
# detect failures, and send alerts. You ensure SLA compliance.""",
# verbose=True,
# )
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum
class AgentStatus(Enum):
IDLE = "idle"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class ETLAgent:
name: str
role: str
tools: List[str]
status: AgentStatus = AgentStatus.IDLE
records_processed: int = 0
errors: int = 0
@dataclass
class ETLPipeline:
name: str
agents: List[ETLAgent] = field(default_factory=list)
schedule: str = ""
source: str = ""
destination: str = ""
def run(self):
for agent in self.agents:
agent.status = AgentStatus.RUNNING
print(f" Running: {agent.name} ({agent.role})")
agent.status = AgentStatus.COMPLETED
agent.records_processed = 1000
pipeline = ETLPipeline(
name="daily-sales-etl",
schedule="0 2 * * *",
source="PostgreSQL + REST API",
destination="BigQuery",
agents=[
ETLAgent("extractor", "Data Extraction", ["API Client", "DB Connector", "File Reader"]),
ETLAgent("validator", "Data Quality", ["Schema Validator", "Rule Engine"]),
ETLAgent("transformer", "Data Transform", ["Pandas", "SQL", "dbt"]),
ETLAgent("loader", "Data Loading", ["BigQuery Client", "Bulk Insert"]),
ETLAgent("monitor", "Pipeline Monitor", ["Prometheus", "Slack Alert"]),
],
)
print("=== ETL Pipeline ===")
print(f" Name: {pipeline.name}")
print(f" Schedule: {pipeline.schedule}")
print(f" Source: {pipeline.source} -> Dest: {pipeline.destination}")
print(f"\n Agents:")
for agent in pipeline.agents:
print(f" [{agent.name}] {agent.role} — Tools: {', '.join(agent.tools)}")
Task Orchestration
# === Task Orchestration ===
# from crewai import Task
#
# extract_task = Task(
# description="""Extract sales data from the following sources:
# 1. PostgreSQL database (orders, products, customers tables)
# 2. REST API (inventory endpoint, paginated)
# 3. CSV files from SFTP server
# Combine all data into a unified raw dataset.""",
# expected_output="Raw dataset with all records from all sources",
# agent=extractor,
# )
#
# validate_task = Task(
# description="""Validate the extracted data:
# 1. Check schema compliance (column names, types)
# 2. Check for null values in required fields
# 3. Check for duplicates on primary keys
# 4. Validate business rules (price > 0, quantity >= 0)
# 5. Flag anomalies (values outside 3 std dev)""",
# expected_output="Validation report with pass/fail and anomalies",
# agent=validator,
# context=[extract_task],
# )
#
# transform_task = Task(
# description="""Transform validated data:
# 1. Normalize customer names (trim, title case)
# 2. Convert currencies to THB
# 3. Calculate derived fields (total, profit margin)
# 4. Join orders with products and customers
# 5. Aggregate daily summaries""",
# expected_output="Clean transformed dataset ready for loading",
# agent=transformer,
# context=[validate_task],
# )
#
# # Create Crew
# crew = Crew(
# agents=[extractor, validator, transformer, monitor],
# tasks=[extract_task, validate_task, transform_task],
# process=Process.sequential,
# verbose=True,
# )
#
# result = crew.kickoff()
@dataclass
class TaskResult:
task: str
agent: str
status: str
records_in: int
records_out: int
duration_sec: float
errors: int
results = [
TaskResult("Extract", "extractor", "completed", 0, 50000, 120.5, 3),
TaskResult("Validate", "validator", "completed", 50000, 49850, 45.2, 150),
TaskResult("Transform", "transformer", "completed", 49850, 49850, 89.7, 0),
TaskResult("Load", "loader", "completed", 49850, 49850, 30.1, 0),
TaskResult("Monitor", "monitor", "completed", 0, 0, 5.0, 0),
]
print("\n=== Pipeline Execution Results ===")
total_time = 0
for r in results:
total_time += r.duration_sec
print(f" [{r.status.upper()}] {r.task} by {r.agent}")
print(f" Records: {r.records_in} -> {r.records_out} | "
f"Time: {r.duration_sec}s | Errors: {r.errors}")
print(f"\n Total Time: {total_time:.1f}s ({total_time/60:.1f} min)")
print(f" Final Records: {results[-2].records_out:,}")
# ETL vs ELT
comparison = {
"ETL (Extract Transform Load)": {
"flow": "Source -> Transform -> Warehouse",
"transform": "ก่อนโหลด (ETL Server)",
"use_case": "Legacy Systems, On-premise",
"tools": "Informatica, Talend, SSIS",
},
"ELT (Extract Load Transform)": {
"flow": "Source -> Warehouse -> Transform",
"transform": "หลังโหลด (ใน Warehouse)",
"use_case": "Cloud Warehouse, Big Data",
"tools": "dbt, Fivetran, Airbyte + BigQuery/Snowflake",
},
}
print(f"\n\nETL vs ELT:")
for approach, info in comparison.items():
print(f"\n [{approach}]")
for k, v in info.items():
print(f" {k}: {v}")
Production Deployment
# === Production ETL Architecture ===
architecture = {
"Orchestration": {
"tools": "Apache Airflow, Prefect, Dagster",
"desc": "กำหนด Schedule, Dependencies, Retry Logic",
},
"Data Sources": {
"tools": "PostgreSQL, MySQL, REST API, S3, SFTP",
"desc": "แหล่งข้อมูลต้นทาง หลายรูปแบบ",
},
"Processing": {
"tools": "CrewAI Agents, Spark, Pandas, dbt",
"desc": "Extract Validate Transform ด้วย AI Agents",
},
"Storage": {
"tools": "BigQuery, Snowflake, Redshift, Delta Lake",
"desc": "Data Warehouse สำหรับ Analytics",
},
"Monitoring": {
"tools": "Prometheus, Grafana, PagerDuty, Slack",
"desc": "ติดตาม Pipeline Health Alerts",
},
"Data Quality": {
"tools": "Great Expectations, dbt tests, Soda",
"desc": "ตรวจสอบคุณภาพข้อมูลอัตโนมัติ",
},
}
print("Production ETL Architecture:")
for layer, info in architecture.items():
print(f"\n [{layer}]")
print(f" Tools: {info['tools']}")
print(f" {info['desc']}")
# Best Practices
practices = [
"Idempotent — รันซ้ำได้ผลเหมือนเดิม",
"Incremental — โหลดเฉพาะข้อมูลใหม่/เปลี่ยนแปลง",
"Schema Evolution — รองรับ Schema เปลี่ยนแปลง",
"Data Quality Checks — ตรวจทุกขั้นตอน",
"Retry Logic — ลองใหม่อัตโนมัติเมื่อล้มเหลว",
"Alerting — แจ้งเตือนทันทีเมื่อมีปัญหา",
"Lineage — ติดตามที่มาของข้อมูล",
]
print(f"\n\nETL Best Practices:")
for i, p in enumerate(practices, 1):
print(f" {i}. {p}")
เคล็ดลับ
- Agent Design: แต่ละ Agent ทำหน้าที่เดียว Single Responsibility
- Idempotent: Pipeline รันซ้ำได้ผลเหมือนเดิม
- Incremental: โหลดเฉพาะข้อมูลที่เปลี่ยนแปลง ไม่ Full Load ทุกครั้ง
- Testing: ใช้ Great Expectations ตรวจ Data Quality ทุกขั้นตอน
- Monitoring: ตรวจ Row Count, Schema, Freshness ทุก Run
CrewAI คืออะไร
Python Framework Multi-Agent AI System Agents Role Goal Tools LLM Collaboration Task Orchestration งานซับซ้อนหลายทักษะ
ETL Pipeline คืออะไร
Extract Transform Load ดึงข้อมูล แปลงรูปแบบ โหลดเข้า Warehouse Airflow dbt Spark Prefect ELT โหลดก่อน Transform
Multi-Agent ดีกว่า Single Agent อย่างไร
แบ่งงาน Expertise เชี่ยวชาญด้านเดียว ลด Complexity ทำงานขนาน Scale Debug ง่าย เปลี่ยน Agent ไม่กระทบ
CrewAI กับ LangChain ต่างกันอย่างไร
LangChain LLM Application Chain RAG Tools CrewAI Multi-Agent Orchestration Role Goal Backstory Team Collaboration ใช้ร่วมกันได้
สรุป
CrewAI Multi-Agent ETL Pipeline Extract Transform Load Agent Design Task Orchestration Data Quality Airflow dbt Spark Incremental Idempotent Monitoring Great Expectations Production
