SiamCafe.net Blog
Technology

SigNoz Observability Backup Recovery Strategy

signoz observability backup recovery strategy
SigNoz Observability Backup Recovery Strategy | SiamCafe Blog
2026-03-05· อ. บอม — SiamCafe.net· 8,377 คำ

Backup Recovery Strategy

Backup Recovery Strategy กำหนด RPO RTO สำหรับแต่ละระบบ ครอบคลุม Full Backup, Incremental, Offsite Storage ทดสอบ Recovery สม่ำเสมอ

SigNoz ช่วย Monitor Backup Jobs ติดตาม Duration, Success Rate, Storage Usage ตั้ง Alerts เมื่อ Backup ล้มเหลว สร้าง Dashboard แสดงสถานะทั้งหมด

Backup Automation

# === Backup Automation Script ===
#!/bin/bash
# backup_automated.sh — Automated Backup with Monitoring

set -euo pipefail

# Configuration
BACKUP_DIR="/backup"
S3_BUCKET="s3://company-backups"
RETENTION_DAYS=30
DATE=$(date +%Y%m%d_%H%M%S)
LOG_FILE="/var/log/backup/backup_.log"

# Databases to backup
DATABASES=("postgres_main" "postgres_analytics" "redis_cache" "clickhouse_events")

# OpenTelemetry — ส่ง Metrics ไป SigNoz
OTEL_ENDPOINT="http://signoz:4318/v1/metrics"

send_metric() {
    local name=$1
    local value=$2
    local labels=$3
    # ส่ง Metric ผ่าน OTLP HTTP
    curl -s -X POST "$OTEL_ENDPOINT" \
      -H "Content-Type: application/json" \
      -d "{\"resourceMetrics\":[{\"resource\":{\"attributes\":[{\"key\":\"service.name\",\"value\":{\"stringValue\":\"backup-service\"}}]},\"scopeMetrics\":[{\"metrics\":[{\"name\":\"$name\",\"gauge\":{\"dataPoints\":[{\"asDouble\":$value}]}}]}]}]}" \
      > /dev/null 2>&1 || true
}

backup_database() {
    local db=$1
    local start_time=$(date +%s)
    local backup_file="/_.sql.gz"

    echo "[$(date)] Starting backup: $db" | tee -a "$LOG_FILE"

    case "$db" in
        postgres_*)
            pg_dump "" | gzip > "$backup_file" 2>> "$LOG_FILE"
            ;;
        redis_*)
            redis-cli BGSAVE
            cp /var/lib/redis/dump.rdb ".rdb" 2>> "$LOG_FILE"
            ;;
        clickhouse_*)
            clickhouse-client --query "SELECT * FROM " \
              --format Native | gzip > "$backup_file" 2>> "$LOG_FILE"
            ;;
    esac

    local end_time=$(date +%s)
    local duration=$((end_time - start_time))
    local size=$(stat -f%z "$backup_file" 2>/dev/null || stat -c%s "$backup_file" 2>/dev/null || echo 0)

    # Verify backup
    if gzip -t "$backup_file" 2>/dev/null; then
        echo "[$(date)] Backup OK: $db (s,  bytes)" | tee -a "$LOG_FILE"
        send_metric "backup.duration" "$duration" "db=$db"
        send_metric "backup.size_bytes" "$size" "db=$db"
        send_metric "backup.success" "1" "db=$db"

        # Upload to S3
        aws s3 cp "$backup_file" "//" --storage-class STANDARD_IA
        echo "[$(date)] Uploaded to S3: $db" | tee -a "$LOG_FILE"
    else
        echo "[$(date)] FAILED: $db" | tee -a "$LOG_FILE"
        send_metric "backup.success" "0" "db=$db"
    fi
}

# Main
echo "[$(date)] === Backup Started ===" | tee -a "$LOG_FILE"
total_start=$(date +%s)

for db in ""; do
    backup_database "$db"
done

# Cleanup old backups
find "$BACKUP_DIR" -name "*.sql.gz" -mtime +$RETENTION_DAYS -delete
find "$BACKUP_DIR" -name "*.rdb" -mtime +$RETENTION_DAYS -delete

total_end=$(date +%s)
total_duration=$((total_end - total_start))

echo "[$(date)] === Backup Complete (s) ===" | tee -a "$LOG_FILE"
send_metric "backup.total_duration" "$total_duration" ""

# Crontab: 0 2 * * * /opt/scripts/backup_automated.sh

Recovery Monitoring ด้วย SigNoz

# recovery_monitor.py — Backup Recovery Monitoring
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime, timedelta
from enum import Enum

class BackupStatus(Enum):
    SUCCESS = "success"
    FAILED = "failed"
    RUNNING = "running"
    VERIFYING = "verifying"

@dataclass
class BackupJob:
    database: str
    timestamp: datetime
    status: BackupStatus
    duration_seconds: int
    size_bytes: int
    location: str  # local, s3, gcs
    verified: bool = False

@dataclass
class RecoveryTest:
    database: str
    timestamp: datetime
    recovery_time_seconds: int
    data_loss_seconds: int  # Actual RPO achieved
    success: bool
    notes: str = ""

class BackupRecoveryMonitor:
    """Backup Recovery Monitor สำหรับ SigNoz"""

    def __init__(self):
        self.backups: List[BackupJob] = []
        self.recovery_tests: List[RecoveryTest] = []
        self.sla = {
            "postgres_main": {"rpo_hours": 1, "rto_hours": 2},
            "postgres_analytics": {"rpo_hours": 6, "rto_hours": 4},
            "redis_cache": {"rpo_hours": 24, "rto_hours": 1},
            "clickhouse_events": {"rpo_hours": 4, "rto_hours": 6},
        }

    def add_backup(self, job: BackupJob):
        self.backups.append(job)

    def add_recovery_test(self, test: RecoveryTest):
        self.recovery_tests.append(test)

    def dashboard(self):
        """Backup Dashboard"""
        print(f"\n{'='*60}")
        print(f"Backup Recovery Dashboard — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        print(f"{'='*60}")

        # Summary
        total = len(self.backups)
        success = sum(1 for b in self.backups if b.status == BackupStatus.SUCCESS)
        rate = success / total * 100 if total > 0 else 0

        print(f"  Backups: {success}/{total} successful ({rate:.0f}%)")

        total_size = sum(b.size_bytes for b in self.backups if b.status == BackupStatus.SUCCESS)
        print(f"  Total Size: {total_size / (1024**3):.1f} GB")

        # Per Database
        print(f"\n  Per Database:")
        dbs = set(b.database for b in self.backups)
        for db in sorted(dbs):
            db_backups = [b for b in self.backups if b.database == db]
            latest = max(db_backups, key=lambda b: b.timestamp)
            avg_dur = sum(b.duration_seconds for b in db_backups) / len(db_backups)

            sla = self.sla.get(db, {})
            rpo = sla.get("rpo_hours", "N/A")
            rto = sla.get("rto_hours", "N/A")

            status = "OK" if latest.status == BackupStatus.SUCCESS else "FAIL"
            print(f"    [{status:>4}] {db:<25} "
                  f"Last: {latest.timestamp.strftime('%m-%d %H:%M')} "
                  f"Avg: {avg_dur:.0f}s "
                  f"RPO:{rpo}h RTO:{rto}h")

        # Recovery Tests
        if self.recovery_tests:
            print(f"\n  Recovery Tests:")
            for test in self.recovery_tests[-5:]:
                status = "PASS" if test.success else "FAIL"
                print(f"    [{status}] {test.database}: "
                      f"Recovery={test.recovery_time_seconds}s "
                      f"DataLoss={test.data_loss_seconds}s")

    def compliance_check(self):
        """SLA Compliance Check"""
        print(f"\n  SLA Compliance:")
        for db, targets in self.sla.items():
            db_backups = [b for b in self.backups
                         if b.database == db and b.status == BackupStatus.SUCCESS]

            if not db_backups:
                print(f"    [FAIL] {db}: No successful backups")
                continue

            latest = max(db_backups, key=lambda b: b.timestamp)
            hours_since = (datetime.now() - latest.timestamp).total_seconds() / 3600

            rpo_ok = hours_since <= targets["rpo_hours"]
            status = "PASS" if rpo_ok else "FAIL"
            print(f"    [{status}] {db}: "
                  f"Last backup {hours_since:.1f}h ago "
                  f"(RPO: {targets['rpo_hours']}h)")

# ตัวอย่าง
monitor = BackupRecoveryMonitor()

now = datetime.now()
backups = [
    BackupJob("postgres_main", now - timedelta(hours=1), BackupStatus.SUCCESS,
              120, 5 * 1024**3, "s3", True),
    BackupJob("postgres_analytics", now - timedelta(hours=3), BackupStatus.SUCCESS,
              300, 20 * 1024**3, "s3", True),
    BackupJob("redis_cache", now - timedelta(hours=12), BackupStatus.SUCCESS,
              15, 500 * 1024**2, "s3", True),
    BackupJob("clickhouse_events", now - timedelta(hours=2), BackupStatus.SUCCESS,
              600, 50 * 1024**3, "s3", True),
    BackupJob("postgres_main", now - timedelta(hours=25), BackupStatus.SUCCESS,
              115, 4.8 * 1024**3, "s3", True),
]

for b in backups:
    monitor.add_backup(b)

monitor.add_recovery_test(RecoveryTest(
    "postgres_main", now - timedelta(days=7), 1800, 300, True, "DR Drill Q1"))
monitor.add_recovery_test(RecoveryTest(
    "clickhouse_events", now - timedelta(days=7), 3600, 900, True, "DR Drill Q1"))

monitor.dashboard()
monitor.compliance_check()

Disaster Recovery Plan

# === Disaster Recovery Plan ===

dr_plan = {
    "tier1_critical": {
        "systems": ["postgres_main", "api-gateway", "auth-service"],
        "rpo": "1 hour",
        "rto": "2 hours",
        "strategy": "Active-Active Cross-Region",
        "backup": "Continuous Replication + Hourly Snapshots",
        "recovery": [
            "1. Detect failure (SigNoz Alert)",
            "2. Verify scope of failure",
            "3. Failover DNS to DR region",
            "4. Verify services in DR region",
            "5. Notify stakeholders",
        ],
    },
    "tier2_important": {
        "systems": ["postgres_analytics", "clickhouse_events", "redis_cache"],
        "rpo": "4 hours",
        "rto": "4 hours",
        "strategy": "Warm Standby + S3 Backup",
        "backup": "4-hourly Full + Continuous WAL",
        "recovery": [
            "1. Detect failure (SigNoz Alert)",
            "2. Provision new instances from AMI",
            "3. Restore from latest backup",
            "4. Apply WAL logs to minimize data loss",
            "5. Update DNS and config",
            "6. Verify data integrity",
        ],
    },
    "tier3_non_critical": {
        "systems": ["dev-db", "staging-db", "log-archive"],
        "rpo": "24 hours",
        "rto": "8 hours",
        "strategy": "Daily Backup to S3",
        "backup": "Daily Full Backup",
        "recovery": [
            "1. Provision new instances",
            "2. Restore from S3 backup",
            "3. Verify services",
        ],
    },
}

# DR Drill Schedule
dr_drills = [
    {"quarter": "Q1", "type": "Tabletop Exercise", "scope": "All tiers"},
    {"quarter": "Q2", "type": "Tier 1 Failover Test", "scope": "Critical systems"},
    {"quarter": "Q3", "type": "Full DR Drill", "scope": "All tiers"},
    {"quarter": "Q4", "type": "Tier 2 Recovery Test", "scope": "Important systems"},
]

print("Disaster Recovery Plan:")
for tier, plan in dr_plan.items():
    print(f"\n  [{tier}]")
    print(f"    Systems: {', '.join(plan['systems'])}")
    print(f"    RPO: {plan['rpo']} | RTO: {plan['rto']}")
    print(f"    Strategy: {plan['strategy']}")
    print(f"    Steps: {len(plan['recovery'])}")

print(f"\n  DR Drill Schedule:")
for drill in dr_drills:
    print(f"    {drill['quarter']}: {drill['type']} ({drill['scope']})")

Best Practices

Backup Recovery Strategy คืออะไร

แผนสำรองกู้คืนข้อมูล กำหนด RPO ข้อมูลหายได้มากสุด RTO กู้คืนเร็วแค่ไหน Full Incremental Backup Offsite Storage Automated Testing Disaster Recovery

RPO กับ RTO ต่างกันอย่างไร

RPO ข้อมูลหายได้มากสุด เช่น 1 ชั่วโมง RTO กู้คืนระบบเร็วแค่ไหน เช่น 4 ชั่วโมง RPO เกี่ยวกับข้อมูล RTO เกี่ยวกับเวลา

วิธี Monitor Backup ด้วย SigNoz ทำอย่างไร

OTel Custom Metrics backup_duration backup_size backup_success_rate Traces แต่ละ Job Alerts Backup ล้มเหลว Dashboard Status Duration Trend Storage Usage

Disaster Recovery Plan ต้องมีอะไรบ้าง

RPO RTO แต่ละระบบ Backup 3-2-1 Offsite Cross-region Replication Automated Recovery DR Drill ปีละ 2 ครั้ง Communication Plan Documentation อัพเดทสม่ำเสมอ

สรุป

SigNoz ร่วมกับ Backup Recovery Strategy ให้ Observability สำหรับ Backup Jobs ติดตาม Duration Success Rate Storage กำหนด RPO RTO ตาม Business Impact 3-2-1 Rule Immutable Backups Test Recovery เดือนละครั้ง DR Drills ปีละ 2 ครั้ง Alerts เมื่อ Backup ล้มเหลว

📖 บทความที่เกี่ยวข้อง

SigNoz Observability Troubleshooting แก้ปัญหาอ่านบทความ → Text Generation WebUI Backup Recovery Strategyอ่านบทความ → Python FastAPI Backup Recovery Strategyอ่านบทความ → ONNX Runtime Backup Recovery Strategyอ่านบทความ → Webhook Design Pattern Backup Recovery Strategyอ่านบทความ →

📚 ดูบทความทั้งหมด →