AWS Bedrock AI Pub Sub Architecture

โดย อ. บอมกิตติทัศน์เจริญพนาสิทธิ์ | อัปเดต 24 ก. พ. 2026 | อ่าน 16 นาที
- AWS Bedrock คืออะไร — Managed Foundation Model API
- Foundation Model ที่รองรับ — Claude, Llama, Titan
- เรียกใช้ Bedrock API พื้นฐาน
- Bedrock Knowledge Bases — RAG Architecture
- Bedrock Agents — AI ที่ทำงานอัตโนมัติ
- Pub/Sub Architecture คืออะไร — ภาพรวม
- AWS Messaging Services — SNS, SQS, EventBridge
- ออกแบบ AI Pipeline ด้วย Pub/Sub
- ตัวอย่าง: Async AI Chat ด้วย SQS + Bedrock
- ตัวอย่าง: Event-driven AI ด้วย EventBridge
- Streaming Response ด้วย WebSocket
- Cost Management และ Throttling
- Security และ Guardrails
- Best Practices และสรุป
AWS Bedrock คืออะไร — Managed Foundation Model API
Amazon Bedrock เป็น Fully Managed Service ที่ให้เรียกใช้ Foundation Model (FM) จาก Provider ชั้นนำผ่าน API เดียวโดยไม่ต้องจัดการ GPU Infrastructure, Model Deployment หรือ Scaling เองเพียง Enable Model ที่ต้องการใน Console แล้วเรียก API ได้ทันทีจ่ายตาม Token ที่ใช้จริง
Bedrock เหมาะกับองค์กรที่ต้องการนำ Generative AI มาใช้งานอย่างรวดเร็วโดยไม่ต้องลงทุน GPU Cluster หรือจ้าง ML Engineer เฉพาะทางเหมาะกับ Chatbot, Content Generation, Summarization, Code Generation, RAG (Retrieval-Augmented Generation) และ AI Agent
Foundation Model ที่รองรับ — Claude, Llama, Titan
| Provider | Model | จุดเด่น | ราคา (Input/Output per 1M tokens) |
|---|---|---|---|
| Anthropic | Claude 3.5 Sonnet | ฉลาดมากดี Coding, Analysis | $3 / $15 |
| Anthropic | Claude 3 Haiku | เร็วมากราคาถูก | $0.25 / $1.25 |
| Meta | Llama 3.1 405B | Open Source ใหญ่สุด | $5.32 / $16 |
| Meta | Llama 3.1 70B | สมดุล Performance/Cost | $2.65 / $3.50 |
| Amazon | Titan Text Premier | ราคาถูก AWS Native | $0.50 / $1.50 |
| Amazon | Titan Embeddings V2 | สำหรับ Vector Search | $0.02 / - |
| Cohere | Command R+ | ดี RAG, Multilingual | $3 / $15 |
| Stability AI | SDXL 1.0 | Image Generation | $0.04/image |
เรียกใช้ Bedrock API พื้นฐาน
# Python — เรียก Claude 3.5 Sonnet ผ่าน Bedrock
import boto3
import json
bedrock = boto3.client('bedrock-runtime', region_name='us-east-1')
response = bedrock.invoke_model(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
contentType='application/json',
accept='application/json',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'max_tokens': 4096,
'messages': [
{
'role': 'user',
'content': 'อธิบาย Pub/Sub Architecture เป็นภาษาไทยแบบเข้าใจง่าย'
}
],
'temperature': 0.7
})
)
result = json.loads(response['body'].read())
print(result['content'][0]['text'])
# Streaming Response
response = bedrock.invoke_model_with_response_stream(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
contentType='application/json',
accept='application/json',
body=json.dumps({...})
)
for event in response['body']:
chunk = json.loads(event['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
print(chunk['delta']['text'], end='', flush=True)
Bedrock Knowledge Bases — RAG Architecture
Knowledge Bases ให้สร้าง RAG (Retrieval-Augmented Generation) ได้ง่ายโดยอัปโหลดเอกสาร (PDF, HTML, Markdown, Word) ขึ้น S3 แล้ว Bedrock จะ Chunk, Embed และเก็บใน Vector Database อัตโนมัติเมื่อ User ถามคำถามระบบจะค้นหาเอกสารที่เกี่ยวข้องแล้วส่งให้ LLM สร้างคำตอบพร้อม Citation
# เรียกใช้ Knowledge Base
bedrock_agent = boto3.client('bedrock-agent-runtime', region_name='us-east-1')
response = bedrock_agent.retrieve_and_generate(
input={'text': 'วิธีตั้งค่า VPN บน Rocky Linux'},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': 'KB_ID_HERE',
'modelArn': 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-5-sonnet-20241022-v2:0'
}
}
)
print(response['output']['text'])
for citation in response['citations']:
print(f"Source: {citation['retrievedReferences'][0]['location']}")
Bedrock Agents — AI ที่ทำงานอัตโนมัติ
Bedrock Agents ให้สร้าง AI Agent ที่ทำงานอัตโนมัติได้เช่นรับคำสั่งจาก User → วางแผน → เรียก API/Tool → สรุปผลลัพธ์ตัวอย่าง Agent ที่รับ Order → ตรวจ Stock → คำนวณราคา → สร้าง Invoice ทุกขั้นตอนทำอัตโนมัติผ่าน Lambda Function ที่กำหนดเป็น Action Group
Pub/Sub Architecture คืออะไร — ภาพรวม
Pub/Sub (Publish/Subscribe) เป็น Messaging Pattern ที่แยก Producer (Publisher) กับ Consumer (Subscriber) ออกจากกัน Publisher ส่ง Message ไปยัง Topic/Channel โดยไม่ต้องรู้ว่า Subscriber คือใครส่วน Subscriber ลงทะเบียนรับ Message จาก Topic ที่สนใจเมื่อมี Message ใหม่จะได้รับอัตโนมัติ
ข้อดีของ Pub/Sub ได้แก่ Decoupling Publisher ไม่ต้องรู้จัก Subscriber, Scalability เพิ่ม Subscriber ได้ไม่จำกัด, Reliability Message ไม่หายแม้ Subscriber ล่ม (ถ้ามี Queue) และ Flexibility เพิ่ม/ลด Consumer ได้โดยไม่กระทบ Producer
AWS Messaging Services — SNS, SQS, EventBridge
| Service | Pattern | เหมาะกับ |
|---|---|---|
| SNS (Simple Notification Service) | Pub/Sub Topic | Fan-out ส่ง Message ไปหลาย Subscriber พร้อมกัน |
| SQS (Simple Queue Service) | Point-to-Point Queue | Buffer Message ให้ Consumer Process ทีละตัว |
| EventBridge | Event Bus + Rules | Event-driven Architecture กรอง Event ตามเงื่อนไข |
| SNS + SQS (Fan-out) | Pub/Sub + Queue | กระจาย Event ไปหลาย Queue แต่ละ Queue มี Consumer แยก |
ออกแบบ AI Pipeline ด้วย Pub/Sub
AI Inference ใช้เวลานาน (1-30 วินาที) ถ้าทำ Synchronous จะ Block User และ Timeout ได้ง่ายใช้ Pub/Sub ให้ User ส่ง Request แล้วได้ผลลัพธ์ทีหลัง
Flow: User → API Gateway → SQS (Request Queue) → Lambda Worker → Bedrock API → SQS (Response Queue) → WebSocket/Callback → User
ข้อดีของ Pattern นี้ได้แก่ User ไม่ต้องรอ (Async), Buffer Request ป้องกัน Bedrock Throttling, Retry อัตโนมัติเมื่อ Bedrock Error, Scale Worker ตาม Queue Depth และ Dead Letter Queue สำหรับ Failed Request
ตัวอย่าง: Async AI Chat ด้วย SQS + Bedrock
# Lambda Worker — อ่านจาก SQS แล้วเรียก Bedrock
import boto3
import json
bedrock = boto3.client('bedrock-runtime')
sqs = boto3.client('sqs')
RESPONSE_QUEUE = 'https://sqs.ap-southeast-1.amazonaws.com/123456789/ai-responses'
def handler(event, context):
for record in event['Records']:
body = json.loads(record['body'])
request_id = body['request_id']
user_message = body['message']
try:
# เรียก Bedrock
response = bedrock.invoke_model(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'max_tokens': 4096,
'messages': [{'role': 'user', 'content': user_message}]
})
)
result = json.loads(response['body'].read())
ai_response = result['content'][0]['text']
# ส่งผลลัพธ์กลับ Response Queue
sqs.send_message(
QueueUrl=RESPONSE_QUEUE,
MessageBody=json.dumps({
'request_id': request_id,
'status': 'success',
'response': ai_response,
'usage': result.get('usage', {})
})
)
except Exception as e:
sqs.send_message(
QueueUrl=RESPONSE_QUEUE,
MessageBody=json.dumps({
'request_id': request_id,
'status': 'error',
'error': str(e)
})
)
ตัวอย่าง: Event-driven AI ด้วย EventBridge
# EventBridge Rule: เมื่อ S3 มีไฟล์ใหม่ → สรุปเนื้อหาด้วย AI
# rule.json
{
"source": ["aws.s3"],
"detail-type": ["Object Created"],
"detail": {
"bucket": {"name": ["documents-bucket"]},
"object": {"key": [{"suffix": ".pdf"}]}
}
}
# Lambda Target: อ่าน PDF → สรุปด้วย Bedrock → เก็บผลใน DynamoDB
import boto3
import json
s3 = boto3.client('s3')
bedrock = boto3.client('bedrock-runtime')
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('document-summaries')
def handler(event, context):
bucket = event['detail']['bucket']['name']
key = event['detail']['object']['key']
# อ่านเนื้อหา (สมมติแปลง PDF เป็น Text แล้ว)
obj = s3.get_object(Bucket=bucket, Key=key)
content = obj['Body'].read().decode('utf-8')[:10000]
# สรุปด้วย Bedrock
response = bedrock.invoke_model(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'max_tokens': 1000,
'messages': [{'role': 'user', 'content': f'สรุปเอกสารนี้เป็นภาษาไทย 3 ย่อหน้า:\n\n{content}'}]
})
)
summary = json.loads(response['body'].read())['content'][0]['text']
# เก็บผลใน DynamoDB
table.put_item(Item={
'document_key': key,
'summary': summary,
'processed_at': context.get_remaining_time_in_millis()
})
Streaming Response ด้วย WebSocket
สำหรับ Chat Application ที่ต้องการแสดงผลแบบ Real-time (ตัวอักษรทีละตัว) ใช้ API Gateway WebSocket + Lambda + Bedrock Streaming
# WebSocket Handler — Stream Bedrock Response กลับ Client
import boto3
import json
bedrock = boto3.client('bedrock-runtime')
apigw = boto3.client('apigatewaymanagementapi',
endpoint_url='https://xxxx.execute-api.ap-southeast-1.amazonaws.com/prod')
def handler(event, context):
connection_id = event['requestContext']['connectionId']
body = json.loads(event['body'])
response = bedrock.invoke_model_with_response_stream(
modelId='anthropic.claude-3-5-sonnet-20241022-v2:0',
body=json.dumps({
'anthropic_version': 'bedrock-2023-05-31',
'max_tokens': 4096,
'messages': [{'role': 'user', 'content': body['message']}],
'stream': True
})
)
for event_stream in response['body']:
chunk = json.loads(event_stream['chunk']['bytes'])
if chunk['type'] == 'content_block_delta':
apigw.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({
'type': 'delta',
'text': chunk['delta']['text']
})
)
apigw.post_to_connection(
ConnectionId=connection_id,
Data=json.dumps({'type': 'done'})
)
Cost Management และ Throttling
- Token Budget — กำหนด max_tokens ให้พอดีไม่ใช่ตั้งสูงสุดเสมอลด Output Token = ลดค่าใช้จ่าย
- Model Selection — ใช้ Haiku สำหรับงานง่ายใช้ Sonnet สำหรับงานยากไม่ต้องใช้ Model ใหญ่ทุกงาน
- Caching — Cache คำตอบสำหรับคำถามที่ซ้ำลดการเรียก API
- Provisioned Throughput — สำหรับ Workload ที่คาดเดาได้ลดราคาต่อ Token
- SQS + Rate Limiting — ใช้ SQS Buffer Request ตั้ง Lambda Concurrency ไม่ให้เกิน Bedrock Quota
- CloudWatch Billing Alarm — ตั้ง Alert เมื่อค่าใช้จ่ายเกิน Budget
Security และ Guardrails
- Bedrock Guardrails — กำหนด Content Filter ป้องกัน Harmful Content, PII Redaction ลบข้อมูลส่วนบุคคลออกก่อนส่งให้ LLM
- IAM Policy — จำกัดว่าใครเรียก Model ไหนได้ใช้ Resource-level Permission
- VPC Endpoint — เรียก Bedrock ผ่าน Private Link ไม่ผ่าน Internet
- CloudTrail Logging — Log ทุก API Call สำหรับ Audit
- Model Invocation Logging — เก็บ Input/Output ทุก Request สำหรับ Compliance
Best Practices และสรุป
- ใช้ Async Pattern สำหรับ AI Workload — อย่าทำ Synchronous ที่ API Gateway (Timeout 30 วินาที)
- เลือก Model ตาม Use Case — ไม่ต้องใช้ Model ใหญ่ที่สุดเสมอ
- ใช้ RAG แทน Fine-tuning — สำหรับ Domain Knowledge ส่วนใหญ่ RAG เพียงพอ
- ตั้ง Guardrails — ป้องกัน Harmful Content และ PII Leakage
- Monitor Token Usage — ตั้ง Budget Alert ป้องกันค่าใช้จ่ายเกิน
- ใช้ SNS+SQS Fan-out — สำหรับ Event ที่ต้อง Trigger หลาย AI Pipeline
- Retry ด้วย Exponential Backoff — Bedrock อาจ Throttle ใช้ SQS Retry อัตโนมัติ
- Cache คำตอบที่ซ้ำ — ลดค่าใช้จ่ายและ Latency
AWS Bedrock + Pub/Sub Architecture เป็นรูปแบบที่เหมาะสมสำหรับ AI Application ระดับ Production ที่ต้อง Scale ได้ Reliable และ Cost-effective ติดตามบทความใหม่ๆได้ที่ SiamCafe.net
Q: AWS Bedrock คืออะไร
Managed Service เรียกใช้ Foundation Model (Claude, Llama, Titan) ผ่าน API ไม่ต้องจัดการ GPU จ่ายตาม Token ที่ใช้จริง