Distributed Tracing CQRS Event Sourcing คืออะไร
Distributed Tracing คือเทคนิคติดตาม requests ที่เดินทางผ่านหลาย microservices เพื่อ debug และ monitor ระบบ CQRS (Command Query Responsibility Segregation) คือ pattern ที่แยก read model กับ write model ออกจากกัน Event Sourcing คือการเก็บ state changes เป็น events แทนการเก็บ current state การรวมสามแนวคิดนี้ช่วยสร้างระบบ distributed ที่ traceable, scalable และ auditable บทความนี้อธิบาย architecture, implementation และ observability สำหรับระบบ CQRS + Event Sourcing ด้วย distributed tracing
Architecture Overview
# architecture.py — CQRS + Event Sourcing + Tracing
import json
class Architecture:
COMPONENTS = {
"command_side": {
"name": "Command Side (Write)",
"description": "รับ commands → validate → produce events → store in event store",
"components": ["API Gateway", "Command Handler", "Aggregate", "Event Store"],
},
"query_side": {
"name": "Query Side (Read)",
"description": "Subscribe events → project เป็น read model → serve queries",
"components": ["Event Consumer", "Projector", "Read Database", "Query API"],
},
"event_store": {
"name": "Event Store",
"description": "เก็บ events ทั้งหมดตามลำดับเวลา — single source of truth",
"options": ["EventStoreDB", "Apache Kafka", "PostgreSQL (append-only)", "DynamoDB Streams"],
},
"tracing": {
"name": "Distributed Tracing",
"description": "ติดตาม trace ตั้งแต่ command → event → projection → query",
"tools": ["OpenTelemetry", "Jaeger", "Zipkin", "Grafana Tempo"],
},
}
FLOW = """
CQRS + Event Sourcing Flow:
[Client] → POST /orders (Command)
↓
[Command API] → CreateOrderCommand
↓ (trace: span-1)
[Command Handler] → validate + business logic
↓ (trace: span-2)
[Aggregate] → produce OrderCreatedEvent
↓ (trace: span-3)
[Event Store] → append event (Kafka/EventStoreDB)
↓ (trace: span-4, async)
[Event Consumer] → read event
↓ (trace: span-5)
[Projector] → update Read DB (PostgreSQL/Elasticsearch)
↓
[Client] → GET /orders/123 (Query)
↓ (trace: span-6)
[Query API] → read from Read DB → return response
"""
def show_components(self):
print("=== Architecture Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print()
def show_flow(self):
print("=== Request Flow ===")
print(self.FLOW[:500])
arch = Architecture()
arch.show_components()
arch.show_flow()
Event Sourcing Implementation
# event_sourcing.py — Event Sourcing implementation
import json
from datetime import datetime
class EventStore:
CODE = """
# event_store.py — Python Event Store implementation
import json
import uuid
from datetime import datetime
from dataclasses import dataclass, field, asdict
@dataclass
class Event:
event_id: str = field(default_factory=lambda: str(uuid.uuid4()))
event_type: str = ""
aggregate_id: str = ""
aggregate_type: str = ""
data: dict = field(default_factory=dict)
metadata: dict = field(default_factory=dict)
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
version: int = 0
class EventStore:
def __init__(self):
self.events = [] # In production: use Kafka/EventStoreDB
def append(self, event: Event):
event.version = len([e for e in self.events if e.aggregate_id == event.aggregate_id]) + 1
self.events.append(event)
return event
def get_events(self, aggregate_id):
return [e for e in self.events if e.aggregate_id == aggregate_id]
def get_all_events(self, event_type=None):
if event_type:
return [e for e in self.events if e.event_type == event_type]
return self.events
class Aggregate:
def __init__(self, aggregate_id):
self.id = aggregate_id
self.version = 0
self.changes = []
def apply_event(self, event):
handler = getattr(self, f"on_{event.event_type}", None)
if handler:
handler(event.data)
self.version += 1
def load_from_history(self, events):
for event in events:
self.apply_event(event)
class OrderAggregate(Aggregate):
def __init__(self, order_id):
super().__init__(order_id)
self.status = None
self.items = []
self.total = 0
def create_order(self, items, customer_id):
total = sum(i["price"] * i["qty"] for i in items)
event = Event(
event_type="OrderCreated",
aggregate_id=self.id,
aggregate_type="Order",
data={"items": items, "customer_id": customer_id, "total": total},
)
self.changes.append(event)
self.apply_event(event)
return event
def on_OrderCreated(self, data):
self.status = "created"
self.items = data["items"]
self.total = data["total"]
# Usage
store = EventStore()
order = OrderAggregate("order-001")
event = order.create_order(
items=[{"name": "Widget", "price": 100, "qty": 2}],
customer_id="cust-123",
)
store.append(event)
print(f"Order: {order.id} | Status: {order.status} | Total: {order.total}")
"""
def show_code(self):
print("=== Event Sourcing Implementation ===")
print(self.CODE[:600])
es = EventStore()
es.show_code()
Distributed Tracing with OpenTelemetry
# tracing.py — OpenTelemetry distributed tracing
import json
class DistributedTracing:
SETUP = """
# tracing_setup.py — OpenTelemetry setup for CQRS
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagators.b3 import B3MultiFormat
# Setup tracer
provider = TracerProvider()
jaeger = JaegerExporter(agent_host_name="jaeger", agent_port=6831)
provider.add_span_processor(BatchSpanProcessor(jaeger))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("cqrs-service")
# Propagate trace context across services
set_global_textmap(B3MultiFormat())
"""
COMMAND_TRACING = """
# command_tracing.py — Trace command handling
from opentelemetry import trace, context
from opentelemetry.trace import SpanKind
tracer = trace.get_tracer("command-service")
class TracedCommandHandler:
def __init__(self, event_store):
self.event_store = event_store
def handle_create_order(self, command):
with tracer.start_as_current_span(
"handle_create_order",
kind=SpanKind.SERVER,
attributes={
"command.type": "CreateOrder",
"order.customer_id": command["customer_id"],
},
) as span:
# Validate
with tracer.start_as_current_span("validate_command"):
self._validate(command)
# Create aggregate and events
with tracer.start_as_current_span("create_aggregate") as agg_span:
order = OrderAggregate(str(uuid.uuid4()))
event = order.create_order(command["items"], command["customer_id"])
agg_span.set_attribute("order.id", order.id)
agg_span.set_attribute("order.total", order.total)
# Store event (inject trace context into event metadata)
with tracer.start_as_current_span("store_event", kind=SpanKind.PRODUCER) as store_span:
# Inject trace context into event metadata for async propagation
ctx = {}
from opentelemetry.propagate import inject
inject(ctx)
event.metadata["trace_context"] = ctx
self.event_store.append(event)
store_span.set_attribute("event.type", event.event_type)
store_span.set_attribute("event.id", event.event_id)
span.set_attribute("result", "success")
return {"order_id": order.id, "status": "created"}
"""
EVENT_CONSUMER_TRACING = """
# consumer_tracing.py — Trace event consumption
tracer = trace.get_tracer("query-service")
class TracedEventConsumer:
def process_event(self, event):
# Extract trace context from event metadata
ctx = event.metadata.get("trace_context", {})
from opentelemetry.propagate import extract
parent_ctx = extract(ctx)
# Continue the trace from command service
with tracer.start_as_current_span(
"process_event",
context=parent_ctx,
kind=SpanKind.CONSUMER,
attributes={
"event.type": event.event_type,
"event.aggregate_id": event.aggregate_id,
},
):
with tracer.start_as_current_span("project_read_model"):
self._project(event)
"""
def show_setup(self):
print("=== OpenTelemetry Setup ===")
print(self.SETUP[:500])
def show_command(self):
print(f"\n=== Command Tracing ===")
print(self.COMMAND_TRACING[:500])
def show_consumer(self):
print(f"\n=== Event Consumer Tracing ===")
print(self.EVENT_CONSUMER_TRACING[:400])
tracing = DistributedTracing()
tracing.show_setup()
tracing.show_command()
CQRS Read Model Projection
# projection.py — Read model projection
import json
import random
class ReadModelProjection:
CODE = """
# projector.py — Event projector for read model
import json
class OrderProjector:
def __init__(self, read_db):
self.db = read_db
def project(self, event):
handler = getattr(self, f"on_{event.event_type}", None)
if handler:
handler(event)
def on_OrderCreated(self, event):
self.db.upsert("orders", {
"id": event.aggregate_id,
"customer_id": event.data["customer_id"],
"items": event.data["items"],
"total": event.data["total"],
"status": "created",
"created_at": event.timestamp,
"version": event.version,
})
def on_OrderPaid(self, event):
self.db.update("orders", event.aggregate_id, {
"status": "paid",
"paid_at": event.timestamp,
"payment_method": event.data.get("method"),
})
def on_OrderShipped(self, event):
self.db.update("orders", event.aggregate_id, {
"status": "shipped",
"shipped_at": event.timestamp,
"tracking": event.data.get("tracking_number"),
})
class QueryService:
def __init__(self, read_db):
self.db = read_db
def get_order(self, order_id):
return self.db.find("orders", order_id)
def get_orders_by_customer(self, customer_id):
return self.db.find_many("orders", {"customer_id": customer_id})
def get_order_summary(self):
orders = self.db.find_all("orders")
return {
"total_orders": len(orders),
"total_revenue": sum(o["total"] for o in orders),
"by_status": {s: len([o for o in orders if o["status"] == s])
for s in set(o["status"] for o in orders)},
}
"""
def show_code(self):
print("=== Read Model Projection ===")
print(self.CODE[:500])
def query_dashboard(self):
print(f"\n=== Order Query Dashboard ===")
statuses = {"created": random.randint(10, 30), "paid": random.randint(20, 50), "shipped": random.randint(30, 80), "delivered": random.randint(50, 100)}
total_revenue = random.randint(100000, 500000)
print(f" Total orders: {sum(statuses.values())}")
print(f" Total revenue: {total_revenue:,} บาท")
for status, count in statuses.items():
print(f" [{status}]: {count}")
proj = ReadModelProjection()
proj.show_code()
proj.query_dashboard()
Infrastructure & Monitoring
# infra.py — Infrastructure setup
import json
class Infrastructure:
DOCKER_COMPOSE = """
# docker-compose.yml — CQRS + Tracing infrastructure
version: '3.8'
services:
# Event Store (Kafka)
kafka:
image: confluentinc/cp-kafka:7.5.0
ports: ["9092:9092"]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
# Read Database
postgres:
image: postgres:16
environment:
POSTGRES_DB: read_model
POSTGRES_PASSWORD: password
ports: ["5432:5432"]
# Distributed Tracing
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # Jaeger UI
- "6831:6831/udp" # Jaeger agent
- "4318:4318" # OTLP HTTP
environment:
COLLECTOR_OTLP_ENABLED: "true"
# Command Service
command-service:
build: ./command-service
environment:
KAFKA_BROKER: kafka:9092
JAEGER_AGENT_HOST: jaeger
depends_on: [kafka, jaeger]
# Query Service
query-service:
build: ./query-service
environment:
DATABASE_URL: postgres://postgres:password@postgres:5432/read_model
KAFKA_BROKER: kafka:9092
JAEGER_AGENT_HOST: jaeger
depends_on: [postgres, kafka, jaeger]
"""
def show_compose(self):
print("=== Docker Compose ===")
print(self.DOCKER_COMPOSE[:600])
def monitoring_tips(self):
print(f"\n=== Monitoring Tips ===")
tips = [
"ใช้ trace_id เชื่อม command → event → projection → query",
"ตั้ง alert เมื่อ event processing lag > threshold (e.g., 5s)",
"Monitor read model consistency — compare event count vs projected count",
"Track span duration สำหรับแต่ละ stage (command, store, consume, project)",
"ใช้ Grafana dashboard รวม traces + metrics + logs",
]
for tip in tips:
print(f" • {tip}")
infra = Infrastructure()
infra.show_compose()
infra.monitoring_tips()
FAQ - คำถามที่พบบ่อย
Q: CQRS กับ Event Sourcing ต้องใช้ด้วยกันเสมอไหม?
A: ไม่จำเป็น — ใช้แยกกันได้ CQRS อย่างเดียว: แยก read/write model แต่ไม่เก็บ events Event Sourcing อย่างเดียว: เก็บ events แต่ใช้ model เดียว ใช้ด้วยกัน: ได้ประโยชน์สูงสุด — audit trail, temporal queries, scalability แนะนำ: เริ่มจาก CQRS ก่อน → เพิ่ม Event Sourcing เมื่อจำเป็น
Q: Distributed Tracing จำเป็นไหม?
A: สำหรับ CQRS + Event Sourcing จำเป็นมาก เพราะ: 1) Request ผ่านหลาย services (command → event store → consumer → read DB) 2) Async processing ทำให้ debug ยาก ถ้าไม่มี trace 3) ต้อง correlate command กับ event กับ projection Tools: OpenTelemetry (standard) + Jaeger หรือ Grafana Tempo
Q: Event Store ใช้อะไรดี?
A: EventStoreDB: purpose-built, subscriptions, projections built-in Apache Kafka: scalable, ecosystem ใหญ่, ใช้เป็น event store + message broker PostgreSQL: ง่าย, append-only table, เหมาะเริ่มต้น เลือกตาม scale: เล็ก → PostgreSQL, กลาง → EventStoreDB, ใหญ่ → Kafka
Q: Event Sourcing มีข้อเสียอะไร?
A: 1) Complexity สูง — เรียนรู้ยากกว่า CRUD 2) Event schema evolution — เปลี่ยน event format ต้องระวัง 3) Read model lag — eventual consistency ไม่ใช่ strong consistency 4) Storage — events สะสมมากขึ้นเรื่อยๆ (ต้อง snapshot) 5) Debugging — ต้องมี tooling ดี (tracing, event replay)
