Distributed Tracing คืออะไรและทำไมต้องใช้กับระบบ Real-time
Distributed Tracing เป็นเทคนิคการติดตาม request ที่วิ่งผ่านหลาย service ในระบบ microservices โดยแต่ละ request จะถูกกำหนด trace ID เพื่อให้สามารถติดตามได้ตลอดเส้นทาง ตั้งแต่ต้นทางจนถึงปลายทาง เมื่อนำมาใช้กับระบบ real-time processing จะช่วยให้เห็นว่า request แต่ละตัวใช้เวลาเท่าไหร่ในแต่ละ service ติดคอขวดตรงไหน และ error เกิดที่จุดใด
ในระบบที่ต้อง process ข้อมูลแบบ real-time เช่น payment gateway, live streaming analytics หรือ IoT sensor data pipeline ถ้าไม่มี tracing จะแก้ปัญหา latency spike ได้ยากมาก เพราะ request วิ่งผ่านหลายสิบ service แต่ละตัวมี dependency ซ้อนกันหลายชั้น
เครื่องมือหลักสำหรับ Distributed Tracing
เครื่องมือที่นิยมใช้ในปัจจุบันมี 3 ตัวหลัก:
- Jaeger — พัฒนาโดย Uber เปิดเป็น open source ภายใต้ CNCF รองรับ OpenTelemetry native
- Zipkin — โปรเจค open source เก่าแก่จาก Twitter เหมาะกับระบบที่ใช้ Java/Spring เป็นหลัก
- Grafana Tempo — backend storage สำหรับ trace ที่ทำงานร่วมกับ Grafana ได้สมบูรณ์ ใช้ object storage เป็น backend ทำให้ cost ต่ำ
ติดตั้ง Jaeger ด้วย Docker Compose
# docker-compose.yml สำหรับ Jaeger all-in-one
version: '3.8'
services:
jaeger:
image: jaegertracing/all-in-one:1.54
ports:
- "16686:16686" # Jaeger UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "14250:14250" # gRPC model
- "6831:6831/udp" # Thrift compact
environment:
- COLLECTOR_OTLP_ENABLED=true
- SPAN_STORAGE_TYPE=badger
- BADGER_EPHEMERAL=false
- BADGER_DIRECTORY_VALUE=/badger/data
- BADGER_DIRECTORY_KEY=/badger/key
volumes:
- jaeger_data:/badger
volumes:
jaeger_data:
# สั่งรัน
docker compose up -d
# ตรวจสอบสถานะ
docker compose ps
# NAME IMAGE STATUS
# jaeger jaegertracing/all-in-one:1.54 Up 2 minutes
# เปิด Jaeger UI
# http://localhost:16686
ติดตั้ง OpenTelemetry SDK ในแอปพลิเคชัน
OpenTelemetry (OTel) เป็นมาตรฐานกลางสำหรับ observability ที่รวม tracing, metrics และ logs เข้าด้วยกัน ตัวอย่างนี้ใช้ Python แต่หลักการเดียวกันใช้ได้กับทุกภาษา
# ติดตั้ง OpenTelemetry packages
pip install opentelemetry-api \
opentelemetry-sdk \
opentelemetry-exporter-otlp \
opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-httpx \
opentelemetry-instrumentation-sqlalchemy
# tracing_config.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
def setup_tracing(service_name: str):
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": "production",
})
provider = TracerProvider(resource=resource)
# ส่ง trace ไปยัง Jaeger ผ่าน OTLP gRPC
otlp_exporter = OTLPSpanExporter(
endpoint="http://jaeger:4317",
insecure=True,
)
# ใช้ BatchSpanProcessor เพื่อ performance ที่ดี
provider.add_span_processor(
BatchSpanProcessor(
otlp_exporter,
max_queue_size=2048,
max_export_batch_size=512,
schedule_delay_millis=5000,
)
)
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
ใช้งานกับ FastAPI
# main.py
from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from tracing_config import setup_tracing
import httpx
tracer = setup_tracing("payment-service")
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)
@app.post("/api/payment/process")
async def process_payment(order_id: str, amount: float):
with tracer.start_as_current_span("validate_payment") as span:
span.set_attribute("order.id", order_id)
span.set_attribute("payment.amount", amount)
# เรียก fraud detection service
async with httpx.AsyncClient() as client:
with tracer.start_as_current_span("call_fraud_check"):
resp = await client.post(
"http://fraud-service:8001/check",
json={"order_id": order_id, "amount": amount}
)
fraud_result = resp.json()
span.set_attribute("fraud.score", fraud_result["score"])
if fraud_result["score"] > 0.8:
span.set_status(trace.StatusCode.ERROR, "High fraud score")
return {"status": "rejected", "reason": "fraud_detected"}
# เรียก billing service
with tracer.start_as_current_span("call_billing"):
async with httpx.AsyncClient() as client:
billing_resp = await client.post(
"http://billing-service:8002/charge",
json={"order_id": order_id, "amount": amount}
)
return {"status": "success", "transaction_id": billing_resp.json()["txn_id"]}
การตั้งค่า Sampling Strategy สำหรับ Real-time Processing
ระบบ real-time ที่มี throughput สูง (หลายหมื่น request/วินาที) ไม่ควร trace ทุก request เพราะจะทำให้ storage บวมและ overhead สูง ต้องเลือก sampling strategy ที่เหมาะสม
| Strategy | เหมาะกับ | ข้อดี | ข้อเสีย |
|---|---|---|---|
| Head-based (probability) | traffic สม่ำเสมอ | ง่าย, overhead ต่ำ | อาจพลาด error trace |
| Tail-based | ต้องการจับ error ทุกตัว | จับ anomaly ได้ดี | ใช้ memory สูง |
| Rate limiting | traffic ไม่สม่ำเสมอ | ควบคุม cost ได้ | อาจพลาดช่วง spike |
# ตั้งค่า tail-based sampling ใน OpenTelemetry Collector
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
tail_sampling:
decision_wait: 10s
num_traces: 100000
expected_new_traces_per_sec: 10000
policies:
# เก็บ trace ที่มี error ทุกตัว
- name: errors-policy
type: status_code
status_code:
status_codes:
- ERROR
# เก็บ trace ที่ latency สูงกว่า 500ms
- name: latency-policy
type: latency
latency:
threshold_ms: 500
# สุ่มเก็บ 10% ของ trace ปกติ
- name: probabilistic-policy
type: probabilistic
probabilistic:
sampling_percentage: 10
batch:
timeout: 5s
send_batch_size: 1024
exporters:
otlp/jaeger:
endpoint: jaeger:4317
tls:
insecure: true
service:
pipelines:
traces:
receivers: [otlp]
processors: [tail_sampling, batch]
exporters: [otlp/jaeger]
# Docker Compose สำหรับ OTel Collector
# เพิ่มใน docker-compose.yml
otel-collector:
image: otel/opentelemetry-collector-contrib:0.96.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317"
- "4318:4318"
- "8889:8889" # Prometheus metrics
depends_on:
- jaeger
Real-time Processing Pipeline พร้อม Tracing
ตัวอย่างการสร้าง real-time data pipeline ที่มี distributed tracing ครบวงจร ใช้ Kafka เป็น message broker
# kafka-consumer พร้อม tracing
# consumer.py
from confluent_kafka import Consumer
from opentelemetry import trace, context
from opentelemetry.trace.propagation import TraceContextTextMapPropagator
from tracing_config import setup_tracing
import json
tracer = setup_tracing("stream-processor")
propagator = TraceContextTextMapPropagator()
consumer = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'realtime-processor',
'auto.offset.reset': 'latest',
'enable.auto.commit': False,
})
consumer.subscribe(['events.raw'])
def process_message(msg):
# ดึง trace context จาก Kafka header
headers = {h[0]: h[1].decode() for h in (msg.headers() or [])}
ctx = propagator.extract(carrier=headers)
with tracer.start_as_current_span(
"process_event",
context=ctx,
kind=trace.SpanKind.CONSUMER
) as span:
payload = json.loads(msg.value())
span.set_attribute("event.type", payload.get("type", "unknown"))
span.set_attribute("kafka.topic", msg.topic())
span.set_attribute("kafka.partition", msg.partition())
span.set_attribute("kafka.offset", msg.offset())
# Transform data
with tracer.start_as_current_span("transform"):
result = transform_event(payload)
# เขียนลง database
with tracer.start_as_current_span("write_to_db"):
write_to_timescaledb(result)
# ส่งต่อไปยัง downstream
with tracer.start_as_current_span("publish_processed"):
publish_to_kafka("events.processed", result)
consumer.commit(msg)
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
process_message(msg)
Monitoring Trace Data ด้วย Grafana
การดู trace อย่างเดียวไม่พอ ต้องสร้าง dashboard ที่แสดง RED metrics (Rate, Error, Duration) จาก trace data เพื่อ monitor ระบบ real-time processing
# Grafana datasource สำหรับ Tempo
# grafana-datasources.yaml
apiVersion: 1
datasources:
- name: Tempo
type: tempo
access: proxy
url: http://tempo:3200
jsonData:
httpMethod: GET
tracesToLogs:
datasourceUid: loki
tags: ['service.name']
mappedTags: [{ key: 'service.name', value: 'service' }]
mapTagNamesEnabled: true
filterByTraceID: true
tracesToMetrics:
datasourceUid: prometheus
tags: [{ key: 'service.name', value: 'service' }]
queries:
- name: 'Request rate'
query: 'sum(rate(traces_spanmetrics_calls_total{$$__tags}[5m]))'
- name: 'Error rate'
query: 'sum(rate(traces_spanmetrics_calls_total{$$__tags, status_code="STATUS_CODE_ERROR"}[5m]))'
serviceMap:
datasourceUid: prometheus
- name: Loki
type: loki
uid: loki
access: proxy
url: http://loki:3100
# PromQL queries สำหรับ dashboard
# P99 latency ของแต่ละ service
histogram_quantile(0.99,
sum(rate(traces_spanmetrics_duration_seconds_bucket[5m])) by (le, service)
)
# Error rate percentage
sum(rate(traces_spanmetrics_calls_total{status_code="STATUS_CODE_ERROR"}[5m])) by (service)
/
sum(rate(traces_spanmetrics_calls_total[5m])) by (service)
* 100
# Throughput per service
sum(rate(traces_spanmetrics_calls_total[5m])) by (service)
Alerting เมื่อ Latency พุ่งสูงในระบบ Real-time
สำหรับระบบ real-time ต้องตั้ง alert ที่ไวพอจะจับ latency spike ก่อนที่จะกระทบ user
# alerting-rules.yaml สำหรับ Prometheus/Grafana
groups:
- name: tracing_alerts
rules:
- alert: HighP99Latency
expr: |
histogram_quantile(0.99,
sum(rate(traces_spanmetrics_duration_seconds_bucket{service="payment-service"}[5m])) by (le)
) > 2.0
for: 2m
labels:
severity: critical
annotations:
summary: "Payment service P99 latency > 2s"
description: "P99 latency is {{ $value | humanizeDuration }} for payment-service"
- alert: HighErrorRate
expr: |
sum(rate(traces_spanmetrics_calls_total{status_code="STATUS_CODE_ERROR", service="stream-processor"}[5m]))
/
sum(rate(traces_spanmetrics_calls_total{service="stream-processor"}[5m]))
> 0.05
for: 1m
labels:
severity: warning
annotations:
summary: "Stream processor error rate > 5%"
- alert: TraceIngestionLag
expr: |
kafka_consumer_group_lag{group="realtime-processor"} > 10000
for: 3m
labels:
severity: critical
annotations:
summary: "Kafka consumer lag > 10k messages"
Performance Tuning สำหรับ High-throughput Tracing
เมื่อระบบ real-time มี throughput หลายหมื่น events/วินาที ต้อง tune ทั้ง tracing SDK และ collector เพื่อไม่ให้เป็น bottleneck
# ปรับ BatchSpanProcessor ใน Python SDK
from opentelemetry.sdk.trace.export import BatchSpanProcessor
processor = BatchSpanProcessor(
exporter,
max_queue_size=8192, # เพิ่มจาก default 2048
max_export_batch_size=1024, # ส่งทีละ batch ใหญ่ขึ้น
schedule_delay_millis=2000, # ลด delay เพื่อส่งเร็วขึ้น
export_timeout_millis=30000, # timeout 30 วินาที
)
# ปรับ OTel Collector สำหรับ high throughput
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
max_recv_msg_size_mib: 16 # รับ message ใหญ่ขึ้น
processors:
memory_limiter:
check_interval: 1s
limit_mib: 4096 # จำกัด memory ที่ 4GB
spike_limit_mib: 512
batch:
timeout: 2s
send_batch_size: 4096 # batch ใหญ่ขึ้น
send_batch_max_size: 8192
extensions:
zpages:
endpoint: 0.0.0.0:55679 # debug endpoint
service:
extensions: [zpages]
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, tail_sampling, batch]
exporters: [otlp/jaeger]
FAQ — คำถามที่พบบ่อย
Q: Distributed Tracing ทำให้แอปช้าลงไหม?
A: ถ้าตั้งค่าถูก overhead จะต่ำมาก ประมาณ 1-3% ของ latency ที่เพิ่มขึ้น สิ่งสำคัญคือใช้ BatchSpanProcessor แทน SimpleSpanProcessor และตั้ง sampling rate ที่เหมาะสม อย่า trace ทุก request ในระบบ production ที่มี traffic สูง
Q: ควรเลือก Jaeger หรือ Zipkin?
A: ถ้าเริ่มใหม่แนะนำ Jaeger เพราะรองรับ OpenTelemetry native และเป็น CNCF graduated project มี community ใหญ่กว่า ถ้าใช้ Spring Cloud อยู่แล้วและมี Zipkin อยู่ก็ไม่จำเป็นต้องย้าย เพราะทั้งคู่รับ OTLP ได้เหมือนกัน
Q: ใช้ Grafana Tempo แทน Jaeger ได้ไหม?
A: ได้ Tempo เหมาะกับ environment ที่ใช้ Grafana stack อยู่แล้ว (Prometheus + Loki + Grafana) ข้อดีของ Tempo คือใช้ object storage (S3, GCS) เป็น backend ทำให้ cost ต่ำกว่า Jaeger ที่ต้องใช้ Elasticsearch หรือ Cassandra ในระดับ production
Q: Trace data ควรเก็บนานแค่ไหน?
A: ขึ้นอยู่กับ use case แต่โดยทั่วไป 7-14 วันเพียงพอสำหรับ troubleshooting ถ้าต้องการเก็บนานกว่านั้นเพื่อ capacity planning ให้ export เป็น aggregated metrics แทนการเก็บ raw trace
Q: ต้องการ trace Kafka consumer อย่างไร?
A: ต้อง propagate trace context ผ่าน Kafka headers ตอน produce ให้ inject context เข้า header ตอน consume ให้ extract กลับมา ตัวอย่าง code อยู่ในหัวข้อ Real-time Processing Pipeline ด้านบน ถ้าใช้ Spring Kafka จะมี auto-instrumentation ทำให้อัตโนมัติ
