SiamCafe.net Blog
Technology

Postman Newman Pub Sub Architecture

postman newman pub sub architecture
Postman Newman Pub Sub Architecture | SiamCafe Blog
2025-11-04· อ. บอม — SiamCafe.net· 1,446 คำ

Postman Newman Pub Sub Architecture คืออะไร

Postman เป็น API development platform ที่ใช้สร้าง ทดสอบ และ document APIs ส่วน Newman เป็น command-line collection runner ที่รัน Postman collections ใน CI/CD pipelines Pub/Sub (Publish/Subscribe) Architecture เป็น messaging pattern ที่ publishers ส่ง messages ไปยัง topics และ subscribers รับ messages จาก topics ที่สนใจ โดยไม่ต้องรู้จักกัน การรวม Newman กับ Pub/Sub ช่วยให้ทดสอบ event-driven systems ได้อัตโนมัติ ตรวจสอบ message flow, schema validation และ end-to-end integration

Pub/Sub Architecture Fundamentals

# pubsub_basics.py — Pub/Sub architecture overview
import json

class PubSubBasics:
    COMPONENTS = {
        "publisher": {
            "name": "Publisher",
            "description": "ส่ง messages ไปยัง topics — ไม่ต้องรู้ว่าใคร subscribe",
            "examples": "API services, IoT devices, user actions",
        },
        "topic": {
            "name": "Topic / Channel",
            "description": "ช่องทางจัดกลุ่ม messages — publishers ส่งไป, subscribers รับจาก",
            "examples": "orders.created, payments.completed, users.registered",
        },
        "subscriber": {
            "name": "Subscriber",
            "description": "รับ messages จาก topics ที่สนใจ — process asynchronously",
            "examples": "Email service, analytics, inventory update",
        },
        "broker": {
            "name": "Message Broker",
            "description": "ตัวกลางจัดการ messages — routing, delivery guarantee, persistence",
            "examples": "Google Pub/Sub, AWS SNS/SQS, RabbitMQ, Kafka, Redis Streams",
        },
    }

    PATTERNS = {
        "fan_out": "Fan-out — 1 publisher, หลาย subscribers รับ message เดียวกัน",
        "fan_in": "Fan-in — หลาย publishers ส่งไป topic เดียว, 1 subscriber",
        "filtering": "Message Filtering — subscriber เลือกรับเฉพาะ messages ที่ตรงเงื่อนไข",
        "dead_letter": "Dead Letter Queue — messages ที่ process ไม่สำเร็จ → DLQ สำหรับ retry/investigate",
    }

    def show_components(self):
        print("=== Pub/Sub Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print(f"  Examples: {comp['examples']}")
            print()

    def show_patterns(self):
        print("=== Messaging Patterns ===")
        for key, desc in self.PATTERNS.items():
            print(f"  [{key}] {desc}")

basics = PubSubBasics()
basics.show_components()
basics.show_patterns()

Newman Collection for Pub/Sub Testing

# newman_collection.py — Postman collection for Pub/Sub testing
import json

class NewmanCollection:
    COLLECTION = """
{
  "info": {
    "name": "Pub/Sub API Tests",
    "schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
  },
  "item": [
    {
      "name": "1. Publish Order Event",
      "request": {
        "method": "POST",
        "url": "{{base_url}}/api/events/publish",
        "header": [
          {"key": "Content-Type", "value": "application/json"},
          {"key": "Authorization", "value": "Bearer {{api_key}}"}
        ],
        "body": {
          "mode": "raw",
          "raw": {
            "topic": "orders.created",
            "message": {
              "order_id": "ORD-{{$randomInt}}",
              "customer_id": "CUST-001",
              "total": 1500.00,
              "items": [{"sku": "SKU-001", "qty": 2}],
              "timestamp": "{{$isoTimestamp}}"
            }
          }
        }
      },
      "event": [
        {
          "listen": "test",
          "script": {
            "exec": [
              "pm.test('Publish success', function() {",
              "  pm.response.to.have.status(200);",
              "  var json = pm.response.json();",
              "  pm.expect(json.message_id).to.exist;",
              "  pm.environment.set('last_message_id', json.message_id);",
              "});"
            ]
          }
        }
      ]
    },
    {
      "name": "2. Check Subscriber Received",
      "request": {
        "method": "GET",
        "url": "{{base_url}}/api/subscriptions/orders/messages?limit=1"
      },
      "event": [
        {
          "listen": "test",
          "script": {
            "exec": [
              "pm.test('Subscriber received message', function() {",
              "  pm.response.to.have.status(200);",
              "  var json = pm.response.json();",
              "  pm.expect(json.messages).to.have.length.greaterThan(0);",
              "  pm.expect(json.messages[0].topic).to.equal('orders.created');",
              "});"
            ]
          }
        }
      ]
    },
    {
      "name": "3. Validate Message Schema",
      "request": {
        "method": "GET",
        "url": "{{base_url}}/api/events/{{last_message_id}}"
      },
      "event": [
        {
          "listen": "test",
          "script": {
            "exec": [
              "pm.test('Schema validation', function() {",
              "  var json = pm.response.json();",
              "  pm.expect(json).to.have.property('order_id');",
              "  pm.expect(json).to.have.property('customer_id');",
              "  pm.expect(json).to.have.property('total');",
              "  pm.expect(json.total).to.be.a('number');",
              "  pm.expect(json).to.have.property('timestamp');",
              "});"
            ]
          }
        }
      ]
    }
  ]
}
"""

    def show_collection(self):
        print("=== Postman Collection ===")
        print(self.COLLECTION[:600])

collection = NewmanCollection()
collection.show_collection()

Newman CI/CD Integration

# newman_cicd.py — Newman in CI/CD for Pub/Sub testing
import json

class NewmanCICD:
    GITHUB_ACTIONS = """
# .github/workflows/pubsub-test.yml
name: Pub/Sub API Tests
on:
  push:
    branches: [main, develop]
  schedule:
    - cron: '0 */6 * * *'  # Every 6 hours

jobs:
  pubsub-tests:
    runs-on: ubuntu-latest
    services:
      redis:
        image: redis:7-alpine
        ports:
          - 6379:6379
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Node.js
        uses: actions/setup-node@v4
        with:
          node-version: '20'
      
      - name: Install Newman
        run: npm install -g newman newman-reporter-htmlextra
      
      - name: Start Pub/Sub Service
        run: |
          docker compose up -d pubsub-service
          sleep 10  # Wait for service
      
      - name: Run Pub/Sub Tests
        run: |
          newman run collections/pubsub-tests.json \\
            --environment environments/staging.json \\
            --reporters cli, htmlextra, junit \\
            --reporter-htmlextra-export reports/pubsub-report.html \\
            --reporter-junit-export reports/pubsub-junit.xml \\
            --iteration-count 5 \\
            --delay-request 2000
      
      - name: Run Schema Validation Tests
        run: |
          newman run collections/schema-validation.json \\
            --environment environments/staging.json \\
            --reporters cli, junit \\
            --reporter-junit-export reports/schema-junit.xml
      
      - name: Upload Reports
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: test-reports
          path: reports/
      
      - name: Notify on Failure
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {"text": "Pub/Sub tests FAILED on }"}
"""

    NEWMAN_SCRIPT = """
# run_pubsub_tests.sh — Newman test runner
#!/bin/bash
set -e

echo "=== Running Pub/Sub Tests ==="

# 1. Publish events
newman run collections/publish-events.json \\
  --environment environments/staging.json \\
  --iteration-count 10 \\
  --delay-request 500

echo "Waiting for message propagation..."
sleep 5

# 2. Verify subscribers received
newman run collections/verify-subscribers.json \\
  --environment environments/staging.json \\
  --reporters cli, htmlextra \\
  --reporter-htmlextra-export reports/subscriber-report.html

# 3. Validate dead letter queue
newman run collections/dlq-check.json \\
  --environment environments/staging.json

echo "=== All Pub/Sub Tests Passed ==="
"""

    def show_github(self):
        print("=== GitHub Actions ===")
        print(self.GITHUB_ACTIONS[:600])

    def show_script(self):
        print("\n=== Test Runner Script ===")
        print(self.NEWMAN_SCRIPT[:400])

cicd = NewmanCICD()
cicd.show_github()
cicd.show_script()

Python Pub/Sub Test Framework

# pubsub_test.py — Python test framework for Pub/Sub
import json

class PubSubTestFramework:
    CODE = """
# pubsub_tester.py — Test Pub/Sub message flow
import json
import time
import redis
import requests
from datetime import datetime

class PubSubTester:
    def __init__(self, api_url, redis_url="redis://localhost:6379"):
        self.api_url = api_url
        self.redis = redis.from_url(redis_url)
    
    def publish_event(self, topic, payload):
        '''Publish event via API'''
        resp = requests.post(f"{self.api_url}/events/publish", json={
            "topic": topic,
            "message": payload,
            "timestamp": datetime.utcnow().isoformat(),
        })
        return resp.json()
    
    def wait_for_message(self, subscription, timeout=10):
        '''Wait for message on subscription'''
        start = time.time()
        while time.time() - start < timeout:
            msg = self.redis.lpop(f"sub:{subscription}")
            if msg:
                return json.loads(msg)
            time.sleep(0.5)
        return None
    
    def test_publish_subscribe(self, topic, subscription, payload):
        '''Test full pub/sub flow'''
        # Publish
        pub_result = self.publish_event(topic, payload)
        assert 'message_id' in pub_result, "Publish failed"
        
        # Wait for subscriber
        received = self.wait_for_message(subscription, timeout=10)
        assert received is not None, f"Message not received within timeout"
        assert received.get('topic') == topic, f"Wrong topic: {received.get('topic')}"
        
        return {
            'status': 'pass',
            'message_id': pub_result['message_id'],
            'latency_ms': (time.time() - float(received.get('published_at', 0))) * 1000,
        }
    
    def test_message_ordering(self, topic, count=10):
        '''Test message ordering'''
        sent_ids = []
        for i in range(count):
            result = self.publish_event(topic, {"sequence": i})
            sent_ids.append(result['message_id'])
        
        time.sleep(5)
        
        received_ids = []
        for _ in range(count):
            msg = self.wait_for_message(f"{topic}-ordered", timeout=5)
            if msg:
                received_ids.append(msg['message_id'])
        
        ordered = sent_ids == received_ids
        return {
            'status': 'pass' if ordered else 'fail',
            'sent': len(sent_ids),
            'received': len(received_ids),
            'in_order': ordered,
        }
    
    def test_dead_letter_queue(self, topic):
        '''Test DLQ handling'''
        # Publish invalid message
        result = self.publish_event(topic, {"invalid": True, "__force_error": True})
        
        time.sleep(5)
        
        dlq_msg = self.wait_for_message(f"dlq:{topic}", timeout=10)
        return {
            'status': 'pass' if dlq_msg else 'fail',
            'dlq_message': dlq_msg,
        }

# tester = PubSubTester("http://localhost:8080/api")
# result = tester.test_publish_subscribe("orders.created", "order-processor", {"order_id": "TEST-001"})
"""

    def show_code(self):
        print("=== Pub/Sub Test Framework ===")
        print(self.CODE[:600])

framework = PubSubTestFramework()
framework.show_code()

Monitoring & Reporting

# monitoring.py — Pub/Sub test monitoring
import json
import random

class PubSubMonitoring:
    METRICS = {
        "publish_latency": "เวลาที่ใช้ publish message (P50, P95, P99)",
        "subscribe_latency": "เวลาตั้งแต่ publish ถึง subscriber ได้รับ",
        "message_throughput": "จำนวน messages ต่อวินาที",
        "delivery_rate": "% messages ที่ deliver สำเร็จ",
        "dlq_rate": "% messages ที่เข้า Dead Letter Queue",
        "test_pass_rate": "% Newman test cases ที่ผ่าน",
    }

    NEWMAN_REPORT = """
# Newman HTML Report Configuration
newman run collection.json \\
  --reporters cli, htmlextra, junit \\
  --reporter-htmlextra-export report.html \\
  --reporter-htmlextra-title "Pub/Sub Test Report" \\
  --reporter-htmlextra-showOnlyFails \\
  --reporter-htmlextra-browserTitle "Pub/Sub Tests" \\
  --reporter-junit-export junit-report.xml
"""

    def show_metrics(self):
        print("=== Key Metrics ===\n")
        for metric, desc in self.METRICS.items():
            print(f"  [{metric}] {desc}")

    def show_report_config(self):
        print(f"\n=== Newman Report ===")
        print(self.NEWMAN_REPORT)

    def sample_results(self):
        print(f"\n=== Test Results ===")
        print(f"  Total Tests: {random.randint(20, 50)}")
        print(f"  Passed: {random.randint(18, 48)}")
        print(f"  Failed: {random.randint(0, 3)}")
        print(f"  Publish Latency P95: {random.randint(5, 50)}ms")
        print(f"  Subscribe Latency P95: {random.randint(20, 200)}ms")
        print(f"  Delivery Rate: {random.uniform(99.0, 100):.2f}%")
        print(f"  DLQ Messages: {random.randint(0, 5)}")

mon = PubSubMonitoring()
mon.show_metrics()
mon.show_report_config()
mon.sample_results()

FAQ - คำถามที่พบบ่อย

Q: Newman ทดสอบ Pub/Sub ได้จริงหรือ?

A: ได้ — แต่ทดสอบผ่าน HTTP API ไม่ใช่ subscribe โดยตรง วิธี: Publish ผ่าน API → Wait → Check subscriber state ผ่าน API ข้อจำกัด: ไม่สามารถ subscribe real-time ได้ — ต้องใช้ polling หรือ webhook เสริม: ใช้ Python/Node test framework สำหรับ real-time subscribe testing

Q: Kafka กับ Google Pub/Sub อันไหนดีกว่า?

A: Kafka: self-hosted, high throughput, message replay, ordering guarantee, complex setup Google Pub/Sub: managed service, auto-scale, no ops, pay-per-use, simpler เลือก Kafka: ถ้าต้องการ control เต็ม, message replay, high volume (millions/sec) เลือก Pub/Sub: ถ้าอยู่บน GCP, ต้องการ managed service, ทีมเล็ก

Q: Message ordering สำคัญไหม?

A: ขึ้นกับ use case: สำคัญ: financial transactions, event sourcing, state machine transitions ไม่สำคัญ: notifications, analytics events, batch processing Kafka: ordering guarantee per partition Google Pub/Sub: ordering ด้วย ordering key RabbitMQ: FIFO per queue Test: ใช้ sequence numbers ตรวจสอบ ordering ใน Newman/Python tests

Q: Dead Letter Queue จำเป็นไหม?

A: จำเป็นมากสำหรับ production — messages ที่ process ไม่สำเร็จต้องมีที่ไป ไม่มี DLQ: messages หายไป หรือ retry loop ไม่สิ้นสุด → block queue มี DLQ: failed messages ไป DLQ → investigate, fix, replay ได้ ทุก Pub/Sub system ควรมี DLQ + monitoring + alerting เมื่อ DLQ มี messages

📖 บทความที่เกี่ยวข้อง

Postman Newman GreenOps Sustainabilityอ่านบทความ → Postman Newman Technical Debt Managementอ่านบทความ → Postman Newman DevOps Cultureอ่านบทความ → Postman Newman Shift Left Securityอ่านบทความ → Solid.js Signals Pub Sub Architectureอ่านบทความ →

📚 ดูบทความทั้งหมด →