SiamCafe.net Blog
Technology
Proxmox VE Cluster Automation Script | SiamCafe Blog
2026-01-29· อ. บอม — SiamCafe.net· 11,631 คำ

Data Lakehouse ?????? Kubernetes ?????????????????????

Data Lakehouse ???????????? architecture ?????????????????????????????????????????? Data Lake (???????????????????????????????????????, schema-on-read, ?????????????????????) ????????? Data Warehouse (ACID transactions, schema enforcement, performance) ????????????????????????????????? ????????? open table formats ???????????? Delta Lake, Apache Iceberg, Apache Hudi

?????????????????? Data Lakehouse ?????? Kubernetes ???????????? schedule workloads ?????????????????????????????? Spark jobs (batch processing, ETL), Trino/Presto (interactive queries), Airflow (orchestration), MinIO/S3 (object storage), Hive Metastore (metadata) ??????????????? workload ?????? resource requirements ????????????????????? Spark ???????????? CPU+Memory ?????????, Trino ???????????? memory ?????????, Storage ???????????? disk I/O ????????????

Pod Scheduling ????????????????????????????????? Data Lakehouse ???????????? ?????????????????? pods ?????????????????? node type (compute-optimized, memory-optimized, storage-optimized), ????????????????????? noisy neighbor (workload ???????????????????????? resources ??????????????? workload ?????????), ?????????????????? auto-scaling ????????? workload, ?????????????????? priority ????????????????????? production queries ????????? batch jobs

Pod Scheduling Strategies ?????????????????? Data Workloads

?????????????????????????????? schedule pods ?????????????????? data platform

# === Pod Scheduling Strategies ===

# 1. Node Pool Configuration
cat > node_pools.yaml << 'EOF'
node_pools:
  compute_optimized:
    description: "Spark executors, heavy ETL"
    instance_type: "c6i.4xlarge"  # 16 vCPU, 32 GB
    labels:
      workload-type: compute
      node-pool: compute-optimized
    taints:
      - key: workload-type
        value: compute
        effect: NoSchedule
    autoscaling:
      min: 2
      max: 20
      
  memory_optimized:
    description: "Trino workers, large joins, caching"
    instance_type: "r6i.4xlarge"  # 16 vCPU, 128 GB
    labels:
      workload-type: memory
      node-pool: memory-optimized
    taints:
      - key: workload-type
        value: memory
        effect: NoSchedule
    autoscaling:
      min: 2
      max: 10
      
  storage_optimized:
    description: "MinIO, local storage, shuffle"
    instance_type: "i3.2xlarge"  # 8 vCPU, 61 GB, NVMe
    labels:
      workload-type: storage
      node-pool: storage-optimized
    taints:
      - key: workload-type
        value: storage
        effect: NoSchedule
    autoscaling:
      min: 3
      max: 6

  general:
    description: "Airflow, Hive Metastore, monitoring"
    instance_type: "m6i.2xlarge"  # 8 vCPU, 32 GB
    labels:
      workload-type: general
    autoscaling:
      min: 2
      max: 5
EOF

# 2. Terraform for EKS Node Groups
cat > eks_nodegroups.tf << 'EOF'
resource "aws_eks_node_group" "compute" {
  cluster_name    = aws_eks_cluster.lakehouse.name
  node_group_name = "compute-optimized"
  node_role_arn   = aws_iam_role.node.arn
  subnet_ids      = var.private_subnet_ids
  instance_types  = ["c6i.4xlarge"]
  
  scaling_config {
    desired_size = 2
    min_size     = 2
    max_size     = 20
  }
  
  labels = {
    "workload-type" = "compute"
    "node-pool"     = "compute-optimized"
  }
  
  taint {
    key    = "workload-type"
    value  = "compute"
    effect = "NO_SCHEDULE"
  }
}
EOF

echo "Node pools configured"

Node Affinity ????????? Topology

???????????????????????? pods ?????????????????? node ?????????

# === Node Affinity & Topology ===

# 1. Spark on Compute Nodes
cat > spark/spark-application.yaml << 'EOF'
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: etl-daily-transform
  namespace: data-platform
spec:
  type: Python
  mode: cluster
  image: myregistry/spark:3.5.0
  mainApplicationFile: "s3a://scripts/daily_transform.py"
  
  driver:
    cores: 2
    memory: "4g"
    nodeSelector:
      workload-type: general
    tolerations:
      - key: workload-type
        operator: Equal
        value: general
        effect: NoSchedule

  executor:
    cores: 4
    memory: "8g"
    instances: 10
    nodeSelector:
      workload-type: compute
    tolerations:
      - key: workload-type
        operator: Equal
        value: compute
        effect: NoSchedule
    affinity:
      podAntiAffinity:
        preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                  - key: spark-role
                    operator: In
                    values: [executor]
              topologyKey: kubernetes.io/hostname
---
# 2. Trino on Memory Nodes
apiVersion: apps/v1
kind: Deployment
metadata:
  name: trino-worker
  namespace: data-platform
spec:
  replicas: 5
  template:
    spec:
      nodeSelector:
        workload-type: memory
      tolerations:
        - key: workload-type
          operator: Equal
          value: memory
          effect: NoSchedule
      topologySpreadConstraints:
        - maxSkew: 1
          topologyKey: topology.kubernetes.io/zone
          whenUnsatisfiable: DoNotSchedule
          labelSelector:
            matchLabels:
              app: trino-worker
      containers:
        - name: trino
          image: trinodb/trino:440
          resources:
            requests:
              cpu: "8"
              memory: "64Gi"
            limits:
              cpu: "14"
              memory: "100Gi"
          env:
            - name: JAVA_TOOL_OPTIONS
              value: "-Xmx80G -XX:+UseG1GC"
---
# 3. MinIO on Storage Nodes
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: minio
  namespace: data-platform
spec:
  replicas: 4
  template:
    spec:
      nodeSelector:
        workload-type: storage
      tolerations:
        - key: workload-type
          operator: Equal
          value: storage
          effect: NoSchedule
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchLabels:
                  app: minio
              topologyKey: kubernetes.io/hostname
EOF

kubectl apply -f spark/spark-application.yaml
echo "Affinity and topology configured"

Resource Management ?????????????????? Spark/Trino

?????????????????? resources ?????????????????? data workloads

#!/usr/bin/env python3
# resource_manager.py ??? Data Lakehouse Resource Management
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("resources")

class LakehouseResourceManager:
    """Resource management for Data Lakehouse on Kubernetes"""
    
    def __init__(self):
        self.node_pools = {}
        self.workloads = {}
    
    def add_node_pool(self, name, nodes, cpu_per_node, memory_gb_per_node):
        self.node_pools[name] = {
            "nodes": nodes,
            "cpu_per_node": cpu_per_node,
            "memory_gb": memory_gb_per_node,
            "total_cpu": nodes * cpu_per_node,
            "total_memory_gb": nodes * memory_gb_per_node,
        }
    
    def calculate_spark_resources(self, data_size_gb, complexity="medium"):
        """Calculate Spark executor resources based on data size"""
        multipliers = {"low": 0.5, "medium": 1.0, "high": 2.0}
        mult = multipliers.get(complexity, 1.0)
        
        # Rule of thumb: 1 executor per 2-5 GB of data
        num_executors = max(2, int(data_size_gb / 3 * mult))
        executor_cores = 4
        executor_memory_gb = 8
        
        # Driver
        driver_cores = 2
        driver_memory_gb = 4
        
        total_cpu = num_executors * executor_cores + driver_cores
        total_memory = num_executors * executor_memory_gb + driver_memory_gb
        
        return {
            "executors": num_executors,
            "executor_cores": executor_cores,
            "executor_memory_gb": executor_memory_gb,
            "driver_cores": driver_cores,
            "driver_memory_gb": driver_memory_gb,
            "total_cpu": total_cpu,
            "total_memory_gb": total_memory,
            "estimated_runtime_min": int(data_size_gb / num_executors * 2 * mult),
        }
    
    def calculate_trino_resources(self, concurrent_queries, avg_data_scanned_gb):
        """Calculate Trino worker resources"""
        # 1 worker per 2-3 concurrent queries
        num_workers = max(3, concurrent_queries // 2)
        worker_memory_gb = max(32, avg_data_scanned_gb * 2)
        worker_cpu = 8
        
        return {
            "workers": num_workers,
            "worker_cpu": worker_cpu,
            "worker_memory_gb": worker_memory_gb,
            "coordinator_cpu": 4,
            "coordinator_memory_gb": 16,
            "total_cpu": num_workers * worker_cpu + 4,
            "total_memory_gb": num_workers * worker_memory_gb + 16,
        }
    
    def resource_quota(self):
        return {
            "namespaces": {
                "spark-production": {
                    "cpu": "200",
                    "memory": "800Gi",
                    "pods": 500,
                    "priority": "production",
                },
                "spark-development": {
                    "cpu": "50",
                    "memory": "200Gi",
                    "pods": 100,
                    "priority": "development",
                },
                "trino": {
                    "cpu": "100",
                    "memory": "1Ti",
                    "pods": 50,
                    "priority": "production",
                },
            },
        }

manager = LakehouseResourceManager()
manager.add_node_pool("compute", 10, 16, 32)
manager.add_node_pool("memory", 5, 16, 128)
manager.add_node_pool("storage", 4, 8, 61)

# Calculate for daily ETL (500 GB data)
spark = manager.calculate_spark_resources(500, "medium")
print(f"Spark Resources (500GB ETL):")
print(f"  Executors: {spark['executors']} x {spark['executor_cores']}CPU/{spark['executor_memory_gb']}GB")
print(f"  Total: {spark['total_cpu']} CPU, {spark['total_memory_gb']} GB")
print(f"  Est. runtime: {spark['estimated_runtime_min']} min")

# Calculate for Trino (20 concurrent queries)
trino = manager.calculate_trino_resources(20, 50)
print(f"\nTrino Resources (20 concurrent queries):")
print(f"  Workers: {trino['workers']} x {trino['worker_cpu']}CPU/{trino['worker_memory_gb']}GB")
print(f"  Total: {trino['total_cpu']} CPU, {trino['total_memory_gb']} GB")

Priority ????????? Preemption

???????????????????????????????????????????????????????????? workloads

# === Priority Classes & Preemption ===

cat > priority_classes.yaml << 'EOF'
# Priority Classes for Data Lakehouse
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: critical-infrastructure
value: 1000000
globalDefault: false
description: "MinIO, Hive Metastore, Airflow scheduler"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: production-queries
value: 500000
globalDefault: false
description: "Production Trino queries, real-time dashboards"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: production-etl
value: 300000
globalDefault: false
description: "Production Spark ETL jobs"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: development
value: 100000
globalDefault: true
description: "Development and ad-hoc workloads"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
  name: batch-low-priority
value: 10000
globalDefault: false
description: "Non-urgent batch jobs, backfills"
preemptionPolicy: Never
---
# Resource Quotas per namespace
apiVersion: v1
kind: ResourceQuota
metadata:
  name: spark-production-quota
  namespace: spark-production
spec:
  hard:
    requests.cpu: "200"
    requests.memory: "800Gi"
    limits.cpu: "250"
    limits.memory: "1000Gi"
    pods: "500"
---
apiVersion: v1
kind: LimitRange
metadata:
  name: spark-limits
  namespace: spark-production
spec:
  limits:
    - type: Container
      default:
        cpu: "4"
        memory: "8Gi"
      defaultRequest:
        cpu: "2"
        memory: "4Gi"
      max:
        cpu: "16"
        memory: "64Gi"
      min:
        cpu: "0.5"
        memory: "512Mi"
EOF

kubectl apply -f priority_classes.yaml
echo "Priority classes and quotas configured"

Monitoring ????????? Optimization

??????????????????????????????????????????????????? scheduling

#!/usr/bin/env python3
# scheduling_monitor.py ??? Pod Scheduling Monitor
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("scheduler")

class SchedulingMonitor:
    def __init__(self):
        pass
    
    def dashboard(self):
        return {
            "cluster_utilization": {
                "compute_pool": {"cpu_used": "72%", "memory_used": "68%", "nodes": "8/10"},
                "memory_pool": {"cpu_used": "45%", "memory_used": "82%", "nodes": "5/5"},
                "storage_pool": {"cpu_used": "35%", "memory_used": "55%", "nodes": "4/4"},
                "general_pool": {"cpu_used": "60%", "memory_used": "55%", "nodes": "3/3"},
            },
            "scheduling_stats_1h": {
                "pods_scheduled": 145,
                "pods_pending": 8,
                "pods_preempted": 2,
                "avg_scheduling_latency_ms": 250,
                "failed_scheduling": 1,
                "reason": "Insufficient memory on memory-optimized nodes",
            },
            "workload_breakdown": {
                "spark_executors": {"running": 80, "pending": 5, "cpu": "320", "memory": "640Gi"},
                "trino_workers": {"running": 5, "pending": 0, "cpu": "40", "memory": "320Gi"},
                "airflow_tasks": {"running": 12, "pending": 3, "cpu": "24", "memory": "48Gi"},
                "minio": {"running": 4, "pending": 0, "cpu": "8", "memory": "32Gi"},
            },
            "optimization_suggestions": [
                "Compute pool: Scale up 2 nodes (CPU utilization > 70%)",
                "Memory pool: Trino workers using 82% memory, consider r6i.8xlarge",
                "Spark: 5 executors pending, increase compute pool max to 25",
                "Development jobs running during peak hours, consider scheduling off-peak",
            ],
        }

monitor = SchedulingMonitor()
dash = monitor.dashboard()
print("Data Lakehouse Scheduling Dashboard:")
print("\nCluster Utilization:")
for pool, stats in dash["cluster_utilization"].items():
    print(f"  {pool}: CPU={stats['cpu_used']}, Mem={stats['memory_used']}, Nodes={stats['nodes']}")

print(f"\nScheduling (1h): {dash['scheduling_stats_1h']['pods_scheduled']} scheduled, {dash['scheduling_stats_1h']['pods_pending']} pending")

print("\nWorkloads:")
for name, info in dash["workload_breakdown"].items():
    print(f"  {name}: {info['running']} running, {info['pending']} pending")

print("\nSuggestions:")
for s in dash["optimization_suggestions"]:
    print(f"  - {s}")

FAQ ??????????????????????????????????????????

Q: ????????????????????????????????? Node Pool ?????????????????? Data Workloads?

A: Data workloads ?????? resource profiles ?????????????????????????????? Spark executors ???????????? CPU ????????? (compute-bound) ?????????????????? compute-optimized instances (c-series), Trino workers ???????????? memory ????????? (memory-bound) ?????????????????? memory-optimized instances (r-series), MinIO/storage ???????????? disk I/O ????????? ?????????????????? storage-optimized instances (i-series) ?????????????????? node pool ??????????????? Spark ????????? schedule ?????? memory node (???????????????????????????????????? memory ???????????????????????????) ???????????? Trino ????????????????????? compute node (memory ???????????????) ?????????????????? pools + taints/tolerations ?????????????????????????????????????????? workload ?????????????????? node ?????????????????????????????? ???????????????????????????????????????????????????????????? performance ??????????????????

Q: Spark on Kubernetes ????????? Spark on YARN ???????????????????????????????????????????

A: Spark on YARN ????????? Hadoop YARN ???????????? resource manager ???????????? maintain Hadoop cluster ????????? resource sharing ????????? Hadoop ecosystem (Hive, HBase) mature ????????? infrastructure ???????????? Spark on Kubernetes ????????? K8s scheduler ?????????????????? pods ????????????????????? maintain Hadoop cluster ????????? containerized (consistent environments) auto-scaling ?????????????????? (Cluster Autoscaler, Karpenter) share infrastructure ????????? services ???????????? ??????????????? ????????????????????????????????????????????? Spark on K8s ??????????????? Hadoop ecosystem ????????????????????????????????? YARN ??????????????? Spark Operator (GoogleCloudPlatform/spark-on-k8s-operator) ??????????????? submit Spark jobs ?????? K8s ?????????????????????

Q: Karpenter ????????? Cluster Autoscaler ??????????????????????????????????

A: Cluster Autoscaler (CA) ???????????? default autoscaler ?????????????????? K8s ?????????????????????????????? node group scale ???????????? node group ????????? pending pods, ????????? (1-3 ????????????), ???????????? pre-define instance types Karpenter ???????????? AWS-native autoscaler ??????????????? instance type ???????????????????????????????????? pod requirements, ???????????????????????? CA (30 ?????????????????? - 1 ????????????), consolidation (????????? pods ?????? nodes ?????????????????? ?????? cost), Spot instance support ?????? ?????????????????? Data Lakehouse ??????????????? Karpenter ??????????????? Spark jobs ?????? variable resource needs Karpenter ??????????????? instance type ?????????????????????????????? requirements ????????????????????????????????????????????? 20-40% ???????????????????????? fixed node groups

Q: ?????????????????? Spot Instances ?????????????????? Spark ??????????????????????

A: Spot Instances ????????????????????? On-Demand 60-90% ????????????????????????????????? Spark executors (stateless, replaceable) ?????????????????????????????????????????? Spark drivers, Trino coordinators, MinIO (stateful) ????????????????????? ????????? Spot ?????????????????? executors ????????????????????????, Driver ????????? On-Demand, ???????????? spark.kubernetes.node.selector ?????????????????? Spot nodes, ???????????? Dynamic Allocation ????????? Spark ???????????? executors ???????????????????????????, ???????????? graceful decommission ??????????????? Spot ????????? reclaim, ????????????????????? instance types ????????????????????? interruption, ???????????? shuffle data ?????? external storage (S3) ?????????????????? local disk Karpenter ?????????????????? Spot + fallback On-Demand ????????????????????????????????????

📖 บทความที่เกี่ยวข้อง

Prometheus Alertmanager Automation Scriptอ่านบทความ → Proxmox VE Cluster Chaos Engineeringอ่านบทความ → Proxmox VE Cluster Compliance Automationอ่านบทความ → Proxmox VE Cluster Database Migrationอ่านบทความ → Proxmox VE Cluster อ่านบทความ →

📚 ดูบทความทั้งหมด →