Proxmox VE Cluster คืออะไรและทำไมต้อง Audit
Proxmox Virtual Environment (VE) เป็น open source virtualization platform ที่รวม KVM hypervisor และ LXC containers ไว้ใน platform เดียว รองรับ high availability clustering, live migration, software-defined storage (Ceph, ZFS) และ backup/restore มี web-based management interface ที่ใช้งานง่าย
Proxmox VE Cluster คือการรวม Proxmox nodes หลายตัวเข้าด้วยกันเป็น cluster ทำให้จัดการ VMs และ containers จากจุดเดียว รองรับ HA (High Availability) ที่ migrate VMs อัตโนมัติเมื่อ node ล่ม และ distributed storage ด้วย Ceph
Audit Trail Logging สำคัญมากสำหรับ Proxmox cluster เพราะต้องรู้ว่าใครทำอะไร เมื่อไหร่ กับ VM/container ไหน สำหรับ security incident investigation, compliance requirements (SOC 2, ISO 27001, PCI DSS), change management tracking และ capacity planning
Proxmox เก็บ logs หลายที่ได้แก่ /var/log/pveproxy/ สำหรับ API access logs, /var/log/pve/tasks/ สำหรับ task logs, /var/log/syslog สำหรับ system events, /var/log/auth.log สำหรับ authentication events และ pve-firewall.log สำหรับ firewall events
ตั้งค่า Proxmox VE Cluster
สร้าง Proxmox cluster และตั้งค่าพื้นฐาน
# === สร้าง Proxmox VE Cluster ===
# Node 1 (สร้าง cluster)
pvecm create my-cluster
# Node 2,3 (join cluster)
pvecm add 192.168.1.10
# ใส่ root password ของ node 1
# ตรวจสอบ cluster status
pvecm status
pvecm nodes
# === ตั้งค่า HA ===
# เปิด HA สำหรับ VM
ha-manager add vm:100 --state started --group ha-group1
ha-manager status
# === Storage Configuration ===
# เพิ่ม Ceph storage
pveceph init --network 10.10.10.0/24
pveceph createmon
pveceph createosd /dev/sdb
pveceph createpool vm-pool
# === Network Configuration ===
# /etc/network/interfaces
# auto vmbr0
# iface vmbr0 inet static
# address 192.168.1.10/24
# gateway 192.168.1.1
# bridge-ports eno1
# bridge-stp off
# bridge-fd 0
#
# auto vmbr1
# iface vmbr1 inet static
# address 10.10.10.10/24
# bridge-ports eno2
# bridge-stp off
# bridge-fd 0
# === User and Permission Setup ===
# สร้าง audit user
pveum useradd auditor@pve -comment "Audit User"
pveum passwd auditor@pve
# สร้าง role
pveum roleadd AuditRole -privs "Sys.Audit, VM.Audit, Datastore.Audit, Pool.Audit"
# Assign role
pveum aclmod / -user auditor@pve -role AuditRole
# สร้าง API token สำหรับ monitoring
pveum user token add root@pam monitoring --privsep 0
# เก็บ token ไว้ใช้กับ monitoring tools
# === Enable Detailed Logging ===
# /etc/default/pveproxy
# ALLOW_FROM="192.168.1.0/24"
# DENY_FROM="all"
# LOG_LEVEL=info
# Restart pveproxy
systemctl restart pveproxy
# ตรวจสอบ logs
journalctl -u pveproxy -f
tail -f /var/log/pveproxy/access.log
ระบบ Audit Trail Logging สำหรับ Proxmox
สร้างระบบเก็บ audit logs แบบครบถ้วน
#!/usr/bin/env python3
# proxmox_audit.py — Proxmox VE Audit Trail System
import requests
import json
import sqlite3
import logging
from datetime import datetime, timedelta
from typing import List, Dict
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("pve_audit")
class ProxmoxAuditCollector:
def __init__(self, host, user, token_name, token_value):
self.base_url = f"https://{host}:8006/api2/json"
self.headers = {
"Authorization": f"PVEAPIToken={user}!{token_name}={token_value}"
}
self.db = sqlite3.connect("proxmox_audit.db")
self._init_db()
def _init_db(self):
self.db.executescript("""
CREATE TABLE IF NOT EXISTS audit_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT NOT NULL,
node TEXT,
user TEXT,
action TEXT,
target_type TEXT,
target_id TEXT,
status TEXT,
details TEXT,
source_ip TEXT,
collected_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_events(timestamp);
CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_events(user);
CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_events(action);
CREATE TABLE IF NOT EXISTS resource_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp TEXT,
node TEXT,
vmid INTEGER,
vm_name TEXT,
vm_type TEXT,
status TEXT,
cpu_usage REAL,
mem_usage REAL,
disk_usage REAL
);
""")
def _api(self, endpoint):
resp = requests.get(
f"{self.base_url}{endpoint}",
headers=self.headers,
verify=False,
timeout=30,
)
resp.raise_for_status()
return resp.json().get("data", [])
def collect_tasks(self, node, since_hours=24):
tasks = self._api(f"/nodes/{node}/tasks?limit=500")
cutoff = datetime.utcnow() - timedelta(hours=since_hours)
new_events = 0
for task in tasks:
task_time = datetime.fromtimestamp(task.get("starttime", 0))
if task_time < cutoff:
continue
self.db.execute("""
INSERT OR IGNORE INTO audit_events
(timestamp, node, user, action, target_type, target_id, status, details, collected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
task_time.isoformat(),
node,
task.get("user", "unknown"),
task.get("type", "unknown"),
"vm" if task.get("id", "").startswith("UPID") else "system",
task.get("id", ""),
task.get("status", "unknown"),
json.dumps(task),
datetime.utcnow().isoformat(),
))
new_events += 1
self.db.commit()
logger.info(f"Collected {new_events} task events from {node}")
return new_events
def collect_resource_snapshot(self, node):
vms = self._api(f"/nodes/{node}/qemu")
containers = self._api(f"/nodes/{node}/lxc")
now = datetime.utcnow().isoformat()
for vm in vms + containers:
vm_type = "qemu" if vm.get("type") != "lxc" else "lxc"
self.db.execute("""
INSERT INTO resource_snapshots
(timestamp, node, vmid, vm_name, vm_type, status, cpu_usage, mem_usage, disk_usage)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
now, node, vm.get("vmid"), vm.get("name", ""),
vm_type, vm.get("status", ""),
vm.get("cpu", 0),
vm.get("mem", 0) / max(vm.get("maxmem", 1), 1),
vm.get("disk", 0) / max(vm.get("maxdisk", 1), 1),
))
self.db.commit()
logger.info(f"Snapshot: {len(vms)} VMs, {len(containers)} CTs on {node}")
def collect_all_nodes(self):
nodes = self._api("/nodes")
for node in nodes:
node_name = node["node"]
try:
self.collect_tasks(node_name)
self.collect_resource_snapshot(node_name)
except Exception as e:
logger.error(f"Failed to collect from {node_name}: {e}")
def query_audit(self, user=None, action=None, hours=24):
query = "SELECT * FROM audit_events WHERE timestamp > ?"
params = [(datetime.utcnow() - timedelta(hours=hours)).isoformat()]
if user:
query += " AND user = ?"
params.append(user)
if action:
query += " AND action LIKE ?"
params.append(f"%{action}%")
query += " ORDER BY timestamp DESC LIMIT 100"
cursor = self.db.execute(query, params)
columns = [d[0] for d in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def generate_report(self, hours=24):
events = self.query_audit(hours=hours)
user_actions = {}
action_types = {}
for e in events:
user = e.get("user", "unknown")
action = e.get("action", "unknown")
user_actions[user] = user_actions.get(user, 0) + 1
action_types[action] = action_types.get(action, 0) + 1
return {
"period_hours": hours,
"total_events": len(events),
"unique_users": len(user_actions),
"user_activity": dict(sorted(user_actions.items(), key=lambda x: -x[1])),
"action_breakdown": dict(sorted(action_types.items(), key=lambda x: -x[1])),
"generated_at": datetime.utcnow().isoformat(),
}
# ใช้งาน
# collector = ProxmoxAuditCollector(
# host="192.168.1.10",
# user="root@pam",
# token_name="monitoring",
# token_value="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
# )
# collector.collect_all_nodes()
# report = collector.generate_report(hours=24)
# print(json.dumps(report, indent=2))
Centralized Logging ด้วย ELK Stack
ส่ง Proxmox logs ไป ELK Stack
# === Filebeat Configuration สำหรับ Proxmox ===
# /etc/filebeat/filebeat.yml
# filebeat.inputs:
# - type: log
# enabled: true
# paths:
# - /var/log/pveproxy/access.log
# fields:
# log_type: pve_access
# fields_under_root: true
#
# - type: log
# enabled: true
# paths:
# - /var/log/syslog
# fields:
# log_type: pve_syslog
# fields_under_root: true
# multiline:
# pattern: '^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}'
# negate: true
# match: after
#
# - type: log
# enabled: true
# paths:
# - /var/log/auth.log
# fields:
# log_type: pve_auth
# fields_under_root: true
#
# - type: log
# enabled: true
# paths:
# - /var/log/pve-firewall.log
# fields:
# log_type: pve_firewall
# fields_under_root: true
#
# output.elasticsearch:
# hosts: ["http://elk-server:9200"]
# index: "proxmox-%{+yyyy.MM.dd}"
#
# setup.kibana:
# host: "http://elk-server:5601"
# ติดตั้ง Filebeat บน Proxmox
apt-get install -y filebeat
systemctl enable filebeat
systemctl start filebeat
# === rsyslog สำหรับ centralized logging ===
# /etc/rsyslog.d/50-proxmox-audit.conf
# Template สำหรับ JSON output
# template(name="proxmox-json" type="list") {
# constant(value="{")
# constant(value="\"timestamp\":\"")
# property(name="timereported" dateFormat="rfc3339")
# constant(value="\",\"hostname\":\"")
# property(name="hostname")
# constant(value="\",\"severity\":\"")
# property(name="syslogseverity-text")
# constant(value="\",\"facility\":\"")
# property(name="syslogfacility-text")
# constant(value="\",\"program\":\"")
# property(name="programname")
# constant(value="\",\"message\":\"")
# property(name="msg" format="jsonf")
# constant(value="\"}\n")
# }
#
# # ส่งไป remote syslog server
# *.* @@elk-server:514;proxmox-json
#
# # เก็บ local copy
# *.* /var/log/proxmox-audit.json;proxmox-json
# Restart rsyslog
systemctl restart rsyslog
# === Docker Compose สำหรับ ELK Stack ===
# docker-compose.yml
# services:
# elasticsearch:
# image: elasticsearch:8.12.0
# environment:
# - discovery.type=single-node
# - xpack.security.enabled=false
# - "ES_JAVA_OPTS=-Xms2g -Xmx2g"
# ports: ["9200:9200"]
# volumes: ["es-data:/usr/share/elasticsearch/data"]
#
# kibana:
# image: kibana:8.12.0
# ports: ["5601:5601"]
# environment:
# - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
# depends_on: [elasticsearch]
#
# logstash:
# image: logstash:8.12.0
# ports: ["5044:5044", "514:514/tcp", "514:514/udp"]
# volumes:
# - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
# depends_on: [elasticsearch]
#
# volumes:
# es-data:
# === Logstash Pipeline ===
# logstash.conf
# input {
# beats { port => 5044 }
# syslog { port => 514 }
# }
# filter {
# if [log_type] == "pve_access" {
# grok {
# match => { "message" => "%{IP:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATHPARAM:request}\"" }
# }
# }
# if [log_type] == "pve_auth" {
# grok {
# match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:node} %{DATA:program}: %{GREEDYDATA:auth_message}" }
# }
# }
# }
# output {
# elasticsearch {
# hosts => ["http://elasticsearch:9200"]
# index => "proxmox-%{+YYYY.MM.dd}"
# }
# }
สร้าง Audit Dashboard และ Alerts
Monitoring dashboard สำหรับ Proxmox audit
#!/usr/bin/env python3
# pve_alerts.py — Proxmox Audit Alerting System
import json
import re
import logging
from datetime import datetime
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pve_alerts")
class ProxmoxAlertEngine:
def __init__(self, webhook_url=None):
self.webhook_url = webhook_url
self.rules = self._default_rules()
self.alert_history = []
def _default_rules(self):
return [
{
"name": "root_login",
"description": "Root user login detected",
"severity": "high",
"condition": lambda e: e.get("user") == "root@pam" and "login" in e.get("action", "").lower(),
},
{
"name": "vm_delete",
"description": "VM or container deleted",
"severity": "critical",
"condition": lambda e: "destroy" in e.get("action", "").lower() or "delete" in e.get("action", "").lower(),
},
{
"name": "failed_login",
"description": "Failed authentication attempt",
"severity": "medium",
"condition": lambda e: e.get("status") == "failed" and "auth" in e.get("action", "").lower(),
},
{
"name": "permission_change",
"description": "User permission changed",
"severity": "high",
"condition": lambda e: "acl" in e.get("action", "").lower() or "role" in e.get("action", "").lower(),
},
{
"name": "firewall_change",
"description": "Firewall rules modified",
"severity": "high",
"condition": lambda e: "firewall" in e.get("action", "").lower(),
},
{
"name": "backup_failure",
"description": "Backup job failed",
"severity": "critical",
"condition": lambda e: "backup" in e.get("action", "").lower() and e.get("status") != "OK",
},
{
"name": "node_offline",
"description": "Cluster node went offline",
"severity": "critical",
"condition": lambda e: "offline" in e.get("details", "").lower() or "corosync" in e.get("action", "").lower(),
},
]
def evaluate_event(self, event):
triggered = []
for rule in self.rules:
try:
if rule["condition"](event):
alert = {
"rule": rule["name"],
"severity": rule["severity"],
"description": rule["description"],
"event": event,
"triggered_at": datetime.utcnow().isoformat(),
}
triggered.append(alert)
self.alert_history.append(alert)
logger.warning(f"ALERT [{rule['severity'].upper()}]: {rule['description']}")
logger.warning(f" User: {event.get('user')}, Action: {event.get('action')}")
self._send_notification(alert)
except Exception as e:
logger.error(f"Rule evaluation failed: {rule['name']}: {e}")
return triggered
def _send_notification(self, alert):
if not self.webhook_url:
return
import requests
severity_emoji = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}
emoji = severity_emoji.get(alert["severity"], "⚪")
payload = {
"text": (
f"{emoji} *Proxmox Audit Alert*\n"
f"*Severity:* {alert['severity'].upper()}\n"
f"*Rule:* {alert['description']}\n"
f"*User:* {alert['event'].get('user', 'unknown')}\n"
f"*Action:* {alert['event'].get('action', 'unknown')}\n"
f"*Node:* {alert['event'].get('node', 'unknown')}\n"
f"*Time:* {alert['triggered_at']}"
),
}
try:
requests.post(self.webhook_url, json=payload, timeout=5)
except Exception as e:
logger.error(f"Failed to send notification: {e}")
def get_alert_summary(self, hours=24):
from datetime import timedelta
cutoff = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
recent = [a for a in self.alert_history if a["triggered_at"] > cutoff]
by_severity = {}
by_rule = {}
for a in recent:
sev = a["severity"]
rule = a["rule"]
by_severity[sev] = by_severity.get(sev, 0) + 1
by_rule[rule] = by_rule.get(rule, 0) + 1
return {
"period_hours": hours,
"total_alerts": len(recent),
"by_severity": by_severity,
"by_rule": by_rule,
}
# Grafana Dashboard Panels (PromQL / Elasticsearch queries):
# - Total events per hour: count by timestamp
# - Top users by activity: terms aggregation on user field
# - Failed logins over time: filter status=failed
# - VM lifecycle events: filter action in (create, destroy, start, stop)
# - Alert timeline: overlay alerts on event graph
# - Node health: cluster status over time
engine = ProxmoxAlertEngine()
# engine.evaluate_event({"user": "root@pam", "action": "login", "node": "pve1"})
Compliance และ Log Retention Policies
นโยบายการเก็บรักษา logs และ compliance
#!/bin/bash
# log_retention.sh — Proxmox Log Retention and Compliance
set -euo pipefail
LOG_DIR="/var/log"
ARCHIVE_DIR="/backup/audit-logs"
RETENTION_DAYS=365 # SOC 2 requires 1 year minimum
COMPRESS_AFTER_DAYS=7
ELASTICSEARCH_URL="http://elk-server:9200"
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"; }
# 1. Archive logs ที่เก่ากว่า 7 วัน
log "Archiving old logs..."
mkdir -p "$ARCHIVE_DIR/$(date +%Y/%m)"
for logfile in \
"$LOG_DIR/pveproxy/access.log" \
"$LOG_DIR/auth.log" \
"$LOG_DIR/syslog" \
"$LOG_DIR/pve-firewall.log"; do
if [ -f "$logfile" ]; then
BASENAME=$(basename "$logfile")
ARCHIVE_NAME="_$(date +%Y%m%d_%H%M).gz"
# Compress and archive
gzip -c "$logfile" > "$ARCHIVE_DIR/$(date +%Y/%m)/$ARCHIVE_NAME"
log "Archived: $BASENAME -> $ARCHIVE_NAME"
fi
done
# 2. ลบ archives ที่เก่ากว่า retention period
log "Cleaning old archives (> $RETENTION_DAYS days)..."
find "$ARCHIVE_DIR" -name "*.gz" -mtime "+$RETENTION_DAYS" -delete
DELETED=$(find "$ARCHIVE_DIR" -name "*.gz" -mtime "+$RETENTION_DAYS" | wc -l)
log "Deleted $DELETED old archive files"
# 3. Rotate Elasticsearch indices
log "Managing Elasticsearch indices..."
# Delete indices older than retention period
CUTOFF_DATE=$(date -d "- days" +%Y.%m.%d)
curl -s -X DELETE "$ELASTICSEARCH_URL/proxmox-*-$(date -d "- days" +%Y)*" \
> /dev/null 2>&1 || true
# 4. Verify log integrity
log "Verifying log integrity..."
CHECKSUM_FILE="$ARCHIVE_DIR/checksums_$(date +%Y%m%d).sha256"
find "$ARCHIVE_DIR/$(date +%Y)" -name "*.gz" -newer "$ARCHIVE_DIR/.last_check" 2>/dev/null | \
while read -r file; do
sha256sum "$file" >> "$CHECKSUM_FILE"
done
touch "$ARCHIVE_DIR/.last_check"
log "Checksums updated: $CHECKSUM_FILE"
# 5. Compliance report
log "Generating compliance summary..."
TOTAL_ARCHIVES=$(find "$ARCHIVE_DIR" -name "*.gz" | wc -l)
OLDEST_ARCHIVE=$(find "$ARCHIVE_DIR" -name "*.gz" -printf '%T+ %p\n' 2>/dev/null | sort | head -1 | cut -d' ' -f1)
NEWEST_ARCHIVE=$(find "$ARCHIVE_DIR" -name "*.gz" -printf '%T+ %p\n' 2>/dev/null | sort -r | head -1 | cut -d' ' -f1)
TOTAL_SIZE=$(du -sh "$ARCHIVE_DIR" 2>/dev/null | cut -f1)
cat << EOF
=== Compliance Summary ===
Date: $(date '+%Y-%m-%d %H:%M')
Retention Policy: $RETENTION_DAYS days
Total Archives: $TOTAL_ARCHIVES files
Archive Size: $TOTAL_SIZE
Oldest Archive: $OLDEST_ARCHIVE
Newest Archive: $NEWEST_ARCHIVE
Standards Coverage:
SOC 2 (1 year retention): $([ "$RETENTION_DAYS" -ge 365 ] && echo "PASS" || echo "FAIL")
ISO 27001 (log protection): PASS (checksums verified)
PCI DSS (1 year online, archive): $([ "$RETENTION_DAYS" -ge 365 ] && echo "PASS" || echo "FAIL")
===========================
EOF
log "Log retention completed"
# Cron job: 0 2 * * * /opt/scripts/log_retention.sh >> /var/log/log_retention.log 2>&1
FAQ คำถามที่พบบ่อย
Q: Proxmox VE ฟรีจริงไหม?
A: Proxmox VE เป็น open source ใช้ฟรีได้โดยไม่มีข้อจำกัด feature มี Community Edition ที่ใช้ no-subscription repository สำหรับ updates Subscription plans เริ่มต้นที่ EUR 95/year/socket ให้ enterprise repository (stable updates), technical support, access to Proxmox Customer Portal สำหรับ production environments แนะนำซื้อ subscription เพื่อความมั่นใจใน stability
Q: Audit logs เก็บนานแค่ไหนถึงจะ comply?
A: ขึ้นอยู่กับ compliance framework SOC 2 ต้องเก็บอย่างน้อย 1 ปี PCI DSS ต้องเก็บ 1 ปี online + archive ISO 27001 ไม่กำหนดระยะเวลาแน่นอนแต่ต้อง appropriate HIPAA ต้องเก็บ 6 ปี GDPR ต้องเก็บเท่าที่จำเป็น (data minimization) แนะนำเก็บ 1 ปีเป็นขั้นต่ำ 3-7 ปีสำหรับ regulated industries
Q: Proxmox กับ VMware ต่างกันอย่างไร?
A: Proxmox เป็น open source ฟรี ใช้ KVM/QEMU hypervisor มี web UI ที่ใช้งานง่าย รองรับ LXC containers built-in storage ด้วย Ceph/ZFS VMware vSphere เป็น commercial product มี feature enterprise มากกว่า (vMotion, DRS, NSX) มี ecosystem ใหญ่กว่า แต่ค่า license สูงมาก (vSphere 8 Standard เริ่มที่ $580/CPU) สำหรับ SME และ home lab Proxmox คุ้มค่ากว่ามาก
Q: ELK Stack กิน resources มากไหม?
A: Elasticsearch ต้องการ RAM อย่างน้อย 4GB (แนะนำ 8-16GB สำหรับ production) สำหรับ Proxmox cluster เล็ก (3-5 nodes) ELK Stack แบบ single node พร้อม 8GB RAM และ 100GB SSD เพียงพอ สำหรับ cluster ใหญ่ควรใช้ dedicated ELK cluster ทางเลือกที่ lightweight กว่าคือ Grafana Loki ที่ใช้ resources น้อยกว่า Elasticsearch มาก เหมาะสำหรับ log aggregation
