Airbyte ELK Log Management

Airbyte ETL Log Management ELK Elasticsearch Logstash Kibana Filebeat Sync Connector Alert Dashboard Pipeline Production

Log TypeSourceVolumeRetention
Sync LogsAirbyte Worker Podsสูง (ทุก Sync Run)Hot 7d Warm 30d Cold 90d
Connector LogsSource/Destination Containersสูง (API Calls)Hot 7d Warm 30d
Platform LogsScheduler Server DBปานกลางHot 7d Warm 30d
Audit LogsAPI Serverต่ำHot 30d Warm 90d Cold 365d

ELK Setup

# === ELK Stack for Airbyte ===

# Docker Compose (docker-compose-elk.yml)
# version: '3.8'
# services:
#   elasticsearch:
#     image: elasticsearch:8.12.0
#     environment:
#       - discovery.type=single-node
#       - xpack.security.enabled=true
#       - ELASTIC_PASSWORD=changeme
#     ports: ["9200:9200"]
#     volumes: ["es-data:/usr/share/elasticsearch/data"]
#
#   kibana:
#     image: kibana:8.12.0
#     environment:
#       - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
#       - ELASTICSEARCH_USERNAME=kibana_system
#       - ELASTICSEARCH_PASSWORD=changeme
#     ports: ["5601:5601"]
#
#   filebeat:
#     image: elastic/filebeat:8.12.0
#     volumes:
#       - ./filebeat.yml:/usr/share/filebeat/filebeat.yml
#       - /var/lib/docker/containers:/var/lib/docker/containers:ro
#       - /var/run/docker.sock:/var/run/docker.sock:ro
#     depends_on: [elasticsearch]

# filebeat.yml
# filebeat.inputs:
#   - type: container
#     paths: ['/var/lib/docker/containers/*/*.log']
#     processors:
#       - add_docker_metadata: ~
#       - decode_json_fields:
#           fields: ["message"]
#           target: "airbyte"
# output.elasticsearch:
#   hosts: ["elasticsearch:9200"]
#   username: "elastic"
#   password: "changeme"
#   index: "airbyte-logs-%{+yyyy.MM.dd}"

from dataclasses import dataclass

@dataclass
class ELKComponent:
    component: str
    role: str
    config: str
    resources: str

components = [
    ELKComponent("Elasticsearch",
        "Store + Index + Search Logs",
        "Single Node (Dev) / 3 Node Cluster (Prod)",
        "RAM 4GB+ (Dev) 16GB+ (Prod) SSD Storage"),
    ELKComponent("Filebeat",
        "Collect Logs from Docker/K8s Containers",
        "DaemonSet (K8s) / Docker Container",
        "RAM 256MB CPU 0.5 Core (Lightweight)"),
    ELKComponent("Logstash (Optional)",
        "Parse Filter Transform Complex Logs",
        "Pipeline Config grok json mutate",
        "RAM 2GB+ CPU 1 Core (Heavy Processing)"),
    ELKComponent("Kibana",
        "Dashboard Visualization Search Alert",
        "Connect to Elasticsearch Cluster",
        "RAM 2GB CPU 1 Core"),
]

print("=== ELK Components ===")
for c in components:
    print(f"  [{c.component}] {c.role}")
    print(f"    Config: {c.config}")
    print(f"    Resources: {c.resources}")

Log Pipeline

# === Log Processing Pipeline ===

# Elasticsearch Ingest Pipeline
# PUT _ingest/pipeline/airbyte-logs
# {
#   "processors": [
#     {"json": {"field": "message", "target_field": "airbyte"}},
#     {"date": {"field": "airbyte.timestamp", "formats": ["ISO8601"]}},
#     {"set": {"field": "severity",
#              "value": "error",
#              "if": "ctx.airbyte?.level == 'ERROR'"}},
#     {"grok": {"field": "airbyte.message",
#               "patterns": ["%{WORD:action}: %{DATA:connector} \\| rows=%{NUMBER:rows:int}"]}},
#     {"remove": {"field": "message"}}
#   ]
# }

# ILM Policy
# PUT _ilm/policy/airbyte-logs-policy
# {
#   "policy": {
#     "phases": {
#       "hot": {"actions": {"rollover": {"max_size": "50gb", "max_age": "7d"}}},
#       "warm": {"min_age": "7d", "actions": {"shrink": {"number_of_shards": 1}}},
#       "cold": {"min_age": "30d", "actions": {"searchable_snapshot": {"snapshot_repository": "s3-repo"}}},
#       "delete": {"min_age": "90d", "actions": {"delete": {}}}
#     }
#   }
# }

@dataclass
class PipelineStage:
    stage: str
    tool: str
    action: str
    output: str

pipeline = [
    PipelineStage("Collection",
        "Filebeat DaemonSet",
        "รวบรวม Container Logs เพิ่ม Docker/K8s Metadata",
        "Raw JSON Logs + Metadata → Elasticsearch"),
    PipelineStage("Parsing",
        "Elasticsearch Ingest Pipeline",
        "Parse JSON ดึง Fields sync_id connector status rows",
        "Structured Documents พร้อม Search"),
    PipelineStage("Enrichment",
        "Ingest Pipeline + Lookup",
        "เพิ่ม severity environment team connector_type",
        "Enriched Documents สำหรับ Filter Aggregate"),
    PipelineStage("Storage",
        "Elasticsearch ILM",
        "Hot 7d (SSD) → Warm 30d → Cold 90d (S3) → Delete",
        "Cost-optimized Storage ตาม Age"),
    PipelineStage("Visualization",
        "Kibana Dashboards",
        "สร้าง Dashboard Sync Rate Error Trend Duration",
        "Real-time Dashboard สำหรับ Data Team"),
    PipelineStage("Alerting",
        "Kibana Rules / ElastAlert",
        "Alert Sync Failure Error Spike Duration Anomaly",
        "Slack PagerDuty Email Jira Notification"),
]

print("=== Log Pipeline Stages ===")
for p in pipeline:
    print(f"  [{p.stage}] Tool: {p.tool}")
    print(f"    Action: {p.action}")
    print(f"    Output: {p.output}")

Dashboard & Alert

# === Kibana Dashboard & Alert Rules ===

@dataclass
class AlertRule:
    name: str
    condition: str
    severity: str
    channel: str

alerts = [
    AlertRule("Sync Failure Spike",
        "status='failed' count > 3 in 1 hour",
        "P1 Critical",
        "Slack #data-alerts + PagerDuty"),
    AlertRule("Error Log Spike",
        "level='ERROR' count > 100 in 10 min",
        "P1 Critical",
        "Slack + PagerDuty"),
    AlertRule("Sync Duration Anomaly",
        "duration > 2x rolling_avg(7d)",
        "P2 High",
        "Slack #data-perf"),
    AlertRule("Data Volume Drop",
        "rows_synced < 50% of previous run",
        "P2 High",
        "Slack #data-quality"),
    AlertRule("No Sync Activity (Flatline)",
        "No sync logs for 2x schedule interval",
        "P1 Critical",
        "PagerDuty (Pipeline may be stuck)"),
    AlertRule("Disk Usage High",
        "Elasticsearch disk > 85%",
        "P2 High",
        "Slack #infra + Auto ILM Force Merge"),
]

print("=== Alert Rules ===")
for a in alerts:
    print(f"  [{a.name}] Severity: {a.severity}")
    print(f"    Condition: {a.condition}")
    print(f"    Channel: {a.channel}")

เคล็ดลับ

  • Filebeat: ใช้ Filebeat แทน Logstash สำหรับ Collection (เบากว่า)
  • ILM: ตั้ง ILM ลบ Log เก่าอัตโนมัติ ประหยัด Storage
  • JSON: ตั้ง Airbyte Output เป็น JSON ง่ายต่อ Parsing
  • Index Pattern: แยก Index ต่อวัน ลบ/Archive ง่าย
  • ElastAlert: ใช้ ElastAlert สำหรับ Complex Alert Rules

Airbyte Log Management คืออะไร

Sync Connector Platform Audit Log Container Pod ELK Elasticsearch Kibana Filebeat Dashboard Alert Full-text Search Centralized