Technology

Distributed Tracing Real-time Processing

distributed tracing real time processing
Distributed Tracing Real-time Processing | SiamCafe Blog
2025-07-14· อ. บอม — SiamCafe.net· 1,924 คำ

Distributed Tracing คืออะไรและทำไมต้องใช้กับระบบ Real-time

Distributed Tracing เป็นเทคนิคการติดตาม request ที่วิ่งผ่านหลาย service ในระบบ microservices โดยแต่ละ request จะถูกกำหนด trace ID เพื่อให้สามารถติดตามได้ตลอดเส้นทาง ตั้งแต่ต้นทางจนถึงปลายทาง เมื่อนำมาใช้กับระบบ real-time processing จะช่วยให้เห็นว่า request แต่ละตัวใช้เวลาเท่าไหร่ในแต่ละ service ติดคอขวดตรงไหน และ error เกิดที่จุดใด

ในระบบที่ต้อง process ข้อมูลแบบ real-time เช่น payment gateway, live streaming analytics หรือ IoT sensor data pipeline ถ้าไม่มี tracing จะแก้ปัญหา latency spike ได้ยากมาก เพราะ request วิ่งผ่านหลายสิบ service แต่ละตัวมี dependency ซ้อนกันหลายชั้น

เครื่องมือหลักสำหรับ Distributed Tracing

เครื่องมือที่นิยมใช้ในปัจจุบันมี 3 ตัวหลัก:

ติดตั้ง Jaeger ด้วย Docker Compose

# docker-compose.yml สำหรับ Jaeger all-in-one
version: '3.8'
services:
  jaeger:
    image: jaegertracing/all-in-one:1.54
    ports:
      - "16686:16686"   # Jaeger UI
      - "4317:4317"     # OTLP gRPC
      - "4318:4318"     # OTLP HTTP
      - "14250:14250"   # gRPC model
      - "6831:6831/udp" # Thrift compact
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      - SPAN_STORAGE_TYPE=badger
      - BADGER_EPHEMERAL=false
      - BADGER_DIRECTORY_VALUE=/badger/data
      - BADGER_DIRECTORY_KEY=/badger/key
    volumes:
      - jaeger_data:/badger

volumes:
  jaeger_data:
# สั่งรัน
docker compose up -d

# ตรวจสอบสถานะ
docker compose ps
# NAME      IMAGE                            STATUS
# jaeger    jaegertracing/all-in-one:1.54    Up 2 minutes

# เปิด Jaeger UI
# http://localhost:16686

ติดตั้ง OpenTelemetry SDK ในแอปพลิเคชัน

OpenTelemetry (OTel) เป็นมาตรฐานกลางสำหรับ observability ที่รวม tracing, metrics และ logs เข้าด้วยกัน ตัวอย่างนี้ใช้ Python แต่หลักการเดียวกันใช้ได้กับทุกภาษา

# ติดตั้ง OpenTelemetry packages
pip install opentelemetry-api \
    opentelemetry-sdk \
    opentelemetry-exporter-otlp \
    opentelemetry-instrumentation-fastapi \
    opentelemetry-instrumentation-httpx \
    opentelemetry-instrumentation-sqlalchemy
# tracing_config.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource

def setup_tracing(service_name: str):
    resource = Resource.create({
        "service.name": service_name,
        "service.version": "1.0.0",
        "deployment.environment": "production",
    })

    provider = TracerProvider(resource=resource)

    # ส่ง trace ไปยัง Jaeger ผ่าน OTLP gRPC
    otlp_exporter = OTLPSpanExporter(
        endpoint="http://jaeger:4317",
        insecure=True,
    )

    # ใช้ BatchSpanProcessor เพื่อ performance ที่ดี
    provider.add_span_processor(
        BatchSpanProcessor(
            otlp_exporter,
            max_queue_size=2048,
            max_export_batch_size=512,
            schedule_delay_millis=5000,
        )
    )

    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)

ใช้งานกับ FastAPI

# main.py
from fastapi import FastAPI
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from tracing_config import setup_tracing
import httpx

tracer = setup_tracing("payment-service")
app = FastAPI()
FastAPIInstrumentor.instrument_app(app)

@app.post("/api/payment/process")
async def process_payment(order_id: str, amount: float):
    with tracer.start_as_current_span("validate_payment") as span:
        span.set_attribute("order.id", order_id)
        span.set_attribute("payment.amount", amount)

        # เรียก fraud detection service
        async with httpx.AsyncClient() as client:
            with tracer.start_as_current_span("call_fraud_check"):
                resp = await client.post(
                    "http://fraud-service:8001/check",
                    json={"order_id": order_id, "amount": amount}
                )
                fraud_result = resp.json()

        span.set_attribute("fraud.score", fraud_result["score"])

        if fraud_result["score"] > 0.8:
            span.set_status(trace.StatusCode.ERROR, "High fraud score")
            return {"status": "rejected", "reason": "fraud_detected"}

        # เรียก billing service
        with tracer.start_as_current_span("call_billing"):
            async with httpx.AsyncClient() as client:
                billing_resp = await client.post(
                    "http://billing-service:8002/charge",
                    json={"order_id": order_id, "amount": amount}
                )

        return {"status": "success", "transaction_id": billing_resp.json()["txn_id"]}

การตั้งค่า Sampling Strategy สำหรับ Real-time Processing

ระบบ real-time ที่มี throughput สูง (หลายหมื่น request/วินาที) ไม่ควร trace ทุก request เพราะจะทำให้ storage บวมและ overhead สูง ต้องเลือก sampling strategy ที่เหมาะสม

Strategyเหมาะกับข้อดีข้อเสีย
Head-based (probability)traffic สม่ำเสมอง่าย, overhead ต่ำอาจพลาด error trace
Tail-basedต้องการจับ error ทุกตัวจับ anomaly ได้ดีใช้ memory สูง
Rate limitingtraffic ไม่สม่ำเสมอควบคุม cost ได้อาจพลาดช่วง spike
# ตั้งค่า tail-based sampling ใน OpenTelemetry Collector
# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  tail_sampling:
    decision_wait: 10s
    num_traces: 100000
    expected_new_traces_per_sec: 10000
    policies:
      # เก็บ trace ที่มี error ทุกตัว
      - name: errors-policy
        type: status_code
        status_code:
          status_codes:
            - ERROR
      # เก็บ trace ที่ latency สูงกว่า 500ms
      - name: latency-policy
        type: latency
        latency:
          threshold_ms: 500
      # สุ่มเก็บ 10% ของ trace ปกติ
      - name: probabilistic-policy
        type: probabilistic
        probabilistic:
          sampling_percentage: 10

  batch:
    timeout: 5s
    send_batch_size: 1024

exporters:
  otlp/jaeger:
    endpoint: jaeger:4317
    tls:
      insecure: true

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [tail_sampling, batch]
      exporters: [otlp/jaeger]
# Docker Compose สำหรับ OTel Collector
# เพิ่มใน docker-compose.yml
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.96.0
    command: ["--config=/etc/otel-collector-config.yaml"]
    volumes:
      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
    ports:
      - "4317:4317"
      - "4318:4318"
      - "8889:8889"  # Prometheus metrics
    depends_on:
      - jaeger

Real-time Processing Pipeline พร้อม Tracing

ตัวอย่างการสร้าง real-time data pipeline ที่มี distributed tracing ครบวงจร ใช้ Kafka เป็น message broker

# kafka-consumer พร้อม tracing
# consumer.py
from confluent_kafka import Consumer
from opentelemetry import trace, context
from opentelemetry.trace.propagation import TraceContextTextMapPropagator
from tracing_config import setup_tracing
import json

tracer = setup_tracing("stream-processor")
propagator = TraceContextTextMapPropagator()

consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'realtime-processor',
    'auto.offset.reset': 'latest',
    'enable.auto.commit': False,
})
consumer.subscribe(['events.raw'])

def process_message(msg):
    # ดึง trace context จาก Kafka header
    headers = {h[0]: h[1].decode() for h in (msg.headers() or [])}
    ctx = propagator.extract(carrier=headers)

    with tracer.start_as_current_span(
        "process_event",
        context=ctx,
        kind=trace.SpanKind.CONSUMER
    ) as span:
        payload = json.loads(msg.value())
        span.set_attribute("event.type", payload.get("type", "unknown"))
        span.set_attribute("kafka.topic", msg.topic())
        span.set_attribute("kafka.partition", msg.partition())
        span.set_attribute("kafka.offset", msg.offset())

        # Transform data
        with tracer.start_as_current_span("transform"):
            result = transform_event(payload)

        # เขียนลง database
        with tracer.start_as_current_span("write_to_db"):
            write_to_timescaledb(result)

        # ส่งต่อไปยัง downstream
        with tracer.start_as_current_span("publish_processed"):
            publish_to_kafka("events.processed", result)

    consumer.commit(msg)

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print(f"Consumer error: {msg.error()}")
        continue
    process_message(msg)

Monitoring Trace Data ด้วย Grafana

การดู trace อย่างเดียวไม่พอ ต้องสร้าง dashboard ที่แสดง RED metrics (Rate, Error, Duration) จาก trace data เพื่อ monitor ระบบ real-time processing

# Grafana datasource สำหรับ Tempo
# grafana-datasources.yaml
apiVersion: 1
datasources:
  - name: Tempo
    type: tempo
    access: proxy
    url: http://tempo:3200
    jsonData:
      httpMethod: GET
      tracesToLogs:
        datasourceUid: loki
        tags: ['service.name']
        mappedTags: [{ key: 'service.name', value: 'service' }]
        mapTagNamesEnabled: true
        filterByTraceID: true
      tracesToMetrics:
        datasourceUid: prometheus
        tags: [{ key: 'service.name', value: 'service' }]
        queries:
          - name: 'Request rate'
            query: 'sum(rate(traces_spanmetrics_calls_total{$$__tags}[5m]))'
          - name: 'Error rate'
            query: 'sum(rate(traces_spanmetrics_calls_total{$$__tags, status_code="STATUS_CODE_ERROR"}[5m]))'
      serviceMap:
        datasourceUid: prometheus
  - name: Loki
    type: loki
    uid: loki
    access: proxy
    url: http://loki:3100
# PromQL queries สำหรับ dashboard
# P99 latency ของแต่ละ service
histogram_quantile(0.99,
  sum(rate(traces_spanmetrics_duration_seconds_bucket[5m])) by (le, service)
)

# Error rate percentage
sum(rate(traces_spanmetrics_calls_total{status_code="STATUS_CODE_ERROR"}[5m])) by (service)
/
sum(rate(traces_spanmetrics_calls_total[5m])) by (service)
* 100

# Throughput per service
sum(rate(traces_spanmetrics_calls_total[5m])) by (service)

Alerting เมื่อ Latency พุ่งสูงในระบบ Real-time

สำหรับระบบ real-time ต้องตั้ง alert ที่ไวพอจะจับ latency spike ก่อนที่จะกระทบ user

# alerting-rules.yaml สำหรับ Prometheus/Grafana
groups:
  - name: tracing_alerts
    rules:
      - alert: HighP99Latency
        expr: |
          histogram_quantile(0.99,
            sum(rate(traces_spanmetrics_duration_seconds_bucket{service="payment-service"}[5m])) by (le)
          ) > 2.0
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Payment service P99 latency > 2s"
          description: "P99 latency is {{ $value | humanizeDuration }} for payment-service"

      - alert: HighErrorRate
        expr: |
          sum(rate(traces_spanmetrics_calls_total{status_code="STATUS_CODE_ERROR", service="stream-processor"}[5m]))
          /
          sum(rate(traces_spanmetrics_calls_total{service="stream-processor"}[5m]))
          > 0.05
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "Stream processor error rate > 5%"

      - alert: TraceIngestionLag
        expr: |
          kafka_consumer_group_lag{group="realtime-processor"} > 10000
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "Kafka consumer lag > 10k messages"

Performance Tuning สำหรับ High-throughput Tracing

เมื่อระบบ real-time มี throughput หลายหมื่น events/วินาที ต้อง tune ทั้ง tracing SDK และ collector เพื่อไม่ให้เป็น bottleneck

# ปรับ BatchSpanProcessor ใน Python SDK
from opentelemetry.sdk.trace.export import BatchSpanProcessor

processor = BatchSpanProcessor(
    exporter,
    max_queue_size=8192,          # เพิ่มจาก default 2048
    max_export_batch_size=1024,    # ส่งทีละ batch ใหญ่ขึ้น
    schedule_delay_millis=2000,    # ลด delay เพื่อส่งเร็วขึ้น
    export_timeout_millis=30000,   # timeout 30 วินาที
)
# ปรับ OTel Collector สำหรับ high throughput
# otel-collector-config.yaml
receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
        max_recv_msg_size_mib: 16  # รับ message ใหญ่ขึ้น

processors:
  memory_limiter:
    check_interval: 1s
    limit_mib: 4096               # จำกัด memory ที่ 4GB
    spike_limit_mib: 512

  batch:
    timeout: 2s
    send_batch_size: 4096          # batch ใหญ่ขึ้น
    send_batch_max_size: 8192

extensions:
  zpages:
    endpoint: 0.0.0.0:55679       # debug endpoint

service:
  extensions: [zpages]
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, tail_sampling, batch]
      exporters: [otlp/jaeger]

FAQ — คำถามที่พบบ่อย

Q: Distributed Tracing ทำให้แอปช้าลงไหม?

A: ถ้าตั้งค่าถูก overhead จะต่ำมาก ประมาณ 1-3% ของ latency ที่เพิ่มขึ้น สิ่งสำคัญคือใช้ BatchSpanProcessor แทน SimpleSpanProcessor และตั้ง sampling rate ที่เหมาะสม อย่า trace ทุก request ในระบบ production ที่มี traffic สูง

Q: ควรเลือก Jaeger หรือ Zipkin?

A: ถ้าเริ่มใหม่แนะนำ Jaeger เพราะรองรับ OpenTelemetry native และเป็น CNCF graduated project มี community ใหญ่กว่า ถ้าใช้ Spring Cloud อยู่แล้วและมี Zipkin อยู่ก็ไม่จำเป็นต้องย้าย เพราะทั้งคู่รับ OTLP ได้เหมือนกัน

Q: ใช้ Grafana Tempo แทน Jaeger ได้ไหม?

A: ได้ Tempo เหมาะกับ environment ที่ใช้ Grafana stack อยู่แล้ว (Prometheus + Loki + Grafana) ข้อดีของ Tempo คือใช้ object storage (S3, GCS) เป็น backend ทำให้ cost ต่ำกว่า Jaeger ที่ต้องใช้ Elasticsearch หรือ Cassandra ในระดับ production

Q: Trace data ควรเก็บนานแค่ไหน?

A: ขึ้นอยู่กับ use case แต่โดยทั่วไป 7-14 วันเพียงพอสำหรับ troubleshooting ถ้าต้องการเก็บนานกว่านั้นเพื่อ capacity planning ให้ export เป็น aggregated metrics แทนการเก็บ raw trace

Q: ต้องการ trace Kafka consumer อย่างไร?

A: ต้อง propagate trace context ผ่าน Kafka headers ตอน produce ให้ inject context เข้า header ตอน consume ให้ extract กลับมา ตัวอย่าง code อยู่ในหัวข้อ Real-time Processing Pipeline ด้านบน ถ้าใช้ Spring Kafka จะมี auto-instrumentation ทำให้อัตโนมัติ

📖 บทความที่เกี่ยวข้อง

ONNX Runtime Real-time Processingอ่านบทความ → AlmaLinux Setup Real-time Processingอ่านบทความ → Distributed Tracing Batch Processing Pipelineอ่านบทความ → PostgreSQL Full Text Search Real-time Processingอ่านบทความ → Proxmox VE Cluster Real-time Processingอ่านบทความ →

📚 ดูบทความทั้งหมด →