Postman Newman Pub Sub Architecture คืออะไร
Postman เป็น API development platform ที่ใช้สร้าง ทดสอบ และ document APIs ส่วน Newman เป็น command-line collection runner ที่รัน Postman collections ใน CI/CD pipelines Pub/Sub (Publish/Subscribe) Architecture เป็น messaging pattern ที่ publishers ส่ง messages ไปยัง topics และ subscribers รับ messages จาก topics ที่สนใจ โดยไม่ต้องรู้จักกัน การรวม Newman กับ Pub/Sub ช่วยให้ทดสอบ event-driven systems ได้อัตโนมัติ ตรวจสอบ message flow, schema validation และ end-to-end integration
Pub/Sub Architecture Fundamentals
# pubsub_basics.py — Pub/Sub architecture overview
import json
class PubSubBasics:
COMPONENTS = {
"publisher": {
"name": "Publisher",
"description": "ส่ง messages ไปยัง topics — ไม่ต้องรู้ว่าใคร subscribe",
"examples": "API services, IoT devices, user actions",
},
"topic": {
"name": "Topic / Channel",
"description": "ช่องทางจัดกลุ่ม messages — publishers ส่งไป, subscribers รับจาก",
"examples": "orders.created, payments.completed, users.registered",
},
"subscriber": {
"name": "Subscriber",
"description": "รับ messages จาก topics ที่สนใจ — process asynchronously",
"examples": "Email service, analytics, inventory update",
},
"broker": {
"name": "Message Broker",
"description": "ตัวกลางจัดการ messages — routing, delivery guarantee, persistence",
"examples": "Google Pub/Sub, AWS SNS/SQS, RabbitMQ, Kafka, Redis Streams",
},
}
PATTERNS = {
"fan_out": "Fan-out — 1 publisher, หลาย subscribers รับ message เดียวกัน",
"fan_in": "Fan-in — หลาย publishers ส่งไป topic เดียว, 1 subscriber",
"filtering": "Message Filtering — subscriber เลือกรับเฉพาะ messages ที่ตรงเงื่อนไข",
"dead_letter": "Dead Letter Queue — messages ที่ process ไม่สำเร็จ → DLQ สำหรับ retry/investigate",
}
def show_components(self):
print("=== Pub/Sub Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print(f" Examples: {comp['examples']}")
print()
def show_patterns(self):
print("=== Messaging Patterns ===")
for key, desc in self.PATTERNS.items():
print(f" [{key}] {desc}")
basics = PubSubBasics()
basics.show_components()
basics.show_patterns()
Newman Collection for Pub/Sub Testing
# newman_collection.py — Postman collection for Pub/Sub testing
import json
class NewmanCollection:
COLLECTION = """
{
"info": {
"name": "Pub/Sub API Tests",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
{
"name": "1. Publish Order Event",
"request": {
"method": "POST",
"url": "{{base_url}}/api/events/publish",
"header": [
{"key": "Content-Type", "value": "application/json"},
{"key": "Authorization", "value": "Bearer {{api_key}}"}
],
"body": {
"mode": "raw",
"raw": {
"topic": "orders.created",
"message": {
"order_id": "ORD-{{$randomInt}}",
"customer_id": "CUST-001",
"total": 1500.00,
"items": [{"sku": "SKU-001", "qty": 2}],
"timestamp": "{{$isoTimestamp}}"
}
}
}
},
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test('Publish success', function() {",
" pm.response.to.have.status(200);",
" var json = pm.response.json();",
" pm.expect(json.message_id).to.exist;",
" pm.environment.set('last_message_id', json.message_id);",
"});"
]
}
}
]
},
{
"name": "2. Check Subscriber Received",
"request": {
"method": "GET",
"url": "{{base_url}}/api/subscriptions/orders/messages?limit=1"
},
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test('Subscriber received message', function() {",
" pm.response.to.have.status(200);",
" var json = pm.response.json();",
" pm.expect(json.messages).to.have.length.greaterThan(0);",
" pm.expect(json.messages[0].topic).to.equal('orders.created');",
"});"
]
}
}
]
},
{
"name": "3. Validate Message Schema",
"request": {
"method": "GET",
"url": "{{base_url}}/api/events/{{last_message_id}}"
},
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test('Schema validation', function() {",
" var json = pm.response.json();",
" pm.expect(json).to.have.property('order_id');",
" pm.expect(json).to.have.property('customer_id');",
" pm.expect(json).to.have.property('total');",
" pm.expect(json.total).to.be.a('number');",
" pm.expect(json).to.have.property('timestamp');",
"});"
]
}
}
]
}
]
}
"""
def show_collection(self):
print("=== Postman Collection ===")
print(self.COLLECTION[:600])
collection = NewmanCollection()
collection.show_collection()
Newman CI/CD Integration
# newman_cicd.py — Newman in CI/CD for Pub/Sub testing
import json
class NewmanCICD:
GITHUB_ACTIONS = """
# .github/workflows/pubsub-test.yml
name: Pub/Sub API Tests
on:
push:
branches: [main, develop]
schedule:
- cron: '0 */6 * * *' # Every 6 hours
jobs:
pubsub-tests:
runs-on: ubuntu-latest
services:
redis:
image: redis:7-alpine
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- name: Setup Node.js
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install Newman
run: npm install -g newman newman-reporter-htmlextra
- name: Start Pub/Sub Service
run: |
docker compose up -d pubsub-service
sleep 10 # Wait for service
- name: Run Pub/Sub Tests
run: |
newman run collections/pubsub-tests.json \\
--environment environments/staging.json \\
--reporters cli, htmlextra, junit \\
--reporter-htmlextra-export reports/pubsub-report.html \\
--reporter-junit-export reports/pubsub-junit.xml \\
--iteration-count 5 \\
--delay-request 2000
- name: Run Schema Validation Tests
run: |
newman run collections/schema-validation.json \\
--environment environments/staging.json \\
--reporters cli, junit \\
--reporter-junit-export reports/schema-junit.xml
- name: Upload Reports
if: always()
uses: actions/upload-artifact@v4
with:
name: test-reports
path: reports/
- name: Notify on Failure
if: failure()
uses: slackapi/slack-github-action@v1
with:
payload: |
{"text": "Pub/Sub tests FAILED on }"}
"""
NEWMAN_SCRIPT = """
# run_pubsub_tests.sh — Newman test runner
#!/bin/bash
set -e
echo "=== Running Pub/Sub Tests ==="
# 1. Publish events
newman run collections/publish-events.json \\
--environment environments/staging.json \\
--iteration-count 10 \\
--delay-request 500
echo "Waiting for message propagation..."
sleep 5
# 2. Verify subscribers received
newman run collections/verify-subscribers.json \\
--environment environments/staging.json \\
--reporters cli, htmlextra \\
--reporter-htmlextra-export reports/subscriber-report.html
# 3. Validate dead letter queue
newman run collections/dlq-check.json \\
--environment environments/staging.json
echo "=== All Pub/Sub Tests Passed ==="
"""
def show_github(self):
print("=== GitHub Actions ===")
print(self.GITHUB_ACTIONS[:600])
def show_script(self):
print("\n=== Test Runner Script ===")
print(self.NEWMAN_SCRIPT[:400])
cicd = NewmanCICD()
cicd.show_github()
cicd.show_script()
Python Pub/Sub Test Framework
# pubsub_test.py — Python test framework for Pub/Sub
import json
class PubSubTestFramework:
CODE = """
# pubsub_tester.py — Test Pub/Sub message flow
import json
import time
import redis
import requests
from datetime import datetime
class PubSubTester:
def __init__(self, api_url, redis_url="redis://localhost:6379"):
self.api_url = api_url
self.redis = redis.from_url(redis_url)
def publish_event(self, topic, payload):
'''Publish event via API'''
resp = requests.post(f"{self.api_url}/events/publish", json={
"topic": topic,
"message": payload,
"timestamp": datetime.utcnow().isoformat(),
})
return resp.json()
def wait_for_message(self, subscription, timeout=10):
'''Wait for message on subscription'''
start = time.time()
while time.time() - start < timeout:
msg = self.redis.lpop(f"sub:{subscription}")
if msg:
return json.loads(msg)
time.sleep(0.5)
return None
def test_publish_subscribe(self, topic, subscription, payload):
'''Test full pub/sub flow'''
# Publish
pub_result = self.publish_event(topic, payload)
assert 'message_id' in pub_result, "Publish failed"
# Wait for subscriber
received = self.wait_for_message(subscription, timeout=10)
assert received is not None, f"Message not received within timeout"
assert received.get('topic') == topic, f"Wrong topic: {received.get('topic')}"
return {
'status': 'pass',
'message_id': pub_result['message_id'],
'latency_ms': (time.time() - float(received.get('published_at', 0))) * 1000,
}
def test_message_ordering(self, topic, count=10):
'''Test message ordering'''
sent_ids = []
for i in range(count):
result = self.publish_event(topic, {"sequence": i})
sent_ids.append(result['message_id'])
time.sleep(5)
received_ids = []
for _ in range(count):
msg = self.wait_for_message(f"{topic}-ordered", timeout=5)
if msg:
received_ids.append(msg['message_id'])
ordered = sent_ids == received_ids
return {
'status': 'pass' if ordered else 'fail',
'sent': len(sent_ids),
'received': len(received_ids),
'in_order': ordered,
}
def test_dead_letter_queue(self, topic):
'''Test DLQ handling'''
# Publish invalid message
result = self.publish_event(topic, {"invalid": True, "__force_error": True})
time.sleep(5)
dlq_msg = self.wait_for_message(f"dlq:{topic}", timeout=10)
return {
'status': 'pass' if dlq_msg else 'fail',
'dlq_message': dlq_msg,
}
# tester = PubSubTester("http://localhost:8080/api")
# result = tester.test_publish_subscribe("orders.created", "order-processor", {"order_id": "TEST-001"})
"""
def show_code(self):
print("=== Pub/Sub Test Framework ===")
print(self.CODE[:600])
framework = PubSubTestFramework()
framework.show_code()
Monitoring & Reporting
# monitoring.py — Pub/Sub test monitoring
import json
import random
class PubSubMonitoring:
METRICS = {
"publish_latency": "เวลาที่ใช้ publish message (P50, P95, P99)",
"subscribe_latency": "เวลาตั้งแต่ publish ถึง subscriber ได้รับ",
"message_throughput": "จำนวน messages ต่อวินาที",
"delivery_rate": "% messages ที่ deliver สำเร็จ",
"dlq_rate": "% messages ที่เข้า Dead Letter Queue",
"test_pass_rate": "% Newman test cases ที่ผ่าน",
}
NEWMAN_REPORT = """
# Newman HTML Report Configuration
newman run collection.json \\
--reporters cli, htmlextra, junit \\
--reporter-htmlextra-export report.html \\
--reporter-htmlextra-title "Pub/Sub Test Report" \\
--reporter-htmlextra-showOnlyFails \\
--reporter-htmlextra-browserTitle "Pub/Sub Tests" \\
--reporter-junit-export junit-report.xml
"""
def show_metrics(self):
print("=== Key Metrics ===\n")
for metric, desc in self.METRICS.items():
print(f" [{metric}] {desc}")
def show_report_config(self):
print(f"\n=== Newman Report ===")
print(self.NEWMAN_REPORT)
def sample_results(self):
print(f"\n=== Test Results ===")
print(f" Total Tests: {random.randint(20, 50)}")
print(f" Passed: {random.randint(18, 48)}")
print(f" Failed: {random.randint(0, 3)}")
print(f" Publish Latency P95: {random.randint(5, 50)}ms")
print(f" Subscribe Latency P95: {random.randint(20, 200)}ms")
print(f" Delivery Rate: {random.uniform(99.0, 100):.2f}%")
print(f" DLQ Messages: {random.randint(0, 5)}")
mon = PubSubMonitoring()
mon.show_metrics()
mon.show_report_config()
mon.sample_results()
FAQ - คำถามที่พบบ่อย
Q: Newman ทดสอบ Pub/Sub ได้จริงหรือ?
A: ได้ — แต่ทดสอบผ่าน HTTP API ไม่ใช่ subscribe โดยตรง วิธี: Publish ผ่าน API → Wait → Check subscriber state ผ่าน API ข้อจำกัด: ไม่สามารถ subscribe real-time ได้ — ต้องใช้ polling หรือ webhook เสริม: ใช้ Python/Node test framework สำหรับ real-time subscribe testing
Q: Kafka กับ Google Pub/Sub อันไหนดีกว่า?
A: Kafka: self-hosted, high throughput, message replay, ordering guarantee, complex setup Google Pub/Sub: managed service, auto-scale, no ops, pay-per-use, simpler เลือก Kafka: ถ้าต้องการ control เต็ม, message replay, high volume (millions/sec) เลือก Pub/Sub: ถ้าอยู่บน GCP, ต้องการ managed service, ทีมเล็ก
Q: Message ordering สำคัญไหม?
A: ขึ้นกับ use case: สำคัญ: financial transactions, event sourcing, state machine transitions ไม่สำคัญ: notifications, analytics events, batch processing Kafka: ordering guarantee per partition Google Pub/Sub: ordering ด้วย ordering key RabbitMQ: FIFO per queue Test: ใช้ sequence numbers ตรวจสอบ ordering ใน Newman/Python tests
Q: Dead Letter Queue จำเป็นไหม?
A: จำเป็นมากสำหรับ production — messages ที่ process ไม่สำเร็จต้องมีที่ไป ไม่มี DLQ: messages หายไป หรือ retry loop ไม่สิ้นสุด → block queue มี DLQ: failed messages ไป DLQ → investigate, fix, replay ได้ ทุก Pub/Sub system ควรมี DLQ + monitoring + alerting เมื่อ DLQ มี messages
