SiamCafe.net Blog
Technology

Proxmox VE Cluster Stream Processing

proxmox ve cluster stream processing
Proxmox VE Cluster Stream Processing | SiamCafe Blog
2026-02-09· อ. บอม — SiamCafe.net· 1,495 คำ

Proxmox VE Cluster Stream Processing คืออะไร

Proxmox VE (Virtual Environment) เป็น open source virtualization platform ที่รวม KVM hypervisor และ LXC containers ไว้ใน management interface เดียว รองรับ clustering, high availability, software-defined storage (Ceph, ZFS) และ backup Stream Processing คือการประมวลผลข้อมูลแบบ real-time ต่อเนื่อง การรวม Proxmox VE Cluster กับ stream processing ช่วยให้สร้าง real-time data pipeline บน virtualized infrastructure ได้อย่างมีประสิทธิภาพ จัดสรร resources อัตโนมัติ และ scale ตาม workload

Proxmox VE Cluster Architecture

# proxmox_arch.py — Proxmox VE cluster architecture
import json

class ProxmoxArch:
    COMPONENTS = {
        "node": {
            "name": "Proxmox Node",
            "role": "Physical server ที่รัน Proxmox VE",
            "specs": "CPU: 8+ cores, RAM: 64+ GB, Storage: SSD + HDD",
        },
        "cluster": {
            "name": "Proxmox Cluster",
            "role": "กลุ่ม nodes ที่จัดการร่วมกัน (Corosync + pmxcfs)",
            "min_nodes": "3 nodes (quorum)",
        },
        "vm": {
            "name": "KVM Virtual Machines",
            "role": "Full virtualization — รัน OS ใดก็ได้",
            "use": "Kafka brokers, Flink workers, databases",
        },
        "ct": {
            "name": "LXC Containers",
            "role": "Lightweight containers — shared kernel",
            "use": "Monitoring, lightweight services, dev environments",
        },
        "storage": {
            "name": "Storage (Ceph, ZFS, NFS)",
            "role": "Shared storage สำหรับ VMs/CTs + live migration",
            "options": "Ceph RBD (distributed), ZFS (local), NFS (shared)",
        },
        "ha": {
            "name": "High Availability",
            "role": "Auto-restart VMs เมื่อ node fail",
            "config": "HA Manager + Fencing (IPMI/iLO)",
        },
    }

    CLUSTER_SETUP = """
    Proxmox VE Cluster for Stream Processing:
    
    [Node 1]              [Node 2]              [Node 3]
    ├─ VM: Kafka-1        ├─ VM: Kafka-2        ├─ VM: Kafka-3
    ├─ VM: Flink-TM-1     ├─ VM: Flink-TM-2     ├─ VM: Flink-JM
    ├─ CT: Prometheus      ├─ CT: Grafana        ├─ CT: AlertManager
    └─ Ceph OSD            └─ Ceph OSD           └─ Ceph OSD + MON
    
    Network:
    ├─ Management: 10.0.0.0/24 (vmbr0)
    ├─ Cluster: 10.0.1.0/24 (Corosync)
    ├─ Ceph: 10.0.2.0/24 (storage replication)
    └─ VM Traffic: 10.0.3.0/24 (stream data)
    """

    def show_components(self):
        print("=== Proxmox Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['role']}")
            print()

    def show_cluster(self):
        print("=== Cluster Layout ===")
        print(self.CLUSTER_SETUP)

arch = ProxmoxArch()
arch.show_components()
arch.show_cluster()

Stream Processing Stack บน Proxmox

# stream_stack.py — Stream processing stack
import json

class StreamStack:
    COMPONENTS = {
        "kafka": {
            "name": "Apache Kafka",
            "role": "Distributed message broker สำหรับ event streaming",
            "deploy": "3 VMs (KVM) — 1 broker per node",
            "resources": "4 vCPU, 16GB RAM, 500GB SSD per broker",
        },
        "flink": {
            "name": "Apache Flink",
            "role": "Stream processing engine (real-time computation)",
            "deploy": "1 JobManager VM + 2-4 TaskManager VMs",
            "resources": "JM: 2 vCPU 8GB | TM: 4 vCPU 16GB each",
        },
        "schema_registry": {
            "name": "Schema Registry",
            "role": "จัดการ Avro/Protobuf schemas สำหรับ Kafka",
            "deploy": "1 LXC container",
            "resources": "2 vCPU, 4GB RAM",
        },
        "connect": {
            "name": "Kafka Connect",
            "role": "Connectors สำหรับ source/sink (DB, S3, Elasticsearch)",
            "deploy": "2 VMs (distributed mode)",
            "resources": "2 vCPU, 8GB RAM per worker",
        },
        "monitoring": {
            "name": "Prometheus + Grafana",
            "role": "Monitoring ทั้ง Proxmox และ stream processing",
            "deploy": "LXC containers",
            "resources": "2 vCPU, 4GB RAM each",
        },
    }

    TERRAFORM_PROXMOX = """
# main.tf — Terraform for Proxmox VMs
terraform {
  required_providers {
    proxmox = {
      source  = "Telmate/proxmox"
      version = "3.0.1rc1"
    }
  }
}

provider "proxmox" {
  pm_api_url      = "https://proxmox.local:8006/api2/json"
  pm_user         = "terraform@pam"
  pm_password     = var.proxmox_password
  pm_tls_insecure = true
}

resource "proxmox_vm_qemu" "kafka" {
  count       = 3
  name        = "kafka-"
  target_node = "pve-"
  clone       = "ubuntu-template"
  
  cores   = 4
  memory  = 16384
  sockets = 1
  
  disk {
    storage = "ceph-pool"
    size    = "500G"
    type    = "scsi"
  }
  
  network {
    model  = "virtio"
    bridge = "vmbr3"
  }
  
  ipconfig0 = "ip=10.0.3.1/24, gw=10.0.3.1"
}
"""

    def show_stack(self):
        print("=== Stream Processing Stack ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}] — {comp['role']}")
            print(f"  Deploy: {comp['deploy']} | Resources: {comp['resources']}")
            print()

    def show_terraform(self):
        print("=== Terraform Config ===")
        print(self.TERRAFORM_PROXMOX[:500])

stack = StreamStack()
stack.show_stack()
stack.show_terraform()

Kafka & Flink Configuration

# kafka_flink.py — Kafka and Flink setup on Proxmox
import json

class KafkaFlinkSetup:
    KAFKA_CONFIG = """
# server.properties — Kafka broker config
broker.id=1
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://kafka-1:9092
log.dirs=/data/kafka-logs
num.partitions=12
default.replication.factor=3
min.insync.replicas=2
log.retention.hours=168
log.segment.bytes=1073741824
zookeeper.connect=zk-1:2181, zk-2:2181, zk-3:2181

# KRaft mode (no ZooKeeper)
# process.roles=broker, controller
# node.id=1
# controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
"""

    FLINK_EXAMPLE = """
# flink_job.py — Flink stream processing job (PyFlink)
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
t_env = StreamTableEnvironment.create(env)

# Kafka source
t_env.execute_sql('''
    CREATE TABLE events (
        event_id STRING,
        user_id STRING,
        event_type STRING,
        amount DECIMAL(10, 2),
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka-1:9092, kafka-2:9092, kafka-3:9092',
        'properties.group.id' = 'flink-processor',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
''')

# Real-time aggregation
t_env.execute_sql('''
    SELECT
        user_id,
        event_type,
        COUNT(*) as event_count,
        SUM(amount) as total_amount,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) as window_start
    FROM events
    GROUP BY
        user_id,
        event_type,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
''').print()
"""

    def show_kafka(self):
        print("=== Kafka Config ===")
        print(self.KAFKA_CONFIG[:400])

    def show_flink(self):
        print(f"\n=== Flink Job Example ===")
        print(self.FLINK_EXAMPLE[:600])

kf = KafkaFlinkSetup()
kf.show_kafka()
kf.show_flink()

Monitoring & Management

# monitoring.py — Proxmox + stream monitoring
import json
import random

class ProxmoxMonitoring:
    def cluster_status(self):
        print("=== Proxmox Cluster Status ===\n")
        nodes = [
            {"name": "pve-1", "cpu": random.randint(30, 70), "ram": random.randint(50, 80), "vms": 4, "status": "online"},
            {"name": "pve-2", "cpu": random.randint(25, 65), "ram": random.randint(45, 75), "vms": 3, "status": "online"},
            {"name": "pve-3", "cpu": random.randint(20, 60), "ram": random.randint(40, 70), "vms": 4, "status": "online"},
        ]
        for node in nodes:
            print(f"  [{node['status']:>6}] {node['name']}: CPU={node['cpu']}% RAM={node['ram']}% VMs={node['vms']}")

    def stream_metrics(self):
        print(f"\n=== Stream Processing Metrics ===")
        metrics = {
            "Kafka throughput": f"{random.randint(10, 100)}K msgs/sec",
            "Kafka lag (max)": f"{random.randint(0, 5000)} msgs",
            "Flink checkpoints": f"{random.randint(95, 100)}% success",
            "Flink processing time": f"{random.randint(10, 200)}ms avg",
            "Events processed/min": f"{random.randint(50, 500)}K",
        }
        for m, v in metrics.items():
            print(f"  {m}: {v}")

    def alerts(self):
        print(f"\n=== Active Alerts ===")
        alerts = [
            {"level": "INFO", "msg": "Kafka broker-2 leader election completed"},
            {"level": "WARN", "msg": f"Flink checkpoint took {random.randint(30, 60)}s (threshold: 30s)"},
            {"level": "INFO", "msg": f"Proxmox storage usage: {random.randint(50, 70)}%"},
        ]
        for a in alerts:
            print(f"  [{a['level']:>4}] {a['msg']}")

    def proxmox_api(self):
        print(f"\n=== Proxmox API Examples ===")
        cmds = [
            "GET /api2/json/cluster/status    # Cluster status",
            "GET /api2/json/nodes             # List nodes",
            "POST /api2/json/nodes/{node}/qemu/{vmid}/status/start  # Start VM",
            "GET /api2/json/nodes/{node}/qemu/{vmid}/rrddata  # VM metrics",
        ]
        for cmd in cmds:
            print(f"  {cmd}")

mon = ProxmoxMonitoring()
mon.cluster_status()
mon.stream_metrics()
mon.alerts()
mon.proxmox_api()

High Availability & Scaling

# ha_scaling.py — HA and scaling strategies
import json

class HAScaling:
    HA_CONFIG = {
        "proxmox_ha": {
            "name": "Proxmox HA Manager",
            "description": "Auto-restart VMs เมื่อ node fail",
            "setup": "Datacenter → HA → Add VM → Set priority",
            "requirement": "3+ nodes, shared storage (Ceph), fencing",
        },
        "kafka_ha": {
            "name": "Kafka Replication",
            "description": "replication.factor=3, min.insync.replicas=2",
            "setup": "Topic-level config, spread across Proxmox nodes",
            "requirement": "3 brokers on different physical nodes",
        },
        "flink_ha": {
            "name": "Flink HA (ZooKeeper/K8s)",
            "description": "JobManager failover + checkpoint recovery",
            "setup": "high-availability.type: zookeeper",
            "requirement": "Shared storage สำหรับ checkpoints",
        },
    }

    SCALING = {
        "vertical": {
            "name": "Vertical Scaling",
            "how": "เพิ่ม vCPU/RAM ให้ VM (Proxmox hot-plug)",
            "when": "Single service ต้องการ resources มากขึ้น",
            "limit": "Physical host resources",
        },
        "horizontal": {
            "name": "Horizontal Scaling",
            "how": "เพิ่ม VMs/nodes ใน cluster",
            "when": "Throughput ไม่พอ, ต้องกระจาย load",
            "limit": "Network bandwidth, Kafka partition count",
        },
        "auto": {
            "name": "Auto-scaling (Proxmox API)",
            "how": "Script ตรวจ metrics → clone/start VMs อัตโนมัติ",
            "when": "Workload ไม่คงที่ (peak hours)",
            "limit": "Physical capacity, VM template readiness",
        },
    }

    def show_ha(self):
        print("=== High Availability ===\n")
        for key, ha in self.HA_CONFIG.items():
            print(f"[{ha['name']}]")
            print(f"  {ha['description']}")
            print(f"  Requires: {ha['requirement']}")
            print()

    def show_scaling(self):
        print("=== Scaling Strategies ===")
        for key, s in self.SCALING.items():
            print(f"  [{s['name']}] {s['how']}")

ha = HAScaling()
ha.show_ha()
ha.show_scaling()

FAQ - คำถามที่พบบ่อย

Q: Proxmox กับ VMware ESXi อันไหนดีสำหรับ stream processing?

A: Proxmox: ฟรี (open source), Ceph built-in, KVM performance ดี, API ง่าย VMware: enterprise support, vMotion, mature ecosystem, แพง (license) สำหรับ home lab/SME: Proxmox (ประหยัด, features เพียงพอ) สำหรับ enterprise: VMware (support + ecosystem) Proxmox performance ใกล้เคียง VMware สำหรับ most workloads

Q: ใช้ LXC containers แทน VMs ได้ไหม?

A: ได้บางส่วน LXC ดีสำหรับ: monitoring (Prometheus, Grafana), lightweight services, dev LXC ไม่เหมาะสำหรับ: Kafka (ต้อง dedicated resources), Flink (heavy computation) กฎ: production stream processing → KVM VMs, supporting services → LXC

Q: Proxmox cluster ต้อง minimum กี่ nodes?

A: 3 nodes minimum สำหรับ quorum (HA) 2 nodes ได้แต่ไม่มี HA (ถ้า 1 node down = no quorum) สำหรับ stream processing: 3 nodes (1 Kafka per node) + shared Ceph storage ถ้า budget จำกัด: 3 nodes ที่รัน mixed workloads

Q: Ceph กับ ZFS อันไหนดีสำหรับ Proxmox?

A: Ceph: distributed, shared across nodes, live migration ได้, HA ZFS: local only (ไม่ shared), เร็วกว่า, snapshots ดี ใช้ Ceph: ต้องการ HA + live migration + shared storage ใช้ ZFS: single node, local performance สำคัญ ผสม: Ceph สำหรับ VMs + ZFS สำหรับ local backup

📖 บทความที่เกี่ยวข้อง

Proxmox VE Cluster Real-time Processingอ่านบทความ → Proxmox VE Cluster Serverless Architectureอ่านบทความ → Proxmox VE Cluster Compliance Automationอ่านบทความ → Proxmox VE Cluster Database Migrationอ่านบทความ → Proxmox VE Cluster อ่านบทความ →

📚 ดูบทความทั้งหมด →