Airbyte ELK Log Management
Airbyte ETL Log Management ELK Elasticsearch Logstash Kibana Filebeat Sync Connector Alert Dashboard Pipeline Production
| Log Type | Source | Volume | Retention |
|---|---|---|---|
| Sync Logs | Airbyte Worker Pods | สูง (ทุก Sync Run) | Hot 7d Warm 30d Cold 90d |
| Connector Logs | Source/Destination Containers | สูง (API Calls) | Hot 7d Warm 30d |
| Platform Logs | Scheduler Server DB | ปานกลาง | Hot 7d Warm 30d |
| Audit Logs | API Server | ต่ำ | Hot 30d Warm 90d Cold 365d |
ELK Setup
# === ELK Stack for Airbyte ===
# Docker Compose (docker-compose-elk.yml)
# version: '3.8'
# services:
# elasticsearch:
# image: elasticsearch:8.12.0
# environment:
# - discovery.type=single-node
# - xpack.security.enabled=true
# - ELASTIC_PASSWORD=changeme
# ports: ["9200:9200"]
# volumes: ["es-data:/usr/share/elasticsearch/data"]
#
# kibana:
# image: kibana:8.12.0
# environment:
# - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
# - ELASTICSEARCH_USERNAME=kibana_system
# - ELASTICSEARCH_PASSWORD=changeme
# ports: ["5601:5601"]
#
# filebeat:
# image: elastic/filebeat:8.12.0
# volumes:
# - ./filebeat.yml:/usr/share/filebeat/filebeat.yml
# - /var/lib/docker/containers:/var/lib/docker/containers:ro
# - /var/run/docker.sock:/var/run/docker.sock:ro
# depends_on: [elasticsearch]
# filebeat.yml
# filebeat.inputs:
# - type: container
# paths: ['/var/lib/docker/containers/*/*.log']
# processors:
# - add_docker_metadata: ~
# - decode_json_fields:
# fields: ["message"]
# target: "airbyte"
# output.elasticsearch:
# hosts: ["elasticsearch:9200"]
# username: "elastic"
# password: "changeme"
# index: "airbyte-logs-%{+yyyy.MM.dd}"
from dataclasses import dataclass
@dataclass
class ELKComponent:
component: str
role: str
config: str
resources: str
components = [
ELKComponent("Elasticsearch",
"Store + Index + Search Logs",
"Single Node (Dev) / 3 Node Cluster (Prod)",
"RAM 4GB+ (Dev) 16GB+ (Prod) SSD Storage"),
ELKComponent("Filebeat",
"Collect Logs from Docker/K8s Containers",
"DaemonSet (K8s) / Docker Container",
"RAM 256MB CPU 0.5 Core (Lightweight)"),
ELKComponent("Logstash (Optional)",
"Parse Filter Transform Complex Logs",
"Pipeline Config grok json mutate",
"RAM 2GB+ CPU 1 Core (Heavy Processing)"),
ELKComponent("Kibana",
"Dashboard Visualization Search Alert",
"Connect to Elasticsearch Cluster",
"RAM 2GB CPU 1 Core"),
]
print("=== ELK Components ===")
for c in components:
print(f" [{c.component}] {c.role}")
print(f" Config: {c.config}")
print(f" Resources: {c.resources}")
Log Pipeline
# === Log Processing Pipeline ===
# Elasticsearch Ingest Pipeline
# PUT _ingest/pipeline/airbyte-logs
# {
# "processors": [
# {"json": {"field": "message", "target_field": "airbyte"}},
# {"date": {"field": "airbyte.timestamp", "formats": ["ISO8601"]}},
# {"set": {"field": "severity",
# "value": "error",
# "if": "ctx.airbyte?.level == 'ERROR'"}},
# {"grok": {"field": "airbyte.message",
# "patterns": ["%{WORD:action}: %{DATA:connector} \\| rows=%{NUMBER:rows:int}"]}},
# {"remove": {"field": "message"}}
# ]
# }
# ILM Policy
# PUT _ilm/policy/airbyte-logs-policy
# {
# "policy": {
# "phases": {
# "hot": {"actions": {"rollover": {"max_size": "50gb", "max_age": "7d"}}},
# "warm": {"min_age": "7d", "actions": {"shrink": {"number_of_shards": 1}}},
# "cold": {"min_age": "30d", "actions": {"searchable_snapshot": {"snapshot_repository": "s3-repo"}}},
# "delete": {"min_age": "90d", "actions": {"delete": {}}}
# }
# }
# }
@dataclass
class PipelineStage:
stage: str
tool: str
action: str
output: str
pipeline = [
PipelineStage("Collection",
"Filebeat DaemonSet",
"รวบรวม Container Logs เพิ่ม Docker/K8s Metadata",
"Raw JSON Logs + Metadata → Elasticsearch"),
PipelineStage("Parsing",
"Elasticsearch Ingest Pipeline",
"Parse JSON ดึง Fields sync_id connector status rows",
"Structured Documents พร้อม Search"),
PipelineStage("Enrichment",
"Ingest Pipeline + Lookup",
"เพิ่ม severity environment team connector_type",
"Enriched Documents สำหรับ Filter Aggregate"),
PipelineStage("Storage",
"Elasticsearch ILM",
"Hot 7d (SSD) → Warm 30d → Cold 90d (S3) → Delete",
"Cost-optimized Storage ตาม Age"),
PipelineStage("Visualization",
"Kibana Dashboards",
"สร้าง Dashboard Sync Rate Error Trend Duration",
"Real-time Dashboard สำหรับ Data Team"),
PipelineStage("Alerting",
"Kibana Rules / ElastAlert",
"Alert Sync Failure Error Spike Duration Anomaly",
"Slack PagerDuty Email Jira Notification"),
]
print("=== Log Pipeline Stages ===")
for p in pipeline:
print(f" [{p.stage}] Tool: {p.tool}")
print(f" Action: {p.action}")
print(f" Output: {p.output}")
Dashboard & Alert
# === Kibana Dashboard & Alert Rules ===
@dataclass
class AlertRule:
name: str
condition: str
severity: str
channel: str
alerts = [
AlertRule("Sync Failure Spike",
"status='failed' count > 3 in 1 hour",
"P1 Critical",
"Slack #data-alerts + PagerDuty"),
AlertRule("Error Log Spike",
"level='ERROR' count > 100 in 10 min",
"P1 Critical",
"Slack + PagerDuty"),
AlertRule("Sync Duration Anomaly",
"duration > 2x rolling_avg(7d)",
"P2 High",
"Slack #data-perf"),
AlertRule("Data Volume Drop",
"rows_synced < 50% of previous run",
"P2 High",
"Slack #data-quality"),
AlertRule("No Sync Activity (Flatline)",
"No sync logs for 2x schedule interval",
"P1 Critical",
"PagerDuty (Pipeline may be stuck)"),
AlertRule("Disk Usage High",
"Elasticsearch disk > 85%",
"P2 High",
"Slack #infra + Auto ILM Force Merge"),
]
print("=== Alert Rules ===")
for a in alerts:
print(f" [{a.name}] Severity: {a.severity}")
print(f" Condition: {a.condition}")
print(f" Channel: {a.channel}")
เคล็ดลับ
- Filebeat: ใช้ Filebeat แทน Logstash สำหรับ Collection (เบากว่า)
- ILM: ตั้ง ILM ลบ Log เก่าอัตโนมัติ ประหยัด Storage
- JSON: ตั้ง Airbyte Output เป็น JSON ง่ายต่อ Parsing
- Index Pattern: แยก Index ต่อวัน ลบ/Archive ง่าย
- ElastAlert: ใช้ ElastAlert สำหรับ Complex Alert Rules
Airbyte Log Management คืออะไร
Sync Connector Platform Audit Log Container Pod ELK Elasticsearch Kibana Filebeat Dashboard Alert Full-text Search Centralized