ai

Distributed Tracing Real-time Processing

Distributed Tracing Real-time Processing

Distributed Tracing คืออะไรและทำไมต้องใช้กับระบบ Real-time

Distributed Tracing Real-time Processing

Distributed Tracing เป็นเทคนิคการติดตาม request ที่วิ่งผ่านหลาย service ในระบบ microservices โดยแต่ละ request จะถูกกำหนด trace ID เพื่อให้สามารถติดตามได้ตลอดเส้นทาง ตั้งแต่ต้นทางจนถึงปลายทาง เมื่อนำมาใช้กับระบบ real-time processing จะช่วยให้เห็นว่า request แต่ละตัวใช้เวลาเท่าไหร่ในแต่ละ service ติดคอขวดตรงไหน และ error เกิดที่จุดใด

ในระบบที่ต้อง process ข้อมูลแบบ real-time เช่น payment gateway, live streaming analytics หรือ IoT sensor data pipeline ถ้าไม่มี tracing จะแก้ปัญหา latency spike ได้ยากมาก เพราะ request วิ่งผ่านหลายสิบ service แต่ละตัวมี dependency ซ้อนกันหลายชั้น

เครื่องมือหลักสำหรับ Distributed Tracing

เครื่องมือที่นิยมใช้ในปัจจุบันมี 3 ตัวหลัก:

  • Jaeger — พัฒนาโดย Uber เปิดเป็น open source ภายใต้ CNCF รองรับ OpenTelemetry native
  • Zipkin — โปรเจค open source เก่าแก่จาก Twitter เหมาะกับระบบที่ใช้ Java/Spring เป็นหลัก
  • Grafana Tempo — backend storage สำหรับ trace ที่ทำงานร่วมกับ Grafana ได้สมบูรณ์ ใช้ object storage เป็น backend ทำให้ cost ต่ำ

ติดตั้ง Jaeger ด้วย Docker Compose

# docker-compose.yml สำหรับ Jaeger all-in-one

version: '3.8'

services:

  jaeger:

    image: jaegertracing/all-in-one:1.54

    ports:

      - "16686:16686"   # Jaeger UI

      - "4317:4317"     # OTLP gRPC

      - "4318:4318"     # OTLP HTTP

      - "14250:14250"   # gRPC model

      - "6831:6831/udp" # Thrift compact

    environment:

      - COLLECTOR_OTLP_ENABLED=true

      - SPAN_STORAGE_TYPE=badger

      - BADGER_EPHEMERAL=false

      - BADGER_DIRECTORY_VALUE=/badger/data

      - BADGER_DIRECTORY_KEY=/badger/key

    volumes:

      - jaeger_data:/badger



volumes:

  jaeger_data:
# สั่งรัน

docker compose up -d



# ตรวจสอบสถานะ

docker compose ps

# NAME      IMAGE                            STATUS

# jaeger    jaegertracing/all-in-one:1.54    Up 2 minutes



# เปิด Jaeger UI

# http://localhost:16686

ติดตั้ง OpenTelemetry SDK ในแอปพลิเคชัน

OpenTelemetry (OTel) เป็นมาตรฐานกลางสำหรับ observability ที่รวม tracing, metrics และ logs เข้าด้วยกัน ตัวอย่างนี้ใช้ Python แต่หลักการเดียวกันใช้ได้กับทุกภาษา

# ติดตั้ง OpenTelemetry packages

pip install opentelemetry-api \

    opentelemetry-sdk \

    opentelemetry-exporter-otlp \

    opentelemetry-instrumentation-fastapi \

    opentelemetry-instrumentation-httpx \

    opentelemetry-instrumentation-sqlalchemy
# tracing_config.py

from opentelemetry import trace

from opentelemetry.sdk.trace import TracerProvider

from opentelemetry.sdk.trace.export import BatchSpanProcessor

from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

from opentelemetry.sdk.resources import Resource



def setup_tracing(service_name: str):

    resource = Resource.create({

        "service.name": service_name,

        "service.version": "1.0.0",

        "deployment.environment": "production",

    })



    provider = TracerProvider(resource=resource)



    # ส่ง trace ไปยัง Jaeger ผ่าน OTLP gRPC

    otlp_exporter = OTLPSpanExporter(

        endpoint="http://jaeger:4317",

        insecure=True,

    )



    # ใช้ BatchSpanProcessor เพื่อ performance ที่ดี

    provider.add_span_processor(

        BatchSpanProcessor(

            otlp_exporter,

            max_queue_size=2048,

            max_export_batch_size=512,

            schedule_delay_millis=5000,

        )

    )



    trace.set_tracer_provider(provider)

    return trace.get_tracer(service_name)

ใช้งานกับ FastAPI

# main.py

from fastapi import FastAPI

from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

from tracing_config import setup_tracing

import httpx



tracer = setup_tracing("payment-service")

app = FastAPI()

FastAPIInstrumentor.instrument_app(app)



@app.post("/api/payment/process")

async def process_payment(order_id: str, amount: float):

    with tracer.start_as_current_span("validate_payment") as span:

        span.set_attribute("order.id", order_id)

        span.set_attribute("payment.amount", amount)



        # เรียก fraud detection service

        async with httpx.AsyncClient() as client:

            with tracer.start_as_current_span("call_fraud_check"):

                resp = await client.post(

                    "http://fraud-service:8001/check",

                    json={"order_id": order_id, "amount": amount}

                )

                fraud_result = resp.json()



        span.set_attribute("fraud.score", fraud_result["score"])



        if fraud_result["score"] > 0.8:

            span.set_status(trace.StatusCode.ERROR, "High fraud score")

            return {"status": "rejected", "reason": "fraud_detected"}



        # เรียก billing service

        with tracer.start_as_current_span("call_billing"):

            async with httpx.AsyncClient() as client:

                billing_resp = await client.post(

                    "http://billing-service:8002/charge",

                    json={"order_id": order_id, "amount": amount}

                )



        return {"status": "success", "transaction_id": billing_resp.json()["txn_id"]}

การตั้งค่า Sampling Strategy สำหรับ Real-time Processing

Distributed Tracing Real-time Processing

ระบบ real-time ที่มี throughput สูง (หลายหมื่น request/วินาที) ไม่ควร trace ทุก request เพราะจะทำให้ storage บวมและ overhead สูง ต้องเลือก sampling strategy ที่เหมาะสม

เนื้อหาเกี่ยวข้อง — อ่านต่อ: mTLS Service Mesh High Availability HA Setup —

Strategyเหมาะกับข้อดีข้อเสีย
Head-based (probability)traffic สม่ำเสมอง่าย, overhead ต่ำอาจพลาด error trace
Tail-basedต้องการจับ error ทุกตัวจับ anomaly ได้ดีใช้ memory สูง
Rate limitingtraffic ไม่สม่ำเสมอควบคุม cost ได้อาจพลาดช่วง spike
# ตั้งค่า tail-based sampling ใน OpenTelemetry Collector

# otel-collector-config.yaml

receivers:

  otlp:

    protocols:

      grpc:

        endpoint: 0.0.0.0:4317

      http:

        endpoint: 0.0.0.0:4318



processors:

  tail_sampling:

    decision_wait: 10s

    num_traces: 100000

    expected_new_traces_per_sec: 10000

    policies:

      # เก็บ trace ที่มี error ทุกตัว

      - name: errors-policy

        type: status_code

        status_code:

          status_codes:

            - ERROR

      # เก็บ trace ที่ latency สูงกว่า 500ms

      - name: latency-policy

        type: latency

        latency:

          threshold_ms: 500

      # สุ่มเก็บ 10% ของ trace ปกติ

      - name: probabilistic-policy

        type: probabilistic

        probabilistic:

          sampling_percentage: 10



  batch:

    timeout: 5s

    send_batch_size: 1024



exporters:

  otlp/jaeger:

    endpoint: jaeger:4317

    tls:

      insecure: true



service:

  pipelines:

    traces:

      receivers: [otlp]

      processors: [tail_sampling, batch]

      exporters: [otlp/jaeger]
# Docker Compose สำหรับ OTel Collector

# เพิ่มใน docker-compose.yml

  otel-collector:

    image: otel/opentelemetry-collector-contrib:0.96.0

    command: ["--config=/etc/otel-collector-config.yaml"]

    volumes:

      - ./otel-collector-config.yaml:/etc/otel-collector-config.yaml

    ports:

      - "4317:4317"

      - "4318:4318"

      - "8889:8889"  # Prometheus metrics

    depends_on:

      - jaeger

Real-time Processing Pipeline พร้อม Tracing

ตัวอย่างการสร้าง real-time data pipeline ที่มี distributed tracing ครบวงจร ใช้ Kafka เป็น message broker

# kafka-consumer พร้อม tracing

# consumer.py

from confluent_kafka import Consumer

from opentelemetry import trace, context

from opentelemetry.trace.propagation import TraceContextTextMapPropagator

from tracing_config import setup_tracing

import json



tracer = setup_tracing("stream-processor")

propagator = TraceContextTextMapPropagator()



consumer = Consumer({

    'bootstrap.servers': 'kafka:9092',

    'group.id': 'realtime-processor',

    'auto.offset.reset': 'latest',

    'enable.auto.commit': False,

})

consumer.subscribe(['events.raw'])



def process_message(msg):

    # ดึง trace context จาก Kafka header

    headers = {h[0]: h[1].decode() for h in (msg.headers() or [])}

    ctx = propagator.extract(carrier=headers)



    with tracer.start_as_current_span(

        "process_event",

        context=ctx,

        kind=trace.SpanKind.CONSUMER

    ) as span:

        payload = json.loads(msg.value())

        span.set_attribute("event.type", payload.get("type", "unknown"))

        span.set_attribute("kafka.topic", msg.topic())

        span.set_attribute("kafka.partition", msg.partition())

        span.set_attribute("kafka.offset", msg.offset())



        # Transform data

        with tracer.start_as_current_span("transform"):

            result = transform_event(payload)



        # เขียนลง database

        with tracer.start_as_current_span("write_to_db"):

            write_to_timescaledb(result)



        # ส่งต่อไปยัง downstream

        with tracer.start_as_current_span("publish_processed"):

            publish_to_kafka("events.processed", result)



    consumer.commit(msg)



while True:

    msg = consumer.poll(1.0)

    if msg is None:

        continue

    if msg.error():

        print(f"Consumer error: {msg.error()}")

        continue

    process_message(msg)

Monitoring Trace Data ด้วย Grafana

การดู trace อย่างเดียวไม่พอ ต้องสร้าง dashboard ที่แสดง RED metrics (Rate, Error, Duration) จาก trace data เพื่อ monitor ระบบ real-time processing

แนะนำเพิ่มเติม — XM Signal

# Grafana datasource สำหรับ Tempo

# grafana-datasources.yaml

apiVersion: 1

datasources:

  - name: Tempo

    type: tempo

    access: proxy

    url: http://tempo:3200

    jsonData:

      httpMethod: GET

      tracesToLogs:

        datasourceUid: loki

        tags: ['service.name']

        mappedTags: [{ key: 'service.name', value: 'service' }]

        mapTagNamesEnabled: true

        filterByTraceID: true

      tracesToMetrics:

        datasourceUid: prometheus

        tags: [{ key: 'service.name', value: 'service' }]

        queries:

          - name: 'Request rate'

            query: 'sum(rate(traces_spanmetrics_calls_total{$$__tags}[5m]))'

          - name: 'Error rate'

            query: 'sum(rate(traces_spanmetrics_calls_total{$$__tags, status_code="STATUS_CODE_ERROR"}[5m]))'

      serviceMap:

        datasourceUid: prometheus

  - name: Loki

    type: loki

    uid: loki

    access: proxy

    url: http://loki:3100
# PromQL queries สำหรับ dashboard

# P99 latency ของแต่ละ service

histogram_quantile(0.99,

  sum(rate(traces_spanmetrics_duration_seconds_bucket[5m])) by (le, service)

)



# Error rate percentage

sum(rate(traces_spanmetrics_calls_total{status_code="STATUS_CODE_ERROR"}[5m])) by (service)

/

sum(rate(traces_spanmetrics_calls_total[5m])) by (service)

* 100



# Throughput per service

sum(rate(traces_spanmetrics_calls_total[5m])) by (service)

Alerting เมื่อ Latency พุ่งสูงในระบบ Real-time

สำหรับระบบ real-time ต้องตั้ง alert ที่ไวพอจะจับ latency spike ก่อนที่จะกระทบ user

# alerting-rules.yaml สำหรับ Prometheus/Grafana

groups:

  - name: tracing_alerts

    rules:

      - alert: HighP99Latency

        expr: |

          histogram_quantile(0.99,

            sum(rate(traces_spanmetrics_duration_seconds_bucket{service="payment-service"}[5m])) by (le)

          ) > 2.0

        for: 2m

        labels:

          severity: critical

        annotations:

          summary: "Payment service P99 latency > 2s"

          description: "P99 latency is {{ $value | humanizeDuration }} for payment-service"



      - alert: HighErrorRate

        expr: |

          sum(rate(traces_spanmetrics_calls_total{status_code="STATUS_CODE_ERROR", service="stream-processor"}[5m]))

          /

          sum(rate(traces_spanmetrics_calls_total{service="stream-processor"}[5m]))

          > 0.05

        for: 1m

        labels:

          severity: warning

        annotations:

          summary: "Stream processor error rate > 5%"



      - alert: TraceIngestionLag

        expr: |

          kafka_consumer_group_lag{group="realtime-processor"} > 10000

        for: 3m

        labels:

          severity: critical

        annotations:

          summary: "Kafka consumer lag > 10k messages"

Performance Tuning สำหรับ High-throughput Tracing

เมื่อระบบ real-time มี throughput หลายหมื่น events/วินาที ต้อง tune ทั้ง tracing SDK และ collector เพื่อไม่ให้เป็น bottleneck

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Airflow DAG Design Performance Tuning

# ปรับ BatchSpanProcessor ใน Python SDK

from opentelemetry.sdk.trace.export import BatchSpanProcessor



processor = BatchSpanProcessor(

    exporter,

    max_queue_size=8192,          # เพิ่มจาก default 2048

    max_export_batch_size=1024,    # ส่งทีละ batch ใหญ่ขึ้น

    schedule_delay_millis=2000,    # ลด delay เพื่อส่งเร็วขึ้น

    export_timeout_millis=30000,   # timeout 30 วินาที

)
# ปรับ OTel Collector สำหรับ high throughput

# otel-collector-config.yaml

receivers:

  otlp:

    protocols:

      grpc:

        endpoint: 0.0.0.0:4317

        max_recv_msg_size_mib: 16  # รับ message ใหญ่ขึ้น



processors:

  memory_limiter:

    check_interval: 1s

    limit_mib: 4096               # จำกัด memory ที่ 4GB

    spike_limit_mib: 512



  batch:

    timeout: 2s

    send_batch_size: 4096          # batch ใหญ่ขึ้น

    send_batch_max_size: 8192



extensions:

  zpages:

    endpoint: 0.0.0.0:55679       # debug endpoint



service:

  extensions: [zpages]

  pipelines:

    traces:

      receivers: [otlp]

      processors: [memory_limiter, tail_sampling, batch]

      exporters: [otlp/jaeger]

FAQ — คำถามที่พบบ่อย

Q: Distributed Tracing ทำให้แอปช้าลงไหม?

A: ถ้าตั้งค่าถูก overhead จะต่ำมาก ประมาณ 1-3% ของ latency ที่เพิ่มขึ้น สิ่งสำคัญคือใช้ BatchSpanProcessor แทน SimpleSpanProcessor และตั้ง sampling rate ที่เหมาะสม อย่า trace ทุก request ในระบบ production ที่มี traffic สูง

Q: ควรเลือก Jaeger หรือ Zipkin?

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

A: ถ้าเริ่มใหม่แนะนำ Jaeger เพราะรองรับ OpenTelemetry native และเป็น CNCF graduated project มี community ใหญ่กว่า ถ้าใช้ Spring Cloud อยู่แล้วและมี Zipkin อยู่ก็ไม่จำเป็นต้องย้าย เพราะทั้งคู่รับ OTLP ได้เหมือนกัน

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Redis Cluster Real-time Processing — คู่มือฉบับสมบูรณ์ 2026

Q: ใช้ Grafana Tempo แทน Jaeger ได้ไหม?

A: ได้ Tempo เหมาะกับ environment ที่ใช้ Grafana stack อยู่แล้ว (Prometheus + Loki + Grafana) ข้อดีของ Tempo คือใช้ object storage (S3, GCS) เป็น backend ทำให้ cost ต่ำกว่า Jaeger ที่ต้องใช้ Elasticsearch หรือ Cassandra ในระดับ production

Q: Trace data ควรเก็บนานแค่ไหน?

A: ขึ้นอยู่กับ use case แต่โดยทั่วไป 7-14 วันเพียงพอสำหรับ troubleshooting ถ้าต้องการเก็บนานกว่านั้นเพื่อ capacity planning ให้ export เป็น aggregated metrics แทนการเก็บ raw trace

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Burp Suite Pro สำหรับมือใหม่ Step by Step —

Q: ต้องการ trace Kafka consumer อย่างไร?

A: ต้อง propagate trace context ผ่าน Kafka headers ตอน produce ให้ inject context เข้า header ตอน consume ให้ extract กลับมา ตัวอย่าง code อยู่ในหัวข้อ Real-time Processing Pipeline ด้านบน ถ้าใช้ Spring Kafka จะมี auto-instrumentation ทำให้อัตโนมัติ

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง