ai

Betteruptime Real-time Processing

Betteruptime Real-time Processing

Betteruptime Real-time Processing คืออะไร

Betteruptime Real-time Processing

Better Uptime เป็น monitoring platform สำหรับตรวจสอบ uptime ของเว็บไซต์ APIs และ services แบบ real-time รองรับ HTTP, TCP, UDP, DNS, ICMP monitoring พร้อม incident management, status pages และ on-call scheduling Real-time Processing คือการประมวลผลข้อมูลทันทีที่ได้รับ ไม่ต้องรอ batch การรวมสองแนวคิดนี้ช่วยสร้างระบบ monitoring ที่ detect downtime ภายในวินาที alert ทีมทันที และ auto-remediate ปัญหาอัตโนมัติ

Better Uptime Architecture

# betteruptime_arch.py — Better Uptime architecture

import json



class BetterUptimeArch:

    FEATURES = {

        "uptime_monitoring": {

            "name": "Uptime Monitoring",

            "description": "ตรวจสอบ availability ทุก 30 วินาที จากหลาย locations ทั่วโลก",

            "protocols": ["HTTP/HTTPS", "TCP", "UDP", "DNS", "ICMP Ping", "Keyword check"],

            "locations": "30+ global locations",

        },

        "incident_management": {

            "name": "Incident Management",

            "description": "สร้าง incident อัตโนมัติเมื่อ monitor ตรวจพบ downtime",

            "features": ["Auto-create incidents", "Escalation policies", "Acknowledgement", "Post-mortem"],

        },

        "status_pages": {

            "name": "Status Pages",

            "description": "Public/private status pages สำหรับแจ้ง users เรื่อง system status",

            "features": ["Custom domain", "Branded design", "Subscriber notifications", "Component groups"],

        },

        "on_call": {

            "name": "On-Call Scheduling",

            "description": "จัดตาราง on-call rotation สำหรับทีม",

            "features": ["Rotation schedules", "Override", "Escalation chains", "Calendar integration"],

        },

        "alerting": {

            "name": "Multi-Channel Alerting",

            "description": "แจ้งเตือนผ่านหลายช่องทาง",

            "channels": ["Email", "SMS", "Phone call", "Slack", "Teams", "PagerDuty", "Webhook"],

        },

    }



    def show_features(self):

        print("=== Better Uptime Features ===\n")

        for key, feat in self.FEATURES.items():

            print(f"[{feat['name']}]")

            print(f"  {feat['description']}")

            if 'protocols' in feat:

                print(f"  Protocols: {', '.join(feat['protocols'][:4])}")

            elif 'channels' in feat:

                print(f"  Channels: {', '.join(feat['channels'][:5])}")

            print()



arch = BetterUptimeArch()

arch.show_features()

Real-time Processing Pipeline

# realtime_pipeline.py — Real-time monitoring pipeline

import json



class RealtimePipeline:

    STAGES = {

        "collect": {

            "name": "1. Data Collection",

            "description": "Probes ทั่วโลกส่ง health check requests ทุก 30 วินาที",

            "output": "Response time, status code, SSL validity, content match",

        },

        "process": {

            "name": "2. Stream Processing",

            "description": "ประมวลผลแบบ real-time — detect anomalies, calculate SLI/SLO",

            "tech": "Kafka Streams / Flink สำหรับ event processing",

        },

        "evaluate": {

            "name": "3. Incident Evaluation",

            "description": "ตรวจสอบจากหลาย locations — ป้องกัน false positives",

            "logic": "Confirm from 3+ locations ก่อน trigger incident",

        },

        "alert": {

            "name": "4. Alert & Escalation",

            "description": "แจ้งเตือนตาม escalation policy — primary → secondary → manager",

            "latency": "< 30 วินาที จาก detection ถึง alert delivery",

        },

        "remediate": {

            "name": "5. Auto-Remediation",

            "description": "Execute runbook อัตโนมัติ — restart service, scale up, failover",

            "tools": "Webhooks → Lambda/Cloud Functions → kubectl/API calls",

        },

    }



    def show_pipeline(self):

        print("=== Real-time Pipeline ===\n")

        for key, stage in self.STAGES.items():

            print(f"[{stage['name']}]")

            print(f"  {stage['description']}")

            print()



pipeline = RealtimePipeline()

pipeline.show_pipeline()

Python Integration

Betteruptime Real-time Processing
# integration.py — Better Uptime API integration

import json

import random



class BetterUptimeAPI:

    CODE = """

# betteruptime_client.py — Python client for Better Uptime API

import requests

import json

from datetime import datetime, timedelta



class BetterUptimeClient:

    BASE_URL = "https://betteruptime.com/api/v2"

    

    def __init__(self, api_token):

        self.session = requests.Session()

        self.session.headers.update({

            "Authorization": f"Bearer {api_token}",

            "Content-Type": "application/json",

        })

    

    def list_monitors(self):

        resp = self.session.get(f"{self.BASE_URL}/monitors")

        resp.raise_for_status()

        return resp.json()["data"]

    

    def create_monitor(self, url, monitor_type="status", check_frequency=30):

        data = {

            "monitor_type": monitor_type,

            "url": url,

            "check_frequency": check_frequency,

            "regions": ["us", "eu", "as", "au"],

            "confirmation_period": 3,

            "request_timeout": 15,

            "ssl_expiration": 30,

        }

        resp = self.session.post(f"{self.BASE_URL}/monitors", json=data)

        return resp.json()

    

    def get_incidents(self, status="ongoing"):

        resp = self.session.get(

            f"{self.BASE_URL}/incidents",

            params={"status": status}

        )

        return resp.json()["data"]

    

    def acknowledge_incident(self, incident_id):

        resp = self.session.post(

            f"{self.BASE_URL}/incidents/{incident_id}/acknowledge"

        )

        return resp.json()

    

    def resolve_incident(self, incident_id):

        resp = self.session.post(

            f"{self.BASE_URL}/incidents/{incident_id}/resolve"

        )

        return resp.json()

    

    def get_uptime_sla(self, monitor_id, days=30):

        from_date = (datetime.now() - timedelta(days=days)).isoformat()

        resp = self.session.get(

            f"{self.BASE_URL}/monitors/{monitor_id}/sla",

            params={"from": from_date}

        )

        return resp.json()

    

    def create_status_page(self, name, subdomain):

        data = {

            "company_name": name,

            "subdomain": subdomain,

            "timezone": "Asia/Bangkok",

        }

        resp = self.session.post(f"{self.BASE_URL}/status-pages", json=data)

        return resp.json()



client = BetterUptimeClient("YOUR_API_TOKEN")

monitors = client.list_monitors()

for m in monitors:

    attrs = m["attributes"]

    print(f"  [{attrs['status']}] {attrs['url']} — {attrs['monitor_type']}")

"""



    def show_code(self):

        print("=== Better Uptime Client ===")

        print(self.CODE[:600])



    def dashboard(self):

        print(f"\n=== Monitoring Dashboard ===")

        services = [

            {"name": "API Gateway", "status": "up", "uptime": random.uniform(99.9, 100)},

            {"name": "Web App", "status": "up", "uptime": random.uniform(99.8, 100)},

            {"name": "Database", "status": "up", "uptime": random.uniform(99.5, 100)},

            {"name": "CDN", "status": "up", "uptime": random.uniform(99.9, 100)},

            {"name": "Payment API", "status": "up" if random.random() > 0.1 else "down", "uptime": random.uniform(99.0, 100)},

        ]

        for svc in services:

            icon = "OK" if svc["status"] == "up" else "DOWN"

            print(f"  [{icon:>4}] {svc['name']:<15} Uptime: {svc['uptime']:.3f}%")



api = BetterUptimeAPI()

api.show_code()

api.dashboard()

Auto-Remediation & Webhooks

# remediation.py — Auto-remediation with webhooks

import json



class AutoRemediation:

    CODE = """

# webhook_handler.py — Handle Better Uptime webhooks

from fastapi import FastAPI, Request

import subprocess

import json

import logging



app = FastAPI()

logger = logging.getLogger(__name__)



RUNBOOKS = {

    "api-gateway": {

        "action": "restart",

        "command": "kubectl rollout restart deployment/api-gateway -n production",

        "max_retries": 2,

    },

    "web-app": {

        "action": "scale",

        "command": "kubectl scale deployment/web-app --replicas=5 -n production",

        "max_retries": 1,

    },

    "database": {

        "action": "failover",

        "command": "python scripts/db_failover.py",

        "max_retries": 1,

        "notify": ["dba-team"],

    },

}



@app.post("/webhook/betteruptime")

async def handle_incident(request: Request):

    payload = await request.json()

    

    event_type = payload.get("data", {}).get("attributes", {}).get("event", "")

    monitor_url = payload.get("data", {}).get("attributes", {}).get("url", "")

    

    logger.info(f"Event: {event_type} for {monitor_url}")

    

    if event_type == "incident.started":

        # Find matching runbook

        for service, runbook in RUNBOOKS.items():

            if service in monitor_url:

                logger.info(f"Executing runbook for {service}: {runbook['action']}")

                

                result = subprocess.run(

                    runbook["command"].split(),

                    capture_output=True, text=True, timeout=60

                )

                

                if result.returncode == 0:

                    logger.info(f"Remediation successful for {service}")

                else:

                    logger.error(f"Remediation failed: {result.stderr}")

                

                return {"status": "remediated", "service": service}

    

    elif event_type == "incident.resolved":

        logger.info(f"Incident resolved for {monitor_url}")

    

    return {"status": "received"}



@app.get("/health")

async def health():

    return {"status": "ok"}

"""



    def show_code(self):

        print("=== Auto-Remediation ===")

        print(self.CODE[:600])



    def incident_timeline(self):

        print(f"\n=== Incident Timeline (Example) ===")

        events = [

            ("00:00", "Monitor detects failure from US region"),

            ("00:15", "Confirmed from EU + AS regions (3 locations)"),

            ("00:20", "Incident created automatically"),

            ("00:25", "Webhook fires → auto-restart triggered"),

            ("00:45", "Service back up — auto-resolve check"),

            ("01:00", "Confirmed recovery from all regions"),

            ("01:05", "Incident resolved — total downtime: ~45 seconds"),

        ]

        for time, event in events:

            print(f"  [{time}s] {event}")



rem = AutoRemediation()

rem.show_code()

rem.incident_timeline()

SLI/SLO Monitoring

# sli_slo.py — SLI/SLO monitoring

import json

import random



class SLISLOMonitoring:

    CODE = """

# slo_tracker.py — Track SLI/SLO compliance

class SLOTracker:

    def __init__(self, target_uptime=99.9):

        self.target = target_uptime

        self.error_budget = 100 - target_uptime  # 0.1% for 99.9%

    

    def calculate_error_budget(self, actual_uptime, period_hours=720):

        budget_minutes = (self.error_budget / 100) * period_hours * 60

        used_minutes = ((100 - actual_uptime) / 100) * period_hours * 60

        remaining_minutes = budget_minutes - used_minutes

        remaining_pct = (remaining_minutes / budget_minutes) * 100

        

        return {

            "target": f"{self.target}%",

            "actual": f"{actual_uptime}%",

            "budget_total_min": round(budget_minutes, 1),

            "budget_used_min": round(used_minutes, 1),

            "budget_remaining_min": round(remaining_minutes, 1),

            "budget_remaining_pct": round(remaining_pct, 1),

            "on_track": remaining_pct > 0,

        }

"""



    def show_code(self):

        print("=== SLO Tracker ===")

        print(self.CODE[:500])



    def slo_dashboard(self):

        print(f"\n=== SLO Dashboard (30-day) ===")

        services = [

            {"name": "API", "target": 99.9, "actual": random.uniform(99.85, 100)},

            {"name": "Web", "target": 99.5, "actual": random.uniform(99.3, 100)},

            {"name": "DB", "target": 99.95, "actual": random.uniform(99.9, 100)},

            {"name": "CDN", "target": 99.99, "actual": random.uniform(99.95, 100)},

        ]

        print(f"  {'Service':<10} {'Target':>8} {'Actual':>8} {'Budget':>10} {'Status':>8}")

        for svc in services:

            budget_total = (100 - svc["target"]) / 100 * 720 * 60  # minutes

            budget_used = (100 - svc["actual"]) / 100 * 720 * 60

            remaining = max(0, budget_total - budget_used)

            status = "OK" if remaining > 0 else "BREACH"

            print(f"  {svc['name']:<10} {svc['target']:>7.2f}% {svc['actual']:>7.3f}% {remaining:>8.1f}m {status:>8}")



slo = SLISLOMonitoring()

slo.show_code()

slo.slo_dashboard()

FAQ - คำถามที่พบบ่อย

Q: Better Uptime กับ UptimeRobot อันไหนดี?

A: Better Uptime: incident management ดีกว่า, on-call scheduling, status pages สวยกว่า, API ครบกว่า UptimeRobot: ถูกกว่า, free tier ดี (50 monitors), เก่าแก่กว่า, community ใหญ่ เลือก Better Uptime: ถ้าต้องการ incident management + on-call + status pages ครบ เลือก UptimeRobot: ถ้าต้องการแค่ basic uptime monitoring ราคาถูก

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน อนโด — คู่มือฉบับสมบูรณ์ 2026

Q: Real-time processing จำเป็นสำหรับ monitoring ไหม?

แนะนำเพิ่มเติม — iCafeForex

A: จำเป็นมาก — downtime ทุกนาทีมีค่าใช้จ่าย: E-commerce: downtime 1 นาที = สูญเสีย $5,600 (average) SaaS: downtime กระทบ SLA → refund + reputation damage Real-time: detect ภายใน 30 วินาที + alert ภายใน 1 นาที = ลด MTTR อย่างมาก Batch (ทุก 5 นาที): อาจพลาด downtime 5 นาทีก่อน detect

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Prometheus Alertmanager คู่มือฉบับสมบูรณ์ 2026 — คู่มือฉบับสมบูรณ์ 2026

Q: False positive ป้องกันอย่างไร?

A: Multi-location confirmation: ตรวจจาก 3+ locations ก่อน trigger incident Confirmation period: รอ 2-3 checks ก่อน confirm downtime Keyword check: ตรวจ response body ด้วย — ไม่ใช่แค่ status code Maintenance windows: exclude scheduled maintenance จาก monitoring Smart alerting: group related alerts → ลด noise

แนะนำเพิ่มเติม — ติดตาม XM Signal

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Go Fiber High Availability HA Setup

Q: ค่าใช้จ่ายเท่าไหร่?

A: Better Uptime pricing: Free: 10 monitors, 3-minute checks, email alerts Starter: $20/month — 50 monitors, 30-second checks Team: $40/month — unlimited monitors, on-call, phone alerts Business: $80/month — multiple status pages, advanced features เทียบ: UptimeRobot Free 50 monitors, Pingdom $10+/month, Datadog $23+/host/month

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Proxmox VE — คู่มือ Virtualization Platform

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง