Airbyte ELK Log Management
Airbyte ETL Log Management ELK Elasticsearch Logstash Kibana Filebeat Sync Connector Alert Dashboard Pipeline Production
| Log Type | Source | Volume | Retention |
|---|---|---|---|
| Sync Logs | Airbyte Worker Pods | สูง (ทุก Sync Run) | Hot 7d Warm 30d Cold 90d |
| Connector Logs | Source/Destination Containers | สูง (API Calls) | Hot 7d Warm 30d |
| Platform Logs | Scheduler Server DB | ปานกลาง | Hot 7d Warm 30d |
| Audit Logs | API Server | ต่ำ | Hot 30d Warm 90d Cold 365d |
ELK Setup
# === ELK Stack for Airbyte ===
# Docker Compose (docker-compose-elk.yml)
# version: '3.8'
# services:
# elasticsearch:
# image: elasticsearch:8.12.0
# environment:
# - discovery.type=single-node
# - xpack.security.enabled=true
# - ELASTIC_PASSWORD=changeme
# ports: ["9200:9200"]
# volumes: ["es-data:/usr/share/elasticsearch/data"]
#
# kibana:
# image: kibana:8.12.0
# environment:
# - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
# - ELASTICSEARCH_USERNAME=kibana_system
# - ELASTICSEARCH_PASSWORD=changeme
# ports: ["5601:5601"]
#
# filebeat:
# image: elastic/filebeat:8.12.0
# volumes:
# - ./filebeat.yml:/usr/share/filebeat/filebeat.yml
# - /var/lib/docker/containers:/var/lib/docker/containers:ro
# - /var/run/docker.sock:/var/run/docker.sock:ro
# depends_on: [elasticsearch]
# filebeat.yml
# filebeat.inputs:
# - type: container
# paths: ['/var/lib/docker/containers/*/*.log']
# processors:
# - add_docker_metadata: ~
# - decode_json_fields:
# fields: ["message"]
# target: "airbyte"
# output.elasticsearch:
# hosts: ["elasticsearch:9200"]
# username: "elastic"
# password: "changeme"
# index: "airbyte-logs-%{+yyyy.MM.dd}"
from dataclasses import dataclass
@dataclass
class ELKComponent:
component: str
role: str
config: str
resources: str
components = [
ELKComponent("Elasticsearch",
"Store + Index + Search Logs",
"Single Node (Dev) / 3 Node Cluster (Prod)",
"RAM 4GB+ (Dev) 16GB+ (Prod) SSD Storage"),
ELKComponent("Filebeat",
"Collect Logs from Docker/K8s Containers",
"DaemonSet (K8s) / Docker Container",
"RAM 256MB CPU 0.5 Core (Lightweight)"),
ELKComponent("Logstash (Optional)",
"Parse Filter Transform Complex Logs",
"Pipeline Config grok json mutate",
"RAM 2GB+ CPU 1 Core (Heavy Processing)"),
ELKComponent("Kibana",
"Dashboard Visualization Search Alert",
"Connect to Elasticsearch Cluster",
"RAM 2GB CPU 1 Core"),
]
print("=== ELK Components ===")
for c in components:
print(f" [{c.component}] {c.role}")
print(f" Config: {c.config}")
print(f" Resources: {c.resources}")
Log Pipeline
# === Log Processing Pipeline ===
# Elasticsearch Ingest Pipeline
# PUT _ingest/pipeline/airbyte-logs
# {
# "processors": [
# {"json": {"field": "message", "target_field": "airbyte"}},
# {"date": {"field": "airbyte.timestamp", "formats": ["ISO8601"]}},
# {"set": {"field": "severity",
# "value": "error",
# "if": "ctx.airbyte?.level == 'ERROR'"}},
# {"grok": {"field": "airbyte.message",
# "patterns": ["%{WORD:action}: %{DATA:connector} \\| rows=%{NUMBER:rows:int}"]}},
# {"remove": {"field": "message"}}
# ]
# }
# ILM Policy
# PUT _ilm/policy/airbyte-logs-policy
# {
# "policy": {
# "phases": {
# "hot": {"actions": {"rollover": {"max_size": "50gb", "max_age": "7d"}}},
# "warm": {"min_age": "7d", "actions": {"shrink": {"number_of_shards": 1}}},
# "cold": {"min_age": "30d", "actions": {"searchable_snapshot": {"snapshot_repository": "s3-repo"}}},
# "delete": {"min_age": "90d", "actions": {"delete": {}}}
# }
# }
# }
@dataclass
class PipelineStage:
stage: str
tool: str
action: str
output: str
pipeline = [
PipelineStage("Collection",
"Filebeat DaemonSet",
"รวบรวม Container Logs เพิ่ม Docker/K8s Metadata",
"Raw JSON Logs + Metadata → Elasticsearch"),
PipelineStage("Parsing",
"Elasticsearch Ingest Pipeline",
"Parse JSON ดึง Fields sync_id connector status rows",
"Structured Documents พร้อม Search"),
PipelineStage("Enrichment",
"Ingest Pipeline + Lookup",
"เพิ่ม severity environment team connector_type",
"Enriched Documents สำหรับ Filter Aggregate"),
PipelineStage("Storage",
"Elasticsearch ILM",
"Hot 7d (SSD) → Warm 30d → Cold 90d (S3) → Delete",
"Cost-optimized Storage ตาม Age"),
PipelineStage("Visualization",
"Kibana Dashboards",
"สร้าง Dashboard Sync Rate Error Trend Duration",
"Real-time Dashboard สำหรับ Data Team"),
PipelineStage("Alerting",
"Kibana Rules / ElastAlert",
"Alert Sync Failure Error Spike Duration Anomaly",
"Slack PagerDuty Email Jira Notification"),
]
print("=== Log Pipeline Stages ===")
for p in pipeline:
print(f" [{p.stage}] Tool: {p.tool}")
print(f" Action: {p.action}")
print(f" Output: {p.output}")
Dashboard & Alert
# === Kibana Dashboard & Alert Rules ===
@dataclass
class AlertRule:
name: str
condition: str
severity: str
channel: str
alerts = [
AlertRule("Sync Failure Spike",
"status='failed' count > 3 in 1 hour",
"P1 Critical",
"Slack #data-alerts + PagerDuty"),
AlertRule("Error Log Spike",
"level='ERROR' count > 100 in 10 min",
"P1 Critical",
"Slack + PagerDuty"),
AlertRule("Sync Duration Anomaly",
"duration > 2x rolling_avg(7d)",
"P2 High",
"Slack #data-perf"),
AlertRule("Data Volume Drop",
"rows_synced < 50% of previous run",
"P2 High",
"Slack #data-quality"),
AlertRule("No Sync Activity (Flatline)",
"No sync logs for 2x schedule interval",
"P1 Critical",
"PagerDuty (Pipeline may be stuck)"),
AlertRule("Disk Usage High",
"Elasticsearch disk > 85%",
"P2 High",
"Slack #infra + Auto ILM Force Merge"),
]
print("=== Alert Rules ===")
for a in alerts:
print(f" [{a.name}] Severity: {a.severity}")
print(f" Condition: {a.condition}")
print(f" Channel: {a.channel}")
เคล็ดลับ
- Filebeat: ใช้ Filebeat แทน Logstash สำหรับ Collection (เบากว่า)
- ILM: ตั้ง ILM ลบ Log เก่าอัตโนมัติ ประหยัด Storage
- JSON: ตั้ง Airbyte Output เป็น JSON ง่ายต่อ Parsing
- Index Pattern: แยก Index ต่อวัน ลบ/Archive ง่าย
- ElastAlert: ใช้ ElastAlert สำหรับ Complex Alert Rules
Airbyte Log Management คืออะไร
Sync Connector Platform Audit Log Container Pod ELK Elasticsearch Kibana Filebeat Dashboard Alert Full-text Search Centralized
ELK Stack ตั้งอย่างไร
Elasticsearch Index Search Filebeat DaemonSet Logstash Parse Kibana Dashboard Docker Compose Kubernetes Cluster HA Security TLS
Log Pipeline ออกแบบอย่างไร
Collection Filebeat Parsing Ingest Pipeline Enrichment ILM Hot Warm Cold Delete Storage Visualization Kibana Alerting ElastAlert
Alert ตั้งอย่างไร
Kibana Rules ElastAlert Sync Failure Error Spike Duration Anomaly Data Drop Flatline Slack PagerDuty Email Jira Frequency Spike
สรุป
Airbyte ETL Log Management ELK Elasticsearch Filebeat Kibana Pipeline ILM Hot Warm Cold Alert ElastAlert Dashboard Production
