Cybersecurity

Airbyte ETL Log Management ELK

airbyte etl log management elk
Airbyte ETL Log Management ELK | SiamCafe Blog
2025-08-27· อ. บอม — SiamCafe.net· 9,333 คำ

Airbyte ELK Log Management

Airbyte ETL Log Management ELK Elasticsearch Logstash Kibana Filebeat Sync Connector Alert Dashboard Pipeline Production

Log TypeSourceVolumeRetention
Sync LogsAirbyte Worker Podsสูง (ทุก Sync Run)Hot 7d Warm 30d Cold 90d
Connector LogsSource/Destination Containersสูง (API Calls)Hot 7d Warm 30d
Platform LogsScheduler Server DBปานกลางHot 7d Warm 30d
Audit LogsAPI Serverต่ำHot 30d Warm 90d Cold 365d

ELK Setup

# === ELK Stack for Airbyte ===

# Docker Compose (docker-compose-elk.yml)
# version: '3.8'
# services:
#   elasticsearch:
#     image: elasticsearch:8.12.0
#     environment:
#       - discovery.type=single-node
#       - xpack.security.enabled=true
#       - ELASTIC_PASSWORD=changeme
#     ports: ["9200:9200"]
#     volumes: ["es-data:/usr/share/elasticsearch/data"]
#
#   kibana:
#     image: kibana:8.12.0
#     environment:
#       - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
#       - ELASTICSEARCH_USERNAME=kibana_system
#       - ELASTICSEARCH_PASSWORD=changeme
#     ports: ["5601:5601"]
#
#   filebeat:
#     image: elastic/filebeat:8.12.0
#     volumes:
#       - ./filebeat.yml:/usr/share/filebeat/filebeat.yml
#       - /var/lib/docker/containers:/var/lib/docker/containers:ro
#       - /var/run/docker.sock:/var/run/docker.sock:ro
#     depends_on: [elasticsearch]

# filebeat.yml
# filebeat.inputs:
#   - type: container
#     paths: ['/var/lib/docker/containers/*/*.log']
#     processors:
#       - add_docker_metadata: ~
#       - decode_json_fields:
#           fields: ["message"]
#           target: "airbyte"
# output.elasticsearch:
#   hosts: ["elasticsearch:9200"]
#   username: "elastic"
#   password: "changeme"
#   index: "airbyte-logs-%{+yyyy.MM.dd}"

from dataclasses import dataclass

@dataclass
class ELKComponent:
    component: str
    role: str
    config: str
    resources: str

components = [
    ELKComponent("Elasticsearch",
        "Store + Index + Search Logs",
        "Single Node (Dev) / 3 Node Cluster (Prod)",
        "RAM 4GB+ (Dev) 16GB+ (Prod) SSD Storage"),
    ELKComponent("Filebeat",
        "Collect Logs from Docker/K8s Containers",
        "DaemonSet (K8s) / Docker Container",
        "RAM 256MB CPU 0.5 Core (Lightweight)"),
    ELKComponent("Logstash (Optional)",
        "Parse Filter Transform Complex Logs",
        "Pipeline Config grok json mutate",
        "RAM 2GB+ CPU 1 Core (Heavy Processing)"),
    ELKComponent("Kibana",
        "Dashboard Visualization Search Alert",
        "Connect to Elasticsearch Cluster",
        "RAM 2GB CPU 1 Core"),
]

print("=== ELK Components ===")
for c in components:
    print(f"  [{c.component}] {c.role}")
    print(f"    Config: {c.config}")
    print(f"    Resources: {c.resources}")

Log Pipeline

# === Log Processing Pipeline ===

# Elasticsearch Ingest Pipeline
# PUT _ingest/pipeline/airbyte-logs
# {
#   "processors": [
#     {"json": {"field": "message", "target_field": "airbyte"}},
#     {"date": {"field": "airbyte.timestamp", "formats": ["ISO8601"]}},
#     {"set": {"field": "severity",
#              "value": "error",
#              "if": "ctx.airbyte?.level == 'ERROR'"}},
#     {"grok": {"field": "airbyte.message",
#               "patterns": ["%{WORD:action}: %{DATA:connector} \\| rows=%{NUMBER:rows:int}"]}},
#     {"remove": {"field": "message"}}
#   ]
# }

# ILM Policy
# PUT _ilm/policy/airbyte-logs-policy
# {
#   "policy": {
#     "phases": {
#       "hot": {"actions": {"rollover": {"max_size": "50gb", "max_age": "7d"}}},
#       "warm": {"min_age": "7d", "actions": {"shrink": {"number_of_shards": 1}}},
#       "cold": {"min_age": "30d", "actions": {"searchable_snapshot": {"snapshot_repository": "s3-repo"}}},
#       "delete": {"min_age": "90d", "actions": {"delete": {}}}
#     }
#   }
# }

@dataclass
class PipelineStage:
    stage: str
    tool: str
    action: str
    output: str

pipeline = [
    PipelineStage("Collection",
        "Filebeat DaemonSet",
        "รวบรวม Container Logs เพิ่ม Docker/K8s Metadata",
        "Raw JSON Logs + Metadata → Elasticsearch"),
    PipelineStage("Parsing",
        "Elasticsearch Ingest Pipeline",
        "Parse JSON ดึง Fields sync_id connector status rows",
        "Structured Documents พร้อม Search"),
    PipelineStage("Enrichment",
        "Ingest Pipeline + Lookup",
        "เพิ่ม severity environment team connector_type",
        "Enriched Documents สำหรับ Filter Aggregate"),
    PipelineStage("Storage",
        "Elasticsearch ILM",
        "Hot 7d (SSD) → Warm 30d → Cold 90d (S3) → Delete",
        "Cost-optimized Storage ตาม Age"),
    PipelineStage("Visualization",
        "Kibana Dashboards",
        "สร้าง Dashboard Sync Rate Error Trend Duration",
        "Real-time Dashboard สำหรับ Data Team"),
    PipelineStage("Alerting",
        "Kibana Rules / ElastAlert",
        "Alert Sync Failure Error Spike Duration Anomaly",
        "Slack PagerDuty Email Jira Notification"),
]

print("=== Log Pipeline Stages ===")
for p in pipeline:
    print(f"  [{p.stage}] Tool: {p.tool}")
    print(f"    Action: {p.action}")
    print(f"    Output: {p.output}")

Dashboard & Alert

# === Kibana Dashboard & Alert Rules ===

@dataclass
class AlertRule:
    name: str
    condition: str
    severity: str
    channel: str

alerts = [
    AlertRule("Sync Failure Spike",
        "status='failed' count > 3 in 1 hour",
        "P1 Critical",
        "Slack #data-alerts + PagerDuty"),
    AlertRule("Error Log Spike",
        "level='ERROR' count > 100 in 10 min",
        "P1 Critical",
        "Slack + PagerDuty"),
    AlertRule("Sync Duration Anomaly",
        "duration > 2x rolling_avg(7d)",
        "P2 High",
        "Slack #data-perf"),
    AlertRule("Data Volume Drop",
        "rows_synced < 50% of previous run",
        "P2 High",
        "Slack #data-quality"),
    AlertRule("No Sync Activity (Flatline)",
        "No sync logs for 2x schedule interval",
        "P1 Critical",
        "PagerDuty (Pipeline may be stuck)"),
    AlertRule("Disk Usage High",
        "Elasticsearch disk > 85%",
        "P2 High",
        "Slack #infra + Auto ILM Force Merge"),
]

print("=== Alert Rules ===")
for a in alerts:
    print(f"  [{a.name}] Severity: {a.severity}")
    print(f"    Condition: {a.condition}")
    print(f"    Channel: {a.channel}")

เคล็ดลับ

Airbyte Log Management คืออะไร

Sync Connector Platform Audit Log Container Pod ELK Elasticsearch Kibana Filebeat Dashboard Alert Full-text Search Centralized

ELK Stack ตั้งอย่างไร

Elasticsearch Index Search Filebeat DaemonSet Logstash Parse Kibana Dashboard Docker Compose Kubernetes Cluster HA Security TLS

Log Pipeline ออกแบบอย่างไร

Collection Filebeat Parsing Ingest Pipeline Enrichment ILM Hot Warm Cold Delete Storage Visualization Kibana Alerting ElastAlert

Alert ตั้งอย่างไร

Kibana Rules ElastAlert Sync Failure Error Spike Duration Anomaly Data Drop Flatline Slack PagerDuty Email Jira Frequency Spike

สรุป

Airbyte ETL Log Management ELK Elasticsearch Filebeat Kibana Pipeline ILM Hot Warm Cold Alert ElastAlert Dashboard Production

📖 บทความที่เกี่ยวข้อง

AWS Glue ETL Log Management ELKอ่านบทความ → Btrfs Filesystem Log Management ELKอ่านบทความ → C# Minimal API Log Management ELKอ่านบทความ → SSE Security Log Management ELKอ่านบทความ → ONNX Runtime Log Management ELKอ่านบทความ →

📚 ดูบทความทั้งหมด →