SiamCafe.net Blog
Technology

Whisper Speech Message Queue Design

whisper speech message queue design
Whisper Speech Message Queue Design | SiamCafe Blog
2026-05-14· อ. บอม — SiamCafe.net· 1,593 คำ

Whisper Speech Message Queue Design คืออะไร

Whisper เป็น open source speech recognition model จาก OpenAI ที่รองรับกว่า 99 ภาษา รวมถึงภาษาไทย สามารถ transcribe เสียงเป็นข้อความและแปลภาษาได้อย่างแม่นยำ Message Queue เป็น middleware สำหรับ asynchronous communication ระหว่าง services เช่น RabbitMQ, Apache Kafka, Redis Streams การรวมสองแนวคิดนี้ช่วยสร้างระบบ speech-to-text ที่ scalable รองรับ concurrent requests จำนวนมาก เหมาะสำหรับ call center transcription, meeting notes, podcast indexing และ accessibility services

Whisper Model Overview

# whisper_overview.py — Whisper model fundamentals
import json

class WhisperOverview:
    MODELS = {
        "tiny": {"params": "39M", "vram": "~1GB", "speed": "~32x realtime", "accuracy": "ต่ำ"},
        "base": {"params": "74M", "vram": "~1GB", "speed": "~16x realtime", "accuracy": "ปานกลาง"},
        "small": {"params": "244M", "vram": "~2GB", "speed": "~6x realtime", "accuracy": "ดี"},
        "medium": {"params": "769M", "vram": "~5GB", "speed": "~2x realtime", "accuracy": "ดีมาก"},
        "large-v3": {"params": "1.55B", "vram": "~10GB", "speed": "~1x realtime", "accuracy": "ดีที่สุด"},
    }

    FEATURES = {
        "transcription": "แปลงเสียงเป็นข้อความ (99+ ภาษา)",
        "translation": "แปลเป็นภาษาอังกฤษอัตโนมัติ",
        "language_detect": "ตรวจจับภาษาอัตโนมัติ",
        "timestamps": "Word-level และ segment-level timestamps",
        "vad": "Voice Activity Detection (ตรวจจับช่วงที่มีเสียงพูด)",
    }

    SETUP = """
# pip install openai-whisper
# pip install faster-whisper  (CTranslate2 optimized — เร็วกว่า 4x)

import whisper

# Standard Whisper
model = whisper.load_model("medium")
result = model.transcribe("audio.mp3", language="th")
print(result["text"])

# Faster Whisper (recommended for production)
from faster_whisper import WhisperModel

model = WhisperModel("medium", device="cuda", compute_type="float16")
segments, info = model.transcribe("audio.mp3", language="th")

for segment in segments:
    print(f"[{segment.start:.1f}s - {segment.end:.1f}s] {segment.text}")
"""

    def show_models(self):
        print("=== Whisper Models ===\n")
        print(f"  {'Model':<12} {'Params':>8} {'VRAM':>8} {'Speed':>15} {'Accuracy'}")
        print(f"  {'-'*55}")
        for name, info in self.MODELS.items():
            print(f"  {name:<12} {info['params']:>8} {info['vram']:>8} {info['speed']:>15} {info['accuracy']}")

    def show_features(self):
        print(f"\n=== Features ===")
        for feat, desc in self.FEATURES.items():
            print(f"  [{feat}] {desc}")

whisper = WhisperOverview()
whisper.show_models()
whisper.show_features()

Message Queue Architecture

# mq_architecture.py — Message queue design for Whisper
import json

class MQArchitecture:
    COMPONENTS = {
        "api_gateway": {
            "name": "API Gateway",
            "role": "รับ audio files จาก clients → validate → publish to queue",
            "tech": "FastAPI + S3 upload",
        },
        "message_queue": {
            "name": "Message Queue",
            "role": "Buffer ระหว่าง API กับ workers — decouple + load leveling",
            "tech": "RabbitMQ (simple) หรือ Kafka (high throughput)",
        },
        "transcription_workers": {
            "name": "Transcription Workers",
            "role": "Consume messages → download audio → Whisper transcribe → publish result",
            "tech": "Python + faster-whisper + GPU",
        },
        "result_store": {
            "name": "Result Store",
            "role": "เก็บผลลัพธ์ transcription — text, timestamps, metadata",
            "tech": "PostgreSQL + Redis cache",
        },
        "notification": {
            "name": "Notification Service",
            "role": "แจ้ง client เมื่อ transcription เสร็จ",
            "tech": "WebSocket, webhook, email",
        },
    }

    QUEUE_DESIGN = {
        "exchanges": {
            "audio.incoming": "Fanout → route to priority queues",
            "audio.results": "Direct → route results to clients",
            "audio.dlx": "Dead Letter Exchange สำหรับ failed messages",
        },
        "queues": {
            "transcribe.high": "Priority queue — short audio (< 5 min), paid users",
            "transcribe.normal": "Normal queue — standard requests",
            "transcribe.batch": "Batch queue — large files, low priority",
            "transcribe.dlq": "Dead letter queue — failed after max retries",
        },
    }

    def show_components(self):
        print("=== Architecture Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  Role: {comp['role']}")
            print(f"  Tech: {comp['tech']}")
            print()

    def show_queues(self):
        print("=== Queue Design ===")
        for q, desc in self.QUEUE_DESIGN["queues"].items():
            print(f"  [{q}] {desc}")

arch = MQArchitecture()
arch.show_components()
arch.show_queues()

Worker Implementation

# worker.py — Whisper transcription worker
import json

class TranscriptionWorker:
    CODE = """
# whisper_worker.py — Transcription worker with message queue
import pika
import json
import os
import time
import logging
from faster_whisper import WhisperModel

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class WhisperWorker:
    def __init__(self, model_size="medium", device="cuda", 
                 rabbitmq_url="amqp://guest:guest@localhost:5672/"):
        self.model = WhisperModel(model_size, device=device, compute_type="float16")
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self._setup_queues()
    
    def _setup_queues(self):
        # Declare queues with DLX
        args = {
            "x-dead-letter-exchange": "audio.dlx",
            "x-dead-letter-routing-key": "failed",
        }
        self.channel.queue_declare("transcribe.high", durable=True, arguments=args)
        self.channel.queue_declare("transcribe.normal", durable=True, arguments=args)
        
        # Priority: consume high first
        self.channel.basic_qos(prefetch_count=1)
    
    def transcribe(self, audio_path, language=None):
        start = time.time()
        
        segments, info = self.model.transcribe(
            audio_path,
            language=language,
            beam_size=5,
            vad_filter=True,
            vad_parameters=dict(min_silence_duration_ms=500),
        )
        
        results = []
        full_text = []
        for segment in segments:
            results.append({
                "start": round(segment.start, 2),
                "end": round(segment.end, 2),
                "text": segment.text.strip(),
            })
            full_text.append(segment.text.strip())
        
        duration = time.time() - start
        return {
            "text": " ".join(full_text),
            "segments": results,
            "language": info.language,
            "language_probability": round(info.language_probability, 3),
            "duration_seconds": round(duration, 2),
            "audio_duration": round(info.duration, 2),
            "realtime_factor": round(duration / max(info.duration, 0.1), 2),
        }
    
    def process_message(self, ch, method, properties, body):
        message = json.loads(body)
        job_id = message.get("job_id")
        audio_path = message.get("audio_path")
        language = message.get("language")
        
        logger.info(f"Processing job {job_id}: {audio_path}")
        
        try:
            result = self.transcribe(audio_path, language)
            result["job_id"] = job_id
            result["status"] = "completed"
            
            # Publish result
            self.channel.basic_publish(
                exchange="audio.results",
                routing_key=message.get("reply_to", "results"),
                body=json.dumps(result),
            )
            
            ch.basic_ack(delivery_tag=method.delivery_tag)
            logger.info(f"Completed {job_id}: {result['duration_seconds']}s")
            
        except Exception as e:
            logger.error(f"Failed {job_id}: {e}")
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
    
    def start(self):
        self.channel.basic_consume("transcribe.high", self.process_message)
        self.channel.basic_consume("transcribe.normal", self.process_message)
        logger.info("Worker started. Waiting for messages...")
        self.channel.start_consuming()

worker = WhisperWorker(model_size="medium", device="cuda")
worker.start()
"""

    def show_code(self):
        print("=== Transcription Worker ===")
        print(self.CODE[:600])

worker = TranscriptionWorker()
worker.show_code()

API & Docker Infrastructure

# infra.py — API and Docker infrastructure
import json

class Infrastructure:
    DOCKER_COMPOSE = """
# docker-compose.yml — Whisper transcription pipeline
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  postgres:
    image: postgres:16
    environment:
      POSTGRES_DB: whisper_db
      POSTGRES_USER: whisper
      POSTGRES_PASSWORD: secret
    volumes:
      - postgres_data:/var/lib/postgresql/data

  api:
    build:
      context: .
      dockerfile: Dockerfile.api
    ports:
      - "8000:8000"
    environment:
      RABBITMQ_URL: amqp://admin:secret@rabbitmq:5672/
      REDIS_URL: redis://redis:6379/0
      DATABASE_URL: postgresql://whisper:secret@postgres/whisper_db
      S3_BUCKET: whisper-audio
    depends_on:
      - rabbitmq
      - redis
      - postgres

  worker:
    build:
      context: .
      dockerfile: Dockerfile.worker
    deploy:
      replicas: 2
      resources:
        reservations:
          devices:
            - capabilities: [gpu]
    environment:
      RABBITMQ_URL: amqp://admin:secret@rabbitmq:5672/
      WHISPER_MODEL: medium
      DEVICE: cuda
    depends_on:
      - rabbitmq

  monitor:
    image: grafana/grafana
    ports:
      - "3000:3000"

volumes:
  rabbitmq_data:
  postgres_data:
"""

    API_CODE = """
# api.py — FastAPI for Whisper service
from fastapi import FastAPI, UploadFile, File, HTTPException
import pika, json, uuid, boto3

app = FastAPI(title="Whisper Transcription API")

@app.post("/api/transcribe")
async def transcribe(
    file: UploadFile = File(...),
    language: str = None,
    priority: str = "normal"
):
    job_id = str(uuid.uuid4())[:8]
    
    # Upload to S3
    s3 = boto3.client("s3")
    s3_key = f"audio/{job_id}/{file.filename}"
    s3.upload_fileobj(file.file, "whisper-audio", s3_key)
    
    # Publish to queue
    queue = f"transcribe.{priority}"
    message = {
        "job_id": job_id,
        "audio_path": f"s3://whisper-audio/{s3_key}",
        "language": language,
        "reply_to": "results",
    }
    
    connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
    channel = connection.channel()
    channel.basic_publish(exchange="", routing_key=queue, body=json.dumps(message))
    connection.close()
    
    return {"job_id": job_id, "status": "queued", "queue": queue}
"""

    def show_docker(self):
        print("=== Docker Compose ===")
        print(self.DOCKER_COMPOSE[:500])

    def show_api(self):
        print(f"\n=== API ===")
        print(self.API_CODE[:500])

infra = Infrastructure()
infra.show_docker()
infra.show_api()

Monitoring & Scaling

# scaling.py — Monitoring and auto-scaling
import json
import random

class ScalingStrategy:
    METRICS = {
        "queue_depth": "จำนวน messages ใน queue — ถ้าสูง = เพิ่ม workers",
        "processing_time": "เวลา transcribe ต่อไฟล์ — monitor performance",
        "worker_gpu_util": "GPU usage ของ workers — ถ้าต่ำ = ลด workers",
        "error_rate": "อัตรา failed jobs — alert ถ้า > 1%",
        "realtime_factor": "processing time / audio duration — ต้อง < 1.0",
    }

    SCALING_RULES = {
        "scale_up": "Queue depth > 100 for 5 min → เพิ่ม worker +1",
        "scale_down": "Queue depth = 0 for 10 min → ลด worker -1 (min 1)",
        "max_workers": "GPU availability — 1 worker per GPU",
        "burst": "Queue depth > 500 → scale to max immediately",
    }

    def dashboard(self):
        print("=== Pipeline Dashboard ===\n")
        print(f"  Queue depth: {random.randint(5, 150)}")
        print(f"  Workers: {random.randint(2, 4)} active")
        print(f"  Avg processing: {random.uniform(0.3, 1.5):.1f}x realtime")
        print(f"  GPU util: {random.randint(60, 95)}%")
        print(f"  Today processed: {random.randint(500, 3000):,} files")
        print(f"  Error rate: {random.uniform(0, 0.5):.2f}%")
        print(f"  Avg wait time: {random.uniform(1, 30):.0f}s")

    def show_scaling(self):
        print(f"\n=== Scaling Rules ===")
        for rule, desc in self.SCALING_RULES.items():
            print(f"  [{rule}] {desc}")

scaling = ScalingStrategy()
scaling.dashboard()
scaling.show_scaling()

FAQ - คำถามที่พบบ่อย

Q: Whisper กับ Google Speech-to-Text อันไหนดี?

A: Whisper: ฟรี, self-hosted, offline, 99+ ภาษา, ไม่มี API limits Google STT: ราคาถูก ($0.006/15s), streaming, higher accuracy สำหรับบางภาษา Whisper ดีกว่า: privacy (data ไม่ออก), cost (ฟรี), offline use Google ดีกว่า: real-time streaming, enterprise support, punctuation สำหรับ batch processing: Whisper คุ้มกว่ามาก (ฟรี + self-hosted)

Q: ทำไมต้องใช้ Message Queue?

A: Whisper transcription ใช้เวลานาน (วินาที-นาที ต่อไฟล์) → ไม่เหมาะกับ synchronous API Message Queue ช่วย: decouple API กับ worker, load leveling (รับ burst traffic), retry failed jobs, priority queues, horizontal scaling ถ้าไม่ใช้ queue: API block จนกว่า transcribe เสร็จ → timeout, poor UX

Q: faster-whisper ต่างจาก whisper ยังไง?

A: faster-whisper ใช้ CTranslate2 (optimized inference engine): เร็วกว่า 4x, memory ใช้น้อยกว่า 50%, รองรับ INT8 quantization Accuracy เท่ากัน — ใช้ model weights เดียวกัน แนะนำ: ใช้ faster-whisper สำหรับ production เสมอ

Q: RabbitMQ กับ Kafka ใช้อันไหน?

A: RabbitMQ: ง่ายกว่า, routing flexible, task queue pattern — เหมาะ Whisper pipeline Kafka: throughput สูงกว่า, event streaming, replay — เหมาะถ้าต้อง process ซ้ำ แนะนำ: RabbitMQ สำหรับ Whisper transcription (task-based, ไม่ต้อง replay) Kafka: ถ้ามี use case อื่นที่ต้อง event streaming ด้วย

📖 บทความที่เกี่ยวข้อง

Whisper Speech Machine Learning Pipelineอ่านบทความ → Whisper Speech Container Orchestrationอ่านบทความ → Whisper Speech อ่านบทความ → Whisper Speech Feature Flag Managementอ่านบทความ → Linux io_uring Message Queue Designอ่านบทความ →

📚 ดูบทความทั้งหมด →