Betteruptime Real-time Processing คืออะไร
Better Uptime เป็น monitoring platform สำหรับตรวจสอบ uptime ของเว็บไซต์ APIs และ services แบบ real-time รองรับ HTTP, TCP, UDP, DNS, ICMP monitoring พร้อม incident management, status pages และ on-call scheduling Real-time Processing คือการประมวลผลข้อมูลทันทีที่ได้รับ ไม่ต้องรอ batch การรวมสองแนวคิดนี้ช่วยสร้างระบบ monitoring ที่ detect downtime ภายในวินาที alert ทีมทันที และ auto-remediate ปัญหาอัตโนมัติ
Better Uptime Architecture
# betteruptime_arch.py — Better Uptime architecture
import json
class BetterUptimeArch:
FEATURES = {
"uptime_monitoring": {
"name": "Uptime Monitoring",
"description": "ตรวจสอบ availability ทุก 30 วินาที จากหลาย locations ทั่วโลก",
"protocols": ["HTTP/HTTPS", "TCP", "UDP", "DNS", "ICMP Ping", "Keyword check"],
"locations": "30+ global locations",
},
"incident_management": {
"name": "Incident Management",
"description": "สร้าง incident อัตโนมัติเมื่อ monitor ตรวจพบ downtime",
"features": ["Auto-create incidents", "Escalation policies", "Acknowledgement", "Post-mortem"],
},
"status_pages": {
"name": "Status Pages",
"description": "Public/private status pages สำหรับแจ้ง users เรื่อง system status",
"features": ["Custom domain", "Branded design", "Subscriber notifications", "Component groups"],
},
"on_call": {
"name": "On-Call Scheduling",
"description": "จัดตาราง on-call rotation สำหรับทีม",
"features": ["Rotation schedules", "Override", "Escalation chains", "Calendar integration"],
},
"alerting": {
"name": "Multi-Channel Alerting",
"description": "แจ้งเตือนผ่านหลายช่องทาง",
"channels": ["Email", "SMS", "Phone call", "Slack", "Teams", "PagerDuty", "Webhook"],
},
}
def show_features(self):
print("=== Better Uptime Features ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}]")
print(f" {feat['description']}")
if 'protocols' in feat:
print(f" Protocols: {', '.join(feat['protocols'][:4])}")
elif 'channels' in feat:
print(f" Channels: {', '.join(feat['channels'][:5])}")
print()
arch = BetterUptimeArch()
arch.show_features()
Real-time Processing Pipeline
# realtime_pipeline.py — Real-time monitoring pipeline
import json
class RealtimePipeline:
STAGES = {
"collect": {
"name": "1. Data Collection",
"description": "Probes ทั่วโลกส่ง health check requests ทุก 30 วินาที",
"output": "Response time, status code, SSL validity, content match",
},
"process": {
"name": "2. Stream Processing",
"description": "ประมวลผลแบบ real-time — detect anomalies, calculate SLI/SLO",
"tech": "Kafka Streams / Flink สำหรับ event processing",
},
"evaluate": {
"name": "3. Incident Evaluation",
"description": "ตรวจสอบจากหลาย locations — ป้องกัน false positives",
"logic": "Confirm from 3+ locations ก่อน trigger incident",
},
"alert": {
"name": "4. Alert & Escalation",
"description": "แจ้งเตือนตาม escalation policy — primary → secondary → manager",
"latency": "< 30 วินาที จาก detection ถึง alert delivery",
},
"remediate": {
"name": "5. Auto-Remediation",
"description": "Execute runbook อัตโนมัติ — restart service, scale up, failover",
"tools": "Webhooks → Lambda/Cloud Functions → kubectl/API calls",
},
}
def show_pipeline(self):
print("=== Real-time Pipeline ===\n")
for key, stage in self.STAGES.items():
print(f"[{stage['name']}]")
print(f" {stage['description']}")
print()
pipeline = RealtimePipeline()
pipeline.show_pipeline()
Python Integration
# integration.py — Better Uptime API integration
import json
import random
class BetterUptimeAPI:
CODE = """
# betteruptime_client.py — Python client for Better Uptime API
import requests
import json
from datetime import datetime, timedelta
class BetterUptimeClient:
BASE_URL = "https://betteruptime.com/api/v2"
def __init__(self, api_token):
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json",
})
def list_monitors(self):
resp = self.session.get(f"{self.BASE_URL}/monitors")
resp.raise_for_status()
return resp.json()["data"]
def create_monitor(self, url, monitor_type="status", check_frequency=30):
data = {
"monitor_type": monitor_type,
"url": url,
"check_frequency": check_frequency,
"regions": ["us", "eu", "as", "au"],
"confirmation_period": 3,
"request_timeout": 15,
"ssl_expiration": 30,
}
resp = self.session.post(f"{self.BASE_URL}/monitors", json=data)
return resp.json()
def get_incidents(self, status="ongoing"):
resp = self.session.get(
f"{self.BASE_URL}/incidents",
params={"status": status}
)
return resp.json()["data"]
def acknowledge_incident(self, incident_id):
resp = self.session.post(
f"{self.BASE_URL}/incidents/{incident_id}/acknowledge"
)
return resp.json()
def resolve_incident(self, incident_id):
resp = self.session.post(
f"{self.BASE_URL}/incidents/{incident_id}/resolve"
)
return resp.json()
def get_uptime_sla(self, monitor_id, days=30):
from_date = (datetime.now() - timedelta(days=days)).isoformat()
resp = self.session.get(
f"{self.BASE_URL}/monitors/{monitor_id}/sla",
params={"from": from_date}
)
return resp.json()
def create_status_page(self, name, subdomain):
data = {
"company_name": name,
"subdomain": subdomain,
"timezone": "Asia/Bangkok",
}
resp = self.session.post(f"{self.BASE_URL}/status-pages", json=data)
return resp.json()
client = BetterUptimeClient("YOUR_API_TOKEN")
monitors = client.list_monitors()
for m in monitors:
attrs = m["attributes"]
print(f" [{attrs['status']}] {attrs['url']} — {attrs['monitor_type']}")
"""
def show_code(self):
print("=== Better Uptime Client ===")
print(self.CODE[:600])
def dashboard(self):
print(f"\n=== Monitoring Dashboard ===")
services = [
{"name": "API Gateway", "status": "up", "uptime": random.uniform(99.9, 100)},
{"name": "Web App", "status": "up", "uptime": random.uniform(99.8, 100)},
{"name": "Database", "status": "up", "uptime": random.uniform(99.5, 100)},
{"name": "CDN", "status": "up", "uptime": random.uniform(99.9, 100)},
{"name": "Payment API", "status": "up" if random.random() > 0.1 else "down", "uptime": random.uniform(99.0, 100)},
]
for svc in services:
icon = "OK" if svc["status"] == "up" else "DOWN"
print(f" [{icon:>4}] {svc['name']:<15} Uptime: {svc['uptime']:.3f}%")
api = BetterUptimeAPI()
api.show_code()
api.dashboard()
Auto-Remediation & Webhooks
# remediation.py — Auto-remediation with webhooks
import json
class AutoRemediation:
CODE = """
# webhook_handler.py — Handle Better Uptime webhooks
from fastapi import FastAPI, Request
import subprocess
import json
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
RUNBOOKS = {
"api-gateway": {
"action": "restart",
"command": "kubectl rollout restart deployment/api-gateway -n production",
"max_retries": 2,
},
"web-app": {
"action": "scale",
"command": "kubectl scale deployment/web-app --replicas=5 -n production",
"max_retries": 1,
},
"database": {
"action": "failover",
"command": "python scripts/db_failover.py",
"max_retries": 1,
"notify": ["dba-team"],
},
}
@app.post("/webhook/betteruptime")
async def handle_incident(request: Request):
payload = await request.json()
event_type = payload.get("data", {}).get("attributes", {}).get("event", "")
monitor_url = payload.get("data", {}).get("attributes", {}).get("url", "")
logger.info(f"Event: {event_type} for {monitor_url}")
if event_type == "incident.started":
# Find matching runbook
for service, runbook in RUNBOOKS.items():
if service in monitor_url:
logger.info(f"Executing runbook for {service}: {runbook['action']}")
result = subprocess.run(
runbook["command"].split(),
capture_output=True, text=True, timeout=60
)
if result.returncode == 0:
logger.info(f"Remediation successful for {service}")
else:
logger.error(f"Remediation failed: {result.stderr}")
return {"status": "remediated", "service": service}
elif event_type == "incident.resolved":
logger.info(f"Incident resolved for {monitor_url}")
return {"status": "received"}
@app.get("/health")
async def health():
return {"status": "ok"}
"""
def show_code(self):
print("=== Auto-Remediation ===")
print(self.CODE[:600])
def incident_timeline(self):
print(f"\n=== Incident Timeline (Example) ===")
events = [
("00:00", "Monitor detects failure from US region"),
("00:15", "Confirmed from EU + AS regions (3 locations)"),
("00:20", "Incident created automatically"),
("00:25", "Webhook fires → auto-restart triggered"),
("00:45", "Service back up — auto-resolve check"),
("01:00", "Confirmed recovery from all regions"),
("01:05", "Incident resolved — total downtime: ~45 seconds"),
]
for time, event in events:
print(f" [{time}s] {event}")
rem = AutoRemediation()
rem.show_code()
rem.incident_timeline()
SLI/SLO Monitoring
# sli_slo.py — SLI/SLO monitoring
import json
import random
class SLISLOMonitoring:
CODE = """
# slo_tracker.py — Track SLI/SLO compliance
class SLOTracker:
def __init__(self, target_uptime=99.9):
self.target = target_uptime
self.error_budget = 100 - target_uptime # 0.1% for 99.9%
def calculate_error_budget(self, actual_uptime, period_hours=720):
budget_minutes = (self.error_budget / 100) * period_hours * 60
used_minutes = ((100 - actual_uptime) / 100) * period_hours * 60
remaining_minutes = budget_minutes - used_minutes
remaining_pct = (remaining_minutes / budget_minutes) * 100
return {
"target": f"{self.target}%",
"actual": f"{actual_uptime}%",
"budget_total_min": round(budget_minutes, 1),
"budget_used_min": round(used_minutes, 1),
"budget_remaining_min": round(remaining_minutes, 1),
"budget_remaining_pct": round(remaining_pct, 1),
"on_track": remaining_pct > 0,
}
"""
def show_code(self):
print("=== SLO Tracker ===")
print(self.CODE[:500])
def slo_dashboard(self):
print(f"\n=== SLO Dashboard (30-day) ===")
services = [
{"name": "API", "target": 99.9, "actual": random.uniform(99.85, 100)},
{"name": "Web", "target": 99.5, "actual": random.uniform(99.3, 100)},
{"name": "DB", "target": 99.95, "actual": random.uniform(99.9, 100)},
{"name": "CDN", "target": 99.99, "actual": random.uniform(99.95, 100)},
]
print(f" {'Service':<10} {'Target':>8} {'Actual':>8} {'Budget':>10} {'Status':>8}")
for svc in services:
budget_total = (100 - svc["target"]) / 100 * 720 * 60 # minutes
budget_used = (100 - svc["actual"]) / 100 * 720 * 60
remaining = max(0, budget_total - budget_used)
status = "OK" if remaining > 0 else "BREACH"
print(f" {svc['name']:<10} {svc['target']:>7.2f}% {svc['actual']:>7.3f}% {remaining:>8.1f}m {status:>8}")
slo = SLISLOMonitoring()
slo.show_code()
slo.slo_dashboard()
FAQ - คำถามที่พบบ่อย
Q: Better Uptime กับ UptimeRobot อันไหนดี?
A: Better Uptime: incident management ดีกว่า, on-call scheduling, status pages สวยกว่า, API ครบกว่า UptimeRobot: ถูกกว่า, free tier ดี (50 monitors), เก่าแก่กว่า, community ใหญ่ เลือก Better Uptime: ถ้าต้องการ incident management + on-call + status pages ครบ เลือก UptimeRobot: ถ้าต้องการแค่ basic uptime monitoring ราคาถูก
Q: Real-time processing จำเป็นสำหรับ monitoring ไหม?
A: จำเป็นมาก — downtime ทุกนาทีมีค่าใช้จ่าย: E-commerce: downtime 1 นาที = สูญเสีย $5,600 (average) SaaS: downtime กระทบ SLA → refund + reputation damage Real-time: detect ภายใน 30 วินาที + alert ภายใน 1 นาที = ลด MTTR อย่างมาก Batch (ทุก 5 นาที): อาจพลาด downtime 5 นาทีก่อน detect
Q: False positive ป้องกันอย่างไร?
A: Multi-location confirmation: ตรวจจาก 3+ locations ก่อน trigger incident Confirmation period: รอ 2-3 checks ก่อน confirm downtime Keyword check: ตรวจ response body ด้วย — ไม่ใช่แค่ status code Maintenance windows: exclude scheduled maintenance จาก monitoring Smart alerting: group related alerts → ลด noise
Q: ค่าใช้จ่ายเท่าไหร่?
A: Better Uptime pricing: Free: 10 monitors, 3-minute checks, email alerts Starter: $20/month — 50 monitors, 30-second checks Team: $40/month — unlimited monitors, on-call, phone alerts Business: $80/month — multiple status pages, advanced features เทียบ: UptimeRobot Free 50 monitors, Pingdom $10+/month, Datadog $23+/host/month
