SiamCafe · Blog
Proxmox VE Cluster Audit Trail Logging —
บทความ

Proxmox VE Cluster Audit Trail Logging —

เผยแพร่ 28 พฤษภาคม 2569

Proxmox VE Cluster คืออะไรและทำไมต้อง Audit

Proxmox Virtual Environment (VE) เป็น open source virtualization platform ที่รวม KVM hypervisor และ LXC containers ไว้ใน platform เดียว รองรับ high availability clustering, live migration, software-defined storage (Ceph, ZFS) และ backup/restore มี web-based management interface ที่ใช้งานง่าย

Proxmox VE Cluster คือการรวม Proxmox nodes หลายตัวเข้าด้วยกันเป็น cluster ทำให้จัดการ VMs และ containers จากจุดเดียว รองรับ HA (High Availability) ที่ migrate VMs อัตโนมัติเมื่อ node ล่ม และ distributed storage ด้วย Ceph

Audit Trail Logging สำคัญมากสำหรับ Proxmox cluster เพราะต้องรู้ว่าใครทำอะไร เมื่อไหร่ กับ VM/container ไหน สำหรับ security incident investigation, compliance requirements (SOC 2, ISO 27001, PCI DSS), change management tracking และ capacity planning

Proxmox เก็บ logs หลายที่ได้แก่ /var/log/pveproxy/ สำหรับ API access logs, /var/log/pve/tasks/ สำหรับ task logs, /var/log/syslog สำหรับ system events, /var/log/auth.log สำหรับ authentication events และ pve-firewall.log สำหรับ firewall events

ตั้งค่า Proxmox VE Cluster

สร้าง Proxmox cluster และตั้งค่าพื้นฐาน

=== สร้าง Proxmox VE Cluster ===

Node 1 (สร้าง cluster)

pvecm create my-cluster

Node 2,3 (join cluster)

pvecm add 192.168.1.10

ใส่ root password ของ node 1

ตรวจสอบ cluster status

pvecm status

pvecm nodes

=== ตั้งค่า HA ===

เปิด HA สำหรับ VM

ha-manager add vm:100 --state started --group ha-group1

ha-manager status

=== Storage Configuration ===

เพิ่ม Ceph storage

pveceph init --network 10.10.10.0/24

pveceph createmon

pveceph createosd /dev/sdb

pveceph createpool vm-pool

=== Network Configuration ===

/etc/network/interfaces

auto vmbr0

iface vmbr0 inet static

address 192.168.1.10/24

gateway 192.168.1.1

bridge-ports eno1

bridge-stp off

bridge-fd 0

auto vmbr1

iface vmbr1 inet static

address 10.10.10.10/24

bridge-ports eno2

bridge-stp off

bridge-fd 0

=== User and Permission Setup ===

สร้าง audit user

pveum useradd auditor@pve -comment "Audit User"

pveum passwd auditor@pve

สร้าง role

pveum roleadd AuditRole -privs "Sys.Audit, VM.Audit, Datastore.Audit, Pool.Audit"

Assign role

pveum aclmod / -user auditor@pve -role AuditRole

สร้าง API token สำหรับ monitoring

pveum user token add root@pam monitoring --privsep 0

เก็บ token ไว้ใช้กับ monitoring tools

=== Enable Detailed Logging ===

/etc/default/pveproxy

ALLOW_FROM="192.168.1.0/24"

DENY_FROM="all"

LOG_LEVEL=info

Restart pveproxy

systemctl restart pveproxy

ตรวจสอบ logs

journalctl -u pveproxy -f

tail -f /var/log/pveproxy/access.log

ระบบ Audit Trail Logging สำหรับ Proxmox

สร้างระบบเก็บ audit logs แบบครบถ้วน

#!/usr/bin/env python3
# proxmox_audit.py — Proxmox VE Audit Trail System
import requests
import json
import sqlite3
import logging
from datetime import datetime, timedelta
from typing import List, Dict
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("pve_audit")

class ProxmoxAuditCollector:
    def __init__(self, host, user, token_name, token_value):
        self.base_url = f"https://{host}:8006/api2/json"
        self.headers = {
            "Authorization": f"PVEAPIToken={user}!{token_name}={token_value}"
        }
        self.db = sqlite3.connect("proxmox_audit.db")
        self._init_db()
    
    def _init_db(self):
        self.db.executescript("""
            CREATE TABLE IF NOT EXISTS audit_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT NOT NULL,
                node TEXT,
                user TEXT,
                action TEXT,
                target_type TEXT,
                target_id TEXT,
                status TEXT,
                details TEXT,
                source_ip TEXT,
                collected_at TEXT
            );
            CREATE INDEX IF NOT EXISTS idx_audit_timestamp ON audit_events(timestamp);
            CREATE INDEX IF NOT EXISTS idx_audit_user ON audit_events(user);
            CREATE INDEX IF NOT EXISTS idx_audit_action ON audit_events(action);
            
            CREATE TABLE IF NOT EXISTS resource_snapshots (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT,
                node TEXT,
                vmid INTEGER,
                vm_name TEXT,
                vm_type TEXT,
                status TEXT,
                cpu_usage REAL,
                mem_usage REAL,
                disk_usage REAL
            );
        """)
    
    def _api(self, endpoint):
        resp = requests.get(
            f"{self.base_url}{endpoint}",
            headers=self.headers,
            verify=False,
            timeout=30,
        )
        resp.raise_for_status()
        return resp.json().get("data", [])
    
    def collect_tasks(self, node, since_hours=24):
        tasks = self._api(f"/nodes/{node}/tasks?limit=500")
        
        cutoff = datetime.utcnow() - timedelta(hours=since_hours)
        new_events = 0
        
        for task in tasks:
            task_time = datetime.fromtimestamp(task.get("starttime", 0))
            if task_time < cutoff:
                continue
            
            self.db.execute("""
                INSERT OR IGNORE INTO audit_events 
                (timestamp, node, user, action, target_type, target_id, status, details, collected_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                task_time.isoformat(),
                node,
                task.get("user", "unknown"),
                task.get("type", "unknown"),
                "vm" if task.get("id", "").startswith("UPID") else "system",
                task.get("id", ""),
                task.get("status", "unknown"),
                json.dumps(task),
                datetime.utcnow().isoformat(),
            ))
            new_events += 1
        
        self.db.commit()
        logger.info(f"Collected {new_events} task events from {node}")
        return new_events
    
    def collect_resource_snapshot(self, node):
        vms = self._api(f"/nodes/{node}/qemu")
        containers = self._api(f"/nodes/{node}/lxc")
        
        now = datetime.utcnow().isoformat()
        
        for vm in vms + containers:
            vm_type = "qemu" if vm.get("type") != "lxc" else "lxc"
            self.db.execute("""
                INSERT INTO resource_snapshots
                (timestamp, node, vmid, vm_name, vm_type, status, cpu_usage, mem_usage, disk_usage)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                now, node, vm.get("vmid"), vm.get("name", ""),
                vm_type, vm.get("status", ""),
                vm.get("cpu", 0),
                vm.get("mem", 0) / max(vm.get("maxmem", 1), 1),
                vm.get("disk", 0) / max(vm.get("maxdisk", 1), 1),
            ))
        
        self.db.commit()
        logger.info(f"Snapshot: {len(vms)} VMs, {len(containers)} CTs on {node}")
    
    def collect_all_nodes(self):
        nodes = self._api("/nodes")
        
        for node in nodes:
            node_name = node["node"]
            try:
                self.collect_tasks(node_name)
                self.collect_resource_snapshot(node_name)
            except Exception as e:
                logger.error(f"Failed to collect from {node_name}: {e}")
    
    def query_audit(self, user=None, action=None, hours=24):
        query = "SELECT * FROM audit_events WHERE timestamp > ?"
        params = [(datetime.utcnow() - timedelta(hours=hours)).isoformat()]
        
        if user:
            query += " AND user = ?"
            params.append(user)
        if action:
            query += " AND action LIKE ?"
            params.append(f"%{action}%")
        
        query += " ORDER BY timestamp DESC LIMIT 100"
        
        cursor = self.db.execute(query, params)
        columns = [d[0] for d in cursor.description]
        return [dict(zip(columns, row)) for row in cursor.fetchall()]
    
    def generate_report(self, hours=24):
        events = self.query_audit(hours=hours)
        
        user_actions = {}
        action_types = {}
        
        for e in events:
            user = e.get("user", "unknown")
            action = e.get("action", "unknown")
            
            user_actions[user] = user_actions.get(user, 0) + 1
            action_types[action] = action_types.get(action, 0) + 1
        
        return {
            "period_hours": hours,
            "total_events": len(events),
            "unique_users": len(user_actions),
            "user_activity": dict(sorted(user_actions.items(), key=lambda x: -x[1])),
            "action_breakdown": dict(sorted(action_types.items(), key=lambda x: -x[1])),
            "generated_at": datetime.utcnow().isoformat(),
        }

# ใช้งาน
# collector = ProxmoxAuditCollector(
#     host="192.168.1.10",
#     user="root@pam",
#     token_name="monitoring",
#     token_value="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
# )
# collector.collect_all_nodes()
# report = collector.generate_report(hours=24)
# print(json.dumps(report, indent=2))

Centralized Logging ด้วย ELK Stack

ส่ง Proxmox logs ไป ELK Stack

=== Filebeat Configuration สำหรับ Proxmox ===

/etc/filebeat/filebeat.yml

filebeat.inputs:

  • type: log

enabled: true

paths:

  • /var/log/pveproxy/access.log

fields:

log_type: pve_access

fields_under_root: true

  • type: log

enabled: true

paths:

  • /var/log/syslog

fields:

log_type: pve_syslog

fields_under_root: true

multiline:

pattern: '^\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2}'

negate: true

match: after

  • type: log

enabled: true

paths:

  • /var/log/auth.log

fields:

log_type: pve_auth

fields_under_root: true

  • type: log

enabled: true

paths:

  • /var/log/pve-firewall.log

fields:

log_type: pve_firewall

fields_under_root: true

output.elasticsearch:

hosts: ["http://elk-server:9200"]

index: "proxmox-%{+yyyy.MM.dd}"

setup.kibana:

host: "http://elk-server:5601"

ติดตั้ง Filebeat บน Proxmox

apt-get install -y filebeat

systemctl enable filebeat

systemctl start filebeat

=== rsyslog สำหรับ centralized logging ===

/etc/rsyslog.d/50-proxmox-audit.conf

Template สำหรับ JSON output

template(name="proxmox-json" type="list") {

constant(value="{")

constant(value="\"timestamp\":\"")

property(name="timereported" dateFormat="rfc3339")

constant(value="\",\"hostname\":\"")

property(name="hostname")

constant(value="\",\"severity\":\"")

property(name="syslogseverity-text")

constant(value="\",\"facility\":\"")

property(name="syslogfacility-text")

constant(value="\",\"program\":\"")

property(name="programname")

constant(value="\",\"message\":\"")

property(name="msg" format="jsonf")

constant(value="\"}\n")

}

# ส่งไป remote syslog server

*.* @@elk-server:514;proxmox-json

# เก็บ local copy

*.* /var/log/proxmox-audit.json;proxmox-json

Restart rsyslog

systemctl restart rsyslog

=== Docker Compose สำหรับ ELK Stack ===

docker-compose.yml

services:

elasticsearch:

image: elasticsearch:8.12.0

environment:

  • discovery.type=single-node
  • xpack.security.enabled=false
  • "ES_JAVA_OPTS=-Xms2g -Xmx2g"

ports: ["9200:9200"]

volumes: ["es-data:/usr/share/elasticsearch/data"]

kibana:

image: kibana:8.12.0

ports: ["5601:5601"]

environment:

  • ELASTICSEARCH_HOSTS=http://elasticsearch:9200

depends_on: [elasticsearch]

logstash:

image: logstash:8.12.0

ports: ["5044:5044", "514:514/tcp", "514:514/udp"]

volumes:

  • ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf

depends_on: [elasticsearch]

volumes:

es-data:

=== Logstash Pipeline ===

logstash.conf

input {

beats { port => 5044 }

syslog { port => 514 }

}

filter {

if [log_type] == "pve_access" {

grok {

match => { "message" => "%{IP:client_ip} - %{DATA:user} \[%{HTTPDATE:timestamp}\] \"%{WORD:method} %{URIPATHPARAM:request}\"" }

}

}

if [log_type] == "pve_auth" {

grok {

match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{HOSTNAME:node} %{DATA:program}: %{GREEDYDATA:auth_message}" }

}

}

}

output {

elasticsearch {

hosts => ["http://elasticsearch:9200"]

index => "proxmox-%{+YYYY.MM.dd}"

}

}

สร้าง Audit Dashboard และ Alerts

Monitoring dashboard สำหรับ Proxmox audit

#!/usr/bin/env python3
# pve_alerts.py — Proxmox Audit Alerting System
import json
import re
import logging
from datetime import datetime
from pathlib import Path

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pve_alerts")

class ProxmoxAlertEngine:
    def __init__(self, webhook_url=None):
        self.webhook_url = webhook_url
        self.rules = self._default_rules()
        self.alert_history = []
    
    def _default_rules(self):
        return [
            {
                "name": "root_login",
                "description": "Root user login detected",
                "severity": "high",
                "condition": lambda e: e.get("user") == "root@pam" and "login" in e.get("action", "").lower(),
            },
            {
                "name": "vm_delete",
                "description": "VM or container deleted",
                "severity": "critical",
                "condition": lambda e: "destroy" in e.get("action", "").lower() or "delete" in e.get("action", "").lower(),
            },
            {
                "name": "failed_login",
                "description": "Failed authentication attempt",
                "severity": "medium",
                "condition": lambda e: e.get("status") == "failed" and "auth" in e.get("action", "").lower(),
            },
            {
                "name": "permission_change",
                "description": "User permission changed",
                "severity": "high",
                "condition": lambda e: "acl" in e.get("action", "").lower() or "role" in e.get("action", "").lower(),
            },
            {
                "name": "firewall_change",
                "description": "Firewall rules modified",
                "severity": "high",
                "condition": lambda e: "firewall" in e.get("action", "").lower(),
            },
            {
                "name": "backup_failure",
                "description": "Backup job failed",
                "severity": "critical",
                "condition": lambda e: "backup" in e.get("action", "").lower() and e.get("status") != "OK",
            },
            {
                "name": "node_offline",
                "description": "Cluster node went offline",
                "severity": "critical",
                "condition": lambda e: "offline" in e.get("details", "").lower() or "corosync" in e.get("action", "").lower(),
            },
        ]
    
    def evaluate_event(self, event):
        triggered = []
        
        for rule in self.rules:
            try:
                if rule["condition"](event):
                    alert = {
                        "rule": rule["name"],
                        "severity": rule["severity"],
                        "description": rule["description"],
                        "event": event,
                        "triggered_at": datetime.utcnow().isoformat(),
                    }
                    triggered.append(alert)
                    self.alert_history.append(alert)
                    
                    logger.warning(f"ALERT [{rule['severity'].upper()}]: {rule['description']}")
                    logger.warning(f"  User: {event.get('user')}, Action: {event.get('action')}")
                    
                    self._send_notification(alert)
            except Exception as e:
                logger.error(f"Rule evaluation failed: {rule['name']}: {e}")
        
        return triggered
    
    def _send_notification(self, alert):
        if not self.webhook_url:
            return
        
        import requests
        severity_emoji = {"critical": "🔴", "high": "🟠", "medium": "🟡", "low": "🟢"}
        emoji = severity_emoji.get(alert["severity"], "⚪")
        
        payload = {
            "text": (
                f"{emoji} *Proxmox Audit Alert*\n"
                f"*Severity:* {alert['severity'].upper()}\n"
                f"*Rule:* {alert['description']}\n"
                f"*User:* {alert['event'].get('user', 'unknown')}\n"
                f"*Action:* {alert['event'].get('action', 'unknown')}\n"
                f"*Node:* {alert['event'].get('node', 'unknown')}\n"
                f"*Time:* {alert['triggered_at']}"
            ),
        }
        
        try:
            requests.post(self.webhook_url, json=payload, timeout=5)
        except Exception as e:
            logger.error(f"Failed to send notification: {e}")
    
    def get_alert_summary(self, hours=24):
        from datetime import timedelta
        cutoff = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
        
        recent = [a for a in self.alert_history if a["triggered_at"] > cutoff]
        
        by_severity = {}
        by_rule = {}
        
        for a in recent:
            sev = a["severity"]
            rule = a["rule"]
            by_severity[sev] = by_severity.get(sev, 0) + 1
            by_rule[rule] = by_rule.get(rule, 0) + 1
        
        return {
            "period_hours": hours,
            "total_alerts": len(recent),
            "by_severity": by_severity,
            "by_rule": by_rule,
        }

# Grafana Dashboard Panels (PromQL / Elasticsearch queries):
# - Total events per hour: count by timestamp
# - Top users by activity: terms aggregation on user field
# - Failed logins over time: filter status=failed
# - VM lifecycle events: filter action in (create, destroy, start, stop)
# - Alert timeline: overlay alerts on event graph
# - Node health: cluster status over time

engine = ProxmoxAlertEngine()
# engine.evaluate_event({"user": "root@pam", "action": "login", "node": "pve1"})

Compliance และ Log Retention Policies

นโยบายการเก็บรักษา logs และ compliance

#!/bin/bash
# log_retention.sh — Proxmox Log Retention and Compliance
set -euo pipefail

LOG_DIR="/var/log"
ARCHIVE_DIR="/backup/audit-logs"
RETENTION_DAYS=365  # SOC 2 requires 1 year minimum
COMPRESS_AFTER_DAYS=7
ELASTICSEARCH_URL="http://elk-server:9200"

log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1"; }

# 1. Archive logs ที่เก่ากว่า 7 วัน
log "Archiving old logs..."
mkdir -p "$ARCHIVE_DIR/$(date +%Y/%m)"

for logfile in \
    "$LOG_DIR/pveproxy/access.log" \
    "$LOG_DIR/auth.log" \
    "$LOG_DIR/syslog" \
    "$LOG_DIR/pve-firewall.log"; do
    
    if [ -f "$logfile" ]; then
        BASENAME=$(basename "$logfile")
        ARCHIVE_NAME="_$(date +%Y%m%d_%H%M).gz"
        
        # Compress and archive
        gzip -c "$logfile" > "$ARCHIVE_DIR/$(date +%Y/%m)/$ARCHIVE_NAME"
        log "Archived: $BASENAME -> $ARCHIVE_NAME"
    fi
done

# 2. ลบ archives ที่เก่ากว่า retention period
log "Cleaning old archives (> $RETENTION_DAYS days)..."
find "$ARCHIVE_DIR" -name "*.gz" -mtime "+$RETENTION_DAYS" -delete
DELETED=$(find "$ARCHIVE_DIR" -name "*.gz" -mtime "+$RETENTION_DAYS" | wc -l)
log "Deleted $DELETED old archive files"

# 3. Rotate Elasticsearch indices
log "Managing Elasticsearch indices..."
# Delete indices older than retention period
CUTOFF_DATE=$(date -d "- days" +%Y.%m.%d)
curl -s -X DELETE "$ELASTICSEARCH_URL/proxmox-*-$(date -d "- days" +%Y)*" \
    > /dev/null 2>&1 || true

# 4. Verify log integrity
log "Verifying log integrity..."
CHECKSUM_FILE="$ARCHIVE_DIR/checksums_$(date +%Y%m%d).sha256"
find "$ARCHIVE_DIR/$(date +%Y)" -name "*.gz" -newer "$ARCHIVE_DIR/.last_check" 2>/dev/null | \
    while read -r file; do
        sha256sum "$file" >> "$CHECKSUM_FILE"
    done
touch "$ARCHIVE_DIR/.last_check"
log "Checksums updated: $CHECKSUM_FILE"

# 5. Compliance report
log "Generating compliance summary..."
TOTAL_ARCHIVES=$(find "$ARCHIVE_DIR" -name "*.gz" | wc -l)
OLDEST_ARCHIVE=$(find "$ARCHIVE_DIR" -name "*.gz" -printf '%T+ %p\n' 2>/dev/null | sort | head -1 | cut -d' ' -f1)
NEWEST_ARCHIVE=$(find "$ARCHIVE_DIR" -name "*.gz" -printf '%T+ %p\n' 2>/dev/null | sort -r | head -1 | cut -d' ' -f1)
TOTAL_SIZE=$(du -sh "$ARCHIVE_DIR" 2>/dev/null | cut -f1)

cat << EOF
=== Compliance Summary ===
Date: $(date '+%Y-%m-%d %H:%M')
Retention Policy: $RETENTION_DAYS days
Total Archives: $TOTAL_ARCHIVES files
Archive Size: $TOTAL_SIZE
Oldest Archive: $OLDEST_ARCHIVE
Newest Archive: $NEWEST_ARCHIVE

Standards Coverage:
  SOC 2 (1 year retention): $([ "$RETENTION_DAYS" -ge 365 ] && echo "PASS" || echo "FAIL")
  ISO 27001 (log protection): PASS (checksums verified)
  PCI DSS (1 year online, archive): $([ "$RETENTION_DAYS" -ge 365 ] && echo "PASS" || echo "FAIL")
===========================
EOF

log "Log retention completed"

# Cron job: 0 2 * * * /opt/scripts/log_retention.sh >> /var/log/log_retention.log 2>&1

FAQ คำถามที่พบบ่อย

Q: Proxmox VE ฟรีจริงไหม?

A: Proxmox VE เป็น open source ใช้ฟรีได้โดยไม่มีข้อจำกัด feature มี Community Edition ที่ใช้ no-subscription repository สำหรับ updates Subscription plans เริ่มต้นที่ EUR 95/year/socket ให้ enterprise repository (stable updates), technical support, access to Proxmox Customer Portal สำหรับ production environments แนะนำซื้อ subscription เพื่อความมั่นใจใน stability

Q: Audit logs เก็บนานแค่ไหนถึงจะ comply?

A: ขึ้นอยู่กับ compliance framework SOC 2 ต้องเก็บอย่างน้อย 1 ปี PCI DSS ต้องเก็บ 1 ปี online + archive ISO 27001 ไม่กำหนดระยะเวลาแน่นอนแต่ต้อง appropriate HIPAA ต้องเก็บ 6 ปี GDPR ต้องเก็บเท่าที่จำเป็น (data minimization) แนะนำเก็บ 1 ปีเป็นขั้นต่ำ 3-7 ปีสำหรับ regulated industries

Q: Proxmox กับ VMware ต่างกันอย่างไร?

A: Proxmox เป็น open source ฟรี ใช้ KVM/QEMU hypervisor มี web UI ที่ใช้งานง่าย รองรับ LXC containers built-in storage ด้วย Ceph/ZFS VMware vSphere เป็น commercial product มี feature enterprise มากกว่า (vMotion, DRS, NSX) มี ecosystem ใหญ่กว่า แต่ค่า license สูงมาก (vSphere 8 Standard เริ่มที่ $580/CPU) สำหรับ SME และ home lab Proxmox คุ้มค่ากว่ามาก

Q: ELK Stack กิน resources มากไหม?

A: Elasticsearch ต้องการ RAM อย่างน้อย 4GB (แนะนำ 8-16GB สำหรับ production) สำหรับ Proxmox cluster เล็ก (3-5 nodes) ELK Stack แบบ single node พร้อม 8GB RAM และ 100GB SSD เพียงพอ สำหรับ cluster ใหญ่ควรใช้ dedicated ELK cluster ทางเลือกที่ lightweight กว่าคือ Grafana Loki ที่ใช้ resources น้อยกว่า Elasticsearch มาก เหมาะสำหรับ log aggregation