SiamCafe.net Blog
Technology

LangChain Agent Event Driven Design — สร้าง AI Agent ที่ตอบสนองต่อ Events

langchain agent event driven design
LangChain Agent Event Driven Design | SiamCafe Blog
2026-03-25· อ. บอม — SiamCafe.net· 1,551 คำ

LangChain Agent คืออะไรและ Event-Driven Design

LangChain เป็น framework สำหรับสร้าง applications ที่ใช้ Large Language Models (LLMs) Agent ใน LangChain คือ system ที่ใช้ LLM เป็น reasoning engine เพื่อตัดสินใจว่าจะทำอะไรต่อไป เลือกใช้ tools ไหน และประมวลผลข้อมูลอย่างไร Agent สามารถ chain actions หลายขั้นตอนเพื่อแก้ปัญหาที่ซับซ้อนได้

Event-Driven Design เป็น architectural pattern ที่ระบบตอบสนองต่อ events แทนที่จะทำงานแบบ sequential เมื่อรวมกับ LangChain Agent ได้ระบบที่ Agent ถูก trigger โดย events เช่น ข้อความใหม่จากผู้ใช้ ข้อมูลเข้ามาใน queue การแจ้งเตือนจาก monitoring system หรือ scheduled tasks

ข้อดีของ Event-Driven Agent Architecture คือ Scalability ที่รองรับ requests จำนวนมากโดย decouple processing Resilience ที่ถ้า Agent ล้มเหลว event ยังอยู่ใน queue รอ retry Flexibility ที่เพิ่ม Agent ใหม่ได้โดยไม่กระทบ Agent เดิม และ Observability ที่ track ทุก event และ Agent action ได้

Use cases ที่เหมาะกับ Event-Driven Agent เช่น Customer support chatbot ที่ตอบจาก ticket queue, Document processing pipeline ที่ process documents เมื่อ upload, Monitoring agent ที่ตอบสนองต่อ alerts และ Data enrichment pipeline ที่ enrich data เมื่อมีข้อมูลใหม่เข้ามา

ติดตั้ง LangChain และสร้าง Agent แรก

ขั้นตอนการติดตั้งและสร้าง basic agent

# ติดตั้ง LangChain
pip install langchain langchain-openai langchain-community
pip install langgraph langsmith
pip install redis celery fastapi uvicorn

# หรือใช้ Ollama สำหรับ local LLM
pip install langchain-ollama

# ตรวจสอบ
python -c "import langchain; print(langchain.__version__)"

# === Basic Agent ===
from langchain_openai import ChatOpenAI
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import tool
from langchain import hub
import os

# ใช้ Ollama แทน OpenAI (local, ฟรี)
from langchain_ollama import ChatOllama
llm = ChatOllama(model="llama3:8b", temperature=0)

# หรือใช้ OpenAI
# llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

# สร้าง Tools
@tool
def search_database(query: str) -> str:
    """Search the product database for information."""
    # simulate database search
    products = {
        "laptop": {"name": "ThinkPad X1", "price": 45000, "stock": 15},
        "mouse": {"name": "MX Master 3S", "price": 3500, "stock": 50},
        "keyboard": {"name": "HHKB Pro 3", "price": 9800, "stock": 8},
    }
    for key, product in products.items():
        if key in query.lower():
            return f"Found: {product['name']}, Price: {product['price']} THB, Stock: {product['stock']}"
    return "No products found matching your query."

@tool
def calculate_discount(price: float, discount_percent: float) -> str:
    """Calculate discounted price."""
    discounted = price * (1 - discount_percent / 100)
    savings = price - discounted
    return f"Original: {price} THB, Discount: {discount_percent}%, Final: {discounted:.0f} THB, Savings: {savings:.0f} THB"

@tool
def check_order_status(order_id: str) -> str:
    """Check the status of an order by order ID."""
    statuses = {
        "ORD-001": "Shipped - Tracking: TH123456",
        "ORD-002": "Processing - Expected ship date: tomorrow",
        "ORD-003": "Delivered - Signed by: John",
    }
    return statuses.get(order_id, f"Order {order_id} not found")

# สร้าง Agent
tools = [search_database, calculate_discount, check_order_status]
prompt = hub.pull("hwchase17/react")

agent = create_react_agent(llm, tools, prompt)
agent_executor = AgentExecutor(
    agent=agent, tools=tools,
    verbose=True, max_iterations=5,
    handle_parsing_errors=True,
)

# ใช้งาน
result = agent_executor.invoke({"input": "ราคา laptop เท่าไหร่ ถ้าลด 10% จะเหลือเท่าไหร่"})
print(result["output"])

สร้าง Event-Driven Agent Architecture

ออกแบบระบบ Agent ที่ทำงานตาม events

#!/usr/bin/env python3
# event_agent.py — Event-Driven Agent System
import json
import redis
import logging
from datetime import datetime
from typing import Dict, Any, Optional
from dataclasses import dataclass, asdict
from langchain_ollama import ChatOllama
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import tool
from langchain import hub

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("event_agent")

@dataclass
class Event:
    event_id: str
    event_type: str
    payload: Dict[str, Any]
    timestamp: str = ""
    source: str = ""
    
    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.utcnow().isoformat()

class EventBus:
    def __init__(self, redis_url="redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
    
    def publish(self, channel: str, event: Event):
        message = json.dumps(asdict(event))
        self.redis.publish(f"agents:{channel}", message)
        self.redis.lpush(f"events_log:{channel}", message)
        self.redis.ltrim(f"events_log:{channel}", 0, 999)
        logger.info(f"Published event {event.event_id} to {channel}")
    
    def subscribe(self, channels):
        pubsub = self.redis.pubsub()
        pubsub.psubscribe([f"agents:{c}" for c in channels])
        return pubsub
    
    def push_task(self, queue: str, event: Event):
        self.redis.rpush(f"task_queue:{queue}", json.dumps(asdict(event)))
    
    def pop_task(self, queue: str, timeout: int = 5):
        result = self.redis.blpop(f"task_queue:{queue}", timeout=timeout)
        if result:
            return Event(**json.loads(result[1]))
        return None

class EventDrivenAgent:
    def __init__(self, agent_name: str, event_bus: EventBus):
        self.name = agent_name
        self.bus = event_bus
        self.llm = ChatOllama(model="llama3:8b", temperature=0)
        self.tools = self._create_tools()
        self.agent = self._create_agent()
        self.handlers = {}
    
    def _create_tools(self):
        @tool
        def emit_event(event_type: str, data: str) -> str:
            """Emit a new event to the event bus. Data should be JSON string."""
            try:
                payload = json.loads(data)
            except json.JSONDecodeError:
                payload = {"message": data}
            
            event = Event(
                event_id=f"evt_{datetime.utcnow().timestamp()}",
                event_type=event_type,
                payload=payload,
                source=self.name,
            )
            self.bus.publish(event_type, event)
            return f"Event emitted: {event_type}"
        
        @tool
        def query_knowledge_base(question: str) -> str:
            """Query the knowledge base for information."""
            kb = {
                "refund_policy": "Refunds within 30 days with receipt. Process takes 5-7 business days.",
                "shipping": "Free shipping over 1000 THB. Standard delivery 3-5 days.",
                "warranty": "1 year warranty on all electronics. Extended warranty available.",
            }
            for key, value in kb.items():
                if any(word in question.lower() for word in key.split("_")):
                    return value
            return "No relevant information found."
        
        @tool
        def update_ticket(ticket_id: str, status: str) -> str:
            """Update a support ticket status."""
            logger.info(f"Ticket {ticket_id} updated to: {status}")
            return f"Ticket {ticket_id} status updated to '{status}'"
        
        return [emit_event, query_knowledge_base, update_ticket]
    
    def _create_agent(self):
        prompt = hub.pull("hwchase17/react")
        agent = create_react_agent(self.llm, self.tools, prompt)
        return AgentExecutor(
            agent=agent, tools=self.tools,
            verbose=True, max_iterations=5,
            handle_parsing_errors=True,
        )
    
    def register_handler(self, event_type: str, handler):
        self.handlers[event_type] = handler
    
    def handle_event(self, event: Event):
        logger.info(f"[{self.name}] Handling event: {event.event_type}")
        
        if event.event_type in self.handlers:
            return self.handlers[event.event_type](event)
        
        prompt = f"""You are {self.name}, an AI agent.
Handle this event:
Type: {event.event_type}
Data: {json.dumps(event.payload)}
Source: {event.source}

Take appropriate actions using your tools."""
        
        result = self.agent.invoke({"input": prompt})
        return result["output"]
    
    def run(self, queue_name: str):
        logger.info(f"[{self.name}] Started, listening on queue: {queue_name}")
        
        while True:
            event = self.bus.pop_task(queue_name, timeout=5)
            if event:
                try:
                    result = self.handle_event(event)
                    logger.info(f"[{self.name}] Result: {result}")
                except Exception as e:
                    logger.error(f"[{self.name}] Error: {e}")

# ใช้งาน
bus = EventBus()
agent = EventDrivenAgent("support-agent", bus)

# Push event to queue
event = Event(
    event_id="evt_001",
    event_type="customer.inquiry",
    payload={"message": "สอบถามนโยบายการคืนสินค้า", "customer_id": "C001"},
    source="web_chat",
)
bus.push_task("support", event)

# agent.run("support")  # Start agent loop

Custom Tools และ Memory Management

สร้าง tools ที่ซับซ้อนและจัดการ conversation memory

#!/usr/bin/env python3
# agent_memory.py — Agent with Memory and Custom Tools
from langchain_ollama import ChatOllama
from langchain.memory import ConversationBufferWindowMemory
from langchain.agents import AgentExecutor, create_react_agent
from langchain.tools import tool, StructuredTool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import OllamaEmbeddings
from pydantic import BaseModel, Field
from typing import Optional
import json
import requests

# === Structured Tool with Schema ===
class OrderLookupInput(BaseModel):
    order_id: str = Field(description="The order ID to look up")
    include_items: bool = Field(default=False, description="Include order items detail")

def lookup_order(order_id: str, include_items: bool = False) -> str:
    orders = {
        "ORD-001": {
            "status": "shipped",
            "tracking": "TH123456",
            "items": [{"name": "Laptop", "qty": 1, "price": 45000}],
            "total": 45000,
        },
    }
    order = orders.get(order_id)
    if not order:
        return f"Order {order_id} not found"
    
    result = f"Order {order_id}: Status={order['status']}, Total={order['total']} THB"
    if include_items:
        items_str = ", ".join([f"{i['name']} x{i['qty']}" for i in order["items"]])
        result += f", Items: {items_str}"
    return result

order_tool = StructuredTool.from_function(
    func=lookup_order,
    name="lookup_order",
    description="Look up order details by order ID",
    args_schema=OrderLookupInput,
)

# === RAG Tool (Retrieval Augmented Generation) ===
class RAGTool:
    def __init__(self):
        self.embeddings = OllamaEmbeddings(model="nomic-embed-text")
        self.docs = [
            "นโยบายคืนสินค้า: คืนได้ภายใน 30 วัน สินค้าต้องอยู่ในสภาพสมบูรณ์ พร้อมใบเสร็จ",
            "การจัดส่ง: ฟรีค่าส่งเมื่อสั่งซื้อ 1000 บาทขึ้นไป จัดส่ง 3-5 วันทำการ",
            "การรับประกัน: สินค้าอิเล็กทรอนิกส์รับประกัน 1 ปี ไม่รวมความเสียหายจากผู้ใช้",
            "วิธีชำระเงิน: รองรับ บัตรเครดิต โอนธนาคาร PromptPay และเก็บเงินปลายทาง",
            "เวลาทำการ: จันทร์-ศุกร์ 9:00-18:00 เสาร์ 9:00-12:00 หยุดวันอาทิตย์และวันหยุดราชการ",
        ]
        self.vectorstore = FAISS.from_texts(self.docs, self.embeddings)
    
    def search(self, query: str) -> str:
        results = self.vectorstore.similarity_search(query, k=2)
        return "\n".join([doc.page_content for doc in results])

rag = RAGTool()

@tool
def search_knowledge(query: str) -> str:
    """Search company knowledge base for policies and information."""
    return rag.search(query)

# === Agent with Memory ===
class MemoryAgent:
    def __init__(self):
        self.llm = ChatOllama(model="llama3:8b", temperature=0)
        self.memory = ConversationBufferWindowMemory(
            memory_key="chat_history",
            return_messages=True,
            k=10,
        )
        self.tools = [order_tool, search_knowledge]
        
        self.prompt = ChatPromptTemplate.from_messages([
            ("system", """You are a helpful customer support agent for an e-commerce store.
Use tools to look up information. Always be polite and helpful.
Respond in the same language as the customer."""),
            MessagesPlaceholder(variable_name="chat_history"),
            ("human", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ])
        
        agent = create_react_agent(self.llm, self.tools, self.prompt)
        self.executor = AgentExecutor(
            agent=agent, tools=self.tools,
            memory=self.memory,
            verbose=True, max_iterations=5,
            handle_parsing_errors=True,
        )
    
    def chat(self, message: str) -> str:
        result = self.executor.invoke({"input": message})
        return result["output"]
    
    def get_history(self):
        return self.memory.load_memory_variables({})

# ใช้งาน
agent = MemoryAgent()
print(agent.chat("สวัสดี ขอสอบถามนโยบายการคืนสินค้า"))
print(agent.chat("แล้วถ้าสินค้าเสียหายล่ะ"))
print(agent.chat("ตรวจสอบคำสั่งซื้อ ORD-001 ให้ด้วย"))

Multi-Agent System ด้วย LangGraph

สร้างระบบ Multi-Agent ที่ทำงานร่วมกัน

#!/usr/bin/env python3
# multi_agent.py — Multi-Agent System with LangGraph
from langgraph.graph import StateGraph, END
from langchain_ollama import ChatOllama
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, Annotated, Sequence
import operator
import json

class AgentState(TypedDict):
    messages: Annotated[Sequence, operator.add]
    current_agent: str
    task_type: str
    context: dict

llm = ChatOllama(model="llama3:8b", temperature=0)

# === Router Agent ===
def router_agent(state: AgentState) -> AgentState:
    last_message = state["messages"][-1].content if state["messages"] else ""
    
    prompt = f"""Classify this customer request into one category:
- billing: payment, invoice, refund, pricing
- technical: bug, error, setup, configuration
- general: information, hours, policy, shipping

Request: {last_message}
Reply with ONLY the category name."""
    
    response = llm.invoke(prompt)
    category = response.content.strip().lower()
    
    if "billing" in category:
        task_type = "billing"
    elif "technical" in category:
        task_type = "technical"
    else:
        task_type = "general"
    
    return {
        "messages": [AIMessage(content=f"[Router] Classified as: {task_type}")],
        "current_agent": task_type,
        "task_type": task_type,
        "context": state.get("context", {}),
    }

# === Billing Agent ===
def billing_agent(state: AgentState) -> AgentState:
    user_message = state["messages"][0].content
    
    prompt = f"""You are a billing support specialist.
Help the customer with their billing question.
Be specific about amounts, dates, and processes.

Customer: {user_message}
Context: {json.dumps(state.get('context', {}))}"""
    
    response = llm.invoke(prompt)
    
    return {
        "messages": [AIMessage(content=f"[Billing Agent] {response.content}")],
        "current_agent": "billing",
        "task_type": state["task_type"],
        "context": state.get("context", {}),
    }

# === Technical Agent ===
def technical_agent(state: AgentState) -> AgentState:
    user_message = state["messages"][0].content
    
    prompt = f"""You are a technical support engineer.
Help diagnose and resolve the technical issue.
Provide step-by-step instructions when possible.

Customer: {user_message}"""
    
    response = llm.invoke(prompt)
    
    return {
        "messages": [AIMessage(content=f"[Technical Agent] {response.content}")],
        "current_agent": "technical",
        "task_type": state["task_type"],
        "context": state.get("context", {}),
    }

# === General Agent ===
def general_agent(state: AgentState) -> AgentState:
    user_message = state["messages"][0].content
    
    prompt = f"""You are a friendly customer service representative.
Answer general questions about the company and its services.

Customer: {user_message}"""
    
    response = llm.invoke(prompt)
    
    return {
        "messages": [AIMessage(content=f"[General Agent] {response.content}")],
        "current_agent": "general",
        "task_type": state["task_type"],
        "context": state.get("context", {}),
    }

# === Quality Check Agent ===
def quality_check(state: AgentState) -> AgentState:
    agent_response = state["messages"][-1].content
    
    prompt = f"""Review this customer support response for quality:
{agent_response}

Check: Is it helpful? Professional? Complete?
If good, reply "APPROVED". If needs improvement, reply "NEEDS_REVISION: [reason]"."""
    
    response = llm.invoke(prompt)
    
    return {
        "messages": [AIMessage(content=f"[QA] {response.content}")],
        "current_agent": "qa",
        "task_type": state["task_type"],
        "context": state.get("context", {}),
    }

# === Build Graph ===
def route_to_agent(state: AgentState) -> str:
    return state.get("task_type", "general")

workflow = StateGraph(AgentState)

workflow.add_node("router", router_agent)
workflow.add_node("billing", billing_agent)
workflow.add_node("technical", technical_agent)
workflow.add_node("general", general_agent)
workflow.add_node("quality_check", quality_check)

workflow.set_entry_point("router")

workflow.add_conditional_edges("router", route_to_agent, {
    "billing": "billing",
    "technical": "technical",
    "general": "general",
})

workflow.add_edge("billing", "quality_check")
workflow.add_edge("technical", "quality_check")
workflow.add_edge("general", "quality_check")
workflow.add_edge("quality_check", END)

app = workflow.compile()

# ใช้งาน
result = app.invoke({
    "messages": [HumanMessage(content="ขอคืนเงินคำสั่งซื้อ ORD-001 ได้ไหม")],
    "current_agent": "",
    "task_type": "",
    "context": {},
})

for msg in result["messages"]:
    print(f"{msg.content}\n")

Production Deployment และ Monitoring

Deploy Agent system สำหรับ production

#!/usr/bin/env python3
# agent_api.py — FastAPI Server for Event-Driven Agent
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel
from typing import Optional, Dict, Any
import uvicorn
import json
import time
import logging
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("agent_api")

app = FastAPI(title="LangChain Agent API", version="1.0")

# Metrics
metrics = {
    "total_requests": 0,
    "successful": 0,
    "failed": 0,
    "avg_latency_ms": 0,
    "latencies": [],
}

class ChatRequest(BaseModel):
    message: str
    session_id: Optional[str] = None
    context: Optional[Dict[str, Any]] = None

class ChatResponse(BaseModel):
    response: str
    session_id: str
    agent_used: str
    latency_ms: float

class EventRequest(BaseModel):
    event_type: str
    payload: Dict[str, Any]
    source: Optional[str] = "api"

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    start = time.time()
    metrics["total_requests"] += 1
    
    try:
        # ใช้ agent ที่สร้างไว้
        # result = agent.chat(request.message)
        result = f"Response to: {request.message}"  # placeholder
        
        latency = (time.time() - start) * 1000
        metrics["successful"] += 1
        metrics["latencies"].append(latency)
        metrics["latencies"] = metrics["latencies"][-100:]
        metrics["avg_latency_ms"] = sum(metrics["latencies"]) / len(metrics["latencies"])
        
        return ChatResponse(
            response=result,
            session_id=request.session_id or "default",
            agent_used="support-agent",
            latency_ms=round(latency, 2),
        )
    except Exception as e:
        metrics["failed"] += 1
        logger.error(f"Chat error: {e}")
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/events")
async def handle_event(request: EventRequest, background_tasks: BackgroundTasks):
    background_tasks.add_task(process_event, request)
    return {"status": "accepted", "event_type": request.event_type}

async def process_event(request: EventRequest):
    logger.info(f"Processing event: {request.event_type}")
    # bus.push_task("support", Event(...))

@app.get("/health")
async def health():
    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat(),
        "metrics": {
            "total_requests": metrics["total_requests"],
            "successful": metrics["successful"],
            "failed": metrics["failed"],
            "avg_latency_ms": round(metrics["avg_latency_ms"], 2),
        },
    }

@app.get("/metrics")
async def get_metrics():
    return metrics

# Docker Compose สำหรับ production
# docker-compose.yml:
# services:
#   agent-api:
#     build: .
#     ports: ["8000:8000"]
#     environment:
#       REDIS_URL: redis://redis:6379
#       OLLAMA_URL: http://ollama:11434
#     depends_on: [redis, ollama]
#
#   ollama:
#     image: ollama/ollama:latest
#     ports: ["11434:11434"]
#     volumes:
#       - ollama_data:/root/.ollama
#     deploy:
#       resources:
#         reservations:
#           devices:
#             - driver: nvidia
#               count: 1
#               capabilities: [gpu]
#
#   redis:
#     image: redis:7-alpine
#     ports: ["6379:6379"]
#
#   worker:
#     build: .
#     command: python event_agent.py
#     depends_on: [redis, ollama]

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)

FAQ คำถามที่พบบ่อย

Q: LangChain Agent กับ simple prompt chaining ต่างกันอย่างไร?

A: Prompt chaining เป็นการกำหนดลำดับขั้นตอนล่วงหน้าแบบ static เช่น step 1 ทำอะไร step 2 ทำอะไร Agent ใช้ LLM ตัดสินใจ dynamically ว่าจะใช้ tool ไหน ทำขั้นตอนไหนต่อ วนกี่รอบ ตามข้อมูลที่ได้ Agent เหมาะสำหรับงานที่ไม่รู้ล่วงหน้าว่าต้องทำกี่ขั้นตอน

Q: LangGraph กับ LangChain Agent ต่างกันอย่างไร?

A: LangChain Agent ใช้ ReAct pattern ที่ LLM ตัดสินใจทุกขั้นตอน LangGraph ให้ control flow ที่ชัดเจนกว่าโดยสร้างเป็น state machine graph ที่กำหนด nodes (agents/functions) และ edges (transitions) ได้ LangGraph เหมาะสำหรับ multi-agent systems ที่ต้องการ deterministic routing และ complex workflows

Q: ใช้ Ollama กับ LangChain Agent ได้ดีแค่ไหน?

A: Ollama กับ Llama 3 8B ใช้ได้ดีสำหรับ simple tools และ classification tasks แต่ model เล็กอาจมีปัญหากับ complex reasoning และ tool calling ที่ซับซ้อน สำหรับ production ที่ต้องการ reliability สูง แนะนำใช้ GPT-4o-mini หรือ Claude สำหรับ development และ testing Ollama ช่วยลดค่าใช้จ่ายได้มาก

Q: Event-Driven Agent มี overhead มากไหม?

A: มี overhead จาก message queue (Redis/RabbitMQ) ประมาณ 1-5ms ต่อ event ซึ่งน้อยมากเมื่อเทียบกับ LLM inference time (1-30 วินาที) ข้อดีที่ได้คือ decoupling, retry capability, scaling และ observability ซึ่งคุ้มค่ากับ overhead สำหรับ production systems

📖 บทความที่เกี่ยวข้อง

LangChain Agent Home Lab Setupอ่านบทความ → LangChain Agent IoT Gatewayอ่านบทความ → Healthchecks.io Domain Driven Design DDDอ่านบทความ → LangChain Agent Container Orchestrationอ่านบทความ → LangChain Agent Database Migrationอ่านบทความ →

📚 ดูบทความทั้งหมด →