SiamCafe.net Blog
Technology

Proxmox VE Cluster Real Time Processing — สร้างระบบ Streaming บน Proxmox

proxmox ve cluster real time processing
Proxmox VE Cluster Real-time Processing | SiamCafe Blog
2026-01-03· อ. บอม — SiamCafe.net· 1,308 คำ

Real-Time Processing บน Proxmox VE Cluster

Real-Time Processing คือการประมวลผลข้อมูลทันทีที่ข้อมูลเข้ามา แทนที่จะรอสะสมแล้วประมวลผลเป็น batch เหมาะสำหรับ use cases เช่น real-time analytics, fraud detection, IoT sensor data, live dashboards และ event-driven architectures

Proxmox VE Cluster เป็น platform ที่ดีสำหรับ host real-time processing infrastructure เพราะรองรับ KVM VMs และ LXC containers, high availability ที่ migrate workloads อัตโนมัติเมื่อ node ล่ม, Ceph storage สำหรับ shared persistent storage และ network isolation ด้วย VLANs และ firewalls

Technology stack สำหรับ real-time processing ที่ deploy บน Proxmox ได้แก่ Apache Kafka สำหรับ message streaming, Apache Flink สำหรับ stream processing, Apache Spark Structured Streaming สำหรับ micro-batch processing, Redis Streams สำหรับ lightweight streaming และ ClickHouse สำหรับ real-time analytics queries

ข้อดีของการใช้ Proxmox แทน cloud คือ ควบคุม hardware เอง ไม่มี network latency ไป cloud, cost predictable ไม่มี surprise bills, data sovereignty เก็บข้อมูลใน premises และ customization สูง เลือก hardware ที่เหมาะกับ workload ได้

ติดตั้ง Proxmox Cluster สำหรับ Real-Time Workloads

ตั้งค่า Proxmox cluster ที่ optimize สำหรับ streaming

# === Proxmox Cluster Setup สำหรับ Real-Time Processing ===

# Hardware Recommendations:
# - CPU: AMD EPYC / Intel Xeon (16+ cores per node)
# - RAM: 128GB+ per node (Kafka/Flink ใช้ memory มาก)
# - Storage: NVMe SSDs (IOPS สำคัญมากสำหรับ Kafka)
# - Network: 10GbE minimum (25GbE recommended)
# - Nodes: minimum 3 nodes สำหรับ HA

# สร้าง Cluster (Node 1)
pvecm create realtime-cluster

# Join nodes
# ssh node2: pvecm add 192.168.1.10
# ssh node3: pvecm add 192.168.1.10

# ตรวจสอบ cluster
pvecm status
pvecm nodes

# === Network Configuration ===
# แยก network สำหรับ data plane
# /etc/network/interfaces (ทุก node)

# Management network
# auto vmbr0
# iface vmbr0 inet static
#     address 192.168.1.10/24
#     gateway 192.168.1.1
#     bridge-ports eno1

# Data network (10GbE)
# auto vmbr1
# iface vmbr1 inet static
#     address 10.10.10.10/24
#     bridge-ports eno2
#     bridge-stp off

# Storage network (Ceph)
# auto vmbr2
# iface vmbr2 inet static
#     address 10.20.20.10/24
#     bridge-ports eno3

# === VM Template สำหรับ Kafka/Flink nodes ===
# สร้าง Ubuntu 22.04 template
qm create 9000 --name ubuntu-template --memory 4096 --cores 4 \
  --net0 virtio, bridge=vmbr0 --net1 virtio, bridge=vmbr1

# Download cloud image
wget https://cloud-images.ubuntu.com/jammy/current/jammy-server-cloudimg-amd64.img
qm importdisk 9000 jammy-server-cloudimg-amd64.img local-lvm

# Configure
qm set 9000 --scsihw virtio-scsi-pci --scsi0 local-lvm:vm-9000-disk-0
qm set 9000 --boot order=scsi0 --serial0 socket --vga serial0
qm set 9000 --agent enabled=1
qm template 9000

# Clone VMs สำหรับ Kafka cluster
qm clone 9000 101 --name kafka-1 --full
qm clone 9000 102 --name kafka-2 --full
qm clone 9000 103 --name kafka-3 --full

# Clone VMs สำหรับ Flink cluster
qm clone 9000 201 --name flink-jobmanager --full
qm clone 9000 202 --name flink-taskmanager-1 --full
qm clone 9000 203 --name flink-taskmanager-2 --full

# Resize disks
qm resize 101 scsi0 +50G
qm resize 102 scsi0 +50G
qm resize 103 scsi0 +50G

# Set resources
qm set 101 --memory 16384 --cores 8
qm set 102 --memory 16384 --cores 8
qm set 103 --memory 16384 --cores 8

# Start VMs
qm start 101 && qm start 102 && qm start 103

Deploy Apache Kafka บน Proxmox

ติดตั้ง Kafka cluster สำหรับ real-time streaming

#!/bin/bash
# deploy_kafka.sh — Deploy Kafka Cluster on Proxmox VMs
set -euo pipefail

KAFKA_VERSION="3.7.0"
SCALA_VERSION="2.13"

# ติดตั้ง Java
sudo apt update
sudo apt install -y openjdk-17-jdk-headless

# ดาวน์โหลด Kafka
wget "https://downloads.apache.org/kafka//kafka_-.tgz"
tar -xzf "kafka_-.tgz"
sudo mv "kafka_-" /opt/kafka

# === KRaft Mode Configuration (ไม่ต้องใช้ ZooKeeper) ===
# /opt/kafka/config/kraft/server.properties

# Node 1 (kafka-1)
cat > /opt/kafka/config/kraft/server.properties << 'CONF'
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka-1:9092
controller.listener.names=CONTROLLER
log.dirs=/data/kafka-logs
num.partitions=6
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
CONF

# สร้าง data directory
sudo mkdir -p /data/kafka-logs
sudo chown -R kafka:kafka /data/kafka-logs

# Generate cluster ID
KAFKA_CLUSTER_ID=$(/opt/kafka/bin/kafka-storage.sh random-uuid)
echo "Cluster ID: $KAFKA_CLUSTER_ID"

# Format storage
/opt/kafka/bin/kafka-storage.sh format \
  -t "$KAFKA_CLUSTER_ID" \
  -c /opt/kafka/config/kraft/server.properties

# Systemd service
cat > /etc/systemd/system/kafka.service << 'SERVICE'
[Unit]
Description=Apache Kafka
After=network.target

[Service]
Type=simple
User=kafka
Environment="KAFKA_HEAP_OPTS=-Xmx8G -Xms8G"
Environment="KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
SERVICE

sudo systemctl daemon-reload
sudo systemctl enable kafka
sudo systemctl start kafka

# ตรวจสอบ
/opt/kafka/bin/kafka-metadata.sh --snapshot /data/kafka-logs/__cluster_metadata-0/00000000000000000000.log --cluster-id "$KAFKA_CLUSTER_ID"

# สร้าง topic
/opt/kafka/bin/kafka-topics.sh --create \
  --topic events \
  --partitions 12 \
  --replication-factor 3 \
  --bootstrap-server kafka-1:9092

# Test produce/consume
echo "test message" | /opt/kafka/bin/kafka-console-producer.sh --topic events --bootstrap-server kafka-1:9092
/opt/kafka/bin/kafka-console-consumer.sh --topic events --from-beginning --bootstrap-server kafka-1:9092 --max-messages 1

สร้าง Stream Processing ด้วย Apache Flink

Deploy Flink และเขียน stream processing jobs

#!/usr/bin/env python3
# stream_processor.py — Real-Time Stream Processing with PyFlink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("stream_processor")

def create_kafka_source(env, topic, bootstrap_servers, group_id):
    source = KafkaSource.builder() \
        .set_bootstrap_servers(bootstrap_servers) \
        .set_topics(topic) \
        .set_group_id(group_id) \
        .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()
    
    return env.from_source(
        source,
        WatermarkStrategy.no_watermarks(),
        f"Kafka Source: {topic}"
    )

def process_events():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)
    
    # Enable checkpointing (fault tolerance)
    env.enable_checkpointing(60000)  # 60 seconds
    
    t_env = StreamTableEnvironment.create(env)
    
    # Create Kafka source table
    t_env.execute_sql("""
        CREATE TABLE events (
            event_id STRING,
            event_type STRING,
            user_id BIGINT,
            page_url STRING,
            event_time TIMESTAMP(3),
            properties MAP,
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'events',
            'properties.bootstrap.servers' = 'kafka-1:9092, kafka-2:9092, kafka-3:9092',
            'properties.group.id' = 'flink-processor',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
        )
    """)
    
    # Real-time aggregation: pageviews per minute
    t_env.execute_sql("""
        CREATE TABLE pageview_stats (
            window_start TIMESTAMP(3),
            window_end TIMESTAMP(3),
            page_url STRING,
            view_count BIGINT,
            unique_users BIGINT,
            PRIMARY KEY (window_start, page_url) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:postgresql://postgres:5432/analytics',
            'table-name' = 'pageview_stats',
            'username' = 'analytics',
            'password' = 'secret'
        )
    """)
    
    # Window aggregation query
    t_env.execute_sql("""
        INSERT INTO pageview_stats
        SELECT
            TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
            TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
            page_url,
            COUNT(*) AS view_count,
            COUNT(DISTINCT user_id) AS unique_users
        FROM events
        WHERE event_type = 'pageview'
        GROUP BY
            TUMBLE(event_time, INTERVAL '1' MINUTE),
            page_url
    """)
    
    # Fraud detection: too many events from same user
    t_env.execute_sql("""
        CREATE TABLE fraud_alerts (
            alert_time TIMESTAMP(3),
            user_id BIGINT,
            event_count BIGINT,
            alert_type STRING
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'fraud-alerts',
            'properties.bootstrap.servers' = 'kafka-1:9092',
            'format' = 'json'
        )
    """)
    
    t_env.execute_sql("""
        INSERT INTO fraud_alerts
        SELECT
            TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS alert_time,
            user_id,
            COUNT(*) AS event_count,
            'high_frequency' AS alert_type
        FROM events
        GROUP BY
            TUMBLE(event_time, INTERVAL '1' MINUTE),
            user_id
        HAVING COUNT(*) > 100
    """)
    
    logger.info("Stream processing started")

if __name__ == "__main__":
    process_events()

Monitoring Real-Time Pipeline

ระบบ monitoring สำหรับ streaming pipeline

#!/usr/bin/env python3
# pipeline_monitor.py — Real-Time Pipeline Monitoring
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from confluent_kafka import Consumer, TopicPartition
import json
import time
import logging
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline_monitor")

# Prometheus Metrics
kafka_lag = Gauge("kafka_consumer_lag", "Consumer lag", ["topic", "partition", "group"])
kafka_throughput = Gauge("kafka_topic_throughput", "Messages per second", ["topic"])
flink_jobs_running = Gauge("flink_jobs_running", "Running Flink jobs")
flink_checkpoint_duration = Gauge("flink_checkpoint_duration_ms", "Checkpoint duration", ["job"])
pipeline_latency = Histogram("pipeline_end_to_end_latency_seconds", "E2E latency",
                              buckets=[0.1, 0.5, 1, 2, 5, 10, 30])

class PipelineMonitor:
    def __init__(self, kafka_brokers, flink_url="http://flink-jobmanager:8081"):
        self.kafka_brokers = kafka_brokers
        self.flink_url = flink_url
    
    def check_kafka_lag(self, group_id, topics):
        consumer = Consumer({
            "bootstrap.servers": self.kafka_brokers,
            "group.id": f"{group_id}-monitor",
            "enable.auto.commit": False,
        })
        
        total_lag = 0
        
        for topic in topics:
            metadata = consumer.list_topics(topic)
            partitions = metadata.topics[topic].partitions
            
            for pid in partitions:
                tp = TopicPartition(topic, pid)
                
                low, high = consumer.get_watermark_offsets(tp)
                
                committed = consumer.committed([tp])[0]
                current_offset = committed.offset if committed and committed.offset >= 0 else 0
                
                lag = high - current_offset
                total_lag += lag
                
                kafka_lag.labels(topic=topic, partition=str(pid), group=group_id).set(lag)
        
        consumer.close()
        logger.info(f"Kafka lag for {group_id}: {total_lag}")
        return total_lag
    
    def check_flink_status(self):
        try:
            resp = requests.get(f"{self.flink_url}/jobs/overview", timeout=10)
            jobs = resp.json().get("jobs", [])
            
            running = sum(1 for j in jobs if j["state"] == "RUNNING")
            flink_jobs_running.set(running)
            
            for job in jobs:
                if job["state"] == "RUNNING":
                    job_resp = requests.get(
                        f"{self.flink_url}/jobs/{job['jid']}/checkpoints",
                        timeout=10,
                    )
                    cp_data = job_resp.json()
                    
                    if cp_data.get("latest", {}).get("completed"):
                        duration = cp_data["latest"]["completed"]["duration"]
                        flink_checkpoint_duration.labels(job=job["name"]).set(duration)
            
            logger.info(f"Flink: {running} running jobs")
            return {"running": running, "total": len(jobs)}
            
        except Exception as e:
            logger.error(f"Flink check failed: {e}")
            return {"error": str(e)}
    
    def run(self, interval=15):
        start_http_server(9090)
        logger.info("Pipeline monitor started on :9090")
        
        while True:
            try:
                self.check_kafka_lag("flink-processor", ["events"])
                self.check_flink_status()
            except Exception as e:
                logger.error(f"Monitor error: {e}")
            
            time.sleep(interval)

# Grafana Dashboard Panels:
# 1. Kafka Consumer Lag (per partition)
#    Query: kafka_consumer_lag{group="flink-processor"}
#
# 2. Throughput (messages/sec)
#    Query: rate(kafka_topic_throughput[5m])
#
# 3. Flink Jobs Status
#    Query: flink_jobs_running
#
# 4. Checkpoint Duration
#    Query: flink_checkpoint_duration_ms
#
# 5. End-to-End Latency
#    Query: histogram_quantile(0.95, pipeline_end_to_end_latency_seconds_bucket)
#
# Alert Rules:
# - Kafka lag > 10000: WARNING
# - Kafka lag > 100000: CRITICAL
# - Flink jobs running < expected: CRITICAL
# - Checkpoint duration > 60s: WARNING
# - E2E latency P95 > 10s: WARNING

if __name__ == "__main__":
    monitor = PipelineMonitor("kafka-1:9092, kafka-2:9092, kafka-3:9092")
    monitor.run()

Performance Tuning และ HA Configuration

เทคนิค optimize performance สำหรับ real-time pipeline

# === Proxmox VM Tuning สำหรับ Kafka ===

# 1. CPU Pinning (ลด context switching)
qm set 101 --cpuunits 2048
qm set 101 --cpu host  # passthrough CPU features

# 2. Memory Tuning
qm set 101 --balloon 0  # ปิด ballooning
qm set 101 --numa 1     # enable NUMA

# 3. Disk I/O Tuning
qm set 101 --scsi0 local-lvm:vm-101-disk-0, iothread=1, cache=none
# cache=none สำหรับ Kafka (ใช้ OS page cache)

# 4. Network Tuning
qm set 101 --net0 virtio, bridge=vmbr1, queues=4
# multiqueue สำหรับ high throughput

# === Kafka Performance Tuning ===
# server.properties additions:
# 
# # Increase throughput
# num.network.threads=8
# num.io.threads=16
# socket.send.buffer.bytes=1048576
# socket.receive.buffer.bytes=1048576
# 
# # Log performance
# log.flush.interval.messages=10000
# log.flush.interval.ms=1000
# 
# # Replication
# replica.fetch.max.bytes=10485760
# replica.fetch.wait.max.ms=500
# 
# # Compression
# compression.type=lz4

# === Flink Performance Tuning ===
# flink-conf.yaml:
#
# taskmanager.numberOfTaskSlots: 8
# taskmanager.memory.process.size: 12g
# taskmanager.memory.managed.fraction: 0.4
# state.backend: rocksdb
# state.checkpoints.dir: hdfs:///flink/checkpoints
# execution.checkpointing.interval: 60000
# execution.checkpointing.min-pause: 30000
# execution.checkpointing.timeout: 600000
# restart-strategy: fixed-delay
# restart-strategy.fixed-delay.attempts: 3
# restart-strategy.fixed-delay.delay: 10s

# === OS Tuning (ทุก VM) ===
# /etc/sysctl.conf
cat >> /etc/sysctl.conf << 'SYSCTL'
# Network
net.core.somaxconn=32768
net.core.netdev_max_backlog=16384
net.core.rmem_max=16777216
net.core.wmem_max=16777216
net.ipv4.tcp_max_syn_backlog=16384
net.ipv4.tcp_fin_timeout=15

# VM
vm.swappiness=1
vm.dirty_ratio=80
vm.dirty_background_ratio=5
vm.max_map_count=262144

# File descriptors
fs.file-max=2097152
SYSCTL

sysctl -p

# /etc/security/limits.conf
# kafka soft nofile 131072
# kafka hard nofile 131072
# kafka soft nproc 65536
# kafka hard nproc 65536

# === HA Configuration ===
# Proxmox HA สำหรับ Kafka VMs
ha-manager add vm:101 --state started --group kafka-group --max_restart 3
ha-manager add vm:102 --state started --group kafka-group --max_restart 3
ha-manager add vm:103 --state started --group kafka-group --max_restart 3

# HA Group (distribute across nodes)
ha-manager groupadd kafka-group --nodes pve1,pve2,pve3 --nofailback 1

# ตรวจสอบ HA status
ha-manager status

FAQ คำถามที่พบบ่อย

Q: Proxmox เหมาะกับ real-time processing ไหม?

A: เหมาะมากสำหรับ on-premise real-time processing Proxmox ให้ virtualization overhead ต่ำ (KVM ให้ near-native performance) รองรับ CPU pinning, NUMA awareness และ SR-IOV สำหรับ network bypass สำหรับ latency-sensitive workloads ใช้ LXC containers แทน VMs ได้เพื่อ overhead ที่ต่ำกว่า HA features ช่วยให้ pipeline ไม่หยุดเมื่อ hardware fail

Q: Kafka KRaft mode ดีกว่า ZooKeeper mode อย่างไร?

A: KRaft mode (Kafka Raft) ลบ dependency บน ZooKeeper ทำให้ deploy และ manage ง่ายขึ้น metadata propagation เร็วกว่า partition limit สูงกว่า (ล้าน partitions) startup time เร็วกว่า และ operational complexity ลดลง (ไม่ต้อง manage ZooKeeper cluster แยก) Kafka 3.5+ recommend KRaft สำหรับ production ใหม่ ZooKeeper mode จะถูกลบใน Kafka 4.0

Q: ใช้ containers แทน VMs สำหรับ Kafka ดีกว่าไหม?

A: ขึ้นอยู่กับ requirements LXC containers บน Proxmox ให้ performance ดีกว่า VMs (ไม่มี hypervisor overhead) เหมาะสำหรับ stateless workloads เช่น Flink TaskManagers แต่ VMs เหมาะกว่าสำหรับ Kafka brokers เพราะ isolation ดีกว่า (kernel แยก), disk I/O predictable กว่า และ live migration ง่ายกว่า แนะนำ VMs สำหรับ Kafka, containers สำหรับ processors

Q: ต้องการ throughput เท่าไหรถึงควรใช้ dedicated infrastructure?

A: ถ้า throughput เกิน 100MB/s หรือ 100,000 messages/s ควรใช้ dedicated infrastructure เพราะ shared resources อาจทำให้ latency ไม่ stable สำหรับ throughput ต่ำกว่านั้น cloud managed services เช่น Amazon MSK หรือ Confluent Cloud อาจคุ้มค่ากว่าในแง่ operational overhead สำหรับ latency-critical applications ที่ต้องการ sub-millisecond on-premise ดีกว่าเสมอ

📖 บทความที่เกี่ยวข้อง

Proxmox VE Cluster อ่านบทความ → Proxmox VE Cluster Compliance Automationอ่านบทความ → Server-Sent Events Real-time Processingอ่านบทความ → Proxmox VE Cluster DevSecOps Integrationอ่านบทความ → ONNX Runtime Real-time Processingอ่านบทความ →

📚 ดูบทความทั้งหมด →