Proxmox VE Cluster Batch Processing Pipeline คืออะไร
Proxmox VE (Virtual Environment) เป็น open source virtualization platform ที่รวม KVM hypervisor และ LXC containers เข้าด้วยกัน รองรับ clustering, high availability และ software-defined storage Batch Processing Pipeline คือระบบประมวลผลข้อมูลเป็นชุดตามกำหนดเวลา การรวมสองแนวคิดนี้ช่วยให้สร้าง batch processing infrastructure บน Proxmox cluster ได้อย่างมีประสิทธิภาพ จัดสรร VMs อัตโนมัติตาม workload, schedule batch jobs และ optimize resources สำหรับ data processing tasks ขนาดใหญ่
Proxmox Cluster สำหรับ Batch Processing
# proxmox_batch.py — Proxmox cluster for batch processing
import json
class ProxmoxBatchCluster:
ARCHITECTURE = {
"management_node": {
"name": "Management Node",
"role": "Proxmox web UI, API, scheduler, orchestrator",
"specs": "4 vCPU, 8GB RAM, 100GB SSD",
"services": ["Proxmox API", "Job Scheduler", "Monitoring"],
},
"compute_nodes": {
"name": "Compute Nodes (3+)",
"role": "รัน batch processing VMs/containers",
"specs": "16-64 vCPU, 64-256GB RAM, NVMe SSDs",
"services": ["KVM VMs", "LXC Containers", "Ceph OSD"],
},
"storage": {
"name": "Shared Storage (Ceph)",
"role": "Distributed storage สำหรับ data และ VM images",
"specs": "Ceph cluster across compute nodes",
"services": ["Ceph RBD (block)", "CephFS (file)", "Ceph RGW (object)"],
},
}
VM_TEMPLATES = {
"spark_worker": {
"name": "Spark Worker",
"template_id": 9001,
"specs": "8 vCPU, 32GB RAM, 200GB disk",
"image": "Ubuntu 22.04 + Spark 3.5 + Python 3.11",
},
"airflow_worker": {
"name": "Airflow Worker",
"template_id": 9002,
"specs": "4 vCPU, 16GB RAM, 100GB disk",
"image": "Ubuntu 22.04 + Airflow 2.8 + Python 3.11",
},
"data_processor": {
"name": "Data Processor (General)",
"template_id": 9003,
"specs": "4 vCPU, 16GB RAM, 100GB disk",
"image": "Ubuntu 22.04 + Python 3.11 + pandas/dask",
},
}
def show_architecture(self):
print("=== Proxmox Batch Cluster ===\n")
for key, node in self.ARCHITECTURE.items():
print(f"[{node['name']}]")
print(f" Role: {node['role']}")
print(f" Specs: {node['specs']}")
print()
def show_templates(self):
print("=== VM Templates ===")
for key, tmpl in self.VM_TEMPLATES.items():
print(f" [{tmpl['name']}] ID:{tmpl['template_id']} | {tmpl['specs']}")
cluster = ProxmoxBatchCluster()
cluster.show_architecture()
cluster.show_templates()
Batch Pipeline Orchestration
# orchestration.py — Batch pipeline orchestration
import json
class BatchOrchestration:
PROXMOX_API = """
# proxmox_api.py — Proxmox API for batch VM management
import requests
import time
import urllib3
urllib3.disable_warnings()
class ProxmoxBatchManager:
def __init__(self, host, user, password, verify_ssl=False):
self.base_url = f"https://{host}:8006/api2/json"
self.verify = verify_ssl
self.ticket = self._auth(user, password)
def _auth(self, user, password):
resp = requests.post(
f"{self.base_url}/access/ticket",
data={"username": user, "password": password},
verify=self.verify,
)
data = resp.json()["data"]
self.headers = {
"CSRFPreventionToken": data["CSRFPreventionToken"],
"Cookie": f"PVEAuthCookie={data['ticket']}",
}
return data["ticket"]
def clone_vm(self, node, template_id, new_id, name):
resp = requests.post(
f"{self.base_url}/nodes/{node}/qemu/{template_id}/clone",
headers=self.headers,
data={"newid": new_id, "name": name, "full": 1},
verify=self.verify,
)
return resp.json()
def start_vm(self, node, vmid):
resp = requests.post(
f"{self.base_url}/nodes/{node}/qemu/{vmid}/status/start",
headers=self.headers,
verify=self.verify,
)
return resp.json()
def stop_vm(self, node, vmid):
resp = requests.post(
f"{self.base_url}/nodes/{node}/qemu/{vmid}/status/stop",
headers=self.headers,
verify=self.verify,
)
return resp.json()
def delete_vm(self, node, vmid):
resp = requests.delete(
f"{self.base_url}/nodes/{node}/qemu/{vmid}",
headers=self.headers,
verify=self.verify,
)
return resp.json()
def spawn_batch_workers(self, node, template_id, count, prefix="worker"):
workers = []
base_id = 2000
for i in range(count):
vmid = base_id + i
name = f"{prefix}-{i+1}"
self.clone_vm(node, template_id, vmid, name)
print(f" Cloned {name} (ID: {vmid})")
workers.append({"vmid": vmid, "name": name, "node": node})
time.sleep(30) # Wait for clones
for w in workers:
self.start_vm(w["node"], w["vmid"])
print(f" Started {w['name']}")
return workers
def cleanup_workers(self, workers):
for w in workers:
self.stop_vm(w["node"], w["vmid"])
time.sleep(5)
self.delete_vm(w["node"], w["vmid"])
print(f" Deleted {w['name']}")
# Usage
pm = ProxmoxBatchManager("proxmox.local", "root@pam", "password")
workers = pm.spawn_batch_workers("pve-1", 9001, 4, "spark-worker")
# ... run batch job ...
pm.cleanup_workers(workers)
"""
def show_api(self):
print("=== Proxmox Batch API ===")
print(self.PROXMOX_API[:600])
orch = BatchOrchestration()
orch.show_api()
Data Pipeline Design
# pipeline.py — Data pipeline on Proxmox
import json
import random
class DataPipeline:
STAGES = {
"ingest": {
"name": "1. Data Ingestion",
"description": "รวบรวมข้อมูลจาก sources ต่างๆ",
"tools": "Kafka, NiFi, Airbyte, custom scripts",
"vm": "LXC container (lightweight)",
},
"process": {
"name": "2. Processing",
"description": "ประมวลผลข้อมูล: transform, clean, aggregate",
"tools": "Spark, Dask, Pandas, dbt",
"vm": "KVM VMs (heavy compute, auto-scaled)",
},
"store": {
"name": "3. Storage",
"description": "เก็บผลลัพธ์: data warehouse, data lake",
"tools": "PostgreSQL, MinIO (S3-compatible), Parquet",
"vm": "Ceph storage cluster",
},
"schedule": {
"name": "4. Scheduling",
"description": "จัดตาราง batch jobs: daily, hourly, event-triggered",
"tools": "Airflow, Prefect, cron",
"vm": "LXC container (always running)",
},
"monitor": {
"name": "5. Monitoring",
"description": "ติดตาม job status, performance, errors",
"tools": "Prometheus, Grafana, Airflow UI",
"vm": "LXC container",
},
}
AIRFLOW_DAG = """
# dags/batch_pipeline.py — Airflow DAG for Proxmox batch
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'proxmox_batch_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
catchup=False,
)
spawn_workers = PythonOperator(
task_id='spawn_workers',
python_callable=lambda: __import__('proxmox_api').spawn_workers(4),
dag=dag,
)
run_spark_job = BashOperator(
task_id='run_spark_job',
bash_command='spark-submit --master spark://master:7077 /opt/jobs/etl.py',
dag=dag,
)
store_results = PythonOperator(
task_id='store_results',
python_callable=lambda: __import__('data_store').save_to_warehouse(),
dag=dag,
)
cleanup = PythonOperator(
task_id='cleanup_workers',
python_callable=lambda: __import__('proxmox_api').cleanup_workers(),
dag=dag,
trigger_rule='all_done',
)
spawn_workers >> run_spark_job >> store_results >> cleanup
"""
def show_stages(self):
print("=== Pipeline Stages ===\n")
for key, stage in self.STAGES.items():
print(f"[{stage['name']}]")
print(f" {stage['description']}")
print(f" Tools: {stage['tools']} | VM: {stage['vm']}")
print()
def show_dag(self):
print("=== Airflow DAG ===")
print(self.AIRFLOW_DAG[:500])
pipeline = DataPipeline()
pipeline.show_stages()
pipeline.show_dag()
Auto-scaling & Resource Management
# autoscale.py — Auto-scaling batch workers
import json
import random
class AutoScaling:
SCALING_RULES = {
"queue_based": {
"name": "Queue-Based Scaling",
"trigger": "จำนวน pending jobs ใน queue",
"rules": "0 jobs → 0 workers | 1-10 jobs → 2 workers | 10+ → 4 workers",
},
"time_based": {
"name": "Time-Based Scaling",
"trigger": "ช่วงเวลา (off-peak batch processing)",
"rules": "2-6 AM → 4 workers | อื่นๆ → 1 worker",
},
"resource_based": {
"name": "Resource-Based Scaling",
"trigger": "Proxmox cluster CPU/RAM usage",
"rules": "Cluster CPU < 50% → scale up | > 80% → scale down",
},
}
AUTOSCALER = """
# autoscaler.py — Proxmox batch auto-scaler
import time
import schedule
from proxmox_api import ProxmoxBatchManager
class BatchAutoScaler:
def __init__(self, proxmox_host, user, password):
self.pm = ProxmoxBatchManager(proxmox_host, user, password)
self.current_workers = 0
self.max_workers = 8
self.template_id = 9003
self.node = "pve-1"
def check_queue(self):
# Check pending jobs (from Airflow/Redis/DB)
import redis
r = redis.Redis()
pending = r.llen("batch_queue")
return pending
def scale(self):
pending = self.check_queue()
desired = min(self.max_workers, max(0, (pending + 4) // 5))
if desired > self.current_workers:
to_add = desired - self.current_workers
print(f" Scaling UP: {self.current_workers} → {desired} (+{to_add})")
self.pm.spawn_batch_workers(self.node, self.template_id, to_add)
self.current_workers = desired
elif desired < self.current_workers:
to_remove = self.current_workers - desired
print(f" Scaling DOWN: {self.current_workers} → {desired} (-{to_remove})")
self.current_workers = desired
def run(self):
schedule.every(5).minutes.do(self.scale)
while True:
schedule.run_pending()
time.sleep(30)
scaler = BatchAutoScaler("proxmox.local", "root@pam", "password")
scaler.run()
"""
def show_rules(self):
print("=== Scaling Rules ===\n")
for key, rule in self.SCALING_RULES.items():
print(f"[{rule['name']}]")
print(f" Trigger: {rule['trigger']}")
print(f" Rules: {rule['rules']}")
print()
def show_autoscaler(self):
print("=== Auto-scaler Script ===")
print(self.AUTOSCALER[:500])
def cluster_metrics(self):
print(f"\n=== Cluster Metrics ===")
metrics = {
"Active workers": random.randint(0, 8),
"Pending jobs": random.randint(0, 50),
"Cluster CPU": f"{random.randint(20, 70)}%",
"Cluster RAM": f"{random.randint(30, 80)}%",
"Storage used": f"{random.randint(40, 75)}%",
"Jobs completed today": random.randint(10, 100),
"Avg job duration": f"{random.randint(5, 60)} min",
}
for m, v in metrics.items():
print(f" {m}: {v}")
scale = AutoScaling()
scale.show_rules()
scale.show_autoscaler()
scale.cluster_metrics()
Monitoring & Troubleshooting
# monitoring.py — Batch pipeline monitoring
import json
import random
class BatchMonitoring:
def job_status(self):
print("=== Job Status (Today) ===\n")
jobs = [
{"name": "ETL Daily", "status": "success", "duration": f"{random.randint(10, 40)}min", "records": f"{random.randint(100, 500)}K"},
{"name": "Report Generation", "status": "success", "duration": f"{random.randint(5, 20)}min", "records": f"{random.randint(10, 50)}K"},
{"name": "ML Training", "status": random.choice(["success", "running"]), "duration": f"{random.randint(30, 120)}min", "records": f"{random.randint(1, 10)}M"},
{"name": "Data Backup", "status": "success", "duration": f"{random.randint(15, 45)}min", "records": f"{random.randint(500, 2000)}K"},
]
for j in jobs:
icon = "OK" if j["status"] == "success" else "RUN" if j["status"] == "running" else "FAIL"
print(f" [{icon:>4}] {j['name']:<25} Duration: {j['duration']:>6} | Records: {j['records']}")
def node_health(self):
print(f"\n=== Node Health ===")
nodes = [
{"name": "pve-1", "cpu": random.randint(20, 60), "ram": random.randint(40, 70), "vms": random.randint(2, 6)},
{"name": "pve-2", "cpu": random.randint(25, 65), "ram": random.randint(45, 75), "vms": random.randint(1, 5)},
{"name": "pve-3", "cpu": random.randint(15, 55), "ram": random.randint(35, 65), "vms": random.randint(2, 4)},
]
for n in nodes:
print(f" [{n['name']}] CPU: {n['cpu']}% | RAM: {n['ram']}% | VMs: {n['vms']}")
def troubleshooting(self):
print(f"\n=== Common Issues ===")
issues = [
{"issue": "VM clone slow", "fix": "ใช้ linked clone แทน full clone, ใช้ Ceph RBD"},
{"issue": "Disk I/O bottleneck", "fix": "ใช้ NVMe SSDs, Ceph with SSD journal"},
{"issue": "Network bottleneck", "fix": "แยก storage network (10GbE), VLAN segmentation"},
{"issue": "Job timeout", "fix": "เพิ่ม resources, optimize query, split large datasets"},
]
for i in issues:
print(f" Issue: {i['issue']}")
print(f" Fix: {i['fix']}")
mon = BatchMonitoring()
mon.job_status()
mon.node_health()
mon.troubleshooting()
FAQ - คำถามที่พบบ่อย
Q: Proxmox กับ Kubernetes อันไหนดีสำหรับ batch processing?
A: Proxmox: VMs + containers, simple setup, bare-metal performance, เหมาะ on-premise Kubernetes: container orchestration, auto-scaling ดีกว่า, cloud-native, ecosystem ใหญ่ ใช้ Proxmox: on-premise hardware, ต้องการ VMs, budget จำกัด, team เล็ก ใช้ K8s: cloud-native, complex scaling, microservices ผสม: Proxmox host → K8s clusters as VMs → batch jobs in pods
Q: Linked clone กับ Full clone อันไหนดี?
A: Linked clone: เร็วมาก (seconds), ใช้ disk น้อย แต่ depend on template Full clone: ช้ากว่า (minutes), independent, ใช้ disk เต็ม สำหรับ batch workers: linked clone (สร้าง/ลบบ่อย ไม่ต้อง persistent) สำหรับ permanent VMs: full clone (independent, portable)
Q: Ceph จำเป็นไหม?
A: จำเป็นถ้าต้องการ: HA (live migration), shared storage ข้าม nodes ไม่จำเป็นถ้า: single node, ใช้ local storage พอ Ceph ต้อง: 3+ nodes, dedicated network (10GbE แนะนำ) ทางเลือก: NFS, iSCSI, ZFS (local only)
Q: ประหยัดค่าใช้จ่ายอย่างไร?
A: Auto-scale: สร้าง workers เฉพาะเมื่อมี jobs, ลบหลังเสร็จ Off-peak: รัน batch jobs ช่วงกลางคืน (ใช้ resources ที่ว่าง) Template: clone จาก template (ไม่ต้อง install ใหม่ทุกครั้ง) LXC: ใช้ containers แทน VMs สำหรับ lightweight tasks (ใช้ resources น้อยกว่า)
