LangChain Pub/Sub Architecture
LangChain Agent Pub Sub Architecture Event-Driven Multi-Agent Communication Kafka Redis RabbitMQ Async Processing Scale Production
| Broker | Persistence | Throughput | Complexity | Cost | Best For |
|---|---|---|---|---|---|
| Redis Pub/Sub | ไม่มี | สูง | ต่ำ | ต่ำ | Prototype, simple |
| Kafka | มี (disk) | สูงมาก | สูง | กลาง-สูง | Production, replay |
| RabbitMQ | มี (optional) | กลาง | กลาง | กลาง | Task queue, routing |
| Google Pub/Sub | มี | สูง | ต่ำ (managed) | ตามใช้งาน | GCP, serverless |
| NATS | มี (JetStream) | สูงมาก | ต่ำ | ต่ำ | Microservices, K8s |
Agent Design
# === Multi-Agent Pub/Sub Design ===
from dataclasses import dataclass
# Agent types
@dataclass
class AgentConfig:
name: str
role: str
subscribes: str
publishes: str
llm: str
tools: str
agents = [
AgentConfig("Router Agent", "รับ Request กระจายงานไป Agent ที่เหมาะสม",
"incoming_requests",
"research_tasks, analysis_tasks, writing_tasks",
"GPT-4o-mini (fast, cheap)",
"None (routing only)"),
AgentConfig("Research Agent", "ค้นหาข้อมูลจาก Web, DB, API",
"research_tasks",
"research_results, error_events",
"GPT-4o",
"Web Search, SQL Query, API Call"),
AgentConfig("Analysis Agent", "วิเคราะห์ข้อมูลที่ได้จาก Research",
"research_results",
"analysis_results, error_events",
"GPT-4o",
"Python REPL, Data Analysis"),
AgentConfig("Writer Agent", "สร้างเนื้อหาจากผลวิเคราะห์",
"analysis_results, writing_tasks",
"draft_content, error_events",
"GPT-4o / Claude",
"None (generation only)"),
AgentConfig("Review Agent", "ตรวจสอบคุณภาพ Content",
"draft_content",
"final_output, revision_requests",
"GPT-4o",
"Fact Check, Grammar Check"),
]
print("=== Agent Configuration ===")
for a in agents:
print(f" [{a.name}] {a.role}")
print(f" Sub: {a.subscribes}")
print(f" Pub: {a.publishes}")
print(f" LLM: {a.llm} | Tools: {a.tools}")
# Message schema
# {
# "message_id": "uuid",
# "timestamp": "2024-01-15T10:30:00Z",
# "source_agent": "research_agent",
# "topic": "research_results",
# "payload": { "query": "...", "results": [...] },
# "metadata": { "trace_id": "xxx", "retry_count": 0 }
# }
Implementation
# === Redis Pub/Sub with LangChain ===
# import redis
# import json
# from langchain.agents import AgentExecutor
# from langchain_openai import ChatOpenAI
#
# r = redis.Redis(host='localhost', port=6379)
# pubsub = r.pubsub()
#
# # Publisher
# def publish_event(topic, payload, source_agent):
# message = {
# "message_id": str(uuid.uuid4()),
# "timestamp": datetime.utcnow().isoformat(),
# "source_agent": source_agent,
# "topic": topic,
# "payload": payload,
# }
# r.publish(topic, json.dumps(message))
#
# # Subscriber (Research Agent)
# class ResearchAgent:
# def __init__(self):
# self.llm = ChatOpenAI(model="gpt-4o")
# self.tools = [web_search, sql_query]
# self.agent = AgentExecutor(...)
# pubsub.subscribe('research_tasks')
#
# def listen(self):
# for message in pubsub.listen():
# if message['type'] == 'message':
# data = json.loads(message['data'])
# result = self.process(data['payload'])
# publish_event('research_results', result, 'research_agent')
#
# def process(self, payload):
# return self.agent.invoke({"input": payload["query"]})
# Kafka version
# from confluent_kafka import Producer, Consumer
#
# producer = Producer({'bootstrap.servers': 'localhost:9092'})
# consumer = Consumer({
# 'bootstrap.servers': 'localhost:9092',
# 'group.id': 'research-agent-group',
# 'auto.offset.reset': 'earliest',
# })
# consumer.subscribe(['research_tasks'])
#
# while True:
# msg = consumer.poll(1.0)
# if msg and not msg.error():
# data = json.loads(msg.value())
# result = agent.invoke(data['payload'])
# producer.produce('research_results', json.dumps(result))
# producer.flush()
@dataclass
class TopicConfig:
topic: str
producers: str
consumers: str
retention: str
partitions: int
topics = [
TopicConfig("incoming_requests", "API Gateway",
"Router Agent", "7 days", 4),
TopicConfig("research_tasks", "Router Agent",
"Research Agent (consumer group, 3 instances)", "3 days", 6),
TopicConfig("research_results", "Research Agent",
"Analysis Agent", "3 days", 4),
TopicConfig("analysis_results", "Analysis Agent",
"Writer Agent", "3 days", 4),
TopicConfig("draft_content", "Writer Agent",
"Review Agent", "3 days", 2),
TopicConfig("final_output", "Review Agent",
"API Gateway / Storage", "30 days", 2),
TopicConfig("error_events", "All Agents",
"Alert System / Dead Letter Processor", "30 days", 2),
]
print("\n=== Topic Configuration ===")
for t in topics:
print(f" [{t.topic}] Partitions: {t.partitions} | Retention: {t.retention}")
print(f" Producers: {t.producers}")
print(f" Consumers: {t.consumers}")
Production Operations
# === Production Checklist ===
@dataclass
class ProdCheck:
category: str
item: str
implementation: str
importance: str
checks = [
ProdCheck("Reliability", "Dead Letter Queue",
"Failed messages → DLQ after 3 retries, manual review",
"Critical"),
ProdCheck("Reliability", "Idempotency",
"Dedup by message_id, process each message exactly once",
"Critical"),
ProdCheck("Reliability", "Circuit Breaker",
"Cut agent if error rate > 50%, auto-recover after cooldown",
"High"),
ProdCheck("Observability", "Distributed Tracing",
"trace_id in every message, Jaeger/Zipkin for visualization",
"High"),
ProdCheck("Observability", "Consumer Lag",
"Monitor lag per consumer group, alert if > 1000 messages",
"High"),
ProdCheck("Performance", "Rate Limiting",
"Token bucket per agent, respect LLM API limits",
"Critical"),
ProdCheck("Performance", "Batch Processing",
"Collect messages, process in batch for embedding/analysis",
"Medium"),
ProdCheck("Cost", "LLM Token Tracking",
"Track tokens per agent per topic, set budget alerts",
"High"),
ProdCheck("Security", "Message Encryption",
"TLS in transit, encrypt sensitive payload fields",
"High"),
]
print("=== Production Checklist ===")
for c in checks:
print(f" [{c.importance}] {c.category}: {c.item}")
print(f" How: {c.implementation}")
เคล็ดลับ
- DLQ: ใส่ Dead Letter Queue ทุก Topic จัดการ Message ที่ Fail
- Idempotent: ออกแบบ Agent ให้ Idempotent ประมวลผลซ้ำได้ไม่มีปัญหา
- Trace: ใส่ trace_id ทุก Message ติดตามจาก Request จนถึง Response
- Cost: Track Token ต่อ Agent ป้องกัน Cost Explosion
- Simple: เริ่มจาก Redis Pub/Sub ค่อยย้ายไป Kafka เมื่อ Scale จริง
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
LangChain Agent Pub/Sub คืออะไร
AI Agent Publish Subscribe Event-Driven Topic Message Broker Redis Kafka RabbitMQ Loose Coupling Scale Async
ออกแบบ Architecture อย่างไร
Agent Responsibility Router Research Analysis Writer Review Topics task_queue result_queue error_queue Dead Letter Schema Validation
ใช้ Message Broker อะไร
Redis Pub/Sub ง่าย Prototype Kafka Production Persistence RabbitMQ Task Queue Google Pub/Sub Managed NATS Microservices
Scale อย่างไร
Container แยก Consumer Group Auto-scaling Queue Depth Rate Limiting LLM API Circuit Breaker Retry Backoff Monitor Lag Producer Consumer
สรุป
LangChain Agent Pub Sub Architecture Event-Driven Multi-Agent Kafka Redis RabbitMQ Async Scale DLQ Tracing Rate Limiting Production
