SiamCafe.net Blog
Technology

LLM Fine-Tuning LoRA Message Queue Design สร้างระบบ Fine-Tune LLM อัตโนมัติ

llm fine tuning lora message queue design
LLM Fine-tuning LoRA Message Queue Design | SiamCafe Blog
2025-09-30· อ. บอม — SiamCafe.net· 1,388 คำ

LLM Fine-Tuning ????????? LoRA ?????????????????????

LLM Fine-Tuning ?????????????????????????????? pre-trained language model ?????????????????????????????????????????????????????????????????????????????? ???????????? ????????????????????????????????????????????????????????? ??????????????? code ??????????????????????????? ?????????????????????????????????????????????????????????????????? ?????????????????? dataset ???????????????????????????????????? fine-tune weights ????????? model

LoRA (Low-Rank Adaptation) ?????????????????????????????? parameter-efficient fine-tuning ??????????????????????????????????????? weights ?????????????????????????????? model ???????????????????????? low-rank matrices ???????????????????????????????????????????????? attention layers ??????????????? trainable parameters ???????????? 10,000 ???????????? ????????? billions ??????????????? millions ????????? GPU memory ??????????????????????????? fine-tune model 7B ??????????????? GPU 16GB ???????????????

Message Queue Design ?????????????????? fine-tuning jobs ?????????????????? training requests ????????????????????? users/teams ??????????????????????????? queue ???????????????????????? jobs, allocate GPU resources, track progress, notify ?????????????????????????????? ????????????????????????????????? ML platform ???????????????????????????????????? fine-tuning as a service

???????????????????????????????????????????????? Fine-Tuning Environment

Setup environment ?????????????????? LoRA fine-tuning

# === LoRA Fine-Tuning Setup ===

# 1. Install Dependencies
pip install torch transformers datasets accelerate
pip install peft bitsandbytes trl
pip install wandb  # for experiment tracking

# 2. Verify GPU
python3 -c "
import torch
print(f'CUDA available: {torch.cuda.is_available()}')
print(f'GPU: {torch.cuda.get_device_name(0)}')
print(f'VRAM: {torch.cuda.get_device_properties(0).total_mem / 1e9:.1f} GB')
"

# 3. Prepare Dataset
cat > prepare_data.py << 'PYEOF'
#!/usr/bin/env python3
from datasets import Dataset
import json

# Example: instruction-following dataset
data = [
    {
        "instruction": "??????????????????????????? Kubernetes ?????????????????????",
        "input": "",
        "output": "Kubernetes ????????? container orchestration platform..."
    },
    {
        "instruction": "??????????????? Python function ?????????????????? binary search",
        "input": "",
        "output": "def binary_search(arr, target):\n    left, right = 0, len(arr) - 1..."
    },
]

# Format for training
def format_prompt(example):
    if example["input"]:
        return f"### Instruction:\n{example['instruction']}\n\n### Input:\n{example['input']}\n\n### Response:\n{example['output']}"
    return f"### Instruction:\n{example['instruction']}\n\n### Response:\n{example['output']}"

formatted = [{"text": format_prompt(d)} for d in data]
dataset = Dataset.from_list(formatted)
dataset.save_to_disk("training_data")
print(f"Dataset: {len(dataset)} examples")
PYEOF

python3 prepare_data.py

# 4. Download Base Model
python3 -c "
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = 'meta-llama/Meta-Llama-3.1-8B-Instruct'
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype='auto')
print(f'Model loaded: {model_name}')
print(f'Parameters: {sum(p.numel() for p in model.parameters()) / 1e9:.1f}B')
"

echo "Environment ready"

Fine-Tune LLM ???????????? LoRA

Training script ?????????????????? LoRA fine-tuning

#!/usr/bin/env python3
# lora_finetune.py ??? LoRA Fine-Tuning Script
import torch
from transformers import (
    AutoModelForCausalLM,
    AutoTokenizer,
    TrainingArguments,
    BitsAndBytesConfig,
)
from peft import LoraConfig, get_peft_model, prepare_model_for_kbit_training
from trl import SFTTrainer
from datasets import load_from_disk
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("finetune")

def main():
    # 1. Load Model with 4-bit Quantization (QLoRA)
    model_name = "meta-llama/Meta-Llama-3.1-8B-Instruct"
    
    bnb_config = BitsAndBytesConfig(
        load_in_4bit=True,
        bnb_4bit_quant_type="nf4",
        bnb_4bit_compute_dtype=torch.bfloat16,
        bnb_4bit_use_double_quant=True,
    )
    
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        quantization_config=bnb_config,
        device_map="auto",
        trust_remote_code=True,
    )
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "right"
    
    # 2. LoRA Configuration
    lora_config = LoraConfig(
        r=16,                      # Rank (higher = more capacity, more memory)
        lora_alpha=32,             # Scaling factor
        target_modules=[           # Which layers to add LoRA
            "q_proj", "k_proj", "v_proj", "o_proj",
            "gate_proj", "up_proj", "down_proj",
        ],
        lora_dropout=0.05,
        bias="none",
        task_type="CAUSAL_LM",
    )
    
    # 3. Prepare Model
    model = prepare_model_for_kbit_training(model)
    model = get_peft_model(model, lora_config)
    
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
    total = sum(p.numel() for p in model.parameters())
    logger.info(f"Trainable: {trainable:,} / {total:,} ({100*trainable/total:.2f}%)")
    
    # 4. Load Dataset
    dataset = load_from_disk("training_data")
    
    # 5. Training Arguments
    training_args = TrainingArguments(
        output_dir="./lora_output",
        num_train_epochs=3,
        per_device_train_batch_size=4,
        gradient_accumulation_steps=4,
        learning_rate=2e-4,
        warmup_steps=100,
        lr_scheduler_type="cosine",
        logging_steps=10,
        save_steps=200,
        save_total_limit=3,
        fp16=True,
        optim="paged_adamw_8bit",
        max_grad_norm=0.3,
        report_to="wandb",
    )
    
    # 6. Train
    trainer = SFTTrainer(
        model=model,
        args=training_args,
        train_dataset=dataset,
        tokenizer=tokenizer,
        dataset_text_field="text",
        max_seq_length=2048,
        packing=True,
    )
    
    trainer.train()
    
    # 7. Save LoRA Adapter (small, ~50-200MB)
    trainer.save_model("./lora_adapter")
    logger.info("LoRA adapter saved to ./lora_adapter")

if __name__ == "__main__":
    main()

Message Queue Design ?????????????????? Fine-Tuning Jobs

?????????????????? queue system ?????????????????? training jobs

#!/usr/bin/env python3
# training_queue.py ??? Fine-Tuning Job Queue System
import json
import logging
import time
import uuid
from datetime import datetime
from typing import Dict, List, Optional
from enum import Enum

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("queue")

class JobStatus(str, Enum):
    PENDING = "pending"
    QUEUED = "queued"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class TrainingJob:
    def __init__(self, user_id, base_model, dataset_path, config):
        self.job_id = str(uuid.uuid4())[:8]
        self.user_id = user_id
        self.base_model = base_model
        self.dataset_path = dataset_path
        self.config = config
        self.status = JobStatus.PENDING
        self.created_at = datetime.utcnow().isoformat()
        self.started_at = None
        self.completed_at = None
        self.gpu_id = None
        self.metrics = {}
    
    def to_dict(self):
        return {
            "job_id": self.job_id,
            "user_id": self.user_id,
            "base_model": self.base_model,
            "status": self.status.value,
            "created_at": self.created_at,
            "gpu_id": self.gpu_id,
            "metrics": self.metrics,
        }

class TrainingQueueManager:
    def __init__(self, max_concurrent=4):
        self.queue: List[TrainingJob] = []
        self.running: Dict[str, TrainingJob] = {}
        self.completed: List[TrainingJob] = []
        self.max_concurrent = max_concurrent
        self.gpu_pool = {f"gpu-{i}": None for i in range(max_concurrent)}
    
    def submit_job(self, user_id, base_model, dataset_path, config=None):
        """Submit a new fine-tuning job"""
        if config is None:
            config = {
                "lora_r": 16,
                "lora_alpha": 32,
                "epochs": 3,
                "batch_size": 4,
                "learning_rate": 2e-4,
                "max_seq_length": 2048,
            }
        
        job = TrainingJob(user_id, base_model, dataset_path, config)
        self.queue.append(job)
        job.status = JobStatus.QUEUED
        
        logger.info(f"Job {job.job_id} submitted by {user_id}")
        self._try_schedule()
        
        return job.to_dict()
    
    def _try_schedule(self):
        """Try to schedule queued jobs on available GPUs"""
        for gpu_id, current_job in self.gpu_pool.items():
            if current_job is None and self.queue:
                job = self.queue.pop(0)
                job.status = JobStatus.RUNNING
                job.started_at = datetime.utcnow().isoformat()
                job.gpu_id = gpu_id
                self.gpu_pool[gpu_id] = job
                self.running[job.job_id] = job
                logger.info(f"Job {job.job_id} started on {gpu_id}")
    
    def complete_job(self, job_id, metrics=None):
        """Mark job as completed"""
        if job_id in self.running:
            job = self.running.pop(job_id)
            job.status = JobStatus.COMPLETED
            job.completed_at = datetime.utcnow().isoformat()
            job.metrics = metrics or {}
            self.gpu_pool[job.gpu_id] = None
            self.completed.append(job)
            self._try_schedule()
            return job.to_dict()
        return None
    
    def get_status(self):
        return {
            "queued": len(self.queue),
            "running": len(self.running),
            "completed": len(self.completed),
            "available_gpus": sum(1 for v in self.gpu_pool.values() if v is None),
            "total_gpus": len(self.gpu_pool),
        }

# Demo
manager = TrainingQueueManager(max_concurrent=4)

# Submit jobs
for i in range(6):
    result = manager.submit_job(
        user_id=f"user_{i}",
        base_model="meta-llama/Meta-Llama-3.1-8B-Instruct",
        dataset_path=f"s3://datasets/user_{i}/train.jsonl",
    )
    print(f"Submitted: {result['job_id']} -> {result['status']}")

status = manager.get_status()
print(f"\nQueue Status: {json.dumps(status, indent=2)}")

Distributed Training Pipeline

Deploy training pipeline ?????? Kubernetes

# === Distributed Fine-Tuning Pipeline ===

# 1. Kubernetes Job for Training
cat > k8s/training-job.yaml << 'EOF'
apiVersion: batch/v1
kind: Job
metadata:
  name: lora-finetune-
  namespace: ml-training
  labels:
    app: lora-finetune
    job-id: ""
spec:
  backoffLimit: 2
  activeDeadlineSeconds: 86400
  template:
    metadata:
      labels:
        app: lora-finetune
    spec:
      restartPolicy: OnFailure
      containers:
        - name: trainer
          image: ghcr.io/myorg/lora-trainer:latest
          env:
            - name: JOB_ID
              value: ""
            - name: BASE_MODEL
              value: ""
            - name: DATASET_PATH
              value: ""
            - name: REDIS_URL
              valueFrom:
                secretKeyRef:
                  name: ml-secrets
                  key: redis-url
            - name: WANDB_API_KEY
              valueFrom:
                secretKeyRef:
                  name: ml-secrets
                  key: wandb-key
          resources:
            limits:
              nvidia.com/gpu: 1
              memory: "32Gi"
              cpu: "8"
            requests:
              nvidia.com/gpu: 1
              memory: "24Gi"
              cpu: "4"
          volumeMounts:
            - name: model-cache
              mountPath: /root/.cache/huggingface
            - name: output
              mountPath: /output
            - name: shm
              mountPath: /dev/shm
      volumes:
        - name: model-cache
          persistentVolumeClaim:
            claimName: hf-model-cache
        - name: output
          persistentVolumeClaim:
            claimName: training-output
        - name: shm
          emptyDir:
            medium: Memory
            sizeLimit: "8Gi"
      tolerations:
        - key: nvidia.com/gpu
          operator: Exists
          effect: NoSchedule
EOF

# 2. Queue Worker (processes training requests)
cat > worker.py << 'PYEOF'
#!/usr/bin/env python3
import redis
import json
import subprocess
import os

r = redis.Redis.from_url(os.environ["REDIS_URL"])

print("Training worker started...")
while True:
    job_data = r.brpop("training:queue", timeout=30)
    if job_data is None:
        continue
    
    job = json.loads(job_data[1])
    job_id = job["job_id"]
    
    r.hset(f"training:job:{job_id}", "status", "running")
    
    try:
        # Run training
        result = subprocess.run([
            "python3", "lora_finetune.py",
            "--model", job["base_model"],
            "--dataset", job["dataset_path"],
            "--output", f"/output/{job_id}",
            "--epochs", str(job.get("epochs", 3)),
            "--lora-r", str(job.get("lora_r", 16)),
        ], capture_output=True, text=True, timeout=86400)
        
        if result.returncode == 0:
            r.hset(f"training:job:{job_id}", mapping={
                "status": "completed",
                "output_path": f"/output/{job_id}",
            })
        else:
            r.hset(f"training:job:{job_id}", mapping={
                "status": "failed",
                "error": result.stderr[:1000],
            })
    except Exception as e:
        r.hset(f"training:job:{job_id}", mapping={
            "status": "failed",
            "error": str(e),
        })
    
    r.publish(f"training:done:{job_id}", "done")
PYEOF

# 3. API Endpoint
cat > api.py << 'PYEOF'
from fastapi import FastAPI
import redis
import json
import uuid

app = FastAPI(title="Fine-Tuning API")
r = redis.Redis.from_url("redis://redis:6379")

@app.post("/finetune")
async def submit_finetune(base_model: str, dataset_path: str, epochs: int = 3):
    job_id = str(uuid.uuid4())[:8]
    job = {"job_id": job_id, "base_model": base_model, "dataset_path": dataset_path, "epochs": epochs}
    r.lpush("training:queue", json.dumps(job))
    r.hset(f"training:job:{job_id}", "status", "queued")
    return {"job_id": job_id, "status": "queued"}

@app.get("/finetune/{job_id}")
async def get_status(job_id: str):
    data = r.hgetall(f"training:job:{job_id}")
    return {k.decode(): v.decode() for k, v in data.items()}
PYEOF

echo "Pipeline deployed"

Monitoring ????????? Evaluation

Monitor training ????????? evaluate model

#!/usr/bin/env python3
# training_monitor.py ??? Training Monitoring & Evaluation
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class TrainingMonitor:
    def __init__(self):
        self.experiments = {}
    
    def key_metrics(self):
        return {
            "training_metrics": {
                "train_loss": "Should decrease steadily",
                "eval_loss": "Should decrease, watch for overfitting (diverges from train)",
                "learning_rate": "Follows scheduler (cosine decay typical)",
                "grad_norm": "Should stay < 1.0 with gradient clipping",
                "tokens_per_second": "Throughput metric",
            },
            "evaluation_metrics": {
                "perplexity": "Lower is better, measures prediction confidence",
                "bleu_score": "For translation tasks (0-100)",
                "rouge_score": "For summarization tasks",
                "exact_match": "For QA tasks",
                "human_eval": "Human preference rating (most important)",
            },
            "resource_metrics": {
                "gpu_utilization": "Should be > 90%",
                "gpu_memory": "Monitor for OOM",
                "training_time": "Track per epoch",
                "cost": "GPU hours * cost per hour",
            },
        }
    
    def lora_hyperparameter_guide(self):
        return {
            "rank_r": {
                "4": "Minimal adaptation, fast training, small adapter",
                "8": "Light adaptation, good for simple tasks",
                "16": "Balanced (recommended default)",
                "32": "More capacity, for complex tasks",
                "64": "High capacity, approaches full fine-tuning",
                "128": "Very high, usually diminishing returns",
            },
            "lora_alpha": {
                "rule": "Typically 2x rank (alpha=32 for r=16)",
                "effect": "Higher alpha = stronger LoRA effect",
            },
            "target_modules": {
                "minimal": ["q_proj", "v_proj"],
                "recommended": ["q_proj", "k_proj", "v_proj", "o_proj"],
                "full": ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
            },
            "learning_rate": {
                "qlora_4bit": "2e-4 (higher because quantized)",
                "lora_fp16": "1e-4",
                "full_finetune": "2e-5 (much lower)",
            },
        }
    
    def cost_estimate(self, model_size_b, dataset_size_examples, epochs=3):
        """Estimate fine-tuning cost"""
        # Rough estimates
        gpu_memory_needed = {7: 16, 8: 16, 13: 24, 70: 80}
        closest = min(gpu_memory_needed.keys(), key=lambda x: abs(x - model_size_b))
        vram = gpu_memory_needed[closest]
        
        # Training time estimate (very rough)
        steps = (dataset_size_examples * epochs) / 16  # batch_size * grad_accum
        time_per_step = 0.5 if model_size_b <= 13 else 2.0  # seconds
        total_hours = (steps * time_per_step) / 3600
        
        gpu_cost_per_hour = 1.0 if vram <= 24 else 3.5  # A10G vs A100
        total_cost = total_hours * gpu_cost_per_hour
        
        return {
            "model_size": f"{model_size_b}B",
            "gpu_needed": f"{vram}GB VRAM",
            "estimated_hours": round(total_hours, 1),
            "estimated_cost": f"",
            "adapter_size": f"~{model_size_b * 10}MB (LoRA r=16)",
        }

monitor = TrainingMonitor()
guide = monitor.lora_hyperparameter_guide()
print("LoRA Rank Guide:")
for rank, desc in guide["rank_r"].items():
    print(f"  r={rank}: {desc}")

cost = monitor.cost_estimate(model_size_b=8, dataset_size_examples=10000, epochs=3)
print(f"\nCost Estimate: {json.dumps(cost, indent=2)}")

FAQ ??????????????????????????????????????????

Q: LoRA ????????? Full Fine-Tuning ???????????????????????????????????????????

A: Full Fine-Tuning ???????????? weights ?????????????????????????????? model ????????????????????? GPU memory ????????? (8B model ???????????? ~80GB) ????????????????????????????????????????????? ?????????????????? ?????????????????????????????? ?????????????????? catastrophic forgetting LoRA ??????????????? low-rank matrices ???????????????????????????????????? attention layers trainable parameters ????????? 0.1-1% ?????????????????????????????? ????????? GPU memory ???????????????????????? 10 ???????????? (8B QLoRA ?????????????????? 16GB) ???????????????????????????????????????????????? full fine-tuning (90-95%) adapter ???????????????????????? (~50-200MB) switch ????????????????????? adapters ????????????????????? ?????????????????? 90% ????????? use cases LoRA ????????????????????????????????????????????????????????????????????????

Q: QLoRA ????????????????????? ????????????????????? LoRA ??????????????????????

A: QLoRA ????????? Quantized LoRA ???????????? base model ???????????? 4-bit quantization (NF4) ??????????????????????????? LoRA adapters ????????????????????? FP16/BF16 ??????????????? ?????? GPU memory ????????? 2-4 ???????????????????????????????????? LoRA ???????????? fine-tune model 7-8B ??????????????? GPU 16GB ??????????????? fine-tune model 70B ??????????????? GPU 80GB (A100) ????????????????????????????????????????????? LoRA FP16 ????????????????????? training ????????????????????? LoRA FP16 ???????????????????????? (~10-20%) ??????????????? quantize/dequantize overhead ?????????????????? GPU memory ??????????????? QLoRA ?????????????????????????????????????????????????????????????????????

Q: ?????????????????? data ???????????????????????????????????????????????? fine-tuning?

A: ????????????????????? task complexity Simple task (classification, formatting) 100-500 examples ????????????????????? Moderate task (domain QA, style transfer) 1,000-5,000 examples Complex task (code generation, reasoning) 10,000-50,000 examples ??????????????????????????????????????????????????????????????? data 500 examples ????????????????????????????????? ?????????????????? 10,000 examples ??????????????????????????? tips data ???????????? diverse ???????????????????????? edge cases, ?????? duplicate, ????????????????????? label quality, ????????? synthetic data ???????????????????????? (????????? GPT-4 ???????????????) ???????????????????????? 500-1,000 examples ???????????? evaluate ?????????????????????????????????????????????????????????

Q: Message Queue ?????????????????????????????? fine-tuning pipeline?

A: Message Queue (Redis, RabbitMQ, Kafka) ??????????????????????????????????????????????????? Job Scheduling ???????????????????????? training jobs ??????????????????????????? ????????????????????? manual allocation, Resource Management allocate GPU ????????? job ??????????????????????????? ??????????????????????????? GPU, Fault Tolerance ????????? training fail re-queue ???????????????????????????, Scalability ??????????????? GPU workers ????????????????????? ???????????????????????? consumers, Async Processing submit job ??????????????????????????????????????? ????????? notification ??????????????????????????????, Multi-tenancy ???????????? users/teams share GPU pool ????????? ?????????????????? ML platform ???????????????????????????????????? fine-tuning message queue ???????????? component ???????????????

📖 บทความที่เกี่ยวข้อง

LLM Fine-tuning LoRA Real-time Processingอ่านบทความ → LLM Fine-tuning LoRA API Integration เชื่อมต่อระบบอ่านบทความ → LLM Fine-tuning LoRA Domain Driven Design DDDอ่านบทความ → LLM Fine-tuning LoRA GitOps Workflowอ่านบทความ → LLM Fine-tuning LoRA Metric Collectionอ่านบทความ →

📚 ดูบทความทั้งหมด →