Midjourney คืออะไรและ Prompt Engineering สำคัญอย่างไร
Midjourney เป็น AI image generation tool ที่สร้างภาพคุณภาพสูงจาก text prompts ใช้ diffusion model เป็นพื้นฐาน สามารถสร้างภาพได้หลากหลายสไตล์ตั้งแต่ realistic photography ไปจนถึง digital art, anime, watercolor และอื่นๆ Midjourney ทำงานผ่าน Discord bot หรือ web interface
Prompt Engineering คือศิลปะของการเขียน text descriptions ที่ทำให้ AI สร้างภาพตรงตามที่ต้องการ Prompt ที่ดีต้องมีองค์ประกอบหลักคือ Subject (สิ่งที่ต้องการ), Style (สไตล์ภาพ), Composition (มุมกล้อง, layout), Lighting (แสงสี), Quality (คุณภาพ) และ Parameters (--ar, --v, --s, --c)
สำหรับ production service ที่ให้บริการ AI image generation แก่ users จำนวนมาก ต้องมีระบบ High Availability (HA) เพื่อรองรับ traffic สูง ลด downtime และให้ response time ที่ดี
การรวม Prompt Engineering expertise กับ HA infrastructure ทำให้สร้าง service ที่ users สามารถสร้างภาพคุณภาพสูงได้ตลอดเวลา ไม่ว่า traffic จะมากแค่ไหน
เทคนิคการเขียน Prompt สำหรับ Midjourney
โครงสร้างและเทคนิคขั้นสูงสำหรับ prompts
# === Midjourney Prompt Structure ===
# [Subject] + [Style] + [Composition] + [Lighting] + [Quality] + [Parameters]
# ตัวอย่าง Prompts ตามหมวดหมู่:
# 1. Realistic Photography
# "Professional portrait of a Thai woman in traditional silk dress,
# soft studio lighting, bokeh background, Canon EOS R5, 85mm f/1.4,
# 8k, ultra detailed --ar 3:4 --v 6 --s 250"
# 2. Digital Art / Concept Art
# "Futuristic Bangkok skyline at sunset, cyberpunk style, neon lights,
# flying vehicles, detailed architecture, concept art, artstation trending,
# dramatic lighting --ar 16:9 --v 6 --s 500"
# 3. Product Photography
# "Minimalist product shot of a premium wireless headphone on marble surface,
# soft diffused lighting, clean white background, commercial photography,
# studio lighting --ar 1:1 --v 6 --s 250"
# 4. Architectural Visualization
# "Modern tropical villa in Phuket, infinity pool overlooking ocean,
# lush green garden, golden hour lighting, architectural photography,
# wide angle lens, interior design magazine --ar 16:9 --v 6"
# === Advanced Parameters ===
# --ar [ratio] : Aspect ratio (16:9, 3:4, 1:1, etc.)
# --v [version] : Model version (5, 5.2, 6, 6.1)
# --s [0-1000] : Stylize (higher = more artistic)
# --c [0-100] : Chaos (higher = more varied results)
# --q [0.25-2] : Quality (higher = more detail, slower)
# --no [item] : Negative prompt (exclude elements)
# --seed [number] : Reproducible results
# --tile : Create tileable patterns
# --iw [0-2] : Image weight (for image prompts)
# === Prompt Formulas ===
# Formula 1: [Subject], [Medium], [Style], [Lighting], [Camera]
# "Ancient Thai temple, oil painting, impressionist style,
# golden hour warm light, wide angle perspective"
# Formula 2: [Subject] in the style of [Artist/Reference]
# "Bangkok street food market in the style of Studio Ghibli,
# warm colors, detailed, whimsical atmosphere"
# Formula 3: [Subject], [Mood], [Color Palette], [Texture]
# "Tropical rainforest waterfall, serene peaceful mood,
# emerald green and sapphire blue palette, photorealistic texture"
# === Negative Prompts (--no) ===
# --no text, watermark, signature, blurry, low quality, distorted
# --no extra fingers, extra limbs, deformed hands
# === Multi-Prompt Weighting ===
# "beautiful garden::2 ancient temple::1 sunset::0.5"
# Higher weight = more emphasis on that element
# === Style References ===
# --sref [URL] : Style reference image
# --cref [URL] : Character reference
# --sw [0-100] : Style reference weight
สร้าง Prompt Management System
ระบบจัดการ prompts สำหรับ production
#!/usr/bin/env python3
# prompt_manager.py — Prompt Management System
import json
import hashlib
from datetime import datetime
from typing import List, Optional, Dict
from dataclasses import dataclass, asdict, field
from pathlib import Path
import sqlite3
@dataclass
class PromptTemplate:
name: str
category: str
template: str
parameters: Dict[str, str] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
created_at: str = ""
version: int = 1
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.utcnow().isoformat()
def render(self, **kwargs) -> str:
prompt = self.template
for key, value in kwargs.items():
prompt = prompt.replace(f"{{{key}}}", str(value))
params = " ".join([f"--{k} {v}" for k, v in self.parameters.items()])
if params:
prompt += f" {params}"
return prompt
class PromptDB:
def __init__(self, db_path="prompts.db"):
self.conn = sqlite3.connect(db_path)
self._init_db()
def _init_db(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS prompts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
category TEXT NOT NULL,
template TEXT NOT NULL,
parameters TEXT DEFAULT '{}',
tags TEXT DEFAULT '[]',
version INTEGER DEFAULT 1,
created_at TEXT,
updated_at TEXT
);
CREATE TABLE IF NOT EXISTS generations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
prompt_id INTEGER,
rendered_prompt TEXT,
seed INTEGER,
result_url TEXT,
rating INTEGER DEFAULT 0,
created_at TEXT,
FOREIGN KEY (prompt_id) REFERENCES prompts(id)
);
CREATE INDEX IF NOT EXISTS idx_prompts_category ON prompts(category);
CREATE INDEX IF NOT EXISTS idx_generations_prompt ON generations(prompt_id);
""")
def save_prompt(self, template: PromptTemplate):
now = datetime.utcnow().isoformat()
self.conn.execute("""
INSERT OR REPLACE INTO prompts
(name, category, template, parameters, tags, version, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
template.name, template.category, template.template,
json.dumps(template.parameters), json.dumps(template.tags),
template.version, template.created_at, now
))
self.conn.commit()
def get_prompt(self, name: str) -> Optional[PromptTemplate]:
row = self.conn.execute(
"SELECT * FROM prompts WHERE name = ?", (name,)
).fetchone()
if not row:
return None
return PromptTemplate(
name=row[1], category=row[2], template=row[3],
parameters=json.loads(row[4]), tags=json.loads(row[5]),
version=row[6], created_at=row[7]
)
def search_prompts(self, category=None, tag=None) -> List[PromptTemplate]:
query = "SELECT * FROM prompts WHERE 1=1"
params = []
if category:
query += " AND category = ?"
params.append(category)
if tag:
query += " AND tags LIKE ?"
params.append(f"%{tag}%")
rows = self.conn.execute(query, params).fetchall()
return [PromptTemplate(
name=r[1], category=r[2], template=r[3],
parameters=json.loads(r[4]), tags=json.loads(r[5]),
version=r[6], created_at=r[7]
) for r in rows]
def log_generation(self, prompt_id, rendered, seed, result_url):
self.conn.execute("""
INSERT INTO generations (prompt_id, rendered_prompt, seed, result_url, created_at)
VALUES (?, ?, ?, ?, ?)
""", (prompt_id, rendered, seed, result_url, datetime.utcnow().isoformat()))
self.conn.commit()
# ใช้งาน
db = PromptDB()
# สร้าง templates
product_template = PromptTemplate(
name="product_shot",
category="photography",
template="{product} on {surface}, {lighting} lighting, commercial photography, studio shot",
parameters={"ar": "1:1", "v": "6", "s": "250"},
tags=["product", "commercial", "studio"],
)
db.save_prompt(product_template)
# Render prompt
prompt = product_template.render(
product="premium watch",
surface="dark marble",
lighting="soft diffused"
)
print(prompt)
# Output: premium watch on dark marble, soft diffused lighting,
# commercial photography, studio shot --ar 1:1 --v 6 --s 250
High Availability Architecture สำหรับ AI Image Service
ออกแบบ HA infrastructure สำหรับ AI image generation
# === HA Architecture Diagram ===
#
# Users -> CDN (CloudFront) -> ALB
# |
# ┌───────────┼───────────┐
# v v v
# [API Server] [API Server] [API Server]
# | | |
# └───────────┼───────────┘
# |
# ┌───────────┼───────────┐
# v v v
# [Queue Worker] [Queue Worker] [Queue Worker]
# | | |
# └───────────┼───────────┘
# |
# ┌───────────┼───────────┐
# v v v
# [GPU Node 1] [GPU Node 2] [GPU Node 3]
# | | |
# └───────────┼───────────┘
# |
# ┌───────────┼───────────┐
# v v v
# [Redis Cluster] [PostgreSQL] [S3 Storage]
# === Kubernetes Deployment ===
# api-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: image-api
spec:
replicas: 3
selector:
matchLabels:
app: image-api
template:
metadata:
labels:
app: image-api
spec:
containers:
- name: api
image: image-service-api:latest
ports:
- containerPort: 8000
env:
- name: REDIS_URL
value: "redis://redis-cluster:6379"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: db-credentials
key: url
resources:
requests:
cpu: "500m"
memory: "1Gi"
limits:
cpu: "1000m"
memory: "2Gi"
readinessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 10
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
---
# gpu-worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: gpu-worker
spec:
replicas: 3
selector:
matchLabels:
app: gpu-worker
template:
spec:
containers:
- name: worker
image: image-service-worker:latest
env:
- name: REDIS_URL
value: "redis://redis-cluster:6379"
- name: MODEL_PATH
value: "/models/stable-diffusion-xl"
resources:
limits:
nvidia.com/gpu: 1
memory: "16Gi"
requests:
nvidia.com/gpu: 1
memory: "12Gi"
volumeMounts:
- name: models
mountPath: /models
volumes:
- name: models
persistentVolumeClaim:
claimName: model-storage
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
---
# HPA for auto-scaling
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: image-api
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
Auto-scaling และ Load Balancing
ระบบ auto-scaling สำหรับ GPU workloads
#!/usr/bin/env python3
# image_service.py — HA Image Generation Service
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel, Field
from typing import Optional
import redis
import json
import uuid
import time
from datetime import datetime
app = FastAPI(title="AI Image Generation Service")
redis_client = redis.Redis(host="redis-cluster", port=6379, decode_responses=True)
QUEUE_NAME = "image_generation_queue"
RESULT_PREFIX = "result:"
MAX_QUEUE_SIZE = 10000
class GenerationRequest(BaseModel):
prompt: str = Field(..., min_length=1, max_length=2000)
negative_prompt: Optional[str] = ""
width: int = Field(default=1024, ge=256, le=2048)
height: int = Field(default=1024, ge=256, le=2048)
steps: int = Field(default=30, ge=10, le=100)
seed: Optional[int] = None
style: Optional[str] = "default"
class GenerationResponse(BaseModel):
job_id: str
status: str
queue_position: Optional[int] = None
estimated_time: Optional[float] = None
@app.post("/generate", response_model=GenerationResponse)
async def generate_image(request: GenerationRequest):
queue_size = redis_client.llen(QUEUE_NAME)
if queue_size >= MAX_QUEUE_SIZE:
raise HTTPException(503, "Service is at capacity. Please try again later.")
job_id = str(uuid.uuid4())
job = {
"job_id": job_id,
"prompt": request.prompt,
"negative_prompt": request.negative_prompt,
"width": request.width,
"height": request.height,
"steps": request.steps,
"seed": request.seed,
"style": request.style,
"created_at": datetime.utcnow().isoformat(),
"status": "queued",
}
redis_client.set(f"{RESULT_PREFIX}{job_id}", json.dumps(job), ex=3600)
redis_client.rpush(QUEUE_NAME, json.dumps(job))
avg_time = float(redis_client.get("avg_generation_time") or 15)
estimated = avg_time * (queue_size + 1)
return GenerationResponse(
job_id=job_id,
status="queued",
queue_position=queue_size + 1,
estimated_time=round(estimated, 1),
)
@app.get("/status/{job_id}")
async def get_status(job_id: str):
result = redis_client.get(f"{RESULT_PREFIX}{job_id}")
if not result:
raise HTTPException(404, "Job not found")
job = json.loads(result)
return {
"job_id": job_id,
"status": job.get("status", "unknown"),
"result_url": job.get("result_url"),
"error": job.get("error"),
"created_at": job.get("created_at"),
"completed_at": job.get("completed_at"),
}
@app.get("/health")
async def health():
try:
redis_client.ping()
queue_size = redis_client.llen(QUEUE_NAME)
return {
"status": "healthy",
"queue_size": queue_size,
"timestamp": datetime.utcnow().isoformat(),
}
except Exception as e:
raise HTTPException(503, f"Unhealthy: {e}")
@app.get("/metrics")
async def metrics():
return {
"queue_size": redis_client.llen(QUEUE_NAME),
"total_generated": int(redis_client.get("total_generated") or 0),
"avg_generation_time": float(redis_client.get("avg_generation_time") or 0),
"active_workers": int(redis_client.get("active_workers") or 0),
}
# GPU Worker (runs separately)
# def worker_loop():
# while True:
# job_data = redis_client.blpop(QUEUE_NAME, timeout=5)
# if not job_data:
# continue
#
# job = json.loads(job_data[1])
# job_id = job["job_id"]
#
# try:
# redis_client.incr("active_workers")
# job["status"] = "processing"
# redis_client.set(f"{RESULT_PREFIX}{job_id}", json.dumps(job), ex=3600)
#
# # Generate image with Stable Diffusion
# # image = pipeline(prompt=job["prompt"], ...)
# # Upload to S3
# # result_url = upload_to_s3(image)
#
# job["status"] = "completed"
# job["result_url"] = result_url
# job["completed_at"] = datetime.utcnow().isoformat()
# redis_client.set(f"{RESULT_PREFIX}{job_id}", json.dumps(job), ex=3600)
# redis_client.incr("total_generated")
#
# except Exception as e:
# job["status"] = "failed"
# job["error"] = str(e)
# redis_client.set(f"{RESULT_PREFIX}{job_id}", json.dumps(job), ex=3600)
# finally:
# redis_client.decr("active_workers")
Monitoring และ Disaster Recovery
ระบบ monitoring และ DR plan
#!/bin/bash
# ha_monitor.sh — HA Monitoring for Image Generation Service
set -euo pipefail
API_URL=""
ALERT_WEBHOOK=""
LOG="/var/log/image_service_monitor.log"
log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG"; }
alert() {
log "ALERT: $1"
[ -n "$ALERT_WEBHOOK" ] && curl -s -X POST "$ALERT_WEBHOOK" \
-H "Content-Type: application/json" \
-d "{\"text\":\"[Image Service] $1\"}" > /dev/null 2>&1 || true
}
ISSUES=0
# 1. Health check
log "Checking API health..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" "$API_URL/health" --max-time 5 || echo "000")
if [ "$HTTP_CODE" != "200" ]; then
alert "API health check FAILED (HTTP $HTTP_CODE)"
ISSUES=$((ISSUES + 1))
fi
# 2. Queue size
log "Checking queue size..."
METRICS=$(curl -s "$API_URL/metrics" --max-time 5 || echo '{}')
QUEUE_SIZE=$(echo "$METRICS" | python3 -c "import sys, json; print(json.load(sys.stdin).get('queue_size',0))" 2>/dev/null || echo 0)
if [ "$QUEUE_SIZE" -gt 5000 ]; then
alert "Queue size critically high: $QUEUE_SIZE"
ISSUES=$((ISSUES + 1))
elif [ "$QUEUE_SIZE" -gt 1000 ]; then
log "WARNING: Queue size elevated: $QUEUE_SIZE"
fi
# 3. Active workers
WORKERS=$(echo "$METRICS" | python3 -c "import sys, json; print(json.load(sys.stdin).get('active_workers',0))" 2>/dev/null || echo 0)
if [ "$WORKERS" -eq 0 ] && [ "$QUEUE_SIZE" -gt 0 ]; then
alert "No active workers but queue has $QUEUE_SIZE jobs!"
ISSUES=$((ISSUES + 1))
fi
# 4. GPU nodes
log "Checking GPU nodes..."
GPU_NODES=$(kubectl get nodes -l nvidia.com/gpu=present --no-headers 2>/dev/null | grep -c "Ready" || echo 0)
if [ "$GPU_NODES" -lt 2 ]; then
alert "Only $GPU_NODES GPU nodes available (minimum: 2)"
ISSUES=$((ISSUES + 1))
fi
# 5. Response time
log "Checking response time..."
RESPONSE_TIME=$(curl -s -o /dev/null -w "%{time_total}" "$API_URL/health" --max-time 10 || echo "99")
RT_MS=$(echo "$RESPONSE_TIME * 1000" | bc | cut -d. -f1)
if [ "$RT_MS" -gt 2000 ]; then
alert "API response time slow: ms"
ISSUES=$((ISSUES + 1))
fi
# 6. Disk space for models
DISK_USAGE=$(df /models 2>/dev/null | tail -1 | awk '{print $5}' | tr -d '%' || echo 0)
if [ "$DISK_USAGE" -gt 85 ]; then
alert "Model storage disk usage: %"
fi
# Summary
if [ "$ISSUES" -eq 0 ]; then
log "Health: OK (queue=$QUEUE_SIZE, workers=$WORKERS, gpus=$GPU_NODES, rt=ms)"
else
log "Health: $ISSUES issue(s) (queue=$QUEUE_SIZE, workers=$WORKERS)"
fi
# === Disaster Recovery Plan ===
# 1. API layer failure:
# - ALB automatically routes to healthy instances
# - HPA scales up new pods within 60 seconds
# - Failover to secondary region if primary is down
#
# 2. GPU worker failure:
# - Jobs remain in Redis queue (persistent)
# - Kubernetes restarts failed pods automatically
# - Scale up remaining workers to compensate
#
# 3. Redis failure:
# - Redis Cluster with 3 masters + 3 replicas
# - Automatic failover within seconds
# - Jobs in queue are replicated
#
# 4. Full region failure:
# - DNS failover to secondary region (Route53)
# - Secondary region has standby GPU nodes
# - S3 cross-region replication for generated images
# - RTO: 5 minutes, RPO: 1 minute
FAQ คำถามที่พบบ่อย
Q: Midjourney prompt ที่ดีต้องมีอะไรบ้าง?
A: Prompt ที่ดีต้องมี Subject ที่ชัดเจน (สิ่งที่ต้องการให้สร้าง), Style keyword (realistic, anime, watercolor), Lighting description (golden hour, studio lighting), Quality keywords (8k, ultra detailed, professional) และ Parameters ที่เหมาะสม (--ar สำหรับ ratio, --s สำหรับ stylization, --v สำหรับ version) หลีกเลี่ยง prompt ที่ยาวเกินไปและคลุมเครือ
Q: HA สำหรับ GPU workloads ต่างจาก CPU workloads อย่างไร?
A: GPU nodes มีราคาสูงกว่ามาก (10-50x) จึงต้อง optimize utilization ให้สูง ใช้ queue-based processing แทน synchronous requests เพราะ image generation ใช้เวลา 10-60 วินาที auto-scaling GPU nodes ช้ากว่า CPU (ต้อง provision GPU hardware) จึงต้องมี buffer capacity และ spot/preemptible instances ช่วยลดค่าใช้จ่ายแต่ต้องมี fallback
Q: ใช้ Stable Diffusion แทน Midjourney ได้ไหม?
A: ได้ Stable Diffusion เป็น open source สามารถ self-host ได้ ควบคุม infrastructure เอง ไม่มีค่า subscription SDXL และ SD 3.0 มีคุณภาพใกล้เคียง Midjourney ข้อดีคือ customizable ได้มาก train custom models ได้ ไม่มี content restrictions ข้อเสียคือต้องจัดการ GPU infrastructure เอง ต้อง tune parameters มากกว่า
Q: Queue-based architecture เหมาะกับ image generation อย่างไร?
A: Image generation ใช้เวลา 10-60 วินาทีต่อภาพ ถ้าใช้ synchronous API จะ timeout และ block connections Queue-based ทำให้ API ตอบกลับทันที (job ID) แล้ว workers process ใน background Users poll status หรือรับ webhook เมื่อเสร็จ ทำให้ handle traffic spikes ได้ดี jobs ไม่หายเมื่อ worker crash และ scale workers แยกจาก API ได้
