CrewAI Multi-Agent ETL

CrewAI Python Multi-Agent AI System Agents Role Goal Tools LLM ETL Extract Transform Load Data Pipeline Orchestration Airflow dbt Spark

AgentRoleToolsOutput
Extractorดึงข้อมูลจากแหล่งAPI, DB, File ReaderRaw Data
ValidatorตรวจสอบคุณภาพSchema Validator, RulesValidated Data
Transformerแปลงรูปแบบข้อมูลPandas, SQL, dbtClean Data
Loaderโหลดเข้า WarehouseBigQuery, SnowflakeLoaded Records
Monitorตรวจสอบ PipelineMetrics, AlertsStatus Report

CrewAI Agent Design

# === CrewAI ETL Pipeline ===

# pip install crewai crewai-tools

# from crewai import Agent, Task, Crew, Process
# from crewai_tools import (
# FileReadTool, CSVSearchTool, JSONSearchTool,
# DatabaseTool, WebScraperTool
# )
#
# # Define Agents
# extractor = Agent(
# role="Data Extraction Specialist",
# goal="Extract data from multiple sources accurately",
# backstory="""You are an expert data engineer who specializes in
# extracting data from APIs, databases, and files. You handle
# pagination, rate limits, and error recovery.""",
# tools=[WebScraperTool(), FileReadTool(), DatabaseTool()],
# verbose=True,
# allow_delegation=False,
# )
#
# validator = Agent(
# role="Data Quality Analyst",
# goal="Validate data quality and flag anomalies",
# backstory="""You are a meticulous data quality analyst who checks
# schema compliance, null values, duplicates, and business rules.
# You never let bad data pass through.""",
# tools=[CSVSearchTool()],
# verbose=True,
# )
#
# transformer = Agent(
# role="Data Transformation Engineer",
# goal="Transform raw data into clean analytical format",
# backstory="""You are a skilled data engineer who transforms messy
# data into clean, normalized, enriched datasets ready for analysis.
# You handle type casting, joins, and aggregations.""",
# tools=[CSVSearchTool(), JSONSearchTool()],
# verbose=True,
# )
#
# monitor = Agent(
# role="Pipeline Monitor",
# goal="Monitor pipeline health and report status",
# backstory="""You monitor data pipeline execution, track metrics,
# detect failures, and send alerts. You ensure SLA compliance.""",
# verbose=True,
# )

from dataclasses import dataclass, field
from typing import List, Dict, Optional
from enum import Enum

class AgentStatus(Enum):
 IDLE = "idle"
 RUNNING = "running"
 COMPLETED = "completed"
 FAILED = "failed"

@dataclass
class ETLAgent:
 name: str
 role: str
 tools: List[str]
 status: AgentStatus = AgentStatus.IDLE
 records_processed: int = 0
 errors: int = 0

@dataclass
class ETLPipeline:
 name: str
 agents: List[ETLAgent] = field(default_factory=list)
 schedule: str = ""
 source: str = ""
 destination: str = ""

 def run(self):
 for agent in self.agents:
 agent.status = AgentStatus.RUNNING
 print(f" Running: {agent.name} ({agent.role})")
 agent.status = AgentStatus.COMPLETED
 agent.records_processed = 1000

pipeline = ETLPipeline(
 name="daily-sales-etl",
 schedule="0 2 * * *",
 source="PostgreSQL + REST API",
 destination="BigQuery",
 agents=[
 ETLAgent("extractor", "Data Extraction", ["API Client", "DB Connector", "File Reader"]),
 ETLAgent("validator", "Data Quality", ["Schema Validator", "Rule Engine"]),
 ETLAgent("transformer", "Data Transform", ["Pandas", "SQL", "dbt"]),
 ETLAgent("loader", "Data Loading", ["BigQuery Client", "Bulk Insert"]),
 ETLAgent("monitor", "Pipeline Monitor", ["Prometheus", "Slack Alert"]),
 ],
)

print("=== ETL Pipeline ===")
print(f" Name: {pipeline.name}")
print(f" Schedule: {pipeline.schedule}")
print(f" Source: {pipeline.source} -> Dest: {pipeline.destination}")
print(f"\n Agents:")
for agent in pipeline.agents:
 print(f" [{agent.name}] {agent.role} — Tools: {', '.join(agent.tools)}")

Task Orchestration

# === Task Orchestration ===

# from crewai import Task
#
# extract_task = Task(
# description="""Extract sales data from the following sources:
# 1. PostgreSQL database (orders, products, customers tables)
# 2. REST API (inventory endpoint, paginated)
# 3. CSV files from SFTP server
# Combine all data into a unified raw dataset.""",
# expected_output="Raw dataset with all records from all sources",
# agent=extractor,
# )
#
# validate_task = Task(
# description="""Validate the extracted data:
# 1. Check schema compliance (column names, types)
# 2. Check for null values in required fields
# 3. Check for duplicates on primary keys
# 4. Validate business rules (price > 0, quantity >= 0)
# 5. Flag anomalies (values outside 3 std dev)""",
# expected_output="Validation report with pass/fail and anomalies",
# agent=validator,
# context=[extract_task],
# )
#
# transform_task = Task(
# description="""Transform validated data:
# 1. Normalize customer names (trim, title case)
# 2. Convert currencies to THB
# 3. Calculate derived fields (total, profit margin)
# 4. Join orders with products and customers
# 5. Aggregate daily summaries""",
# expected_output="Clean transformed dataset ready for loading",
# agent=transformer,
# context=[validate_task],
# )
#
# # Create Crew
# crew = Crew(
# agents=[extractor, validator, transformer, monitor],
# tasks=[extract_task, validate_task, transform_task],
# process=Process.sequential,
# verbose=True,
# )
#
# result = crew.kickoff()

@dataclass
class TaskResult:
 task: str
 agent: str
 status: str
 records_in: int
 records_out: int
 duration_sec: float
 errors: int

results = [
 TaskResult("Extract", "extractor", "completed", 0, 50000, 120.5, 3),
 TaskResult("Validate", "validator", "completed", 50000, 49850, 45.2, 150),
 TaskResult("Transform", "transformer", "completed", 49850, 49850, 89.7, 0),
 TaskResult("Load", "loader", "completed", 49850, 49850, 30.1, 0),
 TaskResult("Monitor", "monitor", "completed", 0, 0, 5.0, 0),
]

print("\n=== Pipeline Execution Results ===")
total_time = 0
for r in results:
 total_time += r.duration_sec
 print(f" [{r.status.upper()}] {r.task} by {r.agent}")
 print(f" Records: {r.records_in} -> {r.records_out} | "
 f"Time: {r.duration_sec}s | Errors: {r.errors}")

print(f"\n Total Time: {total_time:.1f}s ({total_time/60:.1f} min)")
print(f" Final Records: {results[-2].records_out:,}")

# ETL vs ELT
comparison = {
 "ETL (Extract Transform Load)": {
 "flow": "Source -> Transform -> Warehouse",
 "transform": "ก่อนโหลด (ETL Server)",
 "use_case": "Legacy Systems, On-premise",
 "tools": "Informatica, Talend, SSIS",
 },
 "ELT (Extract Load Transform)": {
 "flow": "Source -> Warehouse -> Transform",
 "transform": "หลังโหลด (ใน Warehouse)",
 "use_case": "Cloud Warehouse, Big Data",
 "tools": "dbt, Fivetran, Airbyte + BigQuery/Snowflake",
 },
}

print(f"\n\nETL vs ELT:")
for approach, info in comparison.items():
 print(f"\n [{approach}]")
 for k, v in info.items():
 print(f" {k}: {v}")

Production Deployment

# === Production ETL Architecture ===

architecture = {
 "Orchestration": {
 "tools": "Apache Airflow, Prefect, Dagster",
 "desc": "กำหนด Schedule, Dependencies, Retry Logic",
 },
 "Data Sources": {
 "tools": "PostgreSQL, MySQL, REST API, S3, SFTP",
 "desc": "แหล่งข้อมูลต้นทาง หลายรูปแบบ",
 },
 "Processing": {
 "tools": "CrewAI Agents, Spark, Pandas, dbt",
 "desc": "Extract Validate Transform ด้วย AI Agents",
 },
 "Storage": {
 "tools": "BigQuery, Snowflake, Redshift, Delta Lake",
 "desc": "Data Warehouse สำหรับ Analytics",
 },
 "Monitoring": {
 "tools": "Prometheus, Grafana, PagerDuty, Slack",
 "desc": "ติดตาม Pipeline Health Alerts",
 },
 "Data Quality": {
 "tools": "Great Expectations, dbt tests, Soda",
 "desc": "ตรวจสอบคุณภาพข้อมูลอัตโนมัติ",
 },
}

print("Production ETL Architecture:")
for layer, info in architecture.items():
 print(f"\n [{layer}]")
 print(f" Tools: {info['tools']}")
 print(f" {info['desc']}")

# Best Practices
practices = [
 "Idempotent — รันซ้ำได้ผลเหมือนเดิม",
 "Incremental — โหลดเฉพาะข้อมูลใหม่/เปลี่ยนแปลง",
 "Schema Evolution — รองรับ Schema เปลี่ยนแปลง",
 "Data Quality Checks — ตรวจทุกขั้นตอน",
 "Retry Logic — ลองใหม่อัตโนมัติเมื่อล้มเหลว",
 "Alerting — แจ้งเตือนทันทีเมื่อมีปัญหา",
 "Lineage — ติดตามที่มาของข้อมูล",
]

print(f"\n\nETL Best Practices:")
for i, p in enumerate(practices, 1):
 print(f" {i}. {p}")

เคล็ดลับ

  • Agent Design: แต่ละ Agent ทำหน้าที่เดียว Single Responsibility
  • Idempotent: Pipeline รันซ้ำได้ผลเหมือนเดิม
  • Incremental: โหลดเฉพาะข้อมูลที่เปลี่ยนแปลง ไม่ Full Load ทุกครั้ง
  • Testing: ใช้ Great Expectations ตรวจ Data Quality ทุกขั้นตอน
  • Monitoring: ตรวจ Row Count, Schema, Freshness ทุก Run

CrewAI คืออะไร

Python Framework Multi-Agent AI System Agents Role Goal Tools LLM Collaboration Task Orchestration งานซับซ้อนหลายทักษะ