SiamCafe.net Blog
Technology

LangChain Agent Pub Sub Architecture

langchain agent pub sub architecture
LangChain Agent Pub Sub Architecture | SiamCafe Blog
2025-11-13· อ. บอม — SiamCafe.net· 11,868 คำ

LangChain Pub/Sub Architecture

LangChain Agent Pub Sub Architecture Event-Driven Multi-Agent Communication Kafka Redis RabbitMQ Async Processing Scale Production

BrokerPersistenceThroughputComplexityCostBest For
Redis Pub/Subไม่มีสูงต่ำต่ำPrototype, simple
Kafkaมี (disk)สูงมากสูงกลาง-สูงProduction, replay
RabbitMQมี (optional)กลางกลางกลางTask queue, routing
Google Pub/Subมีสูงต่ำ (managed)ตามใช้งานGCP, serverless
NATSมี (JetStream)สูงมากต่ำต่ำMicroservices, K8s

Agent Design

# === Multi-Agent Pub/Sub Design ===

from dataclasses import dataclass

# Agent types
@dataclass
class AgentConfig:
    name: str
    role: str
    subscribes: str
    publishes: str
    llm: str
    tools: str

agents = [
    AgentConfig("Router Agent", "รับ Request กระจายงานไป Agent ที่เหมาะสม",
        "incoming_requests",
        "research_tasks, analysis_tasks, writing_tasks",
        "GPT-4o-mini (fast, cheap)",
        "None (routing only)"),
    AgentConfig("Research Agent", "ค้นหาข้อมูลจาก Web, DB, API",
        "research_tasks",
        "research_results, error_events",
        "GPT-4o",
        "Web Search, SQL Query, API Call"),
    AgentConfig("Analysis Agent", "วิเคราะห์ข้อมูลที่ได้จาก Research",
        "research_results",
        "analysis_results, error_events",
        "GPT-4o",
        "Python REPL, Data Analysis"),
    AgentConfig("Writer Agent", "สร้างเนื้อหาจากผลวิเคราะห์",
        "analysis_results, writing_tasks",
        "draft_content, error_events",
        "GPT-4o / Claude",
        "None (generation only)"),
    AgentConfig("Review Agent", "ตรวจสอบคุณภาพ Content",
        "draft_content",
        "final_output, revision_requests",
        "GPT-4o",
        "Fact Check, Grammar Check"),
]

print("=== Agent Configuration ===")
for a in agents:
    print(f"  [{a.name}] {a.role}")
    print(f"    Sub: {a.subscribes}")
    print(f"    Pub: {a.publishes}")
    print(f"    LLM: {a.llm} | Tools: {a.tools}")

# Message schema
# {
#   "message_id": "uuid",
#   "timestamp": "2024-01-15T10:30:00Z",
#   "source_agent": "research_agent",
#   "topic": "research_results",
#   "payload": { "query": "...", "results": [...] },
#   "metadata": { "trace_id": "xxx", "retry_count": 0 }
# }

Implementation

# === Redis Pub/Sub with LangChain ===

# import redis
# import json
# from langchain.agents import AgentExecutor
# from langchain_openai import ChatOpenAI
#
# r = redis.Redis(host='localhost', port=6379)
# pubsub = r.pubsub()
#
# # Publisher
# def publish_event(topic, payload, source_agent):
#     message = {
#         "message_id": str(uuid.uuid4()),
#         "timestamp": datetime.utcnow().isoformat(),
#         "source_agent": source_agent,
#         "topic": topic,
#         "payload": payload,
#     }
#     r.publish(topic, json.dumps(message))
#
# # Subscriber (Research Agent)
# class ResearchAgent:
#     def __init__(self):
#         self.llm = ChatOpenAI(model="gpt-4o")
#         self.tools = [web_search, sql_query]
#         self.agent = AgentExecutor(...)
#         pubsub.subscribe('research_tasks')
#
#     def listen(self):
#         for message in pubsub.listen():
#             if message['type'] == 'message':
#                 data = json.loads(message['data'])
#                 result = self.process(data['payload'])
#                 publish_event('research_results', result, 'research_agent')
#
#     def process(self, payload):
#         return self.agent.invoke({"input": payload["query"]})

# Kafka version
# from confluent_kafka import Producer, Consumer
#
# producer = Producer({'bootstrap.servers': 'localhost:9092'})
# consumer = Consumer({
#     'bootstrap.servers': 'localhost:9092',
#     'group.id': 'research-agent-group',
#     'auto.offset.reset': 'earliest',
# })
# consumer.subscribe(['research_tasks'])
#
# while True:
#     msg = consumer.poll(1.0)
#     if msg and not msg.error():
#         data = json.loads(msg.value())
#         result = agent.invoke(data['payload'])
#         producer.produce('research_results', json.dumps(result))
#         producer.flush()

@dataclass
class TopicConfig:
    topic: str
    producers: str
    consumers: str
    retention: str
    partitions: int

topics = [
    TopicConfig("incoming_requests", "API Gateway",
        "Router Agent", "7 days", 4),
    TopicConfig("research_tasks", "Router Agent",
        "Research Agent (consumer group, 3 instances)", "3 days", 6),
    TopicConfig("research_results", "Research Agent",
        "Analysis Agent", "3 days", 4),
    TopicConfig("analysis_results", "Analysis Agent",
        "Writer Agent", "3 days", 4),
    TopicConfig("draft_content", "Writer Agent",
        "Review Agent", "3 days", 2),
    TopicConfig("final_output", "Review Agent",
        "API Gateway / Storage", "30 days", 2),
    TopicConfig("error_events", "All Agents",
        "Alert System / Dead Letter Processor", "30 days", 2),
]

print("\n=== Topic Configuration ===")
for t in topics:
    print(f"  [{t.topic}] Partitions: {t.partitions} | Retention: {t.retention}")
    print(f"    Producers: {t.producers}")
    print(f"    Consumers: {t.consumers}")

Production Operations

# === Production Checklist ===

@dataclass
class ProdCheck:
    category: str
    item: str
    implementation: str
    importance: str

checks = [
    ProdCheck("Reliability", "Dead Letter Queue",
        "Failed messages → DLQ after 3 retries, manual review",
        "Critical"),
    ProdCheck("Reliability", "Idempotency",
        "Dedup by message_id, process each message exactly once",
        "Critical"),
    ProdCheck("Reliability", "Circuit Breaker",
        "Cut agent if error rate > 50%, auto-recover after cooldown",
        "High"),
    ProdCheck("Observability", "Distributed Tracing",
        "trace_id in every message, Jaeger/Zipkin for visualization",
        "High"),
    ProdCheck("Observability", "Consumer Lag",
        "Monitor lag per consumer group, alert if > 1000 messages",
        "High"),
    ProdCheck("Performance", "Rate Limiting",
        "Token bucket per agent, respect LLM API limits",
        "Critical"),
    ProdCheck("Performance", "Batch Processing",
        "Collect messages, process in batch for embedding/analysis",
        "Medium"),
    ProdCheck("Cost", "LLM Token Tracking",
        "Track tokens per agent per topic, set budget alerts",
        "High"),
    ProdCheck("Security", "Message Encryption",
        "TLS in transit, encrypt sensitive payload fields",
        "High"),
]

print("=== Production Checklist ===")
for c in checks:
    print(f"  [{c.importance}] {c.category}: {c.item}")
    print(f"    How: {c.implementation}")

เคล็ดลับ

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

LangChain Agent Pub/Sub คืออะไร

AI Agent Publish Subscribe Event-Driven Topic Message Broker Redis Kafka RabbitMQ Loose Coupling Scale Async

ออกแบบ Architecture อย่างไร

Agent Responsibility Router Research Analysis Writer Review Topics task_queue result_queue error_queue Dead Letter Schema Validation

ใช้ Message Broker อะไร

Redis Pub/Sub ง่าย Prototype Kafka Production Persistence RabbitMQ Task Queue Google Pub/Sub Managed NATS Microservices

Scale อย่างไร

Container แยก Consumer Group Auto-scaling Queue Depth Rate Limiting LLM API Circuit Breaker Retry Backoff Monitor Lag Producer Consumer

สรุป

LangChain Agent Pub Sub Architecture Event-Driven Multi-Agent Kafka Redis RabbitMQ Async Scale DLQ Tracing Rate Limiting Production

📖 บทความที่เกี่ยวข้อง

LangChain Agent Home Lab Setupอ่านบทความ → LangChain Agent Disaster Recovery Planอ่านบทความ → LangChain Agent Container Orchestrationอ่านบทความ → LangChain Agent Incident Managementอ่านบทความ → LangChain Agent Database Migrationอ่านบทความ →

📚 ดูบทความทั้งหมด →