
BetterUptime กับ Chaos Engineering — วิธีใช้
BetterUptime และ Chaos Engineering

BetterUptime เป็นแพลตฟอร์ม Uptime Monitoring ที่ตรวจสอบเว็บไซต์และ API จากหลาย Locations ทั่วโลก แจ้งเตือนทันทีเมื่อเกิด Downtime เมื่อใช้ร่วมกับ Chaos Engineering จะได้ระบบ Monitoring ที่ตรวจจับปัญหาได้จริงระหว่าง Chaos Experiments
Chaos Engineering คือการทดสอบความทนทานของระบบโดยจงใจสร้างปัญหา ช่วยค้นหาจุดอ่อนก่อนเกิดปัญหาจริง BetterUptime ทำหน้าที่เป็น "ตาวิเศษ" ที่คอยตรวจสอบว่าระบบยังทำงานปกติหรือไม่ระหว่าง Experiment
Setup BetterUptime Monitoring
# === BetterUptime Setup ===
# สมัครที่ https://betteruptime.com
# 1. API สำหรับ Automation
# BetterUptime API: https://betteruptime.com/api/v2
# === Python Client สำหรับ BetterUptime API ===
import requests
import json
from datetime import datetime, timedelta
class BetterUptimeClient:
"""BetterUptime API Client"""
BASE_URL = "https://betteruptime.com/api/v2"
def __init__(self, api_token):
self.session = requests.Session()
self.session.headers = {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
}
# === Monitors ===
def create_monitor(self, url, monitor_type="status",
check_frequency=30):
"""สร้าง Monitor ใหม่"""
payload = {
"url": url,
"monitor_type": monitor_type,
"check_frequency": check_frequency,
"request_timeout": 15,
"confirmation_period": 3,
"regions": ["us", "eu", "as", "au"],
"follow_redirects": True,
"remember_cookies": False,
}
resp = self.session.post(f"{self.BASE_URL}/monitors",
json=payload)
return resp.json()
def list_monitors(self):
"""ดู Monitors ทั้งหมด"""
resp = self.session.get(f"{self.BASE_URL}/monitors")
return resp.json()
def get_monitor(self, monitor_id):
"""ดู Monitor เฉพาะ"""
resp = self.session.get(f"{self.BASE_URL}/monitors/{monitor_id}")
return resp.json()
def pause_monitor(self, monitor_id):
"""หยุด Monitor ชั่วคราว (ระหว่าง Chaos Experiment)"""
resp = self.session.patch(
f"{self.BASE_URL}/monitors/{monitor_id}",
json={"paused": True},
)
return resp.json()
def resume_monitor(self, monitor_id):
"""เปิด Monitor กลับ"""
resp = self.session.patch(
f"{self.BASE_URL}/monitors/{monitor_id}",
json={"paused": False},
)
return resp.json()
# === Incidents ===
def list_incidents(self, resolved=None):
"""ดู Incidents"""
params = {}
if resolved is not None:
params["resolved"] = str(resolved).lower()
resp = self.session.get(f"{self.BASE_URL}/incidents",
params=params)
return resp.json()
# === Status Pages ===
def create_status_page(self, name, subdomain):
"""สร้าง Status Page"""
payload = {
"company_name": name,
"subdomain": subdomain,
"timezone": "Asia/Bangkok",
}
resp = self.session.post(f"{self.BASE_URL}/status-pages",
json=payload)
return resp.json()
# === Heartbeats ===
def create_heartbeat(self, name, period=300, grace=60):
"""สร้าง Heartbeat Monitor (สำหรับ Cron Jobs)"""
payload = {
"name": name,
"period": period,
"grace": grace,
}
resp = self.session.post(f"{self.BASE_URL}/heartbeats",
json=payload)
return resp.json()
def print_status(self):
"""แสดงสถานะ Monitors ทั้งหมด"""
data = self.list_monitors()
monitors = data.get("data", [])
print(f"\n{'='*60}")
print(f"BetterUptime Status — {datetime.now():%Y-%m-%d %H:%M}")
print(f"{'='*60}")
up = sum(1 for m in monitors
if m["attributes"]["status"] == "up")
down = sum(1 for m in monitors
if m["attributes"]["status"] == "down")
print(f" Total: {len(monitors)} | Up: {up} | Down: {down}")
for m in monitors:
attr = m["attributes"]
status = attr["status"].upper()
url = attr["url"]
uptime = attr.get("availability", 0)
print(f" [{status:>4}] {url:<40} {uptime:.2f}%")
# client = BetterUptimeClient("YOUR_API_TOKEN")
# client.create_monitor("https://api.example.com/health")
# client.print_status()
Chaos Engineering Experiments

# chaos_experiments.py — Chaos Engineering Experiments
import subprocess
import time
import random
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Callable
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ChaosExperiment:
name: str
hypothesis: str
steady_state: Callable
experiment: Callable
rollback: Callable
duration_seconds: int = 300
blast_radius: str = "staging"
class ChaosRunner:
"""รัน Chaos Experiments"""
def __init__(self, monitoring_client=None):
self.monitoring = monitoring_client
self.results = []
def run_experiment(self, exp: ChaosExperiment):
"""รัน Experiment ตาม Scientific Method"""
logger.info(f"\n{'='*50}")
logger.info(f"Chaos Experiment: {exp.name}")
logger.info(f"Hypothesis: {exp.hypothesis}")
logger.info(f"Blast Radius: {exp.blast_radius}")
logger.info(f"Duration: {exp.duration_seconds}s")
logger.info(f"{'='*50}")
result = {
"name": exp.name,
"start": datetime.now().isoformat(),
"hypothesis": exp.hypothesis,
"passed": False,
"notes": [],
}
try:
# Step 1: Verify Steady State
logger.info("Step 1: Verifying steady state...")
if not exp.steady_state():
result["notes"].append("Steady state check FAILED — aborting")
logger.error("Steady state not met, aborting experiment")
self.results.append(result)
return result
result["notes"].append("Steady state verified")
# Step 2: Run Experiment
logger.info("Step 2: Running chaos experiment...")
exp.experiment()
result["notes"].append("Experiment started")
# Step 3: Observe
logger.info(f"Step 3: Observing for {exp.duration_seconds}s...")
time.sleep(exp.duration_seconds)
# Step 4: Verify Steady State Again
logger.info("Step 4: Verifying steady state after experiment...")
passed = exp.steady_state()
result["passed"] = passed
if passed:
result["notes"].append("System maintained steady state — PASSED")
logger.info("RESULT: PASSED")
else:
result["notes"].append("System DID NOT maintain steady state — FAILED")
logger.warning("RESULT: FAILED")
except Exception as e:
result["notes"].append(f"Error: {str(e)}")
logger.error(f"Experiment error: {e}")
finally:
# Step 5: Rollback
logger.info("Step 5: Rolling back...")
try:
exp.rollback()
result["notes"].append("Rollback completed")
except Exception as e:
result["notes"].append(f"Rollback error: {e}")
logger.error(f"Rollback failed: {e}")
result["end"] = datetime.now().isoformat()
self.results.append(result)
return result
def print_report(self):
"""แสดงรายงาน"""
print(f"\n{'='*60}")
print(f"Chaos Engineering Report")
print(f"{'='*60}")
passed = sum(1 for r in self.results if r["passed"])
failed = len(self.results) - passed
print(f" Total: {len(self.results)} | Passed: {passed} | Failed: {failed}")
for r in self.results:
status = "PASS" if r["passed"] else "FAIL"
print(f"\n [{status}] {r['name']}")
print(f" Hypothesis: {r['hypothesis']}")
for note in r["notes"]:
print(f" - {note}")
# === ตัวอย่าง Experiments ===
def check_api_health():
"""Steady State: API ตอบ 200"""
try:
import requests
resp = requests.get("http://localhost:8080/health", timeout=5)
return resp.status_code == 200
except:
return False
def kill_random_pod():
"""Experiment: ลบ Pod แบบสุ่ม"""
subprocess.run([
"kubectl", "delete", "pod",
"--selector=app=myapp",
"--field-selector=status.phase=Running",
"--grace-period=0", "--force",
"-n", "staging",
], capture_output=True)
def rollback_pods():
"""Rollback: ให้ K8s สร้าง Pod ใหม่อัตโนมัติ"""
subprocess.run([
"kubectl", "rollout", "status", "deployment/myapp",
"-n", "staging", "--timeout=120s",
], capture_output=True)
# runner = ChaosRunner()
# exp = ChaosExperiment(
# name="Pod Failure Recovery",
# hypothesis="ระบบควรทำงานต่อได้เมื่อ Pod ถูกลบ",
# steady_state=check_api_health,
# experiment=kill_random_pod,
# rollback=rollback_pods,
# duration_seconds=60,
# blast_radius="staging",
# )
# runner.run_experiment(exp)
# runner.print_report()
Chaos Experiment Catalog
# === Chaos Experiment Catalog ===
experiments_catalog = [
{
"name": "Pod Failure",
"description": "ลบ Pod แบบสุ่มดูว่า K8s สร้างใหม่ทันไหม",
"command": "kubectl delete pod -l app=myapp --grace-period=0",
"expected": "ระบบควรมี Downtime ไม่เกิน 30 วินาที",
"tools": ["kubectl"],
"risk": "low",
},
{
"name": "Network Latency",
"description": "เพิ่ม Latency 500ms ระหว่าง Services",
"command": "tc qdisc add dev eth0 root netem delay 500ms",
"expected": "ระบบควร Timeout gracefully ไม่ crash",
"tools": ["tc", "Chaos Mesh"],
"risk": "medium",
},
{
"name": "CPU Stress",
"description": "ใช้ CPU 100% ใน Pod",
"command": "stress-ng --cpu 4 --timeout 300s",
"expected": "HPA ควร Scale up, Response time อาจเพิ่มแต่ไม่ล่ม",
"tools": ["stress-ng"],
"risk": "medium",
},
{
"name": "Memory Pressure",
"description": "ใช้ Memory จนเกือบ Limit",
"command": "stress-ng --vm 2 --vm-bytes 90% --timeout 300s",
"expected": "OOMKiller ควร Kill Pod, K8s สร้างใหม่อัตโนมัติ",
"tools": ["stress-ng"],
"risk": "medium",
},
{
"name": "Database Failover",
"description": "ปิด Primary Database ดู Failover",
"command": "kubectl delete pod mysql-primary-0",
"expected": "Replica ควร Promote เป็น Primary ใน 30 วินาที",
"tools": ["kubectl"],
"risk": "high",
},
{
"name": "DNS Failure",
"description": "ทำให้ DNS Resolution ล้มเหลว",
"command": "kubectl scale deployment coredns --replicas=0 -n kube-system",
"expected": "Cached DNS ยังใช้ได้ ระบบไม่ล่มทันที",
"tools": ["kubectl"],
"risk": "high",
},
]
print("Chaos Experiment Catalog")
print("=" * 60)
for exp in experiments_catalog:
risk_color = {"low": "LOW", "medium": "MED", "high": "HIGH"}
print(f"\n [{risk_color[exp['risk']]:>4}] {exp['name']}")
print(f" {exp['description']}")
print(f" Command: {exp['command']}")
print(f" Expected: {exp['expected']}")
print(f" Tools: {', '.join(exp['tools'])}")
Best Practices
- เริ่มจาก Staging: ทำ Chaos Experiments ใน Staging ก่อน ไม่ทำ Production ถ้ายังไม่พร้อม
- มี Monitoring พร้อม: ต้องมี Monitoring (เช่น BetterUptime) ดูผลกระทบก่อนทำ Experiment
- Blast Radius เล็ก: เริ่มจากผลกระทบเล็กก่อน (1 Pod) แล้วค่อยขยาย
- มี Rollback Plan: ทุก Experiment ต้องมีวิธี Rollback ถ้าเกิดปัญหาหนัก
- แจ้งทีม: แจ้งทุกคนที่เกี่ยวข้องก่อนทำ Experiment ใช้ Status Page ถ้าทำใน Production
- Game Day: จัด Game Day ฝึกซ้อมทีมรับมือ Incidents อย่างน้อยเดือนละครั้ง
BetterUptime คืออะไร
แพลตฟอร์ม Uptime Monitoring ตรวจสอบเว็บไซต์และ API ทุก 30 วินาที แจ้งเตือนผ่าน SMS Email Slack PagerDuty เมื่อ Downtime มี Status Page Incident Management On-call Scheduling