SiamCafe.net Blog
Technology

LangChain Agent Data Pipeline ETL สร้าง Intelligent Data Pipeline ด้วย AI

langchain agent data pipeline etl
LangChain Agent Data Pipeline ETL | SiamCafe Blog
2026-03-27· อ. บอม — SiamCafe.net· 1,278 คำ

LangChain Agent ????????? Data Pipeline ?????????????????????

LangChain ???????????? framework ????????????????????????????????? applications ?????????????????? Large Language Models (LLMs) ????????? LangChain Agent ?????????????????????????????? LLM ???????????????????????????????????????????????? tools ??????????????? ????????????????????????????????????????????????????????? ????????????????????????????????????????????? Data Pipeline ETL ?????????????????????????????? intelligent data pipelines ??????????????????????????????????????????????????????????????????????????????????????????????????????

ETL (Extract, Transform, Load) ????????????????????????????????????????????????????????????????????? data engineering Extract ?????????????????????????????????????????????????????????????????? (databases, APIs, files), Transform ????????????????????????????????????????????????????????????????????????????????????????????????????????? (clean, aggregate, join), Load ?????????????????????????????????????????? data warehouse ???????????? data lake

?????????????????? LangChain Agent ????????? ETL ????????????????????????????????? AI ???????????????????????? transformation logic ????????? data characteristics, Natural language interface ???????????? pipeline ????????????????????????????????????????????????, Automated error recovery agent ??????????????????????????????????????????, Dynamic schema mapping ???????????? mapping ????????? schema changes, Intelligent data quality checks ?????????????????????????????????????????????????????????????????????????????????

????????????????????? LangChain ?????????????????? ETL

Setup LangChain ETL environment

# === LangChain ETL Setup ===

# 1. Install Dependencies
pip install langchain langchain-openai langchain-community
pip install pandas sqlalchemy psycopg2-binary
pip install apache-airflow dbt-core
pip install great-expectations

# 2. Environment Configuration
cat > .env << 'EOF'
OPENAI_API_KEY=sk-your-api-key-here
DATABASE_URL=postgresql://user:pass@localhost:5432/warehouse
REDIS_URL=redis://localhost:6379
EOF

# 3. LangChain ETL Agent Setup
cat > etl_agent_config.yaml << 'EOF'
agent:
  model: "gpt-4o-mini"
  temperature: 0
  max_iterations: 10
  
tools:
  - name: "sql_query"
    description: "Execute SQL queries on source/target databases"
  - name: "data_profiler"
    description: "Profile data quality and statistics"
  - name: "schema_mapper"
    description: "Map source schema to target schema"
  - name: "transform_data"
    description: "Apply transformations (clean, aggregate, join)"
  - name: "load_data"
    description: "Load transformed data to target"
  - name: "validate_data"
    description: "Validate data quality with expectations"

sources:
  postgres:
    type: "postgresql"
    host: "source-db.example.com"
    port: 5432
    database: "production"
  api:
    type: "rest_api"
    base_url: "https://api.example.com/v1"
  s3:
    type: "s3"
    bucket: "data-lake-raw"
    
targets:
  warehouse:
    type: "postgresql"
    host: "warehouse.example.com"
    database: "analytics"
  bigquery:
    type: "bigquery"
    project: "analytics-prod"
    dataset: "transformed"
EOF

# 4. Verify Setup
python3 -c "
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor
print('LangChain: OK')
import pandas as pd
print('Pandas: OK')
from sqlalchemy import create_engine
print('SQLAlchemy: OK')
print('ETL Agent environment ready')
"

echo "Setup complete"

??????????????? AI Agent ?????????????????? Data Pipeline

LangChain Agent ??????????????????????????? ETL pipeline

#!/usr/bin/env python3
# etl_agent.py ??? LangChain ETL Agent
import json
import logging
from typing import Dict, List, Any

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("etl_agent")

class ETLAgent:
    """AI-powered ETL Agent using LangChain patterns"""
    
    def __init__(self):
        self.tools = {}
        self.pipeline_state = {}
    
    def register_tool(self, name, func, description):
        self.tools[name] = {"func": func, "description": description}
    
    def plan_pipeline(self, user_request):
        """Agent plans ETL steps based on natural language request"""
        # In production: LLM generates this plan
        plan_templates = {
            "sync_users": [
                {"tool": "sql_query", "args": {"query": "SELECT * FROM users WHERE updated_at > :last_sync"}},
                {"tool": "transform_data", "args": {"operations": ["deduplicate", "normalize_email", "hash_pii"]}},
                {"tool": "validate_data", "args": {"checks": ["not_null:id", "unique:email", "format:phone"]}},
                {"tool": "load_data", "args": {"target": "warehouse.dim_users", "mode": "upsert"}},
            ],
            "aggregate_sales": [
                {"tool": "sql_query", "args": {"query": "SELECT * FROM orders WHERE date >= :start_date"}},
                {"tool": "transform_data", "args": {"operations": ["calculate_totals", "group_by_product", "add_metrics"]}},
                {"tool": "validate_data", "args": {"checks": ["positive:amount", "not_null:product_id"]}},
                {"tool": "load_data", "args": {"target": "warehouse.fact_sales", "mode": "append"}},
            ],
        }
        
        # Simple keyword matching (LLM does this in production)
        for key, plan in plan_templates.items():
            if key.replace("_", " ") in user_request.lower():
                return {"request": user_request, "steps": plan, "estimated_time": "2-5 minutes"}
        
        return {"request": user_request, "steps": [], "note": "LLM would generate custom plan"}
    
    def execute_pipeline(self, plan):
        """Execute planned ETL pipeline"""
        results = []
        for i, step in enumerate(plan["steps"]):
            logger.info(f"Step {i+1}/{len(plan['steps'])}: {step['tool']}")
            
            result = {
                "step": i + 1,
                "tool": step["tool"],
                "status": "success",
                "rows_processed": 1000 * (i + 1),
                "duration_sec": 5 + i * 3,
            }
            
            # Simulate validation failures
            if step["tool"] == "validate_data":
                result["validation"] = {
                    "passed": 985,
                    "failed": 15,
                    "pass_rate": 98.5,
                }
            
            results.append(result)
        
        return {
            "pipeline": plan["request"],
            "status": "completed",
            "steps_completed": len(results),
            "total_rows": results[-1]["rows_processed"] if results else 0,
            "total_duration": sum(r["duration_sec"] for r in results),
            "results": results,
        }

agent = ETLAgent()

# Plan pipeline from natural language
plan = agent.plan_pipeline("sync users from production to warehouse")
print(f"Pipeline: {plan['request']}")
print(f"Steps: {len(plan['steps'])}")
for step in plan["steps"]:
    print(f"  {step['tool']}: {json.dumps(step['args'], default=str)[:60]}...")

# Execute
result = agent.execute_pipeline(plan)
print(f"\nStatus: {result['status']}")
print(f"Rows: {result['total_rows']:,}, Duration: {result['total_duration']}s")

ETL Automation ???????????? LangChain Tools

Custom tools ?????????????????? ETL operations

#!/usr/bin/env python3
# etl_tools.py ??? LangChain ETL Tools
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("tools")

class DataProfiler:
    """Profile data quality and statistics"""
    
    def profile(self, data_sample):
        """Profile a dataset"""
        return {
            "row_count": len(data_sample),
            "columns": list(data_sample[0].keys()) if data_sample else [],
            "null_counts": {col: sum(1 for row in data_sample if row.get(col) is None) for col in (data_sample[0].keys() if data_sample else [])},
            "unique_counts": {col: len(set(row.get(col) for row in data_sample)) for col in (data_sample[0].keys() if data_sample else [])},
        }

class SchemaMapper:
    """Map source schema to target schema"""
    
    def auto_map(self, source_schema, target_schema):
        """AI-assisted schema mapping"""
        mappings = []
        for s_col in source_schema:
            best_match = None
            for t_col in target_schema:
                if s_col["name"].lower() == t_col["name"].lower():
                    best_match = t_col["name"]
                    break
                if s_col["name"].lower().replace("_", "") == t_col["name"].lower().replace("_", ""):
                    best_match = t_col["name"]
            
            mappings.append({
                "source": s_col["name"],
                "target": best_match,
                "transform": "direct" if best_match else "unmapped",
            })
        return mappings

class DataTransformer:
    """Transform data with various operations"""
    
    def transform(self, data, operations):
        """Apply transformation operations"""
        result = data.copy()
        applied = []
        
        for op in operations:
            if op == "deduplicate":
                seen = set()
                deduped = []
                for row in result:
                    key = json.dumps(row, sort_keys=True)
                    if key not in seen:
                        seen.add(key)
                        deduped.append(row)
                removed = len(result) - len(deduped)
                result = deduped
                applied.append({"operation": op, "removed": removed})
            
            elif op == "normalize_email":
                for row in result:
                    if "email" in row and row["email"]:
                        row["email"] = row["email"].lower().strip()
                applied.append({"operation": op, "affected": len(result)})
            
            elif op == "hash_pii":
                import hashlib
                for row in result:
                    for field in ["ssn", "national_id", "phone"]:
                        if field in row and row[field]:
                            row[field] = hashlib.sha256(str(row[field]).encode()).hexdigest()[:16]
                applied.append({"operation": op, "fields_hashed": ["ssn", "national_id", "phone"]})
        
        return {"data": result, "operations_applied": applied, "row_count": len(result)}

# Demo
profiler = DataProfiler()
sample_data = [
    {"id": 1, "name": "John", "email": "JOHN@Example.com", "phone": "0891234567"},
    {"id": 2, "name": "Jane", "email": "jane@example.com", "phone": "0891234568"},
    {"id": 1, "name": "John", "email": "JOHN@Example.com", "phone": "0891234567"},
    {"id": 3, "name": None, "email": "bob@test.com", "phone": None},
]

profile = profiler.profile(sample_data)
print(f"Profile: {profile['row_count']} rows, {len(profile['columns'])} columns")
print(f"Nulls: {profile['null_counts']}")

transformer = DataTransformer()
result = transformer.transform(sample_data, ["deduplicate", "normalize_email", "hash_pii"])
print(f"\nTransformed: {result['row_count']} rows")
for op in result["operations_applied"]:
    print(f"  {op['operation']}: {op}")

Orchestration ????????? Error Handling

?????????????????? pipeline orchestration

# === Pipeline Orchestration ===

# 1. Airflow DAG with LangChain Agent
cat > dags/etl_agent_dag.py << 'PYEOF'
#!/usr/bin/env python3
"""Airflow DAG with LangChain ETL Agent"""
from datetime import datetime, timedelta
import json
import logging

logger = logging.getLogger("dag")

# DAG Configuration
DAG_CONFIG = {
    "dag_id": "langchain_etl_pipeline",
    "schedule": "0 2 * * *",  # Daily at 2 AM
    "start_date": "2024-01-01",
    "retries": 3,
    "retry_delay_minutes": 5,
}

class PipelineOrchestrator:
    def __init__(self):
        self.state = {}
    
    def extract(self, source_config):
        """Extract data from source"""
        logger.info(f"Extracting from {source_config['type']}")
        return {
            "status": "success",
            "rows_extracted": 10000,
            "source": source_config["type"],
            "timestamp": datetime.utcnow().isoformat(),
        }
    
    def transform_with_agent(self, data_meta, rules):
        """Use LangChain agent for intelligent transformation"""
        logger.info(f"Transforming {data_meta['rows_extracted']} rows")
        return {
            "status": "success",
            "rows_transformed": data_meta["rows_extracted"] - 50,
            "rows_rejected": 50,
            "transformations": rules,
        }
    
    def load(self, data_meta, target_config):
        """Load data to target"""
        logger.info(f"Loading to {target_config['type']}")
        return {
            "status": "success",
            "rows_loaded": data_meta["rows_transformed"],
            "target": target_config["type"],
        }
    
    def handle_error(self, error, context):
        """AI-assisted error handling"""
        error_handlers = {
            "connection_timeout": {
                "action": "retry",
                "max_retries": 3,
                "backoff": "exponential",
            },
            "schema_mismatch": {
                "action": "agent_remap",
                "description": "LangChain agent attempts schema remapping",
            },
            "data_quality_failure": {
                "action": "quarantine",
                "description": "Move failed records to quarantine table",
            },
            "disk_full": {
                "action": "alert_and_stop",
                "description": "Alert ops team, stop pipeline",
            },
        }
        
        handler = error_handlers.get(error, {"action": "alert", "description": "Unknown error"})
        return {"error": error, "handler": handler, "context": context}

orchestrator = PipelineOrchestrator()

# Run pipeline
extract_result = orchestrator.extract({"type": "postgresql", "table": "orders"})
print(f"Extract: {extract_result['rows_extracted']} rows")

transform_result = orchestrator.transform_with_agent(
    extract_result, ["clean", "aggregate", "enrich"]
)
print(f"Transform: {transform_result['rows_transformed']} rows ({transform_result['rows_rejected']} rejected)")

load_result = orchestrator.load(transform_result, {"type": "bigquery", "table": "fact_orders"})
print(f"Load: {load_result['rows_loaded']} rows to {load_result['target']}")

# Error handling demo
error = orchestrator.handle_error("schema_mismatch", {"table": "users", "column": "phone_number"})
print(f"\nError handling: {error['handler']['action']} ??? {error['handler']['description']}")
PYEOF

echo "Orchestration configured"

Monitoring ????????? Observability

?????????????????? pipeline performance

#!/usr/bin/env python3
# pipeline_monitor.py ??? ETL Pipeline Monitoring
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class PipelineMonitor:
    def __init__(self):
        self.metrics = {}
    
    def dashboard(self):
        return {
            "pipeline_health": {
                "total_pipelines": 15,
                "healthy": 13,
                "warning": 1,
                "failed": 1,
                "success_rate": 86.7,
            },
            "daily_stats": {
                "rows_processed": 2500000,
                "data_volume_gb": 45.2,
                "avg_latency_min": 12,
                "agent_decisions": 45,
                "auto_fixes": 3,
            },
            "top_pipelines": [
                {"name": "orders_sync", "rows": 500000, "duration": "8 min", "status": "success"},
                {"name": "user_profiles", "rows": 150000, "duration": "3 min", "status": "success"},
                {"name": "product_catalog", "rows": 50000, "duration": "2 min", "status": "success"},
                {"name": "clickstream", "rows": 1800000, "duration": "25 min", "status": "warning"},
            ],
            "agent_activity": {
                "schema_remappings": 2,
                "auto_retries": 5,
                "quality_checks": 15,
                "transformations_optimized": 3,
            },
            "cost_tracking": {
                "llm_api_calls": 450,
                "llm_cost_usd": 2.50,
                "compute_cost_usd": 15.00,
                "storage_cost_usd": 8.00,
                "total_daily_cost": 25.50,
            },
        }

monitor = PipelineMonitor()
data = monitor.dashboard()
print(f"Pipeline Health: {data['pipeline_health']['healthy']}/{data['pipeline_health']['total_pipelines']} healthy")
print(f"Daily: {data['daily_stats']['rows_processed']:,} rows, {data['daily_stats']['data_volume_gb']} GB")
print(f"Agent: {data['daily_stats']['agent_decisions']} decisions, {data['daily_stats']['auto_fixes']} auto-fixes")
print(f"Cost: /day (LLM: )")

FAQ ??????????????????????????????????????????

Q: LangChain Agent ???????????????????????? ETL ???????????????????????????????

A: ????????? LangChain Agent ???????????????????????? ETL ?????????????????????????????? intelligence ???????????? Dynamic schema mapping ??????????????? source schema ?????????????????????????????????, Intelligent error recovery ???????????????????????????????????????????????????, Natural language pipeline creation ??????????????? pipeline ???????????????????????????????????????????????????????????????, Data quality analysis ??????????????????????????? data anomalies ????????????????????????????????? Simple repetitive ETL ????????? logic ?????????????????? ????????? Airflow/dbt ????????????????????????????????????, High-volume streaming ????????????????????? low latency (????????? Kafka/Flink), Cost-sensitive environments LLM API ???????????????????????????????????? ??????????????? ????????? LangChain Agent ?????????????????? orchestration layer ??????????????????????????? row transformation

Q: ?????????????????????????????? LLM API ?????????????????? ETL ??????????????????????

A: ???????????????????????????????????????????????? ????????????????????????????????? LLM ????????? row ????????? LLM ?????????????????? planning, schema mapping, error handling ???????????????????????? ???????????????????????? Pipeline planning 1-5 calls/pipeline ($0.01-0.05), Schema mapping 1-3 calls/schema change ($0.01-0.03), Error handling 0-5 calls/day ($0-0.05), Data quality analysis 1-2 calls/pipeline ($0.01-0.02) ????????? $0.50-5.00/????????? ?????????????????? 10-20 pipelines ????????? gpt-4o-mini ($0.15/1M input tokens) ????????? gpt-4 ???????????????????????????????????? 10x Cache LLM responses ?????????????????? similar requests ??????????????? 50%

Q: LangChain ????????? LlamaIndex ?????????????????????????????????????????? ?????????????????? data pipeline?

A: LangChain ???????????? agent framework ????????????????????????????????? ????????????????????????????????? orchestration, tool usage, complex workflows ?????? agent types ???????????????????????? (ReAct, Plan-and-Execute) ecosystem ???????????? ??????????????? ETL pipeline orchestration LlamaIndex ???????????? data indexing ????????? retrieval ????????????????????????????????? RAG (Retrieval-Augmented Generation) ????????????????????????????????? search ????????? Q&A ?????? data ?????????????????? ETL ????????? LangChain ?????????????????? orchestration + LlamaIndex ?????????????????? data catalog/search ????????????????????????????????????

Q: ????????????????????? LLM hallucination ?????? ETL ??????????????????????

A: Hallucination ?????? ETL ????????????????????? ??????????????????????????????????????? data ????????????????????? ????????????????????????????????? ????????? temperature=0 ?????? randomness, Validate ????????? LLM output ???????????? execute (???????????? SQL syntax, schema validity), ????????? structured output (JSON mode) ?????????????????? free text, ??????????????? tools ????????? agent ???????????????????????????????????? (????????????????????????????????? arbitrary code), Data quality checks ????????????????????? step (Great Expectations), Human-in-the-loop ?????????????????? critical changes (schema migration, delete operations), Dry-run mode ??????????????????????????? execute ????????????, Audit log ??????????????????????????? LLM decision ?????????????????? review

📖 บทความที่เกี่ยวข้อง

LangChain Agent Home Lab Setupอ่านบทความ → React Server Components Data Pipeline ETLอ่านบทความ → LangChain Agent Micro-segmentationอ่านบทความ → LangChain Agent Container Orchestrationอ่านบทความ → LangChain Agent Low Code No Codeอ่านบทความ →

📚 ดูบทความทั้งหมด →