LLM Fine-Tuning ????????? LoRA ?????????????????????
LLM Fine-Tuning ?????????????????????????????? pre-trained language model ?????????????????????????????????????????????????????????????????????????????? ???????????? ????????????????????????????????????????????????????????? ??????????????? code ??????????????????????????? ?????????????????????????????????????????????????????????????????? ?????????????????? dataset ???????????????????????????????????? fine-tune weights ????????? model
LoRA (Low-Rank Adaptation) ?????????????????????????????? parameter-efficient fine-tuning ??????????????????????????????????????? weights ?????????????????????????????? model ???????????????????????? low-rank matrices ???????????????????????????????????????????????? attention layers ??????????????? trainable parameters ???????????? 10,000 ???????????? ????????? billions ??????????????? millions ????????? GPU memory ??????????????????????????? fine-tune model 7B ??????????????? GPU 16GB ???????????????
Message Queue Design ?????????????????? fine-tuning jobs ?????????????????? training requests ????????????????????? users/teams ??????????????????????????? queue ???????????????????????? jobs, allocate GPU resources, track progress, notify ?????????????????????????????? ????????????????????????????????? ML platform ???????????????????????????????????? fine-tuning as a service
???????????????????????????????????????????????? Fine-Tuning Environment
Setup environment ?????????????????? LoRA fine-tuning
# === LoRA Fine-Tuning Setup ===
# 1. Install Dependencies
pip install torch transformers datasets accelerate
pip install peft bitsandbytes trl
pip install wandb # for experiment tracking
# 2. Verify GPU
python3 -c "
import torch
print(f'CUDA available: {torch.cuda.is_available()}')
print(f'GPU: {torch.cuda.get_device_name(0)}')
print(f'VRAM: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB')
"
# 3. Prepare Dataset
cat > prepare_data.py << 'PYEOF'
#!/usr/bin/env python3
from datasets import Dataset
import json
# Example: instruction-following dataset
data = [
{
"instruction": "??????????????????????????? Kubernetes ?????????????????????",
"input": "",
"output": "Kubernetes ????????? container orchestration platform..."
},
{
"instruction": "??????????????? Python function ?????????????????? binary search",
"input": "",
"output": "def binary_search(arr, target):\n left, right = 0, len(arr) - 1..."
},
]
# Format for training
def format_prompt(example):
if example["input"]:
return f"### Instruction:\n{example['instruction']}\n\n### Input:\n{example['input']}\n\n### Response:\n{example['output']}"
return f"### Instruction:\n{example['instruction']}\n\n### Response:\n{example['output']}"
formatted = [{"text": format_prompt(d)} for d in data]
dataset = Dataset.from_list(formatted)
dataset.save_to_disk("training_data")
print(f"Dataset: {len(dataset)} examples")
PYEOF
python3 prepare_data.py
# 4. Download Base Model
python3 -c "
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = 'meta-llama/Meta-Llama-3.1-8B-Instruct'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype='auto')
print(f'Model loaded: {model_name}')
print(f'Parameters: {sum(p.numel() for p in model.parameters()) / 1e9:.1f}B')
"
echo "Environment ready"
Fine-Tune LLM ???????????? LoRA
Training script ?????????????????? LoRA fine-tuning
#!/usr/bin/env python3
# lora_finetune.py ??? LoRA Fine-Tuning Script
import torch
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TrainingArguments,
BitsAndBytesConfig,
)
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from trl import SFTTrainer
from datasets import load_from_disk
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("finetune")
def main():
# 1. Load Model with 4-bit Quantization (QLoRA)
model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct"
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
model_name,
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True,
)
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "right"
# 2. LoRA Configuration
lora_config = LoraConfig(
r=16, # Rank (higher = more capacity, more memory)
lora_alpha=32, # Scaling factor
target_modules=[ # Which layers to add LoRA
"q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj",
],
lora_dropout=0.05,
bias="none",
task_type="CAUSAL_LM",
)
# 3. Prepare Model
model = prepare_model_for_kbit_training(model)
model = get_peft_model(model, lora_config)
trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
total = sum(p.numel() for p in model.parameters())
logger.info(f"Trainable: {trainable:,} / {total:,} ({100*trainable/total:.2f}%)")
# 4. Load Dataset
dataset = load_from_disk("training_data")
# 5. Training Arguments
training_args = TrainingArguments(
output_dir="./lora_output",
num_train_epochs=3,
per_device_train_batch_size=4,
gradient_accumulation_steps=4,
learning_rate=2e-4,
warmup_steps=100,
lr_scheduler_type="cosine",
logging_steps=10,
save_steps=200,
save_total_limit=3,
fp16=True,
optim="paged_adamw_8bit",
max_grad_norm=0.3,
report_to="wandb",
)
# 6. Train
trainer = SFTTrainer(
model=model,
args=training_args,
train_dataset=dataset,
tokenizer=tokenizer,
dataset_text_field="text",
max_seq_length=2048,
packing=True,
)
trainer.train()
# 7. Save LoRA Adapter (small, ~50-200MB)
trainer.save_model("./lora_adapter")
logger.info("LoRA adapter saved to ./lora_adapter")
if __name__ == "__main__":
main()
Message Queue Design ?????????????????? Fine-Tuning Jobs
?????????????????? queue system ?????????????????? training jobs
#!/usr/bin/env python3
# training_queue.py ??? Fine-Tuning Job Queue System
import json
import logging
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional
from enum import Enum
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("queue")
class JobStatus(str, Enum):
PENDING = "pending"
QUEUED = "queued"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TrainingJob:
def __init__(self, user_id, base_model, dataset_path, config):
self.job_id = str(uuid.uuid4())[:8]
self.user_id = user_id
self.base_model = base_model
self.dataset_path = dataset_path
self.config = config
self.status = JobStatus.PENDING
self.created_at = datetime.utcnow().isoformat()
self.started_at = None
self.completed_at = None
self.gpu_id = None
self.metrics = {}
def to_dict(self):
return {
"job_id": self.job_id,
"user_id": self.user_id,
"base_model": self.base_model,
"status": self.status.value,
"created_at": self.created_at,
"gpu_id": self.gpu_id,
"metrics": self.metrics,
}
class TrainingQueueManager:
def __init__(self, max_concurrent=4):
self.queue: List[TrainingJob] = []
self.running: Dict[str, TrainingJob] = {}
self.completed: List[TrainingJob] = []
self.max_concurrent = max_concurrent
self.gpu_pool = {f"gpu-{i}": None for i in range(max_concurrent)}
def submit_job(self, user_id, base_model, dataset_path, config=None):
"""Submit a new fine-tuning job"""
if config is None:
config = {
"lora_r": 16,
"lora_alpha": 32,
"epochs": 3,
"batch_size": 4,
"learning_rate": 2e-4,
"max_seq_length": 2048,
}
job = TrainingJob(user_id, base_model, dataset_path, config)
self.queue.append(job)
job.status = JobStatus.QUEUED
logger.info(f"Job {job.job_id} submitted by {user_id}")
self._try_schedule()
return job.to_dict()
def _try_schedule(self):
"""Try to schedule queued jobs on available GPUs"""
for gpu_id, current_job in self.gpu_pool.items():
if current_job is None and self.queue:
job = self.queue.pop(0)
job.status = JobStatus.RUNNING
job.started_at = datetime.utcnow().isoformat()
job.gpu_id = gpu_id
self.gpu_pool[gpu_id] = job
self.running[job.job_id] = job
logger.info(f"Job {job.job_id} started on {gpu_id}")
def complete_job(self, job_id, metrics=None):
"""Mark job as completed"""
if job_id in self.running:
job = self.running.pop(job_id)
job.status = JobStatus.COMPLETED
job.completed_at = datetime.utcnow().isoformat()
job.metrics = metrics or {}
self.gpu_pool[job.gpu_id] = None
self.completed.append(job)
self._try_schedule()
return job.to_dict()
return None
def get_status(self):
return {
"queued": len(self.queue),
"running": len(self.running),
"completed": len(self.completed),
"available_gpus": sum(1 for v in self.gpu_pool.values() if v is None),
"total_gpus": len(self.gpu_pool),
}
# Demo
manager = TrainingQueueManager(max_concurrent=4)
# Submit jobs
for i in range(6):
result = manager.submit_job(
user_id=f"user_{i}",
base_model="meta-llama/Meta-Llama-3.1-8B-Instruct",
dataset_path=f"s3://datasets/user_{i}/train.jsonl",
)
print(f"Submitted: {result['job_id']} -> {result['status']}")
status = manager.get_status()
print(f"\nQueue Status: {json.dumps(status, indent=2)}")
Distributed Training Pipeline
Deploy training pipeline ?????? Kubernetes
# === Distributed Fine-Tuning Pipeline ===
# 1. Kubernetes Job for Training
cat > k8s/training-job.yaml << 'EOF'
apiVersion: batch/v1
kind: Job
metadata:
name: lora-finetune-
namespace: ml-training
labels:
app: lora-finetune
job-id: ""
spec:
backoffLimit: 2
activeDeadlineSeconds: 86400
template:
metadata:
labels:
app: lora-finetune
spec:
restartPolicy: OnFailure
containers:
- name: trainer
image: ghcr.io/myorg/lora-trainer:latest
env:
- name: JOB_ID
value: ""
- name: BASE_MODEL
value: ""
- name: DATASET_PATH
value: ""
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: ml-secrets
key: redis-url
- name: WANDB_API_KEY
valueFrom:
secretKeyRef:
name: ml-secrets
key: wandb-key
resources:
limits:
nvidia.com/gpu: 1
memory: "32Gi"
cpu: "8"
requests:
nvidia.com/gpu: 1
memory: "24Gi"
cpu: "4"
volumeMounts:
- name: model-cache
mountPath: /root/.cache/huggingface
- name: output
mountPath: /output
- name: shm
mountPath: /dev/shm
volumes:
- name: model-cache
persistentVolumeClaim:
claimName: hf-model-cache
- name: output
persistentVolumeClaim:
claimName: training-output
- name: shm
emptyDir:
medium: Memory
sizeLimit: "8Gi"
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
EOF
# 2. Queue Worker (processes training requests)
cat > worker.py << 'PYEOF'
#!/usr/bin/env python3
import redis
import json
import subprocess
import os
r = redis.Redis.from_url(os.environ["REDIS_URL"])
print("Training worker started...")
while True:
job_data = r.brpop("training:queue", timeout=30)
if job_data is None:
continue
job = json.loads(job_data[1])
job_id = job["job_id"]
r.hset(f"training:job:{job_id}", "status", "running")
try:
# Run training
result = subprocess.run([
"python3", "lora_finetune.py",
"--model", job["base_model"],
"--dataset", job["dataset_path"],
"--output", f"/output/{job_id}",
"--epochs", str(job.get("epochs", 3)),
"--lora-r", str(job.get("lora_r", 16)),
], capture_output=True, text=True, timeout=86400)
if result.returncode == 0:
r.hset(f"training:job:{job_id}", mapping={
"status": "completed",
"output_path": f"/output/{job_id}",
})
else:
r.hset(f"training:job:{job_id}", mapping={
"status": "failed",
"error": result.stderr[:1000],
})
except Exception as e:
r.hset(f"training:job:{job_id}", mapping={
"status": "failed",
"error": str(e),
})
r.publish(f"training:done:{job_id}", "done")
PYEOF
# 3. API Endpoint
cat > api.py << 'PYEOF'
from fastapi import FastAPI
import redis
import json
import uuid
app = FastAPI(title="Fine-Tuning API")
r = redis.Redis.from_url("redis://redis:6379")
@app.post("/finetune")
async def submit_finetune(base_model: str, dataset_path: str, epochs: int = 3):
job_id = str(uuid.uuid4())[:8]
job = {"job_id": job_id, "base_model": base_model, "dataset_path": dataset_path, "epochs": epochs}
r.lpush("training:queue", json.dumps(job))
r.hset(f"training:job:{job_id}", "status", "queued")
return {"job_id": job_id, "status": "queued"}
@app.get("/finetune/{job_id}")
async def get_status(job_id: str):
data = r.hgetall(f"training:job:{job_id}")
return {k.decode(): v.decode() for k, v in data.items()}
PYEOF
echo "Pipeline deployed"
Monitoring ????????? Evaluation
Monitor training ????????? evaluate model
#!/usr/bin/env python3
# training_monitor.py ??? Training Monitoring & Evaluation
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")
class TrainingMonitor:
def __init__(self):
self.experiments = {}
def key_metrics(self):
return {
"training_metrics": {
"train_loss": "Should decrease steadily",
"eval_loss": "Should decrease, watch for overfitting (diverges from train)",
"learning_rate": "Follows scheduler (cosine decay typical)",
"grad_norm": "Should stay < 1.0 with gradient clipping",
"tokens_per_second": "Throughput metric",
},
"evaluation_metrics": {
"perplexity": "Lower is better, measures prediction confidence",
"bleu_score": "For translation tasks (0-100)",
"rouge_score": "For summarization tasks",
"exact_match": "For QA tasks",
"human_eval": "Human preference rating (most important)",
},
"resource_metrics": {
"gpu_utilization": "Should be > 90%",
"gpu_memory": "Monitor for OOM",
"training_time": "Track per epoch",
"cost": "GPU hours * cost per hour",
},
}
def lora_hyperparameter_guide(self):
return {
"rank_r": {
"4": "Minimal adaptation, fast training, small adapter",
"8": "Light adaptation, good for simple tasks",
"16": "Balanced (recommended default)",
"32": "More capacity, for complex tasks",
"64": "High capacity, approaches full fine-tuning",
"128": "Very high, usually diminishing returns",
},
"lora_alpha": {
"rule": "Typically 2x rank (alpha=32 for r=16)",
"effect": "Higher alpha = stronger LoRA effect",
},
"target_modules": {
"minimal": ["q_proj", "v_proj"],
"recommended": ["q_proj", "k_proj", "v_proj", "o_proj"],
"full": ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
},
"learning_rate": {
"qlora_4bit": "2e-4 (higher because quantized)",
"lora_fp16": "1e-4",
"full_finetune": "2e-5 (much lower)",
},
}
def cost_estimate(self, model_size_b, dataset_size_examples, epochs=3):
"""Estimate fine-tuning cost"""
# Rough estimates
gpu_memory_needed = {7: 16, 8: 16, 13: 24, 70: 80}
closest = min(gpu_memory_needed.keys(), key=lambda x: abs(x - model_size_b))
vram = gpu_memory_needed[closest]
# Training time estimate (very rough)
steps = (dataset_size_examples * epochs) / 16 # batch_size * grad_accum
time_per_step = 0.5 if model_size_b <= 13 else 2.0 # seconds
total_hours = (steps * time_per_step) / 3600
gpu_cost_per_hour = 1.0 if vram <= 24 else 3.5 # A10G vs A100
total_cost = total_hours * gpu_cost_per_hour
return {
"model_size": f"{model_size_b}B",
"gpu_needed": f"{vram}GB VRAM",
"estimated_hours": round(total_hours, 1),
"estimated_cost": f"",
"adapter_size": f"~{model_size_b * 10}MB (LoRA r=16)",
}
monitor = TrainingMonitor()
guide = monitor.lora_hyperparameter_guide()
print("LoRA Rank Guide:")
for rank, desc in guide["rank_r"].items():
print(f" r={rank}: {desc}")
cost = monitor.cost_estimate(model_size_b=8, dataset_size_examples=10000, epochs=3)
print(f"\nCost Estimate: {json.dumps(cost, indent=2)}")
FAQ ??????????????????????????????????????????
Q: LoRA ????????? Full Fine-Tuning ???????????????????????????????????????????
A: Full Fine-Tuning ???????????? weights ?????????????????????????????? model ????????????????????? GPU memory ????????? (8B model ???????????? ~80GB) ????????????????????????????????????????????? ?????????????????? ?????????????????????????????? ?????????????????? catastrophic forgetting LoRA ??????????????? low-rank matrices ???????????????????????????????????? attention layers trainable parameters ????????? 0.1-1% ?????????????????????????????? ????????? GPU memory ???????????????????????? 10 ???????????? (8B QLoRA ?????????????????? 16GB) ???????????????????????????????????????????????? full fine-tuning (90-95%) adapter ???????????????????????? (~50-200MB) switch ????????????????????? adapters ????????????????????? ?????????????????? 90% ????????? use cases LoRA ????????????????????????????????????????????????????????????????????????
Q: QLoRA ????????????????????? ????????????????????? LoRA ??????????????????????
A: QLoRA ????????? Quantized LoRA ???????????? base model ???????????? 4-bit quantization (NF4) ??????????????????????????? LoRA adapters ????????????????????? FP16/BF16 ??????????????? ?????? GPU memory ????????? 2-4 ???????????????????????????????????? LoRA ???????????? fine-tune model 7-8B ??????????????? GPU 16GB ??????????????? fine-tune model 70B ??????????????? GPU 80GB (A100) ????????????????????????????????????????????? LoRA FP16 ????????????????????? training ????????????????????? LoRA FP16 ???????????????????????? (~10-20%) ??????????????? quantize/dequantize overhead ?????????????????? GPU memory ??????????????? QLoRA ?????????????????????????????????????????????????????????????????????
Q: ?????????????????? data ???????????????????????????????????????????????? fine-tuning?
A: ????????????????????? task complexity Simple task (classification, formatting) 100-500 examples ????????????????????? Moderate task (domain QA, style transfer) 1,000-5,000 examples Complex task (code generation, reasoning) 10,000-50,000 examples ??????????????????????????????????????????????????????????????? data 500 examples ????????????????????????????????? ?????????????????? 10,000 examples ??????????????????????????? tips data ???????????? diverse ???????????????????????? edge cases, ?????? duplicate, ????????????????????? label quality, ????????? synthetic data ???????????????????????? (????????? GPT-4 ???????????????) ???????????????????????? 500-1,000 examples ???????????? evaluate ?????????????????????????????????????????????????????????
Q: Message Queue ?????????????????????????????? fine-tuning pipeline?
A: Message Queue (Redis, RabbitMQ, Kafka) ??????????????????????????????????????????????????? Job Scheduling ???????????????????????? training jobs ??????????????????????????? ????????????????????? manual allocation, Resource Management allocate GPU ????????? job ??????????????????????????? ??????????????????????????? GPU, Fault Tolerance ????????? training fail re-queue ???????????????????????????, Scalability ??????????????? GPU workers ????????????????????? ???????????????????????? consumers, Async Processing submit job ??????????????????????????????????????? ????????? notification ??????????????????????????????, Multi-tenancy ???????????? users/teams share GPU pool ????????? ?????????????????? ML platform ???????????????????????????????????? fine-tuning message queue ???????????? component ???????????????
