Technology

CircleCI Orbs CQRS Event Sourcing CI/CD สำหรับ Event-Driven Architecture

circleci orbs cqrs event sourcing
CircleCI Orbs CQRS Event Sourcing | SiamCafe Blog
2025-12-20· อ. บอม — SiamCafe.net· 1,509 คำ
Text Generation WebUI Pod Scheduling — จัดการ LLM Inference บน Kubernetes | SiamCafe Blog เรียนรู้ Text Generation WebUI Pod Scheduling ตั้งแต่ Oobabooga WebUI, GPU Scheduling, Kubernetes Pod Affinity, Resource Management ไปจนถึง Production Scaling FAQ_Q:Text Generation WebUI คืออะไร FAQ_A:Text Generation WebUI (Oobabooga) เป็น Open Source Web Interface สำหรับรัน Large Language Model (LLM) บนเครื่องตัวเอง รองรับ Model หลายตัว Llama Mistral Phi GGUF GPTQ AWQ มี Chat Mode Notebook Mode API Server ติดตั้งง่าย รองรับ GPU NVIDIA AMD CPU Inference ใช้แทน ChatGPT แบบ Private ไม่ส่งข้อมูลออก FAQ_Q:Pod Scheduling บน Kubernetes คืออะไร FAQ_A:Pod Scheduling คือกระบวนการที่ Kubernetes เลือก Node ที่เหมาะสมสำหรับรัน Pod ใช้ Resource Requests Limits กำหนด CPU Memory GPU ใช้ Node Selector เลือก Node ที่มี GPU Affinity กำหนดให้ Pod อยู่ใกล้กัน Anti-affinity แยก Pod ไม่ให้อยู่ Node เดียวกัน Taints Tolerations สงวน Node สำหรับ Workload เฉพาะ FAQ_Q: จัดการ GPU บน Kubernetes อย่างไร FAQ_A: ติดตั้ง NVIDIA GPU Operator หรือ Device Plugin ใช้ nvidia.com/gpu ใน Resource Requests แยก GPU Node Pool ใช้ Taints ป้องกัน Non-GPU Pod รัน GPU Sharing ด้วย MIG (Multi-Instance GPU) หรือ Time-slicing ใช้ Karpenter หรือ Cluster Autoscaler เพิ่ม GPU Node อัตโนมัติ Monitor GPU Utilization ด้วย DCGM Exporter FAQ_Q:Scale LLM Inference อย่างไร FAQ_A:Horizontal Pod Autoscaler (HPA) ตาม GPU Utilization หรือ Request Queue Length ใช้ vLLM หรือ TGI สำหรับ High-throughput Inference Batching รวม Request เพิ่ม Throughput KV Cache Optimization ลด Memory ใช้ Quantization GPTQ AWQ GGUF ลด GPU Memory 50-75% Multi-GPU Tensor Parallelism สำหรับ Model ใหญ่ BODY_START

Text Gen WebUI Scheduling

Text Generation WebUI Oobabooga LLM Inference Kubernetes Pod Scheduling GPU NVIDIA Llama Mistral vLLM TGI Quantization GPTQ AWQ GGUF HPA Autoscaler

Inference EngineThroughputLatencyGPU Memoryเหมาะกับ
vLLMสูงมากต่ำปานกลางProduction API
TGI (HuggingFace)สูงต่ำปานกลางProduction API
Oobabooga WebUIปานกลางปานกลางสูงDev/Testing
llama.cpp (GGUF)ปานกลางปานกลางต่ำCPU/Edge

Kubernetes GPU Setup

# === GPU Pod Scheduling ===

# Install NVIDIA GPU Operator
# helm repo add nvidia https://helm.ngc.nvidia.com/nvidia
# helm install gpu-operator nvidia/gpu-operator \
#   --namespace gpu-operator --create-namespace

# Pod with GPU — Text Generation WebUI
# apiVersion: apps/v1
# kind: Deployment
# metadata:
#   name: text-gen-webui
# spec:
#   replicas: 2
#   selector:
#     matchLabels:
#       app: text-gen-webui
#   template:
#     metadata:
#       labels:
#         app: text-gen-webui
#     spec:
#       nodeSelector:
#         accelerator: nvidia-a100
#       tolerations:
#         - key: nvidia.com/gpu
#           operator: Exists
#           effect: NoSchedule
#       containers:
#         - name: webui
#           image: atinoda/text-generation-webui:latest
#           resources:
#             requests:
#               cpu: "4"
#               memory: "16Gi"
#               nvidia.com/gpu: "1"
#             limits:
#               cpu: "8"
#               memory: "32Gi"
#               nvidia.com/gpu: "1"
#           ports:
#             - containerPort: 7860
#           volumeMounts:
#             - name: models
#               mountPath: /app/models
#       volumes:
#         - name: models
#           persistentVolumeClaim:
#             claimName: llm-models-pvc

# Pod Anti-affinity — แยก GPU Pod ไม่ให้อยู่ Node เดียวกัน
# affinity:
#   podAntiAffinity:
#     preferredDuringSchedulingIgnoredDuringExecution:
#       - weight: 100
#         podAffinityTerm:
#           labelSelector:
#             matchLabels:
#               app: text-gen-webui
#           topologyKey: kubernetes.io/hostname

from dataclasses import dataclass

@dataclass
class GPUNode:
    name: str
    gpu_type: str
    gpu_count: int
    vram_gb: int
    pods_running: int
    gpu_util_pct: float
    status: str

nodes = [
    GPUNode("gpu-node-01", "A100 80GB", 4, 320, 3, 72, "Ready"),
    GPUNode("gpu-node-02", "A100 80GB", 4, 320, 4, 85, "Ready"),
    GPUNode("gpu-node-03", "A10G 24GB", 4, 96, 2, 55, "Ready"),
    GPUNode("gpu-node-04", "T4 16GB", 4, 64, 4, 90, "Ready"),
    GPUNode("gpu-node-05", "A100 80GB", 4, 320, 0, 0, "Scaling Up"),
]

print("=== GPU Nodes ===")
total_gpus = 0
total_used = 0
for n in nodes:
    total_gpus += n.gpu_count
    total_used += n.pods_running
    print(f"  [{n.status}] {n.name}")
    print(f"    GPU: {n.gpu_type} x{n.gpu_count} | VRAM: {n.vram_gb}GB")
    print(f"    Pods: {n.pods_running} | Util: {n.gpu_util_pct}%")
print(f"\n  Total GPUs: {total_gpus} | Used: {total_used}")

Inference Optimization

# === LLM Inference Optimization ===

# vLLM — High-throughput Serving
# pip install vllm
# python -m vllm.entrypoints.openai.api_server \
#   --model mistralai/Mistral-7B-Instruct-v0.2 \
#   --gpu-memory-utilization 0.9 \
#   --max-model-len 8192 \
#   --tensor-parallel-size 1

# Docker — vLLM Server
# docker run --gpus all -p 8000:8000 \
#   vllm/vllm-openai:latest \
#   --model TheBloke/Mistral-7B-Instruct-v0.2-GPTQ \
#   --quantization gptq \
#   --max-model-len 4096

# Quantization Comparison
# Model: Llama-2-13B
# FP16:    26GB VRAM, 100% quality
# GPTQ-4:  8GB VRAM,  97% quality, 2x faster
# AWQ-4:   8GB VRAM,  97% quality, 2x faster
# GGUF-Q4: 8GB VRAM,  95% quality, CPU possible

@dataclass
class ModelConfig:
    model: str
    quantization: str
    vram_gb: float
    tokens_per_sec: int
    quality_pct: float
    max_context: int

configs = [
    ModelConfig("Llama-2-70B", "FP16", 140, 25, 100, 4096),
    ModelConfig("Llama-2-70B", "GPTQ-4bit", 40, 45, 97, 4096),
    ModelConfig("Mistral-7B", "FP16", 14, 80, 100, 32768),
    ModelConfig("Mistral-7B", "GPTQ-4bit", 5, 120, 97, 32768),
    ModelConfig("Mistral-7B", "GGUF-Q4", 5, 40, 95, 32768),
    ModelConfig("Phi-3-mini", "FP16", 8, 100, 100, 128000),
]

print("\n=== Model Configurations ===")
for c in configs:
    print(f"  [{c.model}] {c.quantization}")
    print(f"    VRAM: {c.vram_gb}GB | Speed: {c.tokens_per_sec} tok/s | Quality: {c.quality_pct}%")

Autoscaling

# === HPA + Cluster Autoscaler ===

# HPA — Scale on GPU Utilization
# apiVersion: autoscaling/v2
# kind: HorizontalPodAutoscaler
# metadata:
#   name: llm-hpa
# spec:
#   scaleTargetRef:
#     apiVersion: apps/v1
#     kind: Deployment
#     name: text-gen-webui
#   minReplicas: 2
#   maxReplicas: 10
#   metrics:
#     - type: Pods
#       pods:
#         metric:
#           name: gpu_utilization
#         target:
#           type: AverageValue
#           averageValue: "70"
#     - type: Pods
#       pods:
#         metric:
#           name: request_queue_length
#         target:
#           type: AverageValue
#           averageValue: "5"

# Karpenter — GPU Node Provisioner
# apiVersion: karpenter.sh/v1alpha5
# kind: Provisioner
# metadata:
#   name: gpu-provisioner
# spec:
#   requirements:
#     - key: node.kubernetes.io/instance-type
#       operator: In
#       values: ["p3.2xlarge", "g5.xlarge", "g5.2xlarge"]
#     - key: nvidia.com/gpu
#       operator: Exists
#   limits:
#     resources:
#       nvidia.com/gpu: "20"
#   ttlSecondsAfterEmpty: 300

scaling_metrics = {
    "Active Pods": "6 / 10 max",
    "GPU Utilization (avg)": "72%",
    "Request Queue": "3 pending",
    "Tokens/sec (total)": "480",
    "Concurrent Users": "25",
    "Avg Latency (TTFT)": "450ms",
    "GPU Nodes": "4 active, 1 scaling",
    "Monthly GPU Cost": "$8,500",
}

print("Autoscaling Dashboard:")
for k, v in scaling_metrics.items():
    print(f"  {k}: {v}")

tips = [
    "Quantization: ใช้ GPTQ/AWQ ลด VRAM 50-75%",
    "vLLM: ใช้ PagedAttention เพิ่ม Throughput 3-5x",
    "Batching: Continuous Batching รวม Request",
    "Karpenter: Auto-provision GPU Node ตาม Demand",
    "Spot Instance: ใช้ Spot สำหรับ Non-critical Inference",
    "Model Cache: PVC เก็บ Model ไม่ต้อง Download ซ้ำ",
    "DCGM: Monitor GPU Health Temperature Memory",
]

print(f"\n\nOptimization Tips:")
for i, t in enumerate(tips, 1):
    print(f"  {i}. {t}")

เคล็ดลับ

Text Generation WebUI คืออะไร

Open Source LLM WebUI Oobabooga Llama Mistral GGUF GPTQ AWQ Chat Notebook API GPU NVIDIA AMD CPU Private ไม่ส่งข้อมูลออก

Pod Scheduling บน Kubernetes คืออะไร

เลือก Node สำหรับ Pod Resource Requests Limits CPU Memory GPU Node Selector Affinity Anti-affinity Taints Tolerations

จัดการ GPU บน Kubernetes อย่างไร

NVIDIA GPU Operator Device Plugin nvidia.com/gpu Taint GPU Node MIG Time-slicing Karpenter Autoscaler DCGM Monitor

Scale LLM Inference อย่างไร

HPA GPU Utilization Queue vLLM TGI Batching KV Cache Quantization GPTQ AWQ GGUF 50-75% Multi-GPU Tensor Parallelism

สรุป

Text Generation WebUI Pod Scheduling Kubernetes GPU NVIDIA vLLM TGI Quantization GPTQ AWQ HPA Autoscaler Karpenter LLM Inference Production Scaling

SQL === # 1. Database schema cat > migrations/001_event_store.sql << 'SQL' -- Event Store Schema CREATE TABLE IF NOT EXISTS events ( event_id UUID PRIMARY KEY DEFAULT gen_random_uuid(), aggregate_id VARCHAR(255) NOT NULL, aggregate_type VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, event_data JSONB NOT NULL, metadata JSONB DEFAULT '{}', version INTEGER NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW(), CONSTRAINT unique_aggregate_version UNIQUE (aggregate_id, version) ); CREATE INDEX idx_events_aggregate ON events(aggregate_id); CREATE INDEX idx_events_type ON events(event_type); CREATE INDEX idx_events_created ON events(created_at); -- Snapshots (optimization for long event streams) CREATE TABLE IF NOT EXISTS snapshots ( aggregate_id VARCHAR(255) PRIMARY KEY, aggregate_type VARCHAR(255) NOT NULL, version INTEGER NOT NULL, state JSONB NOT NULL, created_at TIMESTAMPTZ DEFAULT NOW() ); -- Projections tracking CREATE TABLE IF NOT EXISTS projection_positions ( projection_name VARCHAR(255) PRIMARY KEY, last_event_id UUID, last_position BIGINT DEFAULT 0, updated_at TIMESTAMPTZ DEFAULT NOW() ); -- Read Model: Orders (projection) CREATE TABLE IF NOT EXISTS read_orders ( order_id VARCHAR(255) PRIMARY KEY, customer_id VARCHAR(255), status VARCHAR(50), items JSONB, total DECIMAL(10,2), created_at TIMESTAMPTZ, updated_at TIMESTAMPTZ ); CREATE INDEX idx_read_orders_customer ON read_orders(customer_id); CREATE INDEX idx_read_orders_status ON read_orders(status); -- Read Model: Order Statistics (projection) CREATE TABLE IF NOT EXISTS read_order_stats ( date DATE PRIMARY KEY, total_orders INTEGER DEFAULT 0, total_revenue DECIMAL(12,2) DEFAULT 0, avg_order_value DECIMAL(10,2) DEFAULT 0, orders_by_status JSONB DEFAULT '{}' ); SQL # 2. Projection rebuilder cat > rebuild_projections.py << 'PYTHON' #!/usr/bin/env python3 """Rebuild projections from event store""" import json import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("rebuild") class ProjectionRebuilder: def __init__(self, db_url): self.db_url = db_url def rebuild(self, projection_name): """Rebuild a specific projection""" logger.info(f"Rebuilding projection: {projection_name}") steps = [ f"TRUNCATE TABLE read_{projection_name}", f"SELECT * FROM events WHERE aggregate_type = '{projection_name}' ORDER BY created_at", f"Process each event through projection handler", f"UPDATE projection_positions SET last_position = (SELECT MAX(rownum) FROM events)", ] for i, step in enumerate(steps, 1): logger.info(f" Step {i}: {step}") return {"projection": projection_name, "status": "rebuilt", "events_processed": 1500} rebuilder = ProjectionRebuilder("postgresql://localhost:5432/eventstore") result = rebuilder.rebuild("orders") print(f"Rebuild result: {json.dumps(result)}") PYTHON echo "Event Store configured"

Testing Strategy

??????????????? CQRS Event Sourcing

#!/usr/bin/env python3
# test_cqrs.py ??? Testing CQRS/ES Applications
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("test")

class CQRSTestStrategy:
    def __init__(self):
        pass
    
    def test_categories(self):
        return {
            "command_handler_tests": {
                "description": "??????????????? command handlers",
                "pattern": "Given events ??? When command ??? Then new events",
                "example": """
def test_create_order():
    # Given: no previous events
    store = InMemoryEventStore()
    
    # When: create order command
    handler = CreateOrderHandler(store)
    handler.handle(CreateOrderCommand(
        customer_id="C001",
        items=[{"product": "Widget", "qty": 2}],
        total=1500
    ))
    
    # Then: OrderCreated event emitted
    events = store.get_events("C001")
    assert len(events) == 1
    assert events[0].event_type == "OrderCreated"
    assert events[0].data["total"] == 1500
""",
            },
            "event_handler_tests": {
                "description": "??????????????? event handlers (projections)",
                "pattern": "Given event ??? When processed ??? Then read model updated",
            },
            "aggregate_tests": {
                "description": "??????????????? aggregate business rules",
                "pattern": "Given state ??? When action ??? Then state change or error",
            },
            "integration_tests": {
                "description": "??????????????? end-to-end flow",
                "pattern": "Send command ??? Verify events ??? Verify read model",
            },
            "projection_rebuild_tests": {
                "description": "???????????????????????? projection rebuild ?????????????????????????????????????????????",
                "pattern": "Build projection ??? Rebuild ??? Compare results",
            },
        }

strategy = CQRSTestStrategy()
tests = strategy.test_categories()
print("CQRS/ES Testing Strategy:")
for name, info in tests.items():
    print(f"\n  {name}:")
    print(f"    {info['description']}")
    print(f"    Pattern: {info['pattern']}")

Monitoring ????????? Debugging

??????????????????????????? debug event-sourced applications

#!/usr/bin/env python3
# es_monitor.py ??? Event Sourcing Monitor
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class ESMonitor:
    def __init__(self):
        pass
    
    def dashboard(self):
        return {
            "event_store": {
                "total_events": 1250000,
                "events_today": 8500,
                "events_per_second": 12,
                "avg_event_size_bytes": 450,
                "storage_used_gb": 2.1,
            },
            "projections": {
                "orders": {"status": "running", "lag": "0 events", "last_updated": "2s ago"},
                "inventory": {"status": "running", "lag": "3 events", "last_updated": "5s ago"},
                "analytics": {"status": "rebuilding", "lag": "15000 events", "progress": "85%"},
            },
            "command_processing": {
                "total_commands_1h": 3200,
                "success_rate": "99.8%",
                "avg_latency_ms": 25,
                "p99_latency_ms": 120,
            },
            "query_processing": {
                "total_queries_1h": 45000,
                "success_rate": "99.99%",
                "avg_latency_ms": 5,
                "p99_latency_ms": 30,
                "cache_hit_rate": "85%",
            },
            "alerts": [
                {"severity": "WARNING", "message": "Analytics projection rebuilding (85% complete, ETA 5 min)"},
                {"severity": "INFO", "message": "Event store size approaching 3GB threshold"},
            ],
        }

monitor = ESMonitor()
dash = monitor.dashboard()
es = dash["event_store"]
print(f"Event Store: {es['total_events']:,} events, {es['events_per_second']} eps, {es['storage_used_gb']}GB")

print(f"\nProjections:")
for name, info in dash["projections"].items():
    print(f"  {name}: {info['status']} (lag: {info['lag']})")

cmd = dash["command_processing"]
qry = dash["query_processing"]
print(f"\nCommands: {cmd['total_commands_1h']:,}/h, {cmd['avg_latency_ms']}ms avg, {cmd['success_rate']} success")
print(f"Queries: {qry['total_queries_1h']:,}/h, {qry['avg_latency_ms']}ms avg, Cache hit: {qry['cache_hit_rate']}")

for a in dash["alerts"]:
    print(f"\n[{a['severity']}] {a['message']}")

FAQ ??????????????????????????????????????????

Q: CQRS ????????? Event Sourcing ????????????????????????????????????????????????????????????????

A: ??????????????????????????? CQRS (????????? read/write) ??????????????????????????????????????? Event Sourcing ????????? ???????????? write ?????? PostgreSQL ???????????? sync ?????? Elasticsearch ?????????????????? read Event Sourcing ??????????????????????????????????????? CQRS ????????? ???????????????????????????????????????????????????????????? (read ????????? event store ???????????? ?????????) ?????????????????????????????? ???????????????????????? CQRS ???????????? optimize read performance, Event Sourcing ????????? complete audit trail + rebuild read models ????????? ????????????????????????????????????????????? ?????????????????????????????? audit trail ????????????????????? (finance, healthcare), Read/write patterns ??????????????????????????????, ???????????? scale read/write ??????????????????, Business logic ????????????????????? (aggregate patterns) ?????????????????????????????????????????????????????? CRUD ???????????????, Team ????????????????????????????????????????????? (learning curve ?????????), ?????????????????????????????? audit trail

Q: CircleCI Orbs ?????????????????????????????? CQRS/ES deployment?

A: CQRS/ES applications ?????? deployment complexity ????????????????????? CRUD ???????????? ??????????????? ???????????? migrate event store schema, ???????????? deploy command service ????????? query service ??????????????????, ???????????? rebuild projections ??????????????? schema ?????????????????????, ???????????? blue-green deploy ????????????????????? downtime CircleCI Orbs ???????????? Package deployment steps ???????????? reusable components, ????????? project ????????? orb ???????????????????????? consistency ?????????, Version orbs ?????????????????? application code, Share ???????????? teams ?????????????????? CQRS/ES ???????????????????????? Orb ?????? commands ?????????????????? setup-eventstore, test-event-handlers, rebuild-projections, deploy-command-service, deploy-query-service ????????? 2 ???????????????????????? config.yml ?????????????????????????????????????????????

Q: Event Store ????????????????????????????????????????

A: Event Store ???????????? events ?????????????????? (append-only) ??????????????????????????????????????? ?????????????????????????????? Snapshots ???????????? aggregate state ????????? N events (???????????? ????????? 100 events) load ????????? snapshot + events ???????????? snapshot ???????????????????????? replay ?????????????????????, Archiving ???????????? events ???????????? (> 1 ??????) ?????? cold storage (S3, GCS) ??????????????????????????? recent events ?????? primary store, Partitioning ???????????? table ????????? date range, Compression PostgreSQL TOAST compression ?????????????????? JSONB, ???????????????????????? ?????????????????? 1 event = 200-1000 bytes, 1 ???????????? events = 200MB-1GB, ???????????????????????????????????????????????????????????????????????????????????? 10M+ events ?????????????????? events ????????????????????? Event Sourcing ????????????????????? "??????" ????????? append CompensatingEvent ?????????

Q: Eventual Consistency ????????????????????????????????????????

A: CQRS/ES ?????? eventual consistency ????????????????????? write model ????????? read model (projections) ?????????????????????????????? User ??????????????? order ???????????? refresh ???????????? ????????????????????? order (projection ????????? process ????????????????????????) ?????????????????????????????? Read-your-writes consistency ???????????? write redirect ?????? page ????????? read ????????? command response (?????????????????? read model), Polling UI poll read model ???????????????????????????????????? (retry with backoff), WebSocket push update ???????????? client ??????????????? projection update ???????????????, Optimistic UI ????????????????????????????????? (assume success) ?????????????????? fail, Causal consistency ????????? version number ???????????????????????? write ????????? read model check ????????? caught up ????????????????????????????????? ???????????????????????? projection lag < 100ms ????????????????????????????????????????????????????????????

📖 บทความที่เกี่ยวข้อง

MLOps Pipeline CQRS Event Sourcingอ่านบทความ → CircleCI Orbs Team Productivityอ่านบทความ → CircleCI Orbs CDN Configurationอ่านบทความ → API Rate Limiting CQRS Event Sourcingอ่านบทความ → Packer Image Builder CQRS Event Sourcingอ่านบทความ →

📚 ดูบทความทั้งหมด →