LangChain Agent ????????? Data Pipeline ?????????????????????
LangChain ???????????? framework ????????????????????????????????? applications ?????????????????? Large Language Models (LLMs) ????????? LangChain Agent ?????????????????????????????? LLM ???????????????????????????????????????????????? tools ??????????????? ????????????????????????????????????????????????????????? ????????????????????????????????????????????? Data Pipeline ETL ?????????????????????????????? intelligent data pipelines ??????????????????????????????????????????????????????????????????????????????????????????????????????
ETL (Extract, Transform, Load) ????????????????????????????????????????????????????????????????????? data engineering Extract ?????????????????????????????????????????????????????????????????? (databases, APIs, files), Transform ????????????????????????????????????????????????????????????????????????????????????????????????????????? (clean, aggregate, join), Load ?????????????????????????????????????????? data warehouse ???????????? data lake
?????????????????? LangChain Agent ????????? ETL ????????????????????????????????? AI ???????????????????????? transformation logic ????????? data characteristics, Natural language interface ???????????? pipeline ????????????????????????????????????????????????, Automated error recovery agent ??????????????????????????????????????????, Dynamic schema mapping ???????????? mapping ????????? schema changes, Intelligent data quality checks ?????????????????????????????????????????????????????????????????????????????????
????????????????????? LangChain ?????????????????? ETL
Setup LangChain ETL environment
# === LangChain ETL Setup ===
# 1. Install Dependencies
pip install langchain langchain-openai langchain-community
pip install pandas sqlalchemy psycopg2-binary
pip install apache-airflow dbt-core
pip install great-expectations
# 2. Environment Configuration
cat > .env << 'EOF'
OPENAI_API_KEY=sk-your-api-key-here
DATABASE_URL=postgresql://user:pass@localhost:5432/warehouse
REDIS_URL=redis://localhost:6379
EOF
# 3. LangChain ETL Agent Setup
cat > etl_agent_config.yaml << 'EOF'
agent:
model: "gpt-4o-mini"
temperature: 0
max_iterations: 10
tools:
- name: "sql_query"
description: "Execute SQL queries on source/target databases"
- name: "data_profiler"
description: "Profile data quality and statistics"
- name: "schema_mapper"
description: "Map source schema to target schema"
- name: "transform_data"
description: "Apply transformations (clean, aggregate, join)"
- name: "load_data"
description: "Load transformed data to target"
- name: "validate_data"
description: "Validate data quality with expectations"
sources:
postgres:
type: "postgresql"
host: "source-db.example.com"
port: 5432
database: "production"
api:
type: "rest_api"
base_url: "https://api.example.com/v1"
s3:
type: "s3"
bucket: "data-lake-raw"
targets:
warehouse:
type: "postgresql"
host: "warehouse.example.com"
database: "analytics"
bigquery:
type: "bigquery"
project: "analytics-prod"
dataset: "transformed"
EOF
# 4. Verify Setup
python3 -c "
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor
print('LangChain: OK')
import pandas as pd
print('Pandas: OK')
from sqlalchemy import create_engine
print('SQLAlchemy: OK')
print('ETL Agent environment ready')
"
echo "Setup complete"
??????????????? AI Agent ?????????????????? Data Pipeline
LangChain Agent ??????????????????????????? ETL pipeline
#!/usr/bin/env python3
# etl_agent.py ??? LangChain ETL Agent
import json
import logging
from typing import Dict, List, Any
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("etl_agent")
class ETLAgent:
"""AI-powered ETL Agent using LangChain patterns"""
def __init__(self):
self.tools = {}
self.pipeline_state = {}
def register_tool(self, name, func, description):
self.tools[name] = {"func": func, "description": description}
def plan_pipeline(self, user_request):
"""Agent plans ETL steps based on natural language request"""
# In production: LLM generates this plan
plan_templates = {
"sync_users": [
{"tool": "sql_query", "args": {"query": "SELECT * FROM users WHERE updated_at > :last_sync"}},
{"tool": "transform_data", "args": {"operations": ["deduplicate", "normalize_email", "hash_pii"]}},
{"tool": "validate_data", "args": {"checks": ["not_null:id", "unique:email", "format:phone"]}},
{"tool": "load_data", "args": {"target": "warehouse.dim_users", "mode": "upsert"}},
],
"aggregate_sales": [
{"tool": "sql_query", "args": {"query": "SELECT * FROM orders WHERE date >= :start_date"}},
{"tool": "transform_data", "args": {"operations": ["calculate_totals", "group_by_product", "add_metrics"]}},
{"tool": "validate_data", "args": {"checks": ["positive:amount", "not_null:product_id"]}},
{"tool": "load_data", "args": {"target": "warehouse.fact_sales", "mode": "append"}},
],
}
# Simple keyword matching (LLM does this in production)
for key, plan in plan_templates.items():
if key.replace("_", " ") in user_request.lower():
return {"request": user_request, "steps": plan, "estimated_time": "2-5 minutes"}
return {"request": user_request, "steps": [], "note": "LLM would generate custom plan"}
def execute_pipeline(self, plan):
"""Execute planned ETL pipeline"""
results = []
for i, step in enumerate(plan["steps"]):
logger.info(f"Step {i+1}/{len(plan['steps'])}: {step['tool']}")
result = {
"step": i + 1,
"tool": step["tool"],
"status": "success",
"rows_processed": 1000 * (i + 1),
"duration_sec": 5 + i * 3,
}
# Simulate validation failures
if step["tool"] == "validate_data":
result["validation"] = {
"passed": 985,
"failed": 15,
"pass_rate": 98.5,
}
results.append(result)
return {
"pipeline": plan["request"],
"status": "completed",
"steps_completed": len(results),
"total_rows": results[-1]["rows_processed"] if results else 0,
"total_duration": sum(r["duration_sec"] for r in results),
"results": results,
}
agent = ETLAgent()
# Plan pipeline from natural language
plan = agent.plan_pipeline("sync users from production to warehouse")
print(f"Pipeline: {plan['request']}")
print(f"Steps: {len(plan['steps'])}")
for step in plan["steps"]:
print(f" {step['tool']}: {json.dumps(step['args'], default=str)[:60]}...")
# Execute
result = agent.execute_pipeline(plan)
print(f"\nStatus: {result['status']}")
print(f"Rows: {result['total_rows']:,}, Duration: {result['total_duration']}s")
ETL Automation ???????????? LangChain Tools
Custom tools ?????????????????? ETL operations
#!/usr/bin/env python3
# etl_tools.py ??? LangChain ETL Tools
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("tools")
class DataProfiler:
"""Profile data quality and statistics"""
def profile(self, data_sample):
"""Profile a dataset"""
return {
"row_count": len(data_sample),
"columns": list(data_sample[0].keys()) if data_sample else [],
"null_counts": {col: sum(1 for row in data_sample if row.get(col) is None) for col in (data_sample[0].keys() if data_sample else [])},
"unique_counts": {col: len(set(row.get(col) for row in data_sample)) for col in (data_sample[0].keys() if data_sample else [])},
}
class SchemaMapper:
"""Map source schema to target schema"""
def auto_map(self, source_schema, target_schema):
"""AI-assisted schema mapping"""
mappings = []
for s_col in source_schema:
best_match = None
for t_col in target_schema:
if s_col["name"].lower() == t_col["name"].lower():
best_match = t_col["name"]
break
if s_col["name"].lower().replace("_", "") == t_col["name"].lower().replace("_", ""):
best_match = t_col["name"]
mappings.append({
"source": s_col["name"],
"target": best_match,
"transform": "direct" if best_match else "unmapped",
})
return mappings
class DataTransformer:
"""Transform data with various operations"""
def transform(self, data, operations):
"""Apply transformation operations"""
result = data.copy()
applied = []
for op in operations:
if op == "deduplicate":
seen = set()
deduped = []
for row in result:
key = json.dumps(row, sort_keys=True)
if key not in seen:
seen.add(key)
deduped.append(row)
removed = len(result) - len(deduped)
result = deduped
applied.append({"operation": op, "removed": removed})
elif op == "normalize_email":
for row in result:
if "email" in row and row["email"]:
row["email"] = row["email"].lower().strip()
applied.append({"operation": op, "affected": len(result)})
elif op == "hash_pii":
import hashlib
for row in result:
for field in ["ssn", "national_id", "phone"]:
if field in row and row[field]:
row[field] = hashlib.sha256(str(row[field]).encode()).hexdigest()[:16]
applied.append({"operation": op, "fields_hashed": ["ssn", "national_id", "phone"]})
return {"data": result, "operations_applied": applied, "row_count": len(result)}
# Demo
profiler = DataProfiler()
sample_data = [
{"id": 1, "name": "John", "email": "JOHN@Example.com", "phone": "0891234567"},
{"id": 2, "name": "Jane", "email": "jane@example.com", "phone": "0891234568"},
{"id": 1, "name": "John", "email": "JOHN@Example.com", "phone": "0891234567"},
{"id": 3, "name": None, "email": "bob@test.com", "phone": None},
]
profile = profiler.profile(sample_data)
print(f"Profile: {profile['row_count']} rows, {len(profile['columns'])} columns")
print(f"Nulls: {profile['null_counts']}")
transformer = DataTransformer()
result = transformer.transform(sample_data, ["deduplicate", "normalize_email", "hash_pii"])
print(f"\nTransformed: {result['row_count']} rows")
for op in result["operations_applied"]:
print(f" {op['operation']}: {op}")
Orchestration ????????? Error Handling
?????????????????? pipeline orchestration
# === Pipeline Orchestration ===
# 1. Airflow DAG with LangChain Agent
cat > dags/etl_agent_dag.py << 'PYEOF'
#!/usr/bin/env python3
"""Airflow DAG with LangChain ETL Agent"""
from datetime import datetime, timedelta
import json
import logging
logger = logging.getLogger("dag")
# DAG Configuration
DAG_CONFIG = {
"dag_id": "langchain_etl_pipeline",
"schedule": "0 2 * * *", # Daily at 2 AM
"start_date": "2024-01-01",
"retries": 3,
"retry_delay_minutes": 5,
}
class PipelineOrchestrator:
def __init__(self):
self.state = {}
def extract(self, source_config):
"""Extract data from source"""
logger.info(f"Extracting from {source_config['type']}")
return {
"status": "success",
"rows_extracted": 10000,
"source": source_config["type"],
"timestamp": datetime.utcnow().isoformat(),
}
def transform_with_agent(self, data_meta, rules):
"""Use LangChain agent for intelligent transformation"""
logger.info(f"Transforming {data_meta['rows_extracted']} rows")
return {
"status": "success",
"rows_transformed": data_meta["rows_extracted"] - 50,
"rows_rejected": 50,
"transformations": rules,
}
def load(self, data_meta, target_config):
"""Load data to target"""
logger.info(f"Loading to {target_config['type']}")
return {
"status": "success",
"rows_loaded": data_meta["rows_transformed"],
"target": target_config["type"],
}
def handle_error(self, error, context):
"""AI-assisted error handling"""
error_handlers = {
"connection_timeout": {
"action": "retry",
"max_retries": 3,
"backoff": "exponential",
},
"schema_mismatch": {
"action": "agent_remap",
"description": "LangChain agent attempts schema remapping",
},
"data_quality_failure": {
"action": "quarantine",
"description": "Move failed records to quarantine table",
},
"disk_full": {
"action": "alert_and_stop",
"description": "Alert ops team, stop pipeline",
},
}
handler = error_handlers.get(error, {"action": "alert", "description": "Unknown error"})
return {"error": error, "handler": handler, "context": context}
orchestrator = PipelineOrchestrator()
# Run pipeline
extract_result = orchestrator.extract({"type": "postgresql", "table": "orders"})
print(f"Extract: {extract_result['rows_extracted']} rows")
transform_result = orchestrator.transform_with_agent(
extract_result, ["clean", "aggregate", "enrich"]
)
print(f"Transform: {transform_result['rows_transformed']} rows ({transform_result['rows_rejected']} rejected)")
load_result = orchestrator.load(transform_result, {"type": "bigquery", "table": "fact_orders"})
print(f"Load: {load_result['rows_loaded']} rows to {load_result['target']}")
# Error handling demo
error = orchestrator.handle_error("schema_mismatch", {"table": "users", "column": "phone_number"})
print(f"\nError handling: {error['handler']['action']} ??? {error['handler']['description']}")
PYEOF
echo "Orchestration configured"
Monitoring ????????? Observability
?????????????????? pipeline performance
#!/usr/bin/env python3
# pipeline_monitor.py ??? ETL Pipeline Monitoring
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")
class PipelineMonitor:
def __init__(self):
self.metrics = {}
def dashboard(self):
return {
"pipeline_health": {
"total_pipelines": 15,
"healthy": 13,
"warning": 1,
"failed": 1,
"success_rate": 86.7,
},
"daily_stats": {
"rows_processed": 2500000,
"data_volume_gb": 45.2,
"avg_latency_min": 12,
"agent_decisions": 45,
"auto_fixes": 3,
},
"top_pipelines": [
{"name": "orders_sync", "rows": 500000, "duration": "8 min", "status": "success"},
{"name": "user_profiles", "rows": 150000, "duration": "3 min", "status": "success"},
{"name": "product_catalog", "rows": 50000, "duration": "2 min", "status": "success"},
{"name": "clickstream", "rows": 1800000, "duration": "25 min", "status": "warning"},
],
"agent_activity": {
"schema_remappings": 2,
"auto_retries": 5,
"quality_checks": 15,
"transformations_optimized": 3,
},
"cost_tracking": {
"llm_api_calls": 450,
"llm_cost_usd": 2.50,
"compute_cost_usd": 15.00,
"storage_cost_usd": 8.00,
"total_daily_cost": 25.50,
},
}
monitor = PipelineMonitor()
data = monitor.dashboard()
print(f"Pipeline Health: {data['pipeline_health']['healthy']}/{data['pipeline_health']['total_pipelines']} healthy")
print(f"Daily: {data['daily_stats']['rows_processed']:,} rows, {data['daily_stats']['data_volume_gb']} GB")
print(f"Agent: {data['daily_stats']['agent_decisions']} decisions, {data['daily_stats']['auto_fixes']} auto-fixes")
print(f"Cost: /day (LLM: )")
FAQ ??????????????????????????????????????????
Q: LangChain Agent ???????????????????????? ETL ???????????????????????????????
A: ????????? LangChain Agent ???????????????????????? ETL ?????????????????????????????? intelligence ???????????? Dynamic schema mapping ??????????????? source schema ?????????????????????????????????, Intelligent error recovery ???????????????????????????????????????????????????, Natural language pipeline creation ??????????????? pipeline ???????????????????????????????????????????????????????????????, Data quality analysis ??????????????????????????? data anomalies ????????????????????????????????? Simple repetitive ETL ????????? logic ?????????????????? ????????? Airflow/dbt ????????????????????????????????????, High-volume streaming ????????????????????? low latency (????????? Kafka/Flink), Cost-sensitive environments LLM API ???????????????????????????????????? ??????????????? ????????? LangChain Agent ?????????????????? orchestration layer ??????????????????????????? row transformation
Q: ?????????????????????????????? LLM API ?????????????????? ETL ??????????????????????
A: ???????????????????????????????????????????????? ????????????????????????????????? LLM ????????? row ????????? LLM ?????????????????? planning, schema mapping, error handling ???????????????????????? ???????????????????????? Pipeline planning 1-5 calls/pipeline ($0.01-0.05), Schema mapping 1-3 calls/schema change ($0.01-0.03), Error handling 0-5 calls/day ($0-0.05), Data quality analysis 1-2 calls/pipeline ($0.01-0.02) ????????? $0.50-5.00/????????? ?????????????????? 10-20 pipelines ????????? gpt-4o-mini ($0.15/1M input tokens) ????????? gpt-4 ???????????????????????????????????? 10x Cache LLM responses ?????????????????? similar requests ??????????????? 50%
Q: LangChain ????????? LlamaIndex ?????????????????????????????????????????? ?????????????????? data pipeline?
A: LangChain ???????????? agent framework ????????????????????????????????? ????????????????????????????????? orchestration, tool usage, complex workflows ?????? agent types ???????????????????????? (ReAct, Plan-and-Execute) ecosystem ???????????? ??????????????? ETL pipeline orchestration LlamaIndex ???????????? data indexing ????????? retrieval ????????????????????????????????? RAG (Retrieval-Augmented Generation) ????????????????????????????????? search ????????? Q&A ?????? data ?????????????????? ETL ????????? LangChain ?????????????????? orchestration + LlamaIndex ?????????????????? data catalog/search ????????????????????????????????????
Q: ????????????????????? LLM hallucination ?????? ETL ??????????????????????
A: Hallucination ?????? ETL ????????????????????? ??????????????????????????????????????? data ????????????????????? ????????????????????????????????? ????????? temperature=0 ?????? randomness, Validate ????????? LLM output ???????????? execute (???????????? SQL syntax, schema validity), ????????? structured output (JSON mode) ?????????????????? free text, ??????????????? tools ????????? agent ???????????????????????????????????? (????????????????????????????????? arbitrary code), Data quality checks ????????????????????? step (Great Expectations), Human-in-the-loop ?????????????????? critical changes (schema migration, delete operations), Dry-run mode ??????????????????????????? execute ????????????, Audit log ??????????????????????????? LLM decision ?????????????????? review
