ในโลกของ Backend Development ปี 2026 ระบบที่ต้องรองรับผู้ใช้หลายล้านคนพร้อมกันไม่สามารถพึ่งพาสถาปัตยกรรมแบบ Request-Response แบบดั้งเดิมได้อีกต่อไป Event-Driven Architecture (EDA) กลายเป็นรูปแบบสถาปัตยกรรมที่สำคัญที่สุดสำหรับระบบ Distributed Systems ขนาดใหญ่ ตั้งแต่ระบบ E-commerce ไปจนถึงระบบ Financial Trading
บทความนี้จะพาคุณเข้าใจ Event-Driven Architecture อย่างลึกซึ้ง ตั้งแต่แนวคิดพื้นฐานไปจนถึง Pattern ขั้นสูงอย่าง Event Sourcing, CQRS, Saga Pattern และ Outbox Pattern พร้อมตัวอย่างการใช้งานจริงกับ Apache Kafka และ RabbitMQ
Event-Driven Architecture (EDA) คืออะไร?
Event-Driven Architecture คือรูปแบบสถาปัตยกรรมซอฟต์แวร์ที่ใช้ "เหตุการณ์" (Events) เป็นตัวขับเคลื่อนการทำงานของระบบ แทนที่จะเรียกใช้งานบริการอื่นโดยตรงแบบ Synchronous เหมือนระบบ Request-Response ทั่วไป ระบบจะ "ปล่อย" (Emit/Publish) เหตุการณ์ออกมา แล้วบริการอื่นที่สนใจจะ "รับฟัง" (Subscribe/Consume) เหตุการณ์เหล่านั้นไปทำงานต่อ
ลองนึกภาพง่ายๆ: ในร้านอาหาร ลูกค้าสั่งอาหาร (Event) → พนักงานเสิร์ฟเขียน Order → ส่งไปห้องครัว → เชฟรับ Order ไปทำ → อาหารเสร็จ (Event) → พนักงานเสิร์ฟรับไปส่ง ทุกขั้นตอนขับเคลื่อนด้วย "เหตุการณ์" ไม่ใช่การสั่งงานโดยตรง
ความแตกต่างระหว่าง Events, Commands และ Queries
การเข้าใจความแตกต่างระหว่างสามแนวคิดนี้เป็นพื้นฐานสำคัญ:
| ประเภท | ความหมาย | ตัวอย่าง | ลักษณะ |
|---|---|---|---|
| Event | สิ่งที่เกิดขึ้นแล้ว (Past tense) | OrderPlaced, UserRegistered | Immutable, ย้อนไม่ได้ |
| Command | คำสั่งให้ทำบางอย่าง (Imperative) | PlaceOrder, RegisterUser | อาจสำเร็จหรือล้มเหลว |
| Query | คำถามเพื่อดึงข้อมูล | GetOrderById, ListUsers | ไม่เปลี่ยนแปลง State |
Event จะใช้ชื่อเป็น Past Tense เสมอ เช่น OrderPlaced ไม่ใช่ PlaceOrder (นั่นคือ Command) เพราะ Event คือสิ่งที่ "เกิดขึ้นแล้ว" เป็นข้อเท็จจริง (fact) ที่ไม่สามารถเปลี่ยนแปลงได้
ประเภทของ Events
Domain Events
เหตุการณ์ที่เกิดขึ้นภายใน Bounded Context เดียวกัน เป็นเรื่องของ Business Logic ภายใน เช่น PaymentReceived, InventoryReserved ใช้ภายใน Service เดียวกันเพื่อแยก Logic ให้ชัดเจน
# ตัวอย่าง Domain Event
class OrderPlaced:
order_id: str
customer_id: str
items: list
total_amount: float
placed_at: datetime
# Domain Event อยู่ภายใน Order Service
# ใช้ trigger การคำนวณ loyalty points,
# อัปเดต order history ภายใน service เดียวกัน
Integration Events
เหตุการณ์ที่ส่งข้าม Bounded Context หรือข้ามระหว่าง Microservices ใช้สำหรับการสื่อสารระหว่างบริการที่แตกต่างกัน เช่น Order Service ส่ง OrderPlaced event ไปยัง Payment Service และ Inventory Service
# ตัวอย่าง Integration Event
class OrderPlacedIntegrationEvent:
event_id: str # Unique ID สำหรับ idempotency
event_type: str # "order.placed"
timestamp: datetime
version: int # Schema version
payload:
order_id: str
customer_id: str
total_amount: float
metadata:
correlation_id: str
source_service: str
Event-Driven Patterns หลักที่ต้องรู้
1. Event Notification
Pattern ที่ง่ายที่สุด Service A ส่ง Event แจ้งว่ามีอะไรเกิดขึ้น โดย Event มีข้อมูลน้อยมาก (แค่ ID) Service ที่รับต้องกลับไปถาม Service A ถ้าต้องการข้อมูลเพิ่มเติม
# Event Notification - ข้อมูลน้อย
{
"event_type": "order.placed",
"order_id": "ORD-12345",
"timestamp": "2026-04-08T10:00:00Z"
}
# Consumer ต้องเรียก GET /orders/ORD-12345 เพื่อดึงรายละเอียด
ข้อดีคือ Coupling ต่ำมาก แต่ข้อเสียคือต้องเรียก API กลับไปหา Producer ซึ่งสร้าง Load เพิ่มเติม
2. Event-Carried State Transfer
Event พกข้อมูลทั้งหมดที่ Consumer ต้องการมาด้วย ไม่ต้องเรียก API กลับไป ทำให้ Consumer สามารถเก็บ Local Copy ของข้อมูลไว้ได้
# Event-Carried State Transfer - ข้อมูลครบ
{
"event_type": "order.placed",
"order_id": "ORD-12345",
"customer_id": "CUST-789",
"customer_name": "สมชาย ใจดี",
"customer_email": "somchai@example.com",
"items": [
{"product_id": "P001", "name": "Laptop", "qty": 1, "price": 35000},
{"product_id": "P002", "name": "Mouse", "qty": 2, "price": 500}
],
"total_amount": 36000,
"shipping_address": "123 กรุงเทพฯ",
"timestamp": "2026-04-08T10:00:00Z"
}
ข้อดีคือ Consumer ไม่ต้องเรียก API กลับ ลด latency และ coupling แต่ข้อเสียคือ Event มีขนาดใหญ่ และอาจมี Stale Data ได้
3. Event Sourcing
แทนที่จะเก็บสถานะปัจจุบัน (Current State) ในฐานข้อมูล Event Sourcing เก็บ "ทุกเหตุการณ์" ที่เกิดขึ้นตามลำดับ แล้ว Replay events เหล่านั้นเพื่อสร้าง Current State ขึ้นมาใหม่ได้ทุกเมื่อ
4. CQRS (Command Query Responsibility Segregation)
แยก Model สำหรับการเขียน (Command) และการอ่าน (Query) ออกจากกัน ทำให้สามารถ Optimize แต่ละด้านได้อย่างอิสระ
Event Sourcing Deep Dive
Event Sourcing เป็น Pattern ที่ทรงพลังที่สุดใน EDA แนวคิดคือ "อย่าเก็บแค่ผลลัพธ์ จงเก็บทุกอย่างที่เกิดขึ้น" เหมือนบัญชีธนาคารที่ไม่ได้เก็บแค่ยอดเงินคงเหลือ แต่เก็บทุก Transaction ที่เคยเกิดขึ้น
Event Store
Event Store คือฐานข้อมูลสำหรับเก็บ Events ทั้งหมด โดยมีคุณสมบัติสำคัญคือ Append-Only (เพิ่มได้อย่างเดียว ลบไม่ได้ แก้ไขไม่ได้) และเรียงตามลำดับเวลา
# โครงสร้าง Event Store Table
CREATE TABLE event_store (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL, -- เช่น order_id
aggregate_type VARCHAR(100) NOT NULL, -- เช่น 'Order'
event_type VARCHAR(100) NOT NULL, -- เช่น 'OrderPlaced'
event_data JSONB NOT NULL, -- ข้อมูล Event
metadata JSONB, -- correlation_id, user_id
version INT NOT NULL, -- ลำดับของ Event ใน Aggregate
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(aggregate_id, version) -- ป้องกัน Concurrent writes
);
-- Index สำหรับ query ตาม aggregate
CREATE INDEX idx_event_store_aggregate
ON event_store(aggregate_id, version);
# ตัวอย่าง Events ของ Order Aggregate
# Version 1: สร้าง Order
{
"event_type": "OrderCreated",
"aggregate_id": "ORD-001",
"version": 1,
"event_data": {
"customer_id": "CUST-789",
"items": [{"product": "Laptop", "qty": 1, "price": 35000}]
}
}
# Version 2: เพิ่มสินค้า
{
"event_type": "ItemAdded",
"aggregate_id": "ORD-001",
"version": 2,
"event_data": {
"product": "Mouse",
"qty": 2,
"price": 500
}
}
# Version 3: ยืนยัน Order
{
"event_type": "OrderConfirmed",
"aggregate_id": "ORD-001",
"version": 3,
"event_data": {
"confirmed_by": "CUST-789",
"total_amount": 36000
}
}
# Version 4: ชำระเงิน
{
"event_type": "PaymentReceived",
"aggregate_id": "ORD-001",
"version": 4,
"event_data": {
"payment_method": "credit_card",
"amount": 36000,
"transaction_id": "TXN-456"
}
}
Projections (Read Models)
เนื่องจาก Event Store เก็บ Events แต่ไม่สะดวกสำหรับการ Query ข้อมูลจึงต้องมี Projections คือกระบวนการอ่าน Events แล้วสร้าง Read Model (View) ขึ้นมาในรูปแบบที่เหมาะสำหรับการ Query
# Python - Event Projection สำหรับ Order
class OrderProjection:
def __init__(self):
self.orders = {} # In-memory store (ใช้ DB จริงใน Production)
def handle(self, event):
handler = getattr(self, f"on_{event['event_type']}", None)
if handler:
handler(event)
def on_OrderCreated(self, event):
data = event['event_data']
self.orders[event['aggregate_id']] = {
'order_id': event['aggregate_id'],
'customer_id': data['customer_id'],
'items': data['items'],
'status': 'created',
'total': sum(i['price'] * i['qty'] for i in data['items']),
'created_at': event['created_at']
}
def on_ItemAdded(self, event):
order = self.orders[event['aggregate_id']]
data = event['event_data']
order['items'].append(data)
order['total'] += data['price'] * data['qty']
def on_OrderConfirmed(self, event):
self.orders[event['aggregate_id']]['status'] = 'confirmed'
def on_PaymentReceived(self, event):
order = self.orders[event['aggregate_id']]
order['status'] = 'paid'
order['payment_txn'] = event['event_data']['transaction_id']
def on_OrderShipped(self, event):
order = self.orders[event['aggregate_id']]
order['status'] = 'shipped'
order['tracking_number'] = event['event_data']['tracking_number']
Snapshots
เมื่อ Aggregate มี Events จำนวนมาก (เช่น บัญชีธนาคารที่มีหลายพัน Transactions) การ Replay ทุก Event จะช้ามาก Snapshots คือการเก็บสถานะ ณ จุดใดจุดหนึ่ง เพื่อไม่ต้อง Replay ตั้งแต่ต้น
# Snapshot Strategy
class SnapshotRepository:
SNAPSHOT_INTERVAL = 100 # สร้าง Snapshot ทุก 100 events
def save_snapshot(self, aggregate_id, state, version):
"""เก็บ Snapshot ลง DB"""
db.execute("""
INSERT INTO snapshots (aggregate_id, state, version, created_at)
VALUES (%s, %s, %s, NOW())
ON CONFLICT (aggregate_id) DO UPDATE
SET state = %s, version = %s, created_at = NOW()
""", (aggregate_id, json.dumps(state), version,
json.dumps(state), version))
def load_aggregate(self, aggregate_id):
"""โหลด Aggregate จาก Snapshot + Events ที่เหลือ"""
# 1. หา Snapshot ล่าสุด
snapshot = db.query(
"SELECT state, version FROM snapshots WHERE aggregate_id = %s",
(aggregate_id,)
)
if snapshot:
state = json.loads(snapshot['state'])
from_version = snapshot['version'] + 1
else:
state = {}
from_version = 1
# 2. โหลด Events หลังจาก Snapshot
events = db.query("""
SELECT * FROM event_store
WHERE aggregate_id = %s AND version >= %s
ORDER BY version
""", (aggregate_id, from_version))
# 3. Replay events ที่เหลือ
for event in events:
state = apply_event(state, event)
return state
CQRS Pattern (Command Query Responsibility Segregation)
CQRS คือการแยก Model สำหรับเขียนและอ่านออกจากกัน ในระบบทั่วไปเราใช้ Model เดียวกันทั้งอ่านและเขียน แต่ใน CQRS เราแยกเป็น Command Model (Write) และ Query Model (Read) โดยแต่ละด้านสามารถ Optimize ได้อย่างอิสระ
ทำไมต้อง CQRS?
- Read/Write Ratio: ระบบส่วนใหญ่อ่านมากกว่าเขียน 10-100 เท่า การแยก Model ทำให้ Scale ด้านอ่านได้ง่าย
- Optimization: Write Model optimize สำหรับ consistency, Read Model optimize สำหรับ query performance
- Complexity: Write side มี Business Logic ซับซ้อน แต่ Read side ต้องการแค่ Data ที่พร้อมแสดง
- Scaling: Scale Read replicas แยกจาก Write database ได้
# CQRS Architecture Example
# ===== COMMAND SIDE (Write) =====
class OrderCommandHandler:
def __init__(self, event_store, event_bus):
self.event_store = event_store
self.event_bus = event_bus
def handle_place_order(self, command):
"""รับ Command → Validate → สร้าง Event → บันทึก"""
# 1. Validate business rules
if not command.items:
raise ValueError("Order must have at least one item")
if command.total_amount <= 0:
raise ValueError("Invalid total amount")
# 2. Check inventory (call Inventory Service)
for item in command.items:
if not inventory_client.check_stock(item.product_id, item.qty):
raise InsufficientStockError(item.product_id)
# 3. Create Domain Event
event = OrderPlacedEvent(
order_id=generate_id(),
customer_id=command.customer_id,
items=command.items,
total_amount=command.total_amount,
placed_at=datetime.utcnow()
)
# 4. Save to Event Store
self.event_store.append(event)
# 5. Publish to Event Bus
self.event_bus.publish(event)
return event.order_id
# ===== QUERY SIDE (Read) =====
class OrderQueryHandler:
def __init__(self, read_db):
self.read_db = read_db # Optimized read database
def get_order(self, order_id):
"""ดึง Order จาก Read Model (denormalized, fast)"""
return self.read_db.query("""
SELECT o.*, c.name as customer_name, c.email
FROM order_views o
JOIN customer_views c ON o.customer_id = c.id
WHERE o.order_id = %s
""", (order_id,))
def list_orders_by_customer(self, customer_id, page=1, size=20):
"""ดึงรายการ Orders - Read Model ถูก optimize สำหรับ query นี้"""
return self.read_db.query("""
SELECT * FROM order_views
WHERE customer_id = %s
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", (customer_id, size, (page-1)*size))
# ===== EVENT HANDLER (Sync Read Model) =====
class OrderProjectionHandler:
"""รับ Events จาก Event Bus แล้วอัปเดต Read Model"""
def on_order_placed(self, event):
self.read_db.execute("""
INSERT INTO order_views
(order_id, customer_id, items, total_amount, status, created_at)
VALUES (%s, %s, %s, %s, 'placed', %s)
""", (event.order_id, event.customer_id,
json.dumps(event.items), event.total_amount, event.placed_at))
def on_order_shipped(self, event):
self.read_db.execute("""
UPDATE order_views SET status='shipped',
tracking_number=%s, shipped_at=%s
WHERE order_id = %s
""", (event.tracking_number, event.shipped_at, event.order_id))
การ Implement EDA ด้วย Kafka และ RabbitMQ
Apache Kafka
Kafka เหมาะสำหรับ High-throughput, Event Streaming ที่ต้องการเก็บ Events ระยะยาว รองรับหลายล้าน Events ต่อวินาที
# Python - Kafka Producer
from confluent_kafka import Producer
import json
producer = Producer({
'bootstrap.servers': 'kafka-broker:9092',
'client.id': 'order-service',
'acks': 'all', # รอ Broker ทุกตัว ACK
'enable.idempotence': True, # ป้องกัน Duplicate
'max.in.flight.requests.per.connection': 5,
'retries': 10
})
def publish_order_event(order_event):
"""Publish Event ไป Kafka Topic"""
event_data = {
'event_id': str(uuid4()),
'event_type': 'order.placed',
'timestamp': datetime.utcnow().isoformat(),
'version': 1,
'payload': {
'order_id': order_event.order_id,
'customer_id': order_event.customer_id,
'total_amount': order_event.total_amount
}
}
producer.produce(
topic='orders.events',
key=order_event.order_id, # ใช้ order_id เป็น key
value=json.dumps(event_data), # เพื่อให้ Events ของ Order เดียวกัน
callback=delivery_callback # ไปอยู่ Partition เดียวกัน (ordering)
)
producer.flush()
# Kafka Consumer
from confluent_kafka import Consumer
consumer = Consumer({
'bootstrap.servers': 'kafka-broker:9092',
'group.id': 'payment-service',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False # Manual commit เพื่อ at-least-once
})
consumer.subscribe(['orders.events'])
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
handle_error(msg.error())
continue
event = json.loads(msg.value())
try:
process_event(event) # ประมวลผล Event
consumer.commit(msg) # Commit offset หลังประมวลผลสำเร็จ
except Exception as e:
handle_processing_error(e, event)
RabbitMQ
RabbitMQ เหมาะสำหรับ Task Queue และ Routing ที่ซับซ้อน รองรับหลาย Exchange types (Direct, Fanout, Topic, Headers)
# Python - RabbitMQ Publisher
import pika, json
connection = pika.BlockingConnection(
pika.ConnectionParameters('rabbitmq-host', 5672,
credentials=pika.PlainCredentials('user', 'pass'))
)
channel = connection.channel()
# ประกาศ Exchange แบบ Topic
channel.exchange_declare(
exchange='order_events',
exchange_type='topic',
durable=True
)
def publish_event(event_type, payload):
"""Publish event ผ่าน Topic Exchange"""
message = {
'event_id': str(uuid4()),
'event_type': event_type,
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
channel.basic_publish(
exchange='order_events',
routing_key=f'order.{event_type}', # เช่น order.placed
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
content_type='application/json',
message_id=message['event_id']
)
)
# RabbitMQ Consumer
channel.queue_declare(queue='payment_processor', durable=True)
channel.queue_bind(
queue='payment_processor',
exchange='order_events',
routing_key='order.placed' # รับเฉพาะ order.placed events
)
def callback(ch, method, properties, body):
event = json.loads(body)
try:
process_payment(event)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_qos(prefetch_count=10)
channel.basic_consume(queue='payment_processor', on_message_callback=callback)
Kafka vs RabbitMQ เลือกอันไหน?
| เกณฑ์ | Apache Kafka | RabbitMQ |
|---|---|---|
| รูปแบบ | Distributed Log (Pull) | Message Queue (Push) |
| Throughput | สูงมาก (ล้าน msg/sec) | ปานกลาง (หมื่น msg/sec) |
| Message Retention | เก็บถาวรได้ | ลบหลัง Consume |
| Ordering | ภายใน Partition | ภายใน Queue |
| Replay | ได้ (offset reset) | ไม่ได้ |
| Routing | Topic + Partition | Exchange + Routing Key (ยืดหยุ่นกว่า) |
| เหมาะกับ | Event Streaming, Log Aggregation | Task Queue, Complex Routing |
Saga Pattern สำหรับ Distributed Transactions
ในระบบ Microservices ไม่สามารถใช้ Database Transaction แบบเดิมข้ามหลาย Services ได้ Saga Pattern เป็นวิธีจัดการ Distributed Transaction โดยแบ่งเป็นหลายขั้นตอน (Steps) และมี Compensating Transaction สำหรับ Rollback แต่ละขั้นตอน
Choreography Saga
แต่ละ Service ตัดสินใจเองว่าจะทำอะไรต่อ โดย Listen Events จาก Services อื่น ไม่มีตัวกลางควบคุม
# Choreography Saga Flow - สั่งซื้อสินค้า
#
# Order Service → OrderPlaced event
# ↓
# Payment Service → PaymentProcessed event (หรือ PaymentFailed)
# ↓
# Inventory Service → InventoryReserved event (หรือ InventoryFailed)
# ↓
# Shipping Service → ShipmentCreated event
#
# ถ้า Payment ล้มเหลว:
# Payment Service → PaymentFailed event
# ↓
# Order Service → ยกเลิก Order (Compensating Transaction)
#
# ถ้า Inventory ล้มเหลว:
# Inventory Service → InventoryFailed event
# ↓
# Payment Service → Refund (Compensating Transaction)
# ↓
# Order Service → ยกเลิก Order (Compensating Transaction)
Orchestration Saga
มี Saga Orchestrator เป็นตัวกลางควบคุมทุกขั้นตอน รู้ว่าต้องทำอะไรต่อ และ Rollback อย่างไรเมื่อเกิดความผิดพลาด
# Orchestration Saga - Order Saga Orchestrator
class OrderSagaOrchestrator:
def __init__(self, payment_svc, inventory_svc, shipping_svc):
self.payment = payment_svc
self.inventory = inventory_svc
self.shipping = shipping_svc
def execute(self, order):
"""ควบคุมทุกขั้นตอนของ Saga"""
saga_log = SagaLog(order.id)
try:
# Step 1: Reserve Inventory
saga_log.start_step("reserve_inventory")
inventory_result = self.inventory.reserve(order.items)
saga_log.complete_step("reserve_inventory", inventory_result)
# Step 2: Process Payment
saga_log.start_step("process_payment")
payment_result = self.payment.charge(
order.customer_id, order.total_amount
)
saga_log.complete_step("process_payment", payment_result)
# Step 3: Create Shipment
saga_log.start_step("create_shipment")
shipment = self.shipping.create(order.id, order.shipping_address)
saga_log.complete_step("create_shipment", shipment)
saga_log.mark_completed()
return SagaResult.SUCCESS
except PaymentFailedError:
# Compensate: Release inventory
self.inventory.release(order.items)
saga_log.mark_compensated()
return SagaResult.PAYMENT_FAILED
except InventoryFailedError:
saga_log.mark_compensated()
return SagaResult.OUT_OF_STOCK
except ShippingFailedError:
# Compensate: Refund payment + Release inventory
self.payment.refund(payment_result.transaction_id)
self.inventory.release(order.items)
saga_log.mark_compensated()
return SagaResult.SHIPPING_FAILED
Choreography vs Orchestration
| เกณฑ์ | Choreography | Orchestration |
|---|---|---|
| Coupling | ต่ำ (Decentralized) | สูงกว่า (Centralized) |
| Complexity | ซับซ้อนเมื่อ Services เยอะ | จัดการง่ายกว่า |
| Visibility | ยากที่จะเห็นภาพรวม Flow | เห็น Flow ชัดเจนใน Orchestrator |
| Single Point of Failure | ไม่มี | Orchestrator เป็น SPOF |
| Testing | ยากกว่า (ต้อง Test Integration) | ง่ายกว่า (Test Orchestrator) |
| เหมาะกับ | 2-4 Services, Flow ง่าย | 4+ Services, Flow ซับซ้อน |
Outbox Pattern
ปัญหาสำคัญของ EDA คือ Dual Write Problem: เมื่อ Service ต้องทั้งบันทึกลง Database และ Publish Event ไป Message Broker พร้อมกัน ถ้าอย่างใดอย่างหนึ่งล้มเหลว ข้อมูลจะไม่สอดคล้องกัน
# ปัญหา Dual Write
def place_order(order):
db.save(order) # 1. บันทึกลง DB - สำเร็จ
kafka.publish(order_event) # 2. Publish Event - ล้มเหลว!
# ผลลัพธ์: Order อยู่ใน DB แต่ไม่มี Event ส่งออกไป
# Services อื่นไม่รู้ว่ามี Order ใหม่!
Outbox Pattern แก้ปัญหานี้โดยเขียน Event ลง Outbox Table ใน Database เดียวกันกับ Business Data ภายใน Transaction เดียวกัน จากนั้นมี Background Process (Relay/Poller) อ่าน Outbox แล้ว Publish ไป Message Broker
# Outbox Pattern Implementation
def place_order(order):
"""ใช้ Transaction เดียวกันสำหรับทั้ง Business Data และ Event"""
with db.transaction():
# 1. บันทึก Order
db.execute("INSERT INTO orders (...) VALUES (...)", order)
# 2. บันทึก Event ลง Outbox (Transaction เดียวกัน)
db.execute("""
INSERT INTO outbox (id, aggregate_type, aggregate_id,
event_type, payload, created_at)
VALUES (%s, 'Order', %s, 'OrderPlaced', %s, NOW())
""", (uuid4(), order.id, json.dumps(order.to_dict())))
# ทั้งสองจะ Commit หรือ Rollback พร้อมกัน
# Background Relay Process (แยก Thread/Process)
class OutboxRelay:
"""อ่าน Outbox แล้ว Publish ไป Kafka"""
def run(self):
while True:
events = db.query("""
SELECT * FROM outbox
WHERE published = FALSE
ORDER BY created_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
for event in events:
try:
kafka.publish(event['event_type'], event['payload'])
db.execute(
"UPDATE outbox SET published = TRUE WHERE id = %s",
(event['id'],)
)
except Exception as e:
log.error(f"Failed to publish {event['id']}: {e}")
time.sleep(1) # Poll interval
Idempotency — ประมวลผลซ้ำได้อย่างปลอดภัย
ในระบบ Event-Driven ที่ใช้ at-least-once delivery อาจได้รับ Event เดียวกันมากกว่าหนึ่งครั้ง ทุก Consumer ต้องเป็น Idempotent คือประมวลผลซ้ำกี่ครั้งก็ได้ผลลัพธ์เหมือนเดิม
# Idempotency Implementation
class IdempotentConsumer:
def __init__(self, db):
self.db = db
def process(self, event):
event_id = event['event_id']
# 1. ตรวจสอบว่าเคยประมวลผลแล้วหรือยัง
existing = self.db.query(
"SELECT id FROM processed_events WHERE event_id = %s",
(event_id,)
)
if existing:
log.info(f"Event {event_id} already processed, skipping")
return # ข้ามไป ไม่ทำซ้ำ
# 2. ประมวลผล + บันทึกว่าทำแล้ว ใน Transaction เดียวกัน
with self.db.transaction():
self.handle_event(event)
self.db.execute(
"INSERT INTO processed_events (event_id, processed_at) VALUES (%s, NOW())",
(event_id,)
)
def handle_event(self, event):
"""Business logic ที่ต้อง implement"""
if event['event_type'] == 'order.placed':
self.process_payment(event['payload'])
elif event['event_type'] == 'payment.completed':
self.reserve_inventory(event['payload'])
Eventual Consistency
ระบบ Event-Driven ไม่มี Strong Consistency แบบ Database Transaction เดี่ยว แต่ใช้ Eventual Consistency คือข้อมูลจะ "สอดคล้องกันในที่สุด" แม้ว่าในช่วงเวลาสั้นๆ อาจมีความไม่สอดคล้อง
ตัวอย่าง: เมื่อลูกค้าสั่งซื้อสินค้า Order Service บันทึก Order แล้ว แต่ Inventory Service อาจยังไม่ได้อัปเดตสต็อก ในช่วงเวลาสั้นๆ นี้ ข้อมูลไม่สอดคล้อง แต่เมื่อ Event ถูกประมวลผลเสร็จ ข้อมูลจะสอดคล้องกัน
วิธีจัดการ Eventual Consistency
- UI/UX: แสดงสถานะ "กำลังดำเนินการ" แทน "สำเร็จ" ทันที ให้ผู้ใช้เข้าใจว่าต้องรอ
- Retry + Dead Letter Queue: ถ้า Event ประมวลผลไม่สำเร็จ ส่งเข้า DLQ เพื่อตรวจสอบภายหลัง
- Compensating Transactions: เมื่อพบว่าข้อมูลไม่สอดคล้อง ทำ Compensation เพื่อแก้ไข
- Reconciliation: มี Background Job ตรวจสอบความสอดคล้องของข้อมูลเป็นระยะ
Event Schema Evolution
เมื่อระบบพัฒนาไป Event Schema จะต้องเปลี่ยนแปลง ต้องจัดการ Versioning ให้ดีเพื่อไม่ให้ Consumer เก่าพัง
# Schema Evolution Strategies
# 1. Backward Compatible Changes (ปลอดภัย)
# - เพิ่ม Field ใหม่ (Optional)
# - เพิ่ม Default value
# Version 1
{"event_type": "order.placed", "order_id": "ORD-001", "amount": 1000}
# Version 2 (เพิ่ม currency field - backward compatible)
{"event_type": "order.placed", "order_id": "ORD-001", "amount": 1000, "currency": "THB"}
# Consumer เก่าที่ไม่รู้จัก "currency" จะข้ามไป ไม่พัง
# 2. Breaking Changes (ต้องจัดการ)
# - ลบ Field
# - เปลี่ยน Type
# - เปลี่ยนความหมาย
# ใช้ Event Type Versioning
{"event_type": "order.placed.v2", "version": 2, ...}
# หรือใช้ Schema Registry (Avro/Protobuf)
# Confluent Schema Registry จัดการ compatibility check อัตโนมัติ
Event-Driven Microservices Architecture
ในระบบ Microservices ขนาดใหญ่ EDA เป็นหัวใจสำคัญ แต่ละ Service สื่อสารกันผ่าน Events ทำให้ Loosely Coupled และ Scale ได้อย่างอิสระ
# ตัวอย่าง E-commerce Event-Driven Architecture
#
# [API Gateway]
# |
# [Order Service] --publish--> [Kafka: orders.events]
# | |
# | +---------+---------+----------+
# | | | | |
# [Payment Svc] [Inventory] [Notification] [Analytics]
# | | | |
# payment.events inventory. notification. (consume only)
# | events events
# |
# [Shipping Svc]
เครื่องมือสำหรับ Event-Driven Architecture
| เครื่องมือ | ประเภท | จุดเด่น |
|---|---|---|
| Apache Kafka | Event Streaming | High throughput, durability, replay |
| RabbitMQ | Message Broker | Flexible routing, priority queue |
| EventStoreDB | Event Store | Purpose-built สำหรับ Event Sourcing |
| Axon Framework | CQRS/ES Framework | Java/Kotlin, built-in Saga support |
| Debezium | CDC | Capture DB changes เป็น Events |
| Apache Pulsar | Event Streaming | Multi-tenancy, geo-replication |
| AWS EventBridge | Serverless Event Bus | Managed, schema registry |
| NATS | Message System | Lightweight, low latency |
Testing Event-Driven Systems
# Unit Test - Event Handler
def test_order_placed_creates_payment():
handler = PaymentEventHandler(mock_db, mock_payment_gateway)
event = {
'event_id': 'evt-001',
'event_type': 'order.placed',
'payload': {'order_id': 'ORD-001', 'amount': 1000}
}
handler.process(event)
# Verify payment was created
mock_payment_gateway.charge.assert_called_once_with(
order_id='ORD-001', amount=1000
)
# Integration Test - Saga
def test_order_saga_compensates_on_payment_failure():
inventory_svc = MockInventoryService(should_succeed=True)
payment_svc = MockPaymentService(should_succeed=False)
saga = OrderSagaOrchestrator(payment_svc, inventory_svc, None)
result = saga.execute(test_order)
assert result == SagaResult.PAYMENT_FAILED
# Verify inventory was released (compensation)
assert inventory_svc.release_called
assert inventory_svc.released_items == test_order.items
# Contract Test - Event Schema
def test_order_placed_event_schema():
"""ตรวจสอบว่า Event ตรง Schema ที่ตกลงกับ Consumer"""
event = create_order_placed_event(test_order)
schema = {
'type': 'object',
'required': ['event_id', 'event_type', 'timestamp', 'payload'],
'properties': {
'payload': {
'type': 'object',
'required': ['order_id', 'customer_id', 'total_amount']
}
}
}
jsonschema.validate(event, schema) # ถ้า Schema ไม่ตรงจะ Fail
Debugging และ Observability
ระบบ Event-Driven ยากต่อการ Debug เพราะ Flow กระจายไปหลาย Services สิ่งที่จำเป็น:
- Correlation ID: ทุก Event ต้องมี Correlation ID เดียวกันตลอด Flow เพื่อ Trace ได้ว่า Request หนึ่งผ่านอะไรบ้าง
- Distributed Tracing: ใช้ OpenTelemetry + Jaeger/Zipkin เพื่อดู Flow ทั้งหมดข้าม Services
- Event Catalog: เอกสารรวบรวม Events ทั้งหมดในระบบ ใครเป็น Producer ใครเป็น Consumer
- Dead Letter Queue: Events ที่ประมวลผลไม่สำเร็จต้องไปอยู่ใน DLQ เพื่อตรวจสอบ
- Metrics: วัด Event processing latency, throughput, error rate, consumer lag
# Correlation ID propagation
import uuid
class EventContext:
def __init__(self, correlation_id=None):
self.correlation_id = correlation_id or str(uuid.uuid4())
self.causation_id = None # ID ของ Event ที่ trigger Event นี้
def create_child_event(self, event_type, payload):
"""สร้าง Event ลูกที่มี Correlation ID เดียวกัน"""
return {
'event_id': str(uuid.uuid4()),
'event_type': event_type,
'correlation_id': self.correlation_id, # เหมือนกันตลอด Flow
'causation_id': self.event_id, # ชี้ไปที่ Event แม่
'timestamp': datetime.utcnow().isoformat(),
'payload': payload
}
เมื่อไหร่ควรใช้ EDA vs Request-Response?
| ใช้ EDA เมื่อ | ใช้ Request-Response เมื่อ |
|---|---|
| ต้องการ Loose Coupling ระหว่าง Services | ต้องการ Response ทันที |
| Throughput สูง ต้องรับ Load มากๆ | Flow ง่ายๆ 2-3 Services |
| ต้องการ Audit Trail / Event History | ต้องการ Strong Consistency |
| หลาย Services ต้องรับรู้เหตุการณ์เดียวกัน | ทีมเล็ก ระบบไม่ซับซ้อน |
| ต้อง Scale แต่ละ Service แยกกัน | ต้องการ Simple debugging |
| รองรับ Eventual Consistency ได้ | ต้องการ Transactional guarantee |
สรุป
Event-Driven Architecture เป็นสถาปัตยกรรมที่ทรงพลังสำหรับระบบ Distributed Systems ในปี 2026 การเข้าใจ Patterns ต่างๆ อย่าง Event Sourcing, CQRS, Saga Pattern และ Outbox Pattern จะช่วยให้คุณออกแบบระบบที่ Scale ได้ มีความยืดหยุ่น และจัดการ Failure ได้ดี
สิ่งสำคัญคือต้องเลือกใช้ Pattern ที่เหมาะสมกับปัญหา ไม่ใช่ใช้ทุก Pattern ทุกที่ เริ่มจากสิ่งง่ายๆ แล้วค่อยเพิ่มความซับซ้อนเมื่อจำเป็น Event-Driven Architecture ไม่ได้เหมาะกับทุกระบบ แต่เมื่อเหมาะสมแล้ว มันจะเป็นหัวใจสำคัญที่ทำให้ระบบของคุณรองรับการเติบโตได้อย่างยั่งยืน
