ทำไมต้อง Test Proxmox VE Cluster
Proxmox VE Cluster เป็น critical infrastructure ที่รัน production workloads การทดสอบอย่างเป็นระบบช่วยลดความเสี่ยงจาก misconfiguration, hardware failures และ software bugs ที่อาจทำให้เกิด downtime
Testing Strategy สำหรับ Proxmox Cluster ครอบคลุม Functional Testing ตรวจสอบว่า features ทำงานถูกต้อง (VM creation, migration, snapshots), Performance Testing วัดประสิทธิภาพภายใต้ load ต่างๆ, High Availability Testing ทดสอบ failover scenarios, Disaster Recovery Testing ทดสอบ backup/restore procedures, Security Testing ตรวจสอบ access controls และ network isolation และ Integration Testing ทดสอบการทำงานร่วมกับ external systems
QA (Quality Assurance) สำหรับ infrastructure ต่างจาก software QA ตรงที่ test environment ต้อง mirror production ให้ใกล้เคียงที่สุด tests อาจกระทบ running services ต้อง schedule อย่างระวัง และ recovery testing ต้องทำจริงไม่ใช่แค่ simulate
หลักการสำคัญคือ Test early and often, Automate repeatable tests, Document test procedures และ results, Review and update test plans เมื่อ infrastructure เปลี่ยน
วางแผน Testing Strategy สำหรับ Proxmox
โครงสร้าง test plan สำหรับ Proxmox cluster
# === Proxmox VE Cluster Test Plan ===
# Test Environment Architecture:
# ┌─────────────────────────────────────────┐
# │ Test Proxmox Cluster │
# │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
# │ │ Node 1 │ │ Node 2 │ │ Node 3 │ │
# │ │ (pve1) │ │ (pve2) │ │ (pve3) │ │
# │ └────┬────┘ └────┬────┘ └────┬────┘ │
# │ │ │ │ │
# │ ┌────┴───────────┴───────────┴────┐ │
# │ │ Cluster Network (VLAN) │ │
# │ └────────────────────────────────┘ │
# │ │
# │ Storage: Ceph / NFS / ZFS │
# │ HA: Corosync + Watchdog │
# └─────────────────────────────────────────┘
# === Test Categories ===
# 1. Smoke Tests (ทุก deploy)
# - Cluster quorum status
# - All nodes online
# - Storage accessible
# - Network connectivity
# - API responsiveness
# 2. Functional Tests (weekly)
# - VM lifecycle (create, start, stop, delete)
# - Container lifecycle (LXC)
# - Live migration
# - Snapshot create/restore
# - Backup/restore
# - Firewall rules
# - User permissions
# 3. Performance Tests (monthly)
# - VM creation time
# - Migration speed
# - Storage IOPS
# - Network throughput
# - API response times under load
# 4. HA/Failover Tests (quarterly)
# - Node failure simulation
# - Storage failure handling
# - Network partition (split-brain)
# - Fencing mechanism
# - Auto-restart of HA VMs
# 5. DR Tests (bi-annually)
# - Full cluster restore from backup
# - Cross-site failover
# - Data integrity verification
# - RTO/RPO validation
# === Test Execution Schedule ===
# Daily: Smoke tests (automated)
# Weekly: Functional tests (automated)
# Monthly: Performance benchmarks (automated)
# Quarterly: HA failover tests (manual + automated)
# Bi-annually: Full DR drill (manual)
# === Test Results Tracking ===
# - Store results in SQLite/PostgreSQL
# - Generate HTML reports
# - Alert on failures via email/Slack
# - Track trends over time
# - Compare against baselines
Automated Testing ด้วย Scripts
Scripts สำหรับ automated Proxmox testing
#!/usr/bin/env python3
# proxmox_tester.py — Automated Proxmox VE Cluster Testing
import requests
import json
import time
import logging
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass, field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pve_tester")
@dataclass
class TestResult:
name: str
category: str
passed: bool
duration_ms: float
message: str
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
class ProxmoxTester:
def __init__(self, host, user, password, verify_ssl=False):
self.base_url = f"https://{host}:8006/api2/json"
self.verify = verify_ssl
self.ticket = None
self.csrf = None
self.results: List[TestResult] = []
self._authenticate(user, password)
def _authenticate(self, user, password):
resp = requests.post(
f"{self.base_url}/access/ticket",
data={"username": user, "password": password},
verify=self.verify,
)
resp.raise_for_status()
data = resp.json()["data"]
self.ticket = data["ticket"]
self.csrf = data["CSRFPreventionToken"]
def _get(self, path):
return requests.get(
f"{self.base_url}{path}",
cookies={"PVEAuthCookie": self.ticket},
verify=self.verify,
).json()
def _post(self, path, data=None):
return requests.post(
f"{self.base_url}{path}",
data=data,
cookies={"PVEAuthCookie": self.ticket},
headers={"CSRFPreventionToken": self.csrf},
verify=self.verify,
).json()
def _run_test(self, name, category, test_fn):
start = time.time()
try:
result = test_fn()
duration = (time.time() - start) * 1000
passed = result.get("passed", True)
message = result.get("message", "OK")
tr = TestResult(name, category, passed, duration, message)
except Exception as e:
duration = (time.time() - start) * 1000
tr = TestResult(name, category, False, duration, str(e))
self.results.append(tr)
status = "PASS" if tr.passed else "FAIL"
logger.info(f"[{status}] {name} ({tr.duration_ms:.0f}ms) - {tr.message}")
return tr
# === Smoke Tests ===
def test_cluster_status(self):
def check():
data = self._get("/cluster/status")["data"]
nodes_online = sum(1 for n in data if n["type"] == "node" and n.get("online", 0) == 1)
total_nodes = sum(1 for n in data if n["type"] == "node")
quorum = next((n for n in data if n["type"] == "cluster"), {}).get("quorate", 0)
return {
"passed": nodes_online == total_nodes and quorum == 1,
"message": f"Nodes: {nodes_online}/{total_nodes}, Quorum: {'yes' if quorum else 'no'}",
}
return self._run_test("Cluster Status", "smoke", check)
def test_storage_status(self):
def check():
data = self._get("/storage")["data"]
active = sum(1 for s in data if s.get("active", 0) == 1)
total = len(data)
return {
"passed": active == total,
"message": f"Storage: {active}/{total} active",
}
return self._run_test("Storage Status", "smoke", check)
def test_node_resources(self):
def check():
data = self._get("/cluster/resources")["data"]
nodes = [r for r in data if r["type"] == "node"]
issues = []
for node in nodes:
cpu = node.get("cpu", 0) * 100
mem_pct = node.get("mem", 0) / max(node.get("maxmem", 1), 1) * 100
if cpu > 90:
issues.append(f"{node['node']}: CPU {cpu:.0f}%")
if mem_pct > 95:
issues.append(f"{node['node']}: MEM {mem_pct:.0f}%")
return {
"passed": len(issues) == 0,
"message": f"Nodes OK" if not issues else f"Issues: {', '.join(issues)}",
}
return self._run_test("Node Resources", "smoke", check)
# === Functional Tests ===
def test_vm_lifecycle(self, node, vmid=9999):
def check():
# Create VM
self._post(f"/nodes/{node}/qemu", {
"vmid": vmid,
"name": "test-vm",
"memory": 512,
"cores": 1,
"net0": "virtio, bridge=vmbr0",
"ostype": "l26",
})
time.sleep(2)
# Start VM
self._post(f"/nodes/{node}/qemu/{vmid}/status/start")
time.sleep(5)
# Check status
status = self._get(f"/nodes/{node}/qemu/{vmid}/status/current")["data"]
running = status.get("status") == "running"
# Stop VM
self._post(f"/nodes/{node}/qemu/{vmid}/status/stop")
time.sleep(3)
# Delete VM
requests.delete(
f"{self.base_url}/nodes/{node}/qemu/{vmid}",
cookies={"PVEAuthCookie": self.ticket},
headers={"CSRFPreventionToken": self.csrf},
verify=self.verify,
)
return {
"passed": running,
"message": f"VM lifecycle {'OK' if running else 'FAILED'}",
}
return self._run_test("VM Lifecycle", "functional", check)
def test_api_response_time(self):
def check():
endpoints = ["/cluster/status", "/cluster/resources", "/storage", "/version"]
times = []
for ep in endpoints:
start = time.time()
self._get(ep)
times.append((time.time() - start) * 1000)
avg_ms = sum(times) / len(times)
max_ms = max(times)
return {
"passed": max_ms < 2000,
"message": f"API avg: {avg_ms:.0f}ms, max: {max_ms:.0f}ms",
}
return self._run_test("API Response Time", "performance", check)
# === Run All Tests ===
def run_smoke_tests(self):
logger.info("=== Running Smoke Tests ===")
self.test_cluster_status()
self.test_storage_status()
self.test_node_resources()
self.test_api_response_time()
def generate_report(self, output="test_report.json"):
total = len(self.results)
passed = sum(1 for r in self.results if r.passed)
report = {
"generated_at": datetime.utcnow().isoformat(),
"summary": {
"total": total,
"passed": passed,
"failed": total - passed,
"pass_rate": f"{passed/max(total,1)*100:.0f}%",
},
"results": [
{"name": r.name, "category": r.category, "passed": r.passed,
"duration_ms": round(r.duration_ms), "message": r.message}
for r in self.results
],
}
with open(output, "w") as f:
json.dump(report, f, indent=2)
logger.info(f"Report: {passed}/{total} passed ({report['summary']['pass_rate']})")
return report
# tester = ProxmoxTester("pve1.local", "root@pam", "password")
# tester.run_smoke_tests()
# tester.generate_report()
Performance Testing และ Stress Testing
ทดสอบ performance ของ Proxmox cluster
#!/usr/bin/env python3
# perf_test.py — Proxmox Performance Testing
import subprocess
import time
import json
import logging
from datetime import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("perf_test")
class ProxmoxPerfTest:
def __init__(self, nodes):
self.nodes = nodes
self.results = {}
def _ssh(self, node, command):
result = subprocess.run(
["ssh", "-o", "StrictHostKeyChecking=no", f"root@{node}", command],
capture_output=True, text=True, timeout=300,
)
return result.stdout.strip()
def test_storage_iops(self, node, test_path="/tmp/fio_test"):
logger.info(f"Testing storage IOPS on {node}")
# Random read IOPS
output = self._ssh(node, f"""
fio --name=randread --ioengine=libaio --iodepth=32 \
--rw=randread --bs=4k --direct=1 --size=1G \
--numjobs=4 --runtime=30 --group_reporting \
--filename={test_path} --output-format=json 2>/dev/null
""")
try:
data = json.loads(output)
read_iops = data["jobs"][0]["read"]["iops"]
except Exception:
read_iops = 0
# Random write IOPS
output = self._ssh(node, f"""
fio --name=randwrite --ioengine=libaio --iodepth=32 \
--rw=randwrite --bs=4k --direct=1 --size=1G \
--numjobs=4 --runtime=30 --group_reporting \
--filename={test_path} --output-format=json 2>/dev/null
""")
try:
data = json.loads(output)
write_iops = data["jobs"][0]["write"]["iops"]
except Exception:
write_iops = 0
# Cleanup
self._ssh(node, f"rm -f {test_path}")
result = {
"node": node,
"read_iops": round(read_iops),
"write_iops": round(write_iops),
}
logger.info(f" Read: {result['read_iops']} IOPS, Write: {result['write_iops']} IOPS")
return result
def test_network_throughput(self, node_a, node_b):
logger.info(f"Testing network: {node_a} <-> {node_b}")
# Start iperf3 server on node_b
self._ssh(node_b, "iperf3 -s -D -p 5201 --one-off")
time.sleep(2)
# Run iperf3 client on node_a
output = self._ssh(node_a, f"iperf3 -c {node_b} -p 5201 -t 10 -J")
try:
data = json.loads(output)
bps = data["end"]["sum_sent"]["bits_per_second"]
gbps = bps / 1e9
except Exception:
gbps = 0
result = {"from": node_a, "to": node_b, "throughput_gbps": round(gbps, 2)}
logger.info(f" Throughput: {result['throughput_gbps']} Gbps")
return result
def test_vm_creation_time(self, node, count=5):
logger.info(f"Testing VM creation time on {node} (x{count})")
times = []
base_vmid = 9900
for i in range(count):
vmid = base_vmid + i
start = time.time()
self._ssh(node, f"""
qm create {vmid} --name test-perf-{i} --memory 512 --cores 1 \
--net0 virtio, bridge=vmbr0 --ostype l26
""")
elapsed = time.time() - start
times.append(elapsed)
# Cleanup
self._ssh(node, f"qm destroy {vmid} --purge")
result = {
"node": node,
"avg_creation_ms": round(sum(times) / len(times) * 1000),
"min_creation_ms": round(min(times) * 1000),
"max_creation_ms": round(max(times) * 1000),
}
logger.info(f" Avg: {result['avg_creation_ms']}ms")
return result
def run_all(self):
logger.info("=== Performance Test Suite ===")
self.results["storage"] = []
for node in self.nodes:
self.results["storage"].append(self.test_storage_iops(node))
self.results["network"] = []
for i in range(len(self.nodes)):
for j in range(i + 1, len(self.nodes)):
self.results["network"].append(
self.test_network_throughput(self.nodes[i], self.nodes[j])
)
self.results["vm_creation"] = []
for node in self.nodes:
self.results["vm_creation"].append(self.test_vm_creation_time(node))
self.results["timestamp"] = datetime.utcnow().isoformat()
Path("perf_results.json").write_text(json.dumps(self.results, indent=2))
logger.info("Results saved to perf_results.json")
return self.results
# perf = ProxmoxPerfTest(["pve1", "pve2", "pve3"])
# perf.run_all()
Disaster Recovery Testing
ทดสอบ disaster recovery procedures
#!/bin/bash
# dr_test.sh — Proxmox Disaster Recovery Test
set -euo pipefail
LOG_FILE="/var/log/proxmox_dr_test_$(date +%Y%m%d).log"
BACKUP_STORAGE="backup-nfs"
TEST_VMID=8888
TEST_NODE="pve1"
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE"
}
test_result() {
local name="$1" status="$2" details="$3"
if [ "$status" = "PASS" ]; then
log "PASS: $name - $details"
else
log "FAIL: $name - $details"
fi
}
# === Test 1: Backup Creation ===
test_backup_create() {
log "=== Test: Backup Creation ==="
# Create test VM
qm create $TEST_VMID --name dr-test-vm --memory 1024 --cores 1 \
--net0 virtio, bridge=vmbr0 --ostype l26 \
--scsi0 local-lvm:8 2>/dev/null
qm start $TEST_VMID
sleep 10
# Create backup
START=$(date +%s)
vzdump $TEST_VMID --storage $BACKUP_STORAGE --compress zstd --mode snapshot
END=$(date +%s)
DURATION=$((END - START))
# Verify backup exists
BACKUP_FILE=$(pvesm list $BACKUP_STORAGE --vmid $TEST_VMID | tail -1 | awk '{print $1}')
if [ -n "$BACKUP_FILE" ]; then
test_result "Backup Creation" "PASS" "Duration: s, File: $BACKUP_FILE"
else
test_result "Backup Creation" "FAIL" "Backup file not found"
fi
qm stop $TEST_VMID 2>/dev/null || true
qm destroy $TEST_VMID --purge 2>/dev/null || true
}
# === Test 2: Backup Restore ===
test_backup_restore() {
log "=== Test: Backup Restore ==="
BACKUP_FILE=$(pvesm list $BACKUP_STORAGE --vmid $TEST_VMID | tail -1 | awk '{print $1}')
if [ -z "$BACKUP_FILE" ]; then
test_result "Backup Restore" "FAIL" "No backup file found"
return
fi
RESTORE_VMID=8889
START=$(date +%s)
qmrestore "$BACKUP_FILE" $RESTORE_VMID --storage local-lvm
END=$(date +%s)
DURATION=$((END - START))
# Verify restored VM
STATUS=$(qm status $RESTORE_VMID 2>/dev/null | awk '{print $2}')
if [ "$STATUS" = "stopped" ]; then
test_result "Backup Restore" "PASS" "Duration: s, VM status: $STATUS"
else
test_result "Backup Restore" "FAIL" "VM status: $STATUS"
fi
# Start restored VM
qm start $RESTORE_VMID
sleep 10
RUNNING=$(qm status $RESTORE_VMID | awk '{print $2}')
if [ "$RUNNING" = "running" ]; then
test_result "Restored VM Boot" "PASS" "VM started successfully"
else
test_result "Restored VM Boot" "FAIL" "VM failed to start"
fi
qm stop $RESTORE_VMID 2>/dev/null || true
qm destroy $RESTORE_VMID --purge 2>/dev/null || true
}
# === Test 3: HA Failover ===
test_ha_failover() {
log "=== Test: HA Failover ==="
HA_VMID=8890
# Create HA VM
qm create $HA_VMID --name ha-test-vm --memory 512 --cores 1 \
--net0 virtio, bridge=vmbr0 --ostype l26
# Add to HA
ha-manager add vm:$HA_VMID --group ha-group --max_restart 3
qm start $HA_VMID
sleep 10
# Check HA status
HA_STATUS=$(ha-manager status | grep "vm:$HA_VMID" | awk '{print $3}')
if [ "$HA_STATUS" = "started" ]; then
test_result "HA Configuration" "PASS" "VM in HA, status: $HA_STATUS"
else
test_result "HA Configuration" "FAIL" "HA status: $HA_STATUS"
fi
# Cleanup
ha-manager remove vm:$HA_VMID 2>/dev/null || true
qm stop $HA_VMID 2>/dev/null || true
qm destroy $HA_VMID --purge 2>/dev/null || true
}
# === Test 4: Cluster Quorum ===
test_cluster_quorum() {
log "=== Test: Cluster Quorum ==="
QUORUM=$(pvecm status | grep "Quorate:" | awk '{print $2}')
NODES=$(pvecm status | grep "Node " | wc -l)
EXPECTED=$(pvecm status | grep "Expected votes:" | awk '{print $3}')
if [ "$QUORUM" = "Yes" ]; then
test_result "Cluster Quorum" "PASS" "Quorate: $QUORUM, Nodes: $NODES, Expected: $EXPECTED"
else
test_result "Cluster Quorum" "FAIL" "Quorate: $QUORUM"
fi
}
# === Run All DR Tests ===
log "========================================="
log "Proxmox DR Test Suite - $(date)"
log "========================================="
test_cluster_quorum
test_backup_create
test_backup_restore
test_ha_failover
log "========================================="
log "DR Test Suite Complete"
log "Results saved to: $LOG_FILE"
log "========================================="
CI/CD สำหรับ Infrastructure Testing
รวม infrastructure tests เข้ากับ CI/CD
# .github/workflows/infra-test.yml
name: Infrastructure Testing
on:
schedule:
- cron: '0 6 * * *' # Daily 06:00 UTC
workflow_dispatch:
inputs:
test_suite:
description: 'Test suite to run'
required: true
default: 'smoke'
type: choice
options: [smoke, functional, performance, dr]
jobs:
smoke-tests:
runs-on: self-hosted
if: github.event.inputs.test_suite == 'smoke' || github.event_name == 'schedule'
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install requests jinja2
- name: Run Smoke Tests
env:
PVE_HOST: }
PVE_USER: }
PVE_PASSWORD: }
run: |
python3 -c "
import os
from proxmox_tester import ProxmoxTester
tester = ProxmoxTester(
os.environ['PVE_HOST'],
os.environ['PVE_USER'],
os.environ['PVE_PASSWORD']
)
tester.run_smoke_tests()
report = tester.generate_report('smoke_results.json')
failed = report['summary']['failed']
if failed > 0:
print(f'FAILED: {failed} tests failed')
exit(1)
print('All smoke tests passed')
"
- name: Upload Results
if: always()
uses: actions/upload-artifact@v4
with:
name: smoke-test-results
path: smoke_results.json
- name: Notify on Failure
if: failure()
run: |
curl -X POST "}" \
-H 'Content-type: application/json' \
-d '{"text":"Proxmox smoke tests FAILED. Check GitHub Actions."}'
performance-tests:
runs-on: self-hosted
if: github.event.inputs.test_suite == 'performance'
steps:
- uses: actions/checkout@v4
- name: Run Performance Tests
run: |
python3 perf_test.py
# Check against baselines
python3 -c "
import json
results = json.load(open('perf_results.json'))
# Storage IOPS baseline
for s in results.get('storage', []):
if s['read_iops'] < 10000:
print(f'WARNING: {s[\"node\"]} read IOPS below baseline')
if s['write_iops'] < 5000:
print(f'WARNING: {s[\"node\"]} write IOPS below baseline')
# Network baseline
for n in results.get('network', []):
if n['throughput_gbps'] < 1.0:
print(f'WARNING: {n[\"from\"]}->{n[\"to\"]} below 1Gbps')
print('Performance tests complete')
"
- name: Upload Results
uses: actions/upload-artifact@v4
with:
name: perf-test-results
path: perf_results.json
# === Test Infrastructure as Code ===
# Ansible playbook for test environment setup
# test_setup.yml:
# - hosts: proxmox_test
# tasks:
# - name: Install test dependencies
# apt:
# name: [fio, iperf3, python3-pip, jq]
# state: present
#
# - name: Copy test scripts
# copy:
# src: tests/
# dest: /opt/pve-tests/
# mode: '0755'
#
# - name: Setup cron for daily smoke tests
# cron:
# name: "PVE Smoke Tests"
# hour: "6"
# minute: "0"
# job: "/opt/pve-tests/run_smoke.sh >> /var/log/pve_smoke.log 2>&1"
FAQ คำถามที่พบบ่อย
Q: ควร test Proxmox cluster บ่อยแค่ไหน?
A: Smoke tests (cluster status, storage, network) ควรรันทุกวัน (automated) Functional tests (VM lifecycle, migration) ทุกสัปดาห์ Performance tests ทุกเดือนเพื่อ track trends HA failover tests ทุก quarter และ full DR drill ทุก 6 เดือน นอกจากนี้ควรรัน tests หลังทุก Proxmox upgrade และหลัง hardware changes
Q: จะ test HA failover โดยไม่กระทบ production ได้อย่างไร?
A: วิธีที่ปลอดภัยที่สุดคือมี dedicated test cluster ที่ mirror production ถ้าไม่มี ใช้ test VMs ที่ไม่ใช่ production สำหรับ failover testing schedule tests ช่วง maintenance window ใช้ ha-manager set vm:VMID --state disabled เพื่อ simulate VM failure แทนการ shutdown node จริง สำหรับ node failure testing ใช้ fencing mechanism test กับ non-production node
Q: Performance baseline ควรมีอะไรบ้าง?
A: Storage IOPS (random read/write 4K), Sequential throughput (read/write), Network throughput ระหว่าง nodes (iperf3), VM creation time, Live migration time สำหรับ VM ขนาดต่างๆ, API response time, Backup/restore duration และ Snapshot creation time บันทึก baselines หลังติดตั้งใหม่และหลังทุก major upgrade เปรียบเทียบผลกับ baseline ถ้า deviate มากกว่า 20% ควรสืบสวน
Q: จะ monitor test results อย่างต่อเนื่องได้อย่างไร?
A: เก็บ test results ใน time series database (InfluxDB, Prometheus) สร้าง Grafana dashboard แสดง pass rate, performance trends, test duration ตั้ง alerts เมื่อ pass rate ต่ำกว่า threshold (เช่น 95%) ใช้ GitHub Actions หรือ Jenkins สำหรับ scheduled tests ส่ง notifications ผ่าน Slack/email เมื่อ tests fail
