Data Lakehouse ?????? Kubernetes ?????????????????????
Data Lakehouse ???????????? architecture ?????????????????????????????????????????? Data Lake (???????????????????????????????????????, schema-on-read, ?????????????????????) ????????? Data Warehouse (ACID transactions, schema enforcement, performance) ????????????????????????????????? ????????? open table formats ???????????? Delta Lake, Apache Iceberg, Apache Hudi
?????????????????? Data Lakehouse ?????? Kubernetes ???????????? schedule workloads ?????????????????????????????? Spark jobs (batch processing, ETL), Trino/Presto (interactive queries), Airflow (orchestration), MinIO/S3 (object storage), Hive Metastore (metadata) ??????????????? workload ?????? resource requirements ????????????????????? Spark ???????????? CPU+Memory ?????????, Trino ???????????? memory ?????????, Storage ???????????? disk I/O ????????????
Pod Scheduling ????????????????????????????????? Data Lakehouse ???????????? ?????????????????? pods ?????????????????? node type (compute-optimized, memory-optimized, storage-optimized), ????????????????????? noisy neighbor (workload ???????????????????????? resources ??????????????? workload ?????????), ?????????????????? auto-scaling ????????? workload, ?????????????????? priority ????????????????????? production queries ????????? batch jobs
Pod Scheduling Strategies ?????????????????? Data Workloads
?????????????????????????????? schedule pods ?????????????????? data platform
# === Pod Scheduling Strategies ===
# 1. Node Pool Configuration
cat > node_pools.yaml << 'EOF'
node_pools:
compute_optimized:
description: "Spark executors, heavy ETL"
instance_type: "c6i.4xlarge" # 16 vCPU, 32 GB
labels:
workload-type: compute
node-pool: compute-optimized
taints:
- key: workload-type
value: compute
effect: NoSchedule
autoscaling:
min: 2
max: 20
memory_optimized:
description: "Trino workers, large joins, caching"
instance_type: "r6i.4xlarge" # 16 vCPU, 128 GB
labels:
workload-type: memory
node-pool: memory-optimized
taints:
- key: workload-type
value: memory
effect: NoSchedule
autoscaling:
min: 2
max: 10
storage_optimized:
description: "MinIO, local storage, shuffle"
instance_type: "i3.2xlarge" # 8 vCPU, 61 GB, NVMe
labels:
workload-type: storage
node-pool: storage-optimized
taints:
- key: workload-type
value: storage
effect: NoSchedule
autoscaling:
min: 3
max: 6
general:
description: "Airflow, Hive Metastore, monitoring"
instance_type: "m6i.2xlarge" # 8 vCPU, 32 GB
labels:
workload-type: general
autoscaling:
min: 2
max: 5
EOF
# 2. Terraform for EKS Node Groups
cat > eks_nodegroups.tf << 'EOF'
resource "aws_eks_node_group" "compute" {
cluster_name = aws_eks_cluster.lakehouse.name
node_group_name = "compute-optimized"
node_role_arn = aws_iam_role.node.arn
subnet_ids = var.private_subnet_ids
instance_types = ["c6i.4xlarge"]
scaling_config {
desired_size = 2
min_size = 2
max_size = 20
}
labels = {
"workload-type" = "compute"
"node-pool" = "compute-optimized"
}
taint {
key = "workload-type"
value = "compute"
effect = "NO_SCHEDULE"
}
}
EOF
echo "Node pools configured"
Node Affinity ????????? Topology
???????????????????????? pods ?????????????????? node ?????????
# === Node Affinity & Topology ===
# 1. Spark on Compute Nodes
cat > spark/spark-application.yaml << 'EOF'
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: etl-daily-transform
namespace: data-platform
spec:
type: Python
mode: cluster
image: myregistry/spark:3.5.0
mainApplicationFile: "s3a://scripts/daily_transform.py"
driver:
cores: 2
memory: "4g"
nodeSelector:
workload-type: general
tolerations:
- key: workload-type
operator: Equal
value: general
effect: NoSchedule
executor:
cores: 4
memory: "8g"
instances: 10
nodeSelector:
workload-type: compute
tolerations:
- key: workload-type
operator: Equal
value: compute
effect: NoSchedule
affinity:
podAntiAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 100
podAffinityTerm:
labelSelector:
matchExpressions:
- key: spark-role
operator: In
values: [executor]
topologyKey: kubernetes.io/hostname
---
# 2. Trino on Memory Nodes
apiVersion: apps/v1
kind: Deployment
metadata:
name: trino-worker
namespace: data-platform
spec:
replicas: 5
template:
spec:
nodeSelector:
workload-type: memory
tolerations:
- key: workload-type
operator: Equal
value: memory
effect: NoSchedule
topologySpreadConstraints:
- maxSkew: 1
topologyKey: topology.kubernetes.io/zone
whenUnsatisfiable: DoNotSchedule
labelSelector:
matchLabels:
app: trino-worker
containers:
- name: trino
image: trinodb/trino:440
resources:
requests:
cpu: "8"
memory: "64Gi"
limits:
cpu: "14"
memory: "100Gi"
env:
- name: JAVA_TOOL_OPTIONS
value: "-Xmx80G -XX:+UseG1GC"
---
# 3. MinIO on Storage Nodes
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: minio
namespace: data-platform
spec:
replicas: 4
template:
spec:
nodeSelector:
workload-type: storage
tolerations:
- key: workload-type
operator: Equal
value: storage
effect: NoSchedule
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchLabels:
app: minio
topologyKey: kubernetes.io/hostname
EOF
kubectl apply -f spark/spark-application.yaml
echo "Affinity and topology configured"
Resource Management ?????????????????? Spark/Trino
?????????????????? resources ?????????????????? data workloads
#!/usr/bin/env python3
# resource_manager.py ??? Data Lakehouse Resource Management
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("resources")
class LakehouseResourceManager:
"""Resource management for Data Lakehouse on Kubernetes"""
def __init__(self):
self.node_pools = {}
self.workloads = {}
def add_node_pool(self, name, nodes, cpu_per_node, memory_gb_per_node):
self.node_pools[name] = {
"nodes": nodes,
"cpu_per_node": cpu_per_node,
"memory_gb": memory_gb_per_node,
"total_cpu": nodes * cpu_per_node,
"total_memory_gb": nodes * memory_gb_per_node,
}
def calculate_spark_resources(self, data_size_gb, complexity="medium"):
"""Calculate Spark executor resources based on data size"""
multipliers = {"low": 0.5, "medium": 1.0, "high": 2.0}
mult = multipliers.get(complexity, 1.0)
# Rule of thumb: 1 executor per 2-5 GB of data
num_executors = max(2, int(data_size_gb / 3 * mult))
executor_cores = 4
executor_memory_gb = 8
# Driver
driver_cores = 2
driver_memory_gb = 4
total_cpu = num_executors * executor_cores + driver_cores
total_memory = num_executors * executor_memory_gb + driver_memory_gb
return {
"executors": num_executors,
"executor_cores": executor_cores,
"executor_memory_gb": executor_memory_gb,
"driver_cores": driver_cores,
"driver_memory_gb": driver_memory_gb,
"total_cpu": total_cpu,
"total_memory_gb": total_memory,
"estimated_runtime_min": int(data_size_gb / num_executors * 2 * mult),
}
def calculate_trino_resources(self, concurrent_queries, avg_data_scanned_gb):
"""Calculate Trino worker resources"""
# 1 worker per 2-3 concurrent queries
num_workers = max(3, concurrent_queries // 2)
worker_memory_gb = max(32, avg_data_scanned_gb * 2)
worker_cpu = 8
return {
"workers": num_workers,
"worker_cpu": worker_cpu,
"worker_memory_gb": worker_memory_gb,
"coordinator_cpu": 4,
"coordinator_memory_gb": 16,
"total_cpu": num_workers * worker_cpu + 4,
"total_memory_gb": num_workers * worker_memory_gb + 16,
}
def resource_quota(self):
return {
"namespaces": {
"spark-production": {
"cpu": "200",
"memory": "800Gi",
"pods": 500,
"priority": "production",
},
"spark-development": {
"cpu": "50",
"memory": "200Gi",
"pods": 100,
"priority": "development",
},
"trino": {
"cpu": "100",
"memory": "1Ti",
"pods": 50,
"priority": "production",
},
},
}
manager = LakehouseResourceManager()
manager.add_node_pool("compute", 10, 16, 32)
manager.add_node_pool("memory", 5, 16, 128)
manager.add_node_pool("storage", 4, 8, 61)
# Calculate for daily ETL (500 GB data)
spark = manager.calculate_spark_resources(500, "medium")
print(f"Spark Resources (500GB ETL):")
print(f" Executors: {spark['executors']} x {spark['executor_cores']}CPU/{spark['executor_memory_gb']}GB")
print(f" Total: {spark['total_cpu']} CPU, {spark['total_memory_gb']} GB")
print(f" Est. runtime: {spark['estimated_runtime_min']} min")
# Calculate for Trino (20 concurrent queries)
trino = manager.calculate_trino_resources(20, 50)
print(f"\nTrino Resources (20 concurrent queries):")
print(f" Workers: {trino['workers']} x {trino['worker_cpu']}CPU/{trino['worker_memory_gb']}GB")
print(f" Total: {trino['total_cpu']} CPU, {trino['total_memory_gb']} GB")
Priority ????????? Preemption
???????????????????????????????????????????????????????????? workloads
# === Priority Classes & Preemption ===
cat > priority_classes.yaml << 'EOF'
# Priority Classes for Data Lakehouse
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: critical-infrastructure
value: 1000000
globalDefault: false
description: "MinIO, Hive Metastore, Airflow scheduler"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: production-queries
value: 500000
globalDefault: false
description: "Production Trino queries, real-time dashboards"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: production-etl
value: 300000
globalDefault: false
description: "Production Spark ETL jobs"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: development
value: 100000
globalDefault: true
description: "Development and ad-hoc workloads"
preemptionPolicy: PreemptLowerPriority
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: batch-low-priority
value: 10000
globalDefault: false
description: "Non-urgent batch jobs, backfills"
preemptionPolicy: Never
---
# Resource Quotas per namespace
apiVersion: v1
kind: ResourceQuota
metadata:
name: spark-production-quota
namespace: spark-production
spec:
hard:
requests.cpu: "200"
requests.memory: "800Gi"
limits.cpu: "250"
limits.memory: "1000Gi"
pods: "500"
---
apiVersion: v1
kind: LimitRange
metadata:
name: spark-limits
namespace: spark-production
spec:
limits:
- type: Container
default:
cpu: "4"
memory: "8Gi"
defaultRequest:
cpu: "2"
memory: "4Gi"
max:
cpu: "16"
memory: "64Gi"
min:
cpu: "0.5"
memory: "512Mi"
EOF
kubectl apply -f priority_classes.yaml
echo "Priority classes and quotas configured"
Monitoring ????????? Optimization
??????????????????????????????????????????????????? scheduling
#!/usr/bin/env python3
# scheduling_monitor.py ??? Pod Scheduling Monitor
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("scheduler")
class SchedulingMonitor:
def __init__(self):
pass
def dashboard(self):
return {
"cluster_utilization": {
"compute_pool": {"cpu_used": "72%", "memory_used": "68%", "nodes": "8/10"},
"memory_pool": {"cpu_used": "45%", "memory_used": "82%", "nodes": "5/5"},
"storage_pool": {"cpu_used": "35%", "memory_used": "55%", "nodes": "4/4"},
"general_pool": {"cpu_used": "60%", "memory_used": "55%", "nodes": "3/3"},
},
"scheduling_stats_1h": {
"pods_scheduled": 145,
"pods_pending": 8,
"pods_preempted": 2,
"avg_scheduling_latency_ms": 250,
"failed_scheduling": 1,
"reason": "Insufficient memory on memory-optimized nodes",
},
"workload_breakdown": {
"spark_executors": {"running": 80, "pending": 5, "cpu": "320", "memory": "640Gi"},
"trino_workers": {"running": 5, "pending": 0, "cpu": "40", "memory": "320Gi"},
"airflow_tasks": {"running": 12, "pending": 3, "cpu": "24", "memory": "48Gi"},
"minio": {"running": 4, "pending": 0, "cpu": "8", "memory": "32Gi"},
},
"optimization_suggestions": [
"Compute pool: Scale up 2 nodes (CPU utilization > 70%)",
"Memory pool: Trino workers using 82% memory, consider r6i.8xlarge",
"Spark: 5 executors pending, increase compute pool max to 25",
"Development jobs running during peak hours, consider scheduling off-peak",
],
}
monitor = SchedulingMonitor()
dash = monitor.dashboard()
print("Data Lakehouse Scheduling Dashboard:")
print("\nCluster Utilization:")
for pool, stats in dash["cluster_utilization"].items():
print(f" {pool}: CPU={stats['cpu_used']}, Mem={stats['memory_used']}, Nodes={stats['nodes']}")
print(f"\nScheduling (1h): {dash['scheduling_stats_1h']['pods_scheduled']} scheduled, {dash['scheduling_stats_1h']['pods_pending']} pending")
print("\nWorkloads:")
for name, info in dash["workload_breakdown"].items():
print(f" {name}: {info['running']} running, {info['pending']} pending")
print("\nSuggestions:")
for s in dash["optimization_suggestions"]:
print(f" - {s}")
FAQ ??????????????????????????????????????????
Q: ????????????????????????????????? Node Pool ?????????????????? Data Workloads?
A: Data workloads ?????? resource profiles ?????????????????????????????? Spark executors ???????????? CPU ????????? (compute-bound) ?????????????????? compute-optimized instances (c-series), Trino workers ???????????? memory ????????? (memory-bound) ?????????????????? memory-optimized instances (r-series), MinIO/storage ???????????? disk I/O ????????? ?????????????????? storage-optimized instances (i-series) ?????????????????? node pool ??????????????? Spark ????????? schedule ?????? memory node (???????????????????????????????????? memory ???????????????????????????) ???????????? Trino ????????????????????? compute node (memory ???????????????) ?????????????????? pools + taints/tolerations ?????????????????????????????????????????? workload ?????????????????? node ?????????????????????????????? ???????????????????????????????????????????????????????????? performance ??????????????????
Q: Spark on Kubernetes ????????? Spark on YARN ???????????????????????????????????????????
A: Spark on YARN ????????? Hadoop YARN ???????????? resource manager ???????????? maintain Hadoop cluster ????????? resource sharing ????????? Hadoop ecosystem (Hive, HBase) mature ????????? infrastructure ???????????? Spark on Kubernetes ????????? K8s scheduler ?????????????????? pods ????????????????????? maintain Hadoop cluster ????????? containerized (consistent environments) auto-scaling ?????????????????? (Cluster Autoscaler, Karpenter) share infrastructure ????????? services ???????????? ??????????????? ????????????????????????????????????????????? Spark on K8s ??????????????? Hadoop ecosystem ????????????????????????????????? YARN ??????????????? Spark Operator (GoogleCloudPlatform/spark-on-k8s-operator) ??????????????? submit Spark jobs ?????? K8s ?????????????????????
Q: Karpenter ????????? Cluster Autoscaler ??????????????????????????????????
A: Cluster Autoscaler (CA) ???????????? default autoscaler ?????????????????? K8s ?????????????????????????????? node group scale ???????????? node group ????????? pending pods, ????????? (1-3 ????????????), ???????????? pre-define instance types Karpenter ???????????? AWS-native autoscaler ??????????????? instance type ???????????????????????????????????? pod requirements, ???????????????????????? CA (30 ?????????????????? - 1 ????????????), consolidation (????????? pods ?????? nodes ?????????????????? ?????? cost), Spot instance support ?????? ?????????????????? Data Lakehouse ??????????????? Karpenter ??????????????? Spark jobs ?????? variable resource needs Karpenter ??????????????? instance type ?????????????????????????????? requirements ????????????????????????????????????????????? 20-40% ???????????????????????? fixed node groups
Q: ?????????????????? Spot Instances ?????????????????? Spark ??????????????????????
A: Spot Instances ????????????????????? On-Demand 60-90% ????????????????????????????????? Spark executors (stateless, replaceable) ?????????????????????????????????????????? Spark drivers, Trino coordinators, MinIO (stateful) ????????????????????? ????????? Spot ?????????????????? executors ????????????????????????, Driver ????????? On-Demand, ???????????? spark.kubernetes.node.selector ?????????????????? Spot nodes, ???????????? Dynamic Allocation ????????? Spark ???????????? executors ???????????????????????????, ???????????? graceful decommission ??????????????? Spot ????????? reclaim, ????????????????????? instance types ????????????????????? interruption, ???????????? shuffle data ?????? external storage (S3) ?????????????????? local disk Karpenter ?????????????????? Spot + fallback On-Demand ????????????????????????????????????