Better Uptime Tech Conference 2026 มีอะไรน่าสนใจ
Better Uptime Tech Conference 2026 เป็นงานประชุมที่รวบรวมผู้เชี่ยวชาญด้าน Site Reliability Engineering, Incident Management และ Observability จากทั่วโลก หัวข้อหลักของปีนี้เน้นเรื่อง AI-Powered Incident Management, Platform Engineering และ Reliability at Scale
เทรนด์สำคัญที่ถูกพูดถึงมากที่สุดคือการใช้ AI/ML สำหรับ anomaly detection และ auto-remediation ที่ไม่ใช่แค่แจ้งเตือนแต่สามารถแก้ไขปัญหาเบื้องต้นได้อัตโนมัติ ลดเวลาที่ on-call engineer ต้องใช้ในการตอบสนองต่อ incident
อีกเทรนด์หนึ่งคือ OpenTelemetry ที่กลายเป็นมาตรฐานสำหรับ observability data collection ทำให้สามารถรวม traces, metrics และ logs จากทุก service เข้าด้วยกันในรูปแบบเดียวกัน ช่วยให้วิเคราะห์ root cause ได้เร็วขึ้น
Platform Engineering ก็เป็นเทรนด์ที่เติบโตมากในปี 2026 แทนที่จะให้ทุกทีมจัดการ infrastructure เอง Platform Team จะสร้าง Internal Developer Platform (IDP) ที่มี monitoring, alerting และ incident management ในตัว ทำให้ทุกทีมได้ระบบที่เชื่อถือได้โดยไม่ต้องเป็นผู้เชี่ยวชาญ
เทรนด์ Incident Management และ Observability ปี 2026
แนวทาง Incident Management ที่เปลี่ยนไปในปี 2026
# เทรนด์ Incident Management 2026
#
# 1. AIOps — ใช้ AI วิเคราะห์ข้อมูลจำนวนมากหา root cause
# - Anomaly Detection บน metrics/logs
# - Correlation Analysis ข้าม services
# - Auto-generated Incident Summary
# - Suggested Remediation Steps
#
# 2. OpenTelemetry เป็นมาตรฐาน
# - Unified Traces + Metrics + Logs
# - Vendor-agnostic data collection
# - Auto-instrumentation สำหรับทุกภาษา
#
# 3. GitOps สำหรับ Incident Response
# - Runbooks เก็บใน Git
# - Automated Rollback ผ่าน GitOps
# - Post-Incident Review เป็น PR
#
# 4. Platform Engineering
# - Internal Developer Platform (IDP)
# - Self-service monitoring/alerting
# - Golden Paths สำหรับ reliability
#
# 5. SLO-based Alerting
# - Alert เมื่อ error budget ใกล้หมด
# - ไม่ alert ทุก error แต่ alert เมื่อกระทบ user
# ตั้งค่า OpenTelemetry Collector สำหรับ unified observability
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
prometheus:
config:
scrape_configs:
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
processors:
batch:
timeout: 5s
send_batch_size: 1000
memory_limiter:
check_interval: 1s
limit_mib: 2048
spike_limit_mib: 512
attributes:
actions:
- key: environment
value: production
action: upsert
exporters:
otlp/betteruptime:
endpoint: "https://otlp.betteruptime.com"
headers:
Authorization: "Bearer "
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch, attributes]
exporters: [otlp/betteruptime]
metrics:
receivers: [otlp, prometheus]
processors: [memory_limiter, batch]
exporters: [prometheus, otlp/betteruptime]
logs:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [otlp/betteruptime]
ตั้งค่า Modern Incident Response Pipeline
สร้าง Incident Response Pipeline ที่ใช้ Better Uptime เป็น core พร้อม integration กับเครื่องมือต่างๆ
# incident-response-pipeline.yaml — Kubernetes CronJob สำหรับ Health Check
apiVersion: batch/v1
kind: CronJob
metadata:
name: synthetic-health-check
namespace: monitoring
spec:
schedule: "*/2 * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: health-checker
image: curlimages/curl:latest
command:
- /bin/sh
- -c
- |
# ตรวจสอบ critical endpoints
ENDPOINTS="
https://api.example.com/health
https://api.example.com/v2/status
https://payment.example.com/health
https://auth.example.com/.well-known/openid-configuration
"
FAILED=0
for ep in $ENDPOINTS; do
STATUS=$(curl -s -o /dev/null -w "%{http_code}" --max-time 10 "$ep")
LATENCY=$(curl -s -o /dev/null -w "%{time_total}" --max-time 10 "$ep")
if [ "$STATUS" != "200" ]; then
echo "FAIL: $ep (status=$STATUS)"
FAILED=$((FAILED + 1))
else
echo "OK: $ep (status=$STATUS, latency=s)"
fi
done
# ส่ง heartbeat ถ้าทุกอย่างปกติ
if [ "$FAILED" -eq 0 ]; then
curl -s "https://betteruptime.com/api/v1/heartbeat/$HEARTBEAT_TOKEN"
fi
env:
- name: HEARTBEAT_TOKEN
valueFrom:
secretKeyRef:
name: betteruptime-secrets
key: heartbeat-token
restartPolicy: OnFailure
---
# Incident Webhook Handler
apiVersion: apps/v1
kind: Deployment
metadata:
name: incident-handler
namespace: monitoring
spec:
replicas: 2
selector:
matchLabels:
app: incident-handler
template:
metadata:
labels:
app: incident-handler
spec:
containers:
- name: handler
image: myregistry/incident-handler:v1.2.0
ports:
- containerPort: 8080
env:
- name: SLACK_WEBHOOK
valueFrom:
secretKeyRef:
name: incident-secrets
key: slack-webhook
- name: JIRA_TOKEN
valueFrom:
secretKeyRef:
name: incident-secrets
key: jira-token
- name: ARGOCD_TOKEN
valueFrom:
secretKeyRef:
name: incident-secrets
key: argocd-token
สร้าง Automated Runbook ด้วย Python และ Webhook
สร้าง automated runbook ที่ทำงานอัตโนมัติเมื่อได้รับ incident alert
#!/usr/bin/env python3
# automated_runbook.py — Incident Auto-Remediation Engine
import os
import json
import logging
import subprocess
from flask import Flask, request, jsonify
from datetime import datetime
import requests
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("runbook")
SLACK_WEBHOOK = os.getenv("SLACK_WEBHOOK")
K8S_NAMESPACE = os.getenv("K8S_NAMESPACE", "production")
RUNBOOKS = {
"high_memory": {
"description": "Pod ใช้ memory สูงเกินไป",
"actions": ["restart_pod", "scale_up"],
"auto_remediate": True,
},
"high_error_rate": {
"description": "Error rate สูงกว่าปกติ",
"actions": ["check_logs", "rollback_deployment"],
"auto_remediate": False,
},
"certificate_expiring": {
"description": "SSL Certificate ใกล้หมดอายุ",
"actions": ["renew_certificate"],
"auto_remediate": True,
},
"database_connection_pool": {
"description": "Database connection pool เต็ม",
"actions": ["restart_app", "increase_pool_size"],
"auto_remediate": True,
},
}
def kubectl(cmd):
result = subprocess.run(
f"kubectl {cmd} -n {K8S_NAMESPACE}".split(),
capture_output=True, text=True, timeout=30
)
return result.returncode == 0, result.stdout
def notify_slack(message, severity="warning"):
emoji = {"critical": "🔴", "warning": "🟡", "info": "🟢"}.get(severity, "⚪")
if SLACK_WEBHOOK:
requests.post(SLACK_WEBHOOK, json={
"text": f"{emoji} *Runbook Automation*: {message}"
})
def restart_pod(service_name):
logger.info(f"Restarting pods for {service_name}")
ok, output = kubectl(f"rollout restart deployment/{service_name}")
if ok:
notify_slack(f"Restarted {service_name}", "info")
return ok
def scale_up(service_name, replicas=None):
ok, output = kubectl(f"get deployment {service_name} -o jsonpath={{.spec.replicas}}")
current = int(output.strip()) if ok else 2
new_replicas = replicas or min(current + 2, 10)
logger.info(f"Scaling {service_name} from {current} to {new_replicas}")
ok, _ = kubectl(f"scale deployment/{service_name} --replicas={new_replicas}")
if ok:
notify_slack(f"Scaled {service_name} to {new_replicas} replicas", "info")
return ok
def rollback_deployment(service_name):
logger.info(f"Rolling back {service_name}")
ok, _ = kubectl(f"rollout undo deployment/{service_name}")
if ok:
notify_slack(f"Rolled back {service_name}", "warning")
return ok
def check_logs(service_name):
ok, logs = kubectl(f"logs deployment/{service_name} --tail=50 --since=5m")
error_lines = [l for l in logs.split("\n") if "error" in l.lower() or "exception" in l.lower()]
return error_lines[:10]
@app.route("/webhook/incident", methods=["POST"])
def handle_incident():
data = request.json
attrs = data.get("data", {}).get("attributes", {})
incident_name = attrs.get("name", "unknown")
cause = attrs.get("cause", "")
status = attrs.get("status", "")
logger.info(f"Incident: {incident_name} (status={status}, cause={cause})")
if status != "started":
return jsonify({"status": "ignored", "reason": f"status={status}"})
# Match runbook
matched_runbook = None
for key, rb in RUNBOOKS.items():
if key.replace("_", " ") in incident_name.lower() or key.replace("_", " ") in cause.lower():
matched_runbook = (key, rb)
break
if matched_runbook:
rb_name, rb_config = matched_runbook
notify_slack(
f"Incident: {incident_name}\nRunbook: {rb_name}\nAuto-remediate: {rb_config['auto_remediate']}",
"warning"
)
if rb_config["auto_remediate"]:
service = incident_name.split()[0].lower()
for action in rb_config["actions"]:
if action == "restart_pod":
restart_pod(service)
elif action == "scale_up":
scale_up(service)
return jsonify({"status": "processed", "runbook": matched_runbook[0] if matched_runbook else None})
if __name__ == "__main__":
app.run(host="0.0.0.0", port=8080)
ใช้ AI สำหรับ Anomaly Detection และ Auto-Remediation
สร้างระบบ anomaly detection อย่างง่ายด้วย statistical methods
#!/usr/bin/env python3
# anomaly_detector.py — ตรวจจับ anomaly จาก metrics
import numpy as np
from collections import deque
from datetime import datetime
import requests
import os
import time
PROMETHEUS_URL = os.getenv("PROMETHEUS_URL", "http://prometheus:9090")
BU_API = "https://betteruptime.com/api/v2"
BU_TOKEN = os.getenv("BU_TOKEN")
class AnomalyDetector:
def __init__(self, window_size=60, threshold=3.0):
self.window_size = window_size
self.threshold = threshold # จำนวน standard deviations
self.history = {}
def add_metric(self, metric_name, value):
if metric_name not in self.history:
self.history[metric_name] = deque(maxlen=self.window_size)
self.history[metric_name].append(value)
def is_anomaly(self, metric_name, value):
if metric_name not in self.history:
return False
data = list(self.history[metric_name])
if len(data) < 10:
return False
mean = np.mean(data)
std = np.std(data)
if std == 0:
return False
z_score = abs(value - mean) / std
return z_score > self.threshold
def get_stats(self, metric_name):
if metric_name not in self.history:
return None
data = list(self.history[metric_name])
return {
"mean": np.mean(data),
"std": np.std(data),
"min": np.min(data),
"max": np.max(data),
"count": len(data),
}
def query_prometheus(query):
r = requests.get(f"{PROMETHEUS_URL}/api/v1/query", params={"query": query})
if r.status_code == 200:
results = r.json().get("data", {}).get("result", [])
return [(r["metric"], float(r["value"][1])) for r in results]
return []
def create_incident(title, description):
requests.post(
f"{BU_API}/incidents",
headers={"Authorization": f"Bearer {BU_TOKEN}", "Content-Type": "application/json"},
json={"summary": title, "description": description}
)
METRICS_TO_WATCH = [
("http_requests_total_rate", 'rate(http_requests_total[5m])'),
("http_error_rate", 'rate(http_requests_total{status=~"5.."}[5m])'),
("response_latency_p99", 'histogram_quantile(0.99, rate(http_request_duration_seconds_bucket[5m]))'),
("cpu_usage", '100 - (avg by(instance)(rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)'),
("memory_usage_pct", '(1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes) * 100'),
]
def main():
detector = AnomalyDetector(window_size=120, threshold=3.5)
print("Anomaly Detector started")
while True:
for name, query in METRICS_TO_WATCH:
results = query_prometheus(query)
for metric_labels, value in results:
key = f"{name}_{metric_labels.get('instance', 'default')}"
is_anomaly = detector.is_anomaly(key, value)
detector.add_metric(key, value)
if is_anomaly:
stats = detector.get_stats(key)
msg = (
f"ANOMALY: {key} = {value:.2f} "
f"(mean={stats['mean']:.2f}, std={stats['std']:.2f})"
)
print(f"[{datetime.now()}] {msg}")
create_incident(f"Anomaly Detected: {name}", msg)
time.sleep(30)
if __name__ == "__main__":
main()
วัดผล Incident Management ด้วย SLI SLO และ Error Budget
ตั้งค่า SLO (Service Level Objectives) และ Error Budget สำหรับ production services
#!/usr/bin/env python3
# slo_calculator.py — คำนวณ SLO และ Error Budget
import requests
import json
from datetime import datetime, timedelta
PROMETHEUS_URL = "http://prometheus:9090"
SLOS = {
"api-availability": {
"description": "API ต้อง available 99.9% ในรอบ 30 วัน",
"target": 0.999,
"window_days": 30,
"sli_query": '1 - (sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m])))',
},
"api-latency-p99": {
"description": "99th percentile latency ต้องต่ำกว่า 500ms",
"target": 0.99,
"window_days": 30,
"sli_query": 'sum(rate(http_request_duration_seconds_bucket{le="0.5"}[5m])) / sum(rate(http_request_duration_seconds_count[5m]))',
},
"checkout-success-rate": {
"description": "Checkout ต้องสำเร็จ 99.5%",
"target": 0.995,
"window_days": 7,
"sli_query": 'sum(rate(checkout_completed_total[5m])) / sum(rate(checkout_attempted_total[5m]))',
},
}
def query_prometheus_range(query, start, end, step="5m"):
r = requests.get(f"{PROMETHEUS_URL}/api/v1/query_range", params={
"query": query, "start": start.timestamp(),
"end": end.timestamp(), "step": step
})
if r.status_code == 200:
results = r.json().get("data", {}).get("result", [])
if results:
values = [float(v[1]) for v in results[0]["values"] if v[1] != "NaN"]
return values
return []
def calculate_error_budget(slo_config):
end = datetime.now()
start = end - timedelta(days=slo_config["window_days"])
values = query_prometheus_range(slo_config["sli_query"], start, end)
if not values:
return None
current_sli = sum(values) / len(values)
target = slo_config["target"]
total_budget = 1 - target # เช่น 0.001 สำหรับ 99.9%
consumed = max(0, (1 - current_sli))
remaining = max(0, total_budget - consumed)
remaining_pct = (remaining / total_budget * 100) if total_budget > 0 else 100
# แปลง error budget เป็นนาที
window_minutes = slo_config["window_days"] * 24 * 60
total_budget_minutes = window_minutes * total_budget
remaining_minutes = window_minutes * remaining
return {
"sli_current": current_sli,
"slo_target": target,
"error_budget_total_minutes": round(total_budget_minutes, 1),
"error_budget_remaining_minutes": round(remaining_minutes, 1),
"error_budget_remaining_pct": round(remaining_pct, 1),
"burn_rate": round(consumed / total_budget * 100, 1) if total_budget > 0 else 0,
"status": "OK" if remaining_pct > 20 else ("WARNING" if remaining_pct > 0 else "EXCEEDED"),
}
def generate_slo_report():
print("=" * 60)
print(f"SLO Report — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print("=" * 60)
for name, config in SLOS.items():
result = calculate_error_budget(config)
if result:
status_icon = {"OK": "OK", "WARNING": "WARN", "EXCEEDED": "FAIL"}[result["status"]]
print(f"\n[{status_icon}] {name}")
print(f" Description: {config['description']}")
print(f" Current SLI: {result['sli_current']:.4f} (target: {result['slo_target']})")
print(f" Error Budget: {result['error_budget_remaining_minutes']}m / {result['error_budget_total_minutes']}m remaining")
print(f" Budget Used: {result['burn_rate']}%")
print(f" Budget Left: {result['error_budget_remaining_pct']}%")
if __name__ == "__main__":
generate_slo_report()
FAQ คำถามที่พบบ่อย
Q: SLO-based Alerting ต่างจาก Threshold-based Alerting อย่างไร?
A: Threshold-based alerting แจ้งเตือนเมื่อ metric ข้าม threshold เช่น CPU มากกว่า 80% ซึ่งอาจเป็น false alarm ถ้า CPU สูงแค่ชั่วคราว ส่วน SLO-based alerting แจ้งเตือนเมื่อ error budget ถูกใช้เร็วเกินไป โดยดูจาก burn rate เช่น ถ้า error budget 30 วันถูกใช้ไป 50% ภายใน 1 วัน ถือว่าวิกฤต ทำให้ alert มีความหมายมากกว่า
Q: OpenTelemetry กับ Prometheus ใช้ร่วมกันได้ไหม?
A: ได้ OpenTelemetry Collector สามารถ scrape Prometheus metrics ได้เหมือน Prometheus Server และยัง export metrics ไปยัง Prometheus ผ่าน remote write ได้ ทำให้สามารถค่อยๆ migrate จาก Prometheus ไปยัง OpenTelemetry ได้โดยไม่ต้องเปลี่ยนทุกอย่างพร้อมกัน
Q: Auto-Remediation ปลอดภัยหรือไม่?
A: ปลอดภัยถ้าออกแบบดี ควรจำกัด scope ของ auto-remediation เช่นอนุญาตแค่ restart pod หรือ scale up แต่ไม่อนุญาตให้ rollback deployment โดยอัตโนมัติ ต้องมี rate limiting ไม่ให้ remediation ทำงานซ้ำเร็วเกินไป และต้องมี circuit breaker ที่หยุด auto-remediation ถ้าไม่สำเร็จหลายครั้ง
Q: Error Budget Policy ควรกำหนดอย่างไร?
A: กำหนดชัดเจนว่าเมื่อ error budget เหลือน้อยจะทำอะไร เช่น เหลือ 50%: หยุด feature release ที่มีความเสี่ยง, เหลือ 25%: freeze deployment ทั้งหมด เน้น reliability fixes, เหลือ 0%: ทุก engineer ต้องแก้ reliability issues จนกว่า budget จะกลับมา สิ่งสำคัญคือ leadership ต้อง enforce policy นี้จริง
Q: MTTD MTTR MTBF คืออะไร?
A: MTTD (Mean Time to Detect) คือเวลาเฉลี่ยตั้งแต่เกิดปัญหาจนตรวจพบ MTTR (Mean Time to Resolve) คือเวลาเฉลี่ยตั้งแต่ตรวจพบจนแก้ไขเสร็จ MTBF (Mean Time Between Failures) คือเวลาเฉลี่ยระหว่าง incident สอง incident การปรับปรุง monitoring ช่วยลด MTTD ส่วน runbook และ auto-remediation ช่วยลด MTTR
