Whisper Speech Message Queue Design คืออะไร
Whisper เป็น open source speech recognition model จาก OpenAI ที่รองรับกว่า 99 ภาษา รวมถึงภาษาไทย สามารถ transcribe เสียงเป็นข้อความและแปลภาษาได้อย่างแม่นยำ Message Queue เป็น middleware สำหรับ asynchronous communication ระหว่าง services เช่น RabbitMQ, Apache Kafka, Redis Streams การรวมสองแนวคิดนี้ช่วยสร้างระบบ speech-to-text ที่ scalable รองรับ concurrent requests จำนวนมาก เหมาะสำหรับ call center transcription, meeting notes, podcast indexing และ accessibility services
Whisper Model Overview
# whisper_overview.py — Whisper model fundamentals
import json
class WhisperOverview:
MODELS = {
"tiny": {"params": "39M", "vram": "~1GB", "speed": "~32x realtime", "accuracy": "ต่ำ"},
"base": {"params": "74M", "vram": "~1GB", "speed": "~16x realtime", "accuracy": "ปานกลาง"},
"small": {"params": "244M", "vram": "~2GB", "speed": "~6x realtime", "accuracy": "ดี"},
"medium": {"params": "769M", "vram": "~5GB", "speed": "~2x realtime", "accuracy": "ดีมาก"},
"large-v3": {"params": "1.55B", "vram": "~10GB", "speed": "~1x realtime", "accuracy": "ดีที่สุด"},
}
FEATURES = {
"transcription": "แปลงเสียงเป็นข้อความ (99+ ภาษา)",
"translation": "แปลเป็นภาษาอังกฤษอัตโนมัติ",
"language_detect": "ตรวจจับภาษาอัตโนมัติ",
"timestamps": "Word-level และ segment-level timestamps",
"vad": "Voice Activity Detection (ตรวจจับช่วงที่มีเสียงพูด)",
}
SETUP = """
# pip install openai-whisper
# pip install faster-whisper (CTranslate2 optimized — เร็วกว่า 4x)
import whisper
# Standard Whisper
model = whisper.load_model("medium")
result = model.transcribe("audio.mp3", language="th")
print(result["text"])
# Faster Whisper (recommended for production)
from faster_whisper import WhisperModel
model = WhisperModel("medium", device="cuda", compute_type="float16")
segments, info = model.transcribe("audio.mp3", language="th")
for segment in segments:
print(f"[{segment.start:.1f}s - {segment.end:.1f}s] {segment.text}")
"""
def show_models(self):
print("=== Whisper Models ===\n")
print(f" {'Model':<12} {'Params':>8} {'VRAM':>8} {'Speed':>15} {'Accuracy'}")
print(f" {'-'*55}")
for name, info in self.MODELS.items():
print(f" {name:<12} {info['params']:>8} {info['vram']:>8} {info['speed']:>15} {info['accuracy']}")
def show_features(self):
print(f"\n=== Features ===")
for feat, desc in self.FEATURES.items():
print(f" [{feat}] {desc}")
whisper = WhisperOverview()
whisper.show_models()
whisper.show_features()
Message Queue Architecture
# mq_architecture.py — Message queue design for Whisper
import json
class MQArchitecture:
COMPONENTS = {
"api_gateway": {
"name": "API Gateway",
"role": "รับ audio files จาก clients → validate → publish to queue",
"tech": "FastAPI + S3 upload",
},
"message_queue": {
"name": "Message Queue",
"role": "Buffer ระหว่าง API กับ workers — decouple + load leveling",
"tech": "RabbitMQ (simple) หรือ Kafka (high throughput)",
},
"transcription_workers": {
"name": "Transcription Workers",
"role": "Consume messages → download audio → Whisper transcribe → publish result",
"tech": "Python + faster-whisper + GPU",
},
"result_store": {
"name": "Result Store",
"role": "เก็บผลลัพธ์ transcription — text, timestamps, metadata",
"tech": "PostgreSQL + Redis cache",
},
"notification": {
"name": "Notification Service",
"role": "แจ้ง client เมื่อ transcription เสร็จ",
"tech": "WebSocket, webhook, email",
},
}
QUEUE_DESIGN = {
"exchanges": {
"audio.incoming": "Fanout → route to priority queues",
"audio.results": "Direct → route results to clients",
"audio.dlx": "Dead Letter Exchange สำหรับ failed messages",
},
"queues": {
"transcribe.high": "Priority queue — short audio (< 5 min), paid users",
"transcribe.normal": "Normal queue — standard requests",
"transcribe.batch": "Batch queue — large files, low priority",
"transcribe.dlq": "Dead letter queue — failed after max retries",
},
}
def show_components(self):
print("=== Architecture Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" Role: {comp['role']}")
print(f" Tech: {comp['tech']}")
print()
def show_queues(self):
print("=== Queue Design ===")
for q, desc in self.QUEUE_DESIGN["queues"].items():
print(f" [{q}] {desc}")
arch = MQArchitecture()
arch.show_components()
arch.show_queues()
Worker Implementation
# worker.py — Whisper transcription worker
import json
class TranscriptionWorker:
CODE = """
# whisper_worker.py — Transcription worker with message queue
import pika
import json
import os
import time
import logging
from faster_whisper import WhisperModel
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class WhisperWorker:
def __init__(self, model_size="medium", device="cuda",
rabbitmq_url="amqp://guest:guest@localhost:5672/"):
self.model = WhisperModel(model_size, device=device, compute_type="float16")
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self._setup_queues()
def _setup_queues(self):
# Declare queues with DLX
args = {
"x-dead-letter-exchange": "audio.dlx",
"x-dead-letter-routing-key": "failed",
}
self.channel.queue_declare("transcribe.high", durable=True, arguments=args)
self.channel.queue_declare("transcribe.normal", durable=True, arguments=args)
# Priority: consume high first
self.channel.basic_qos(prefetch_count=1)
def transcribe(self, audio_path, language=None):
start = time.time()
segments, info = self.model.transcribe(
audio_path,
language=language,
beam_size=5,
vad_filter=True,
vad_parameters=dict(min_silence_duration_ms=500),
)
results = []
full_text = []
for segment in segments:
results.append({
"start": round(segment.start, 2),
"end": round(segment.end, 2),
"text": segment.text.strip(),
})
full_text.append(segment.text.strip())
duration = time.time() - start
return {
"text": " ".join(full_text),
"segments": results,
"language": info.language,
"language_probability": round(info.language_probability, 3),
"duration_seconds": round(duration, 2),
"audio_duration": round(info.duration, 2),
"realtime_factor": round(duration / max(info.duration, 0.1), 2),
}
def process_message(self, ch, method, properties, body):
message = json.loads(body)
job_id = message.get("job_id")
audio_path = message.get("audio_path")
language = message.get("language")
logger.info(f"Processing job {job_id}: {audio_path}")
try:
result = self.transcribe(audio_path, language)
result["job_id"] = job_id
result["status"] = "completed"
# Publish result
self.channel.basic_publish(
exchange="audio.results",
routing_key=message.get("reply_to", "results"),
body=json.dumps(result),
)
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info(f"Completed {job_id}: {result['duration_seconds']}s")
except Exception as e:
logger.error(f"Failed {job_id}: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
def start(self):
self.channel.basic_consume("transcribe.high", self.process_message)
self.channel.basic_consume("transcribe.normal", self.process_message)
logger.info("Worker started. Waiting for messages...")
self.channel.start_consuming()
worker = WhisperWorker(model_size="medium", device="cuda")
worker.start()
"""
def show_code(self):
print("=== Transcription Worker ===")
print(self.CODE[:600])
worker = TranscriptionWorker()
worker.show_code()
API & Docker Infrastructure
# infra.py — API and Docker infrastructure
import json
class Infrastructure:
DOCKER_COMPOSE = """
# docker-compose.yml — Whisper transcription pipeline
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
volumes:
- rabbitmq_data:/var/lib/rabbitmq
redis:
image: redis:7-alpine
ports:
- "6379:6379"
postgres:
image: postgres:16
environment:
POSTGRES_DB: whisper_db
POSTGRES_USER: whisper
POSTGRES_PASSWORD: secret
volumes:
- postgres_data:/var/lib/postgresql/data
api:
build:
context: .
dockerfile: Dockerfile.api
ports:
- "8000:8000"
environment:
RABBITMQ_URL: amqp://admin:secret@rabbitmq:5672/
REDIS_URL: redis://redis:6379/0
DATABASE_URL: postgresql://whisper:secret@postgres/whisper_db
S3_BUCKET: whisper-audio
depends_on:
- rabbitmq
- redis
- postgres
worker:
build:
context: .
dockerfile: Dockerfile.worker
deploy:
replicas: 2
resources:
reservations:
devices:
- capabilities: [gpu]
environment:
RABBITMQ_URL: amqp://admin:secret@rabbitmq:5672/
WHISPER_MODEL: medium
DEVICE: cuda
depends_on:
- rabbitmq
monitor:
image: grafana/grafana
ports:
- "3000:3000"
volumes:
rabbitmq_data:
postgres_data:
"""
API_CODE = """
# api.py — FastAPI for Whisper service
from fastapi import FastAPI, UploadFile, File, HTTPException
import pika, json, uuid, boto3
app = FastAPI(title="Whisper Transcription API")
@app.post("/api/transcribe")
async def transcribe(
file: UploadFile = File(...),
language: str = None,
priority: str = "normal"
):
job_id = str(uuid.uuid4())[:8]
# Upload to S3
s3 = boto3.client("s3")
s3_key = f"audio/{job_id}/{file.filename}"
s3.upload_fileobj(file.file, "whisper-audio", s3_key)
# Publish to queue
queue = f"transcribe.{priority}"
message = {
"job_id": job_id,
"audio_path": f"s3://whisper-audio/{s3_key}",
"language": language,
"reply_to": "results",
}
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.basic_publish(exchange="", routing_key=queue, body=json.dumps(message))
connection.close()
return {"job_id": job_id, "status": "queued", "queue": queue}
"""
def show_docker(self):
print("=== Docker Compose ===")
print(self.DOCKER_COMPOSE[:500])
def show_api(self):
print(f"\n=== API ===")
print(self.API_CODE[:500])
infra = Infrastructure()
infra.show_docker()
infra.show_api()
Monitoring & Scaling
# scaling.py — Monitoring and auto-scaling
import json
import random
class ScalingStrategy:
METRICS = {
"queue_depth": "จำนวน messages ใน queue — ถ้าสูง = เพิ่ม workers",
"processing_time": "เวลา transcribe ต่อไฟล์ — monitor performance",
"worker_gpu_util": "GPU usage ของ workers — ถ้าต่ำ = ลด workers",
"error_rate": "อัตรา failed jobs — alert ถ้า > 1%",
"realtime_factor": "processing time / audio duration — ต้อง < 1.0",
}
SCALING_RULES = {
"scale_up": "Queue depth > 100 for 5 min → เพิ่ม worker +1",
"scale_down": "Queue depth = 0 for 10 min → ลด worker -1 (min 1)",
"max_workers": "GPU availability — 1 worker per GPU",
"burst": "Queue depth > 500 → scale to max immediately",
}
def dashboard(self):
print("=== Pipeline Dashboard ===\n")
print(f" Queue depth: {random.randint(5, 150)}")
print(f" Workers: {random.randint(2, 4)} active")
print(f" Avg processing: {random.uniform(0.3, 1.5):.1f}x realtime")
print(f" GPU util: {random.randint(60, 95)}%")
print(f" Today processed: {random.randint(500, 3000):,} files")
print(f" Error rate: {random.uniform(0, 0.5):.2f}%")
print(f" Avg wait time: {random.uniform(1, 30):.0f}s")
def show_scaling(self):
print(f"\n=== Scaling Rules ===")
for rule, desc in self.SCALING_RULES.items():
print(f" [{rule}] {desc}")
scaling = ScalingStrategy()
scaling.dashboard()
scaling.show_scaling()
FAQ - คำถามที่พบบ่อย
Q: Whisper กับ Google Speech-to-Text อันไหนดี?
A: Whisper: ฟรี, self-hosted, offline, 99+ ภาษา, ไม่มี API limits Google STT: ราคาถูก ($0.006/15s), streaming, higher accuracy สำหรับบางภาษา Whisper ดีกว่า: privacy (data ไม่ออก), cost (ฟรี), offline use Google ดีกว่า: real-time streaming, enterprise support, punctuation สำหรับ batch processing: Whisper คุ้มกว่ามาก (ฟรี + self-hosted)
Q: ทำไมต้องใช้ Message Queue?
A: Whisper transcription ใช้เวลานาน (วินาที-นาที ต่อไฟล์) → ไม่เหมาะกับ synchronous API Message Queue ช่วย: decouple API กับ worker, load leveling (รับ burst traffic), retry failed jobs, priority queues, horizontal scaling ถ้าไม่ใช้ queue: API block จนกว่า transcribe เสร็จ → timeout, poor UX
Q: faster-whisper ต่างจาก whisper ยังไง?
A: faster-whisper ใช้ CTranslate2 (optimized inference engine): เร็วกว่า 4x, memory ใช้น้อยกว่า 50%, รองรับ INT8 quantization Accuracy เท่ากัน — ใช้ model weights เดียวกัน แนะนำ: ใช้ faster-whisper สำหรับ production เสมอ
Q: RabbitMQ กับ Kafka ใช้อันไหน?
A: RabbitMQ: ง่ายกว่า, routing flexible, task queue pattern — เหมาะ Whisper pipeline Kafka: throughput สูงกว่า, event streaming, replay — เหมาะถ้าต้อง process ซ้ำ แนะนำ: RabbitMQ สำหรับ Whisper transcription (task-based, ไม่ต้อง replay) Kafka: ถ้ามี use case อื่นที่ต้อง event streaming ด้วย
