SiamCafe · Blog
Proxmox VE Cluster Real Time Processing —
บทความ

Proxmox VE Cluster Real Time Processing —

เผยแพร่ 28 พฤษภาคม 2569

Real-Time Processing บน Proxmox VE Cluster

Real-Time Processing คือการประมวลผลข้อมูลทันทีที่ข้อมูลเข้ามา แทนที่จะรอสะสมแล้วประมวลผลเป็น batch เหมาะสำหรับ use cases เช่น real-time analytics, fraud detection, IoT sensor data, live dashboards และ event-driven architectures

Proxmox VE Cluster เป็น platform ที่ดีสำหรับ host real-time processing infrastructure เพราะรองรับ KVM VMs และ LXC containers, high availability ที่ migrate workloads อัตโนมัติเมื่อ node ล่ม, Ceph storage สำหรับ shared persistent storage และ network isolation ด้วย VLANs และ firewalls

Technology stack สำหรับ real-time processing ที่ deploy บน Proxmox ได้แก่ Apache Kafka สำหรับ message streaming, Apache Flink สำหรับ stream processing, Apache Spark Structured Streaming สำหรับ micro-batch processing, Redis Streams สำหรับ lightweight streaming และ ClickHouse สำหรับ real-time analytics queries

ข้อดีของการใช้ Proxmox แทน cloud คือ ควบคุม hardware เอง ไม่มี network latency ไป cloud, cost predictable ไม่มี surprise bills, data sovereignty เก็บข้อมูลใน premises และ customization สูง เลือก hardware ที่เหมาะกับ workload ได้

ติดตั้ง Proxmox Cluster สำหรับ Real-Time Workloads

ตั้งค่า Proxmox cluster ที่ optimize สำหรับ streaming

=== Proxmox Cluster Setup สำหรับ Real-Time Processing ===

Hardware Recommendations:

  • CPU: AMD EPYC / Intel Xeon (16+ cores per node)
  • RAM: 128GB+ per node (Kafka/Flink ใช้ memory มาก)
  • Storage: NVMe SSDs (IOPS สำคัญมากสำหรับ Kafka)
  • Network: 10GbE minimum (25GbE recommended)
  • Nodes: minimum 3 nodes สำหรับ HA

สร้าง Cluster (Node 1)

pvecm create realtime-cluster

Join nodes

ssh node2: pvecm add 192.168.1.10

ssh node3: pvecm add 192.168.1.10

ตรวจสอบ cluster

pvecm status

pvecm nodes

=== Network Configuration ===

แยก network สำหรับ data plane

/etc/network/interfaces (ทุก node)

Management network

auto vmbr0

iface vmbr0 inet static

address 192.168.1.10/24

gateway 192.168.1.1

bridge-ports eno1

Data network (10GbE)

auto vmbr1

iface vmbr1 inet static

address 10.10.10.10/24

bridge-ports eno2

bridge-stp off

Storage network (Ceph)

auto vmbr2

iface vmbr2 inet static

address 10.20.20.10/24

bridge-ports eno3

=== VM Template สำหรับ Kafka/Flink nodes ===

สร้าง Ubuntu 22.04 template

qm create 9000 --name ubuntu-template --memory 4096 --cores 4 \

--net0 virtio, bridge=vmbr0 --net1 virtio, bridge=vmbr1

Download cloud image

wget https://cloud-images.ubuntu.com/jammy/current/jammy-server-cloudimg-amd64.img

qm importdisk 9000 jammy-server-cloudimg-amd64.img local-lvm

Configure

qm set 9000 --scsihw virtio-scsi-pci --scsi0 local-lvm:vm-9000-disk-0

qm set 9000 --boot order=scsi0 --serial0 socket --vga serial0

qm set 9000 --agent enabled=1

qm template 9000

Clone VMs สำหรับ Kafka cluster

qm clone 9000 101 --name kafka-1 --full

qm clone 9000 102 --name kafka-2 --full

qm clone 9000 103 --name kafka-3 --full

Clone VMs สำหรับ Flink cluster

qm clone 9000 201 --name flink-jobmanager --full

qm clone 9000 202 --name flink-taskmanager-1 --full

qm clone 9000 203 --name flink-taskmanager-2 --full

Resize disks

qm resize 101 scsi0 +50G

qm resize 102 scsi0 +50G

qm resize 103 scsi0 +50G

Set resources

qm set 101 --memory 16384 --cores 8

qm set 102 --memory 16384 --cores 8

qm set 103 --memory 16384 --cores 8

Start VMs

qm start 101 && qm start 102 && qm start 103

Deploy Apache Kafka บน Proxmox

ติดตั้ง Kafka cluster สำหรับ real-time streaming

#!/bin/bash
# deploy_kafka.sh — Deploy Kafka Cluster on Proxmox VMs
set -euo pipefail

KAFKA_VERSION="3.7.0"
SCALA_VERSION="2.13"

# ติดตั้ง Java
sudo apt update
sudo apt install -y openjdk-17-jdk-headless

# ดาวน์โหลด Kafka
wget "https://downloads.apache.org/kafka//kafka_-.tgz"
tar -xzf "kafka_-.tgz"
sudo mv "kafka_-" /opt/kafka

# === KRaft Mode Configuration (ไม่ต้องใช้ ZooKeeper) ===
# /opt/kafka/config/kraft/server.properties

# Node 1 (kafka-1)
cat > /opt/kafka/config/kraft/server.properties << 'CONF'
process.roles=broker,controller
node.id=1
controller.quorum.voters=1@kafka-1:9093,2@kafka-2:9093,3@kafka-3:9093
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://kafka-1:9092
controller.listener.names=CONTROLLER
log.dirs=/data/kafka-logs
num.partitions=6
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
CONF

# สร้าง data directory
sudo mkdir -p /data/kafka-logs
sudo chown -R kafka:kafka /data/kafka-logs

# Generate cluster ID
KAFKA_CLUSTER_ID=$(/opt/kafka/bin/kafka-storage.sh random-uuid)
echo "Cluster ID: $KAFKA_CLUSTER_ID"

# Format storage
/opt/kafka/bin/kafka-storage.sh format \
  -t "$KAFKA_CLUSTER_ID" \
  -c /opt/kafka/config/kraft/server.properties

# Systemd service
cat > /etc/systemd/system/kafka.service << 'SERVICE'
[Unit]
Description=Apache Kafka
After=network.target

[Service]
Type=simple
User=kafka
Environment="KAFKA_HEAP_OPTS=-Xmx8G -Xms8G"
Environment="KAFKA_JVM_PERFORMANCE_OPTS=-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/kraft/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
Restart=on-failure
LimitNOFILE=65536

[Install]
WantedBy=multi-user.target
SERVICE

sudo systemctl daemon-reload
sudo systemctl enable kafka
sudo systemctl start kafka

# ตรวจสอบ
/opt/kafka/bin/kafka-metadata.sh --snapshot /data/kafka-logs/__cluster_metadata-0/00000000000000000000.log --cluster-id "$KAFKA_CLUSTER_ID"

# สร้าง topic
/opt/kafka/bin/kafka-topics.sh --create \
  --topic events \
  --partitions 12 \
  --replication-factor 3 \
  --bootstrap-server kafka-1:9092

# Test produce/consume
echo "test message" | /opt/kafka/bin/kafka-console-producer.sh --topic events --bootstrap-server kafka-1:9092
/opt/kafka/bin/kafka-console-consumer.sh --topic events --from-beginning --bootstrap-server kafka-1:9092 --max-messages 1

สร้าง Stream Processing ด้วย Apache Flink

Deploy Flink และเขียน stream processing jobs

#!/usr/bin/env python3
# stream_processor.py — Real-Time Stream Processing with PyFlink
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from pyflink.common import WatermarkStrategy, Types
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("stream_processor")

def create_kafka_source(env, topic, bootstrap_servers, group_id):
    source = KafkaSource.builder() \
        .set_bootstrap_servers(bootstrap_servers) \
        .set_topics(topic) \
        .set_group_id(group_id) \
        .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()
    
    return env.from_source(
        source,
        WatermarkStrategy.no_watermarks(),
        f"Kafka Source: {topic}"
    )

def process_events():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(4)
    
    # Enable checkpointing (fault tolerance)
    env.enable_checkpointing(60000)  # 60 seconds
    
    t_env = StreamTableEnvironment.create(env)
    
    # Create Kafka source table
    t_env.execute_sql("""
        CREATE TABLE events (
            event_id STRING,
            event_type STRING,
            user_id BIGINT,
            page_url STRING,
            event_time TIMESTAMP(3),
            properties MAP,
            WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'events',
            'properties.bootstrap.servers' = 'kafka-1:9092, kafka-2:9092, kafka-3:9092',
            'properties.group.id' = 'flink-processor',
            'scan.startup.mode' = 'latest-offset',
            'format' = 'json'
        )
    """)
    
    # Real-time aggregation: pageviews per minute
    t_env.execute_sql("""
        CREATE TABLE pageview_stats (
            window_start TIMESTAMP(3),
            window_end TIMESTAMP(3),
            page_url STRING,
            view_count BIGINT,
            unique_users BIGINT,
            PRIMARY KEY (window_start, page_url) NOT ENFORCED
        ) WITH (
            'connector' = 'jdbc',
            'url' = 'jdbc:postgresql://postgres:5432/analytics',
            'table-name' = 'pageview_stats',
            'username' = 'analytics',
            'password' = 'secret'
        )
    """)
    
    # Window aggregation query
    t_env.execute_sql("""
        INSERT INTO pageview_stats
        SELECT
            TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
            TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS window_end,
            page_url,
            COUNT(*) AS view_count,
            COUNT(DISTINCT user_id) AS unique_users
        FROM events
        WHERE event_type = 'pageview'
        GROUP BY
            TUMBLE(event_time, INTERVAL '1' MINUTE),
            page_url
    """)
    
    # Fraud detection: too many events from same user
    t_env.execute_sql("""
        CREATE TABLE fraud_alerts (
            alert_time TIMESTAMP(3),
            user_id BIGINT,
            event_count BIGINT,
            alert_type STRING
        ) WITH (
            'connector' = 'kafka',
            'topic' = 'fraud-alerts',
            'properties.bootstrap.servers' = 'kafka-1:9092',
            'format' = 'json'
        )
    """)
    
    t_env.execute_sql("""
        INSERT INTO fraud_alerts
        SELECT
            TUMBLE_END(event_time, INTERVAL '1' MINUTE) AS alert_time,
            user_id,
            COUNT(*) AS event_count,
            'high_frequency' AS alert_type
        FROM events
        GROUP BY
            TUMBLE(event_time, INTERVAL '1' MINUTE),
            user_id
        HAVING COUNT(*) > 100
    """)
    
    logger.info("Stream processing started")

if __name__ == "__main__":
    process_events()

Monitoring Real-Time Pipeline

ระบบ monitoring สำหรับ streaming pipeline

#!/usr/bin/env python3
# pipeline_monitor.py — Real-Time Pipeline Monitoring
from prometheus_client import start_http_server, Gauge, Counter, Histogram
from confluent_kafka import Consumer, TopicPartition
import json
import time
import logging
import requests

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pipeline_monitor")

# Prometheus Metrics
kafka_lag = Gauge("kafka_consumer_lag", "Consumer lag", ["topic", "partition", "group"])
kafka_throughput = Gauge("kafka_topic_throughput", "Messages per second", ["topic"])
flink_jobs_running = Gauge("flink_jobs_running", "Running Flink jobs")
flink_checkpoint_duration = Gauge("flink_checkpoint_duration_ms", "Checkpoint duration", ["job"])
pipeline_latency = Histogram("pipeline_end_to_end_latency_seconds", "E2E latency",
                              buckets=[0.1, 0.5, 1, 2, 5, 10, 30])

class PipelineMonitor:
    def __init__(self, kafka_brokers, flink_url="http://flink-jobmanager:8081"):
        self.kafka_brokers = kafka_brokers
        self.flink_url = flink_url
    
    def check_kafka_lag(self, group_id, topics):
        consumer = Consumer({
            "bootstrap.servers": self.kafka_brokers,
            "group.id": f"{group_id}-monitor",
            "enable.auto.commit": False,
        })
        
        total_lag = 0
        
        for topic in topics:
            metadata = consumer.list_topics(topic)
            partitions = metadata.topics[topic].partitions
            
            for pid in partitions:
                tp = TopicPartition(topic, pid)
                
                low, high = consumer.get_watermark_offsets(tp)
                
                committed = consumer.committed([tp])[0]
                current_offset = committed.offset if committed and committed.offset >= 0 else 0
                
                lag = high - current_offset
                total_lag += lag
                
                kafka_lag.labels(topic=topic, partition=str(pid), group=group_id).set(lag)
        
        consumer.close()
        logger.info(f"Kafka lag for {group_id}: {total_lag}")
        return total_lag
    
    def check_flink_status(self):
        try:
            resp = requests.get(f"{self.flink_url}/jobs/overview", timeout=10)
            jobs = resp.json().get("jobs", [])
            
            running = sum(1 for j in jobs if j["state"] == "RUNNING")
            flink_jobs_running.set(running)
            
            for job in jobs:
                if job["state"] == "RUNNING":
                    job_resp = requests.get(
                        f"{self.flink_url}/jobs/{job['jid']}/checkpoints",
                        timeout=10,
                    )
                    cp_data = job_resp.json()
                    
                    if cp_data.get("latest", {}).get("completed"):
                        duration = cp_data["latest"]["completed"]["duration"]
                        flink_checkpoint_duration.labels(job=job["name"]).set(duration)
            
            logger.info(f"Flink: {running} running jobs")
            return {"running": running, "total": len(jobs)}
            
        except Exception as e:
            logger.error(f"Flink check failed: {e}")
            return {"error": str(e)}
    
    def run(self, interval=15):
        start_http_server(9090)
        logger.info("Pipeline monitor started on :9090")
        
        while True:
            try:
                self.check_kafka_lag("flink-processor", ["events"])
                self.check_flink_status()
            except Exception as e:
                logger.error(f"Monitor error: {e}")
            
            time.sleep(interval)

# Grafana Dashboard Panels:
# 1. Kafka Consumer Lag (per partition)
#    Query: kafka_consumer_lag{group="flink-processor"}
#
# 2. Throughput (messages/sec)
#    Query: rate(kafka_topic_throughput[5m])
#
# 3. Flink Jobs Status
#    Query: flink_jobs_running
#
# 4. Checkpoint Duration
#    Query: flink_checkpoint_duration_ms
#
# 5. End-to-End Latency
#    Query: histogram_quantile(0.95, pipeline_end_to_end_latency_seconds_bucket)
#
# Alert Rules:
# - Kafka lag > 10000: WARNING
# - Kafka lag > 100000: CRITICAL
# - Flink jobs running < expected: CRITICAL
# - Checkpoint duration > 60s: WARNING
# - E2E latency P95 > 10s: WARNING

if __name__ == "__main__":
    monitor = PipelineMonitor("kafka-1:9092, kafka-2:9092, kafka-3:9092")
    monitor.run()

Performance Tuning และ HA Configuration

เทคนิค optimize performance สำหรับ real-time pipeline

=== Proxmox VM Tuning สำหรับ Kafka ===

1. CPU Pinning (ลด context switching)

qm set 101 --cpuunits 2048

qm set 101 --cpu host # passthrough CPU features

2. Memory Tuning

qm set 101 --balloon 0 # ปิด ballooning

qm set 101 --numa 1 # enable NUMA

3. Disk I/O Tuning

qm set 101 --scsi0 local-lvm:vm-101-disk-0, iothread=1, cache=none

cache=none สำหรับ Kafka (ใช้ OS page cache)

4. Network Tuning

qm set 101 --net0 virtio, bridge=vmbr1, queues=4

multiqueue สำหรับ high throughput

=== Kafka Performance Tuning ===

server.properties additions:

# Increase throughput

num.network.threads=8

num.io.threads=16

socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

# Log performance

log.flush.interval.messages=10000

log.flush.interval.ms=1000

# Replication

replica.fetch.max.bytes=10485760

replica.fetch.wait.max.ms=500

# Compression

compression.type=lz4

=== Flink Performance Tuning ===

flink-conf.yaml:

taskmanager.numberOfTaskSlots: 8

taskmanager.memory.process.size: 12g

taskmanager.memory.managed.fraction: 0.4

state.backend: rocksdb

state.checkpoints.dir: hdfs:///flink/checkpoints

execution.checkpointing.interval: 60000

execution.checkpointing.min-pause: 30000

execution.checkpointing.timeout: 600000

restart-strategy: fixed-delay

restart-strategy.fixed-delay.attempts: 3

restart-strategy.fixed-delay.delay: 10s

=== OS Tuning (ทุก VM) ===

/etc/sysctl.conf

cat >> /etc/sysctl.conf << 'SYSCTL'

Network

net.core.somaxconn=32768

net.core.netdev_max_backlog=16384

net.core.rmem_max=16777216

net.core.wmem_max=16777216

net.ipv4.tcp_max_syn_backlog=16384

net.ipv4.tcp_fin_timeout=15

VM

vm.swappiness=1

vm.dirty_ratio=80

vm.dirty_background_ratio=5

vm.max_map_count=262144

File descriptors

fs.file-max=2097152

SYSCTL

sysctl -p

/etc/security/limits.conf

kafka soft nofile 131072

kafka hard nofile 131072

kafka soft nproc 65536

kafka hard nproc 65536

=== HA Configuration ===

Proxmox HA สำหรับ Kafka VMs

ha-manager add vm:101 --state started --group kafka-group --max_restart 3

ha-manager add vm:102 --state started --group kafka-group --max_restart 3

ha-manager add vm:103 --state started --group kafka-group --max_restart 3

HA Group (distribute across nodes)

ha-manager groupadd kafka-group --nodes pve1,pve2,pve3 --nofailback 1

ตรวจสอบ HA status

ha-manager status

FAQ คำถามที่พบบ่อย

Q: Proxmox เหมาะกับ real-time processing ไหม?

A: เหมาะมากสำหรับ on-premise real-time processing Proxmox ให้ virtualization overhead ต่ำ (KVM ให้ near-native performance) รองรับ CPU pinning, NUMA awareness และ SR-IOV สำหรับ network bypass สำหรับ latency-sensitive workloads ใช้ LXC containers แทน VMs ได้เพื่อ overhead ที่ต่ำกว่า HA features ช่วยให้ pipeline ไม่หยุดเมื่อ hardware fail

Q: Kafka KRaft mode ดีกว่า ZooKeeper mode อย่างไร?

A: KRaft mode (Kafka Raft) ลบ dependency บน ZooKeeper ทำให้ deploy และ manage ง่ายขึ้น metadata propagation เร็วกว่า partition limit สูงกว่า (ล้าน partitions) startup time เร็วกว่า และ operational complexity ลดลง (ไม่ต้อง manage ZooKeeper cluster แยก) Kafka 3.5+ recommend KRaft สำหรับ production ใหม่ ZooKeeper mode จะถูกลบใน Kafka 4.0

Q: ใช้ containers แทน VMs สำหรับ Kafka ดีกว่าไหม?

A: ขึ้นอยู่กับ requirements LXC containers บน Proxmox ให้ performance ดีกว่า VMs (ไม่มี hypervisor overhead) เหมาะสำหรับ stateless workloads เช่น Flink TaskManagers แต่ VMs เหมาะกว่าสำหรับ Kafka brokers เพราะ isolation ดีกว่า (kernel แยก), disk I/O predictable กว่า และ live migration ง่ายกว่า แนะนำ VMs สำหรับ Kafka, containers สำหรับ processors

Q: ต้องการ throughput เท่าไหรถึงควรใช้ dedicated infrastructure?

A: ถ้า throughput เกิน 100MB/s หรือ 100,000 messages/s ควรใช้ dedicated infrastructure เพราะ shared resources อาจทำให้ latency ไม่ stable สำหรับ throughput ต่ำกว่านั้น cloud managed services เช่น Amazon MSK หรือ Confluent Cloud อาจคุ้มค่ากว่าในแง่ operational overhead สำหรับ latency-critical applications ที่ต้องการ sub-millisecond on-premise ดีกว่าเสมอ