SiamCafe.net Blog
Technology

Postman Newman Scaling Strategy วิธี Scale

postman newman scaling strategy วธ scale
Postman Newman Scaling Strategy วิธี Scale | SiamCafe Blog
2026-03-25· อ. บอม — SiamCafe.net· 1,693 คำ

Postman Newman Scaling Strategy วิธี Scale คืออะไร

Postman เป็นเครื่องมือยอดนิยมสำหรับ API testing และ development Newman เป็น command-line collection runner ของ Postman ที่รัน collections จาก CLI ได้โดยไม่ต้องเปิด Postman GUI เหมาะสำหรับ CI/CD pipelines การ Scale Newman คือการออกแบบระบบให้รัน API tests จำนวนมากได้อย่างมีประสิทธิภาพ ทั้ง parallel execution, distributed testing, load testing และ test orchestration บทความนี้อธิบาย strategies สำหรับ scale Postman/Newman testing ตั้งแต่ single runner จนถึง distributed testing infrastructure

Newman Basics & CLI

# newman_basics.py — Newman fundamentals
import json

class NewmanBasics:
    COMMANDS = {
        "basic_run": {
            "name": "Basic Collection Run",
            "command": "newman run collection.json",
            "description": "รัน Postman collection จากไฟล์ JSON",
        },
        "with_environment": {
            "name": "Run with Environment",
            "command": "newman run collection.json -e staging.json",
            "description": "รัน collection พร้อม environment variables",
        },
        "with_data": {
            "name": "Data-driven Testing",
            "command": "newman run collection.json -d test_data.csv --iteration-count 100",
            "description": "รัน collection กับ data file หลาย iterations",
        },
        "reporters": {
            "name": "Custom Reporters",
            "command": "newman run collection.json -r cli, htmlextra, junit --reporter-htmlextra-export report.html",
            "description": "Export results ในหลายรูปแบบ — HTML, JUnit XML, JSON",
        },
        "timeout": {
            "name": "Timeout & Delay",
            "command": "newman run collection.json --timeout-request 10000 --delay-request 500",
            "description": "ตั้ง timeout per request + delay ระหว่าง requests",
        },
    }

    INSTALL = """
# Install Newman globally
npm install -g newman

# Install reporters
npm install -g newman-reporter-htmlextra
npm install -g newman-reporter-junitfull

# Export collection from Postman
# Postman → Collection → Export → Collection v2.1

# Export environment
# Postman → Environment → Export
"""

    def show_commands(self):
        print("=== Newman Commands ===\n")
        for key, cmd in self.COMMANDS.items():
            print(f"[{cmd['name']}]")
            print(f"  $ {cmd['command']}")
            print(f"  {cmd['description']}")
            print()

basics = NewmanBasics()
basics.show_commands()

Parallel Execution Strategy

# parallel.py — Parallel Newman execution
import json

class ParallelExecution:
    CODE = """
# parallel_newman.py — Run Newman collections in parallel
import subprocess
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

class ParallelNewmanRunner:
    def __init__(self, max_workers=4, environment=None):
        self.max_workers = max_workers
        self.environment = environment
        self.results = []
    
    def run_collection(self, collection_path, env_override=None):
        '''Run a single Newman collection'''
        cmd = ["newman", "run", collection_path]
        
        env = env_override or self.environment
        if env:
            cmd.extend(["-e", env])
        
        cmd.extend([
            "-r", "json, cli",
            "--reporter-json-export", f"results/{Path(collection_path).stem}.json",
            "--timeout-request", "30000",
            "--suppress-exit-code",
        ])
        
        start = time.time()
        result = subprocess.run(cmd, capture_output=True, text=True)
        duration = time.time() - start
        
        return {
            "collection": collection_path,
            "exit_code": result.returncode,
            "duration_sec": round(duration, 2),
            "stdout": result.stdout[-500:],  # Last 500 chars
            "stderr": result.stderr[-200:] if result.stderr else None,
        }
    
    def run_parallel(self, collections):
        '''Run multiple collections in parallel'''
        os.makedirs("results", exist_ok=True)
        
        print(f"Running {len(collections)} collections with {self.max_workers} workers...")
        start_total = time.time()
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(self.run_collection, col): col
                for col in collections
            }
            
            for future in as_completed(futures):
                result = future.result()
                self.results.append(result)
                status = "PASS" if result["exit_code"] == 0 else "FAIL"
                print(f"  [{status}] {result['collection']} ({result['duration_sec']}s)")
        
        total_time = time.time() - start_total
        passed = sum(1 for r in self.results if r["exit_code"] == 0)
        
        return {
            "total": len(self.results),
            "passed": passed,
            "failed": len(self.results) - passed,
            "total_time_sec": round(total_time, 2),
            "results": self.results,
        }
    
    def split_collection(self, collection_path, chunks=4):
        '''Split a large collection into smaller chunks'''
        with open(collection_path) as f:
            collection = json.load(f)
        
        items = collection.get("item", [])
        chunk_size = len(items) // chunks + 1
        
        split_files = []
        for i in range(0, len(items), chunk_size):
            chunk = collection.copy()
            chunk["item"] = items[i:i+chunk_size]
            chunk["info"]["name"] = f"{collection['info']['name']}_chunk_{i//chunk_size}"
            
            filename = f"chunks/chunk_{i//chunk_size}.json"
            os.makedirs("chunks", exist_ok=True)
            with open(filename, 'w') as f:
                json.dump(chunk, f)
            split_files.append(filename)
        
        return split_files

# Usage
# runner = ParallelNewmanRunner(max_workers=4, environment="staging.json")
# collections = ["auth_tests.json", "user_tests.json", "payment_tests.json", "search_tests.json"]
# results = runner.run_parallel(collections)
"""

    def show_code(self):
        print("=== Parallel Newman ===")
        print(self.CODE[:600])

parallel = ParallelExecution()
parallel.show_code()

CI/CD Integration

# cicd.py — CI/CD pipeline with Newman
import json

class CICDIntegration:
    GITHUB_ACTIONS = """
# .github/workflows/api-tests.yml
name: API Tests (Newman)
on:
  push:
    branches: [main, develop]
  pull_request:
  schedule:
    - cron: '0 */6 * * *'  # Every 6 hours

jobs:
  api-tests:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        collection:
          - auth_tests
          - user_api_tests
          - payment_tests
          - search_tests
      fail-fast: false
    
    steps:
      - uses: actions/checkout@v4
      
      - uses: actions/setup-node@v4
        with:
          node-version: '20'
      
      - name: Install Newman
        run: |
          npm install -g newman
          npm install -g newman-reporter-htmlextra
          npm install -g newman-reporter-junitfull
      
      - name: Run API Tests
        run: |
          newman run collections/}.json \\
            -e environments/}.json \\
            -r cli, htmlextra, junitfull \\
            --reporter-htmlextra-export reports/}.html \\
            --reporter-junitfull-export reports/}.xml \\
            --timeout-request 30000 \\
            --iteration-count 1
      
      - name: Upload Report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: test-report-}
          path: reports/
      
      - name: Publish Test Results
        if: always()
        uses: dorny/test-reporter@v1
        with:
          name: Newman }
          path: reports/*.xml
          reporter: java-junit

  notify:
    needs: api-tests
    if: failure()
    runs-on: ubuntu-latest
    steps:
      - name: Notify Slack
        uses: 8398a7/action-slack@v3
        with:
          status: failure
          text: "API Tests Failed!"
        env:
          SLACK_WEBHOOK_URL: }
"""

    def show_pipeline(self):
        print("=== GitHub Actions Pipeline ===")
        print(self.GITHUB_ACTIONS[:600])

cicd = CICDIntegration()
cicd.show_pipeline()

Distributed Testing

# distributed.py — Distributed Newman testing
import json
import random

class DistributedTesting:
    CODE = """
# distributed_newman.py — Distributed test execution
import docker
import json
import time
from concurrent.futures import ThreadPoolExecutor

class DistributedNewmanRunner:
    def __init__(self, docker_image="postman/newman:latest"):
        self.client = docker.from_env()
        self.image = docker_image
        self.results = []
    
    def run_in_container(self, collection_path, environment_path=None):
        '''Run Newman in a Docker container'''
        volumes = {
            os.path.abspath("collections"): {"bind": "/etc/newman/collections", "mode": "ro"},
            os.path.abspath("environments"): {"bind": "/etc/newman/environments", "mode": "ro"},
            os.path.abspath("results"): {"bind": "/etc/newman/results", "mode": "rw"},
        }
        
        cmd = f"run /etc/newman/collections/{collection_path}"
        if environment_path:
            cmd += f" -e /etc/newman/environments/{environment_path}"
        cmd += " -r json --reporter-json-export /etc/newman/results/result.json"
        
        container = self.client.containers.run(
            self.image,
            command=cmd,
            volumes=volumes,
            detach=True,
            mem_limit="512m",
            cpu_period=100000,
            cpu_quota=50000,  # 50% CPU
        )
        
        result = container.wait()
        logs = container.logs().decode()
        container.remove()
        
        return {
            "collection": collection_path,
            "exit_code": result["StatusCode"],
            "logs": logs[-500:],
        }
    
    def run_distributed(self, collections, max_containers=8):
        '''Run collections across multiple containers'''
        with ThreadPoolExecutor(max_workers=max_containers) as executor:
            futures = [
                executor.submit(self.run_in_container, col)
                for col in collections
            ]
            
            for future in futures:
                self.results.append(future.result())
        
        return self.results

# Kubernetes-based distributed testing
K8S_JOB = '''
apiVersion: batch/v1
kind: Job
metadata:
  name: newman-test-{collection}
spec:
  parallelism: 1
  template:
    spec:
      containers:
        - name: newman
          image: postman/newman:latest
          command: ["newman", "run", "/collections/{collection}.json",
                    "-e", "/environments/staging.json",
                    "-r", "json", "--reporter-json-export", "/results/result.json"]
          volumeMounts:
            - name: collections
              mountPath: /collections
            - name: results
              mountPath: /results
      volumes:
        - name: collections
          configMap:
            name: newman-collections
        - name: results
          emptyDir: {}
      restartPolicy: Never
'''
"""

    def show_code(self):
        print("=== Distributed Testing ===")
        print(self.CODE[:600])

    def scaling_tiers(self):
        print(f"\n=== Scaling Tiers ===")
        tiers = [
            {"tier": "Single", "tests": "< 50", "time": "5-10 min", "infra": "1 Newman process"},
            {"tier": "Parallel", "tests": "50-200", "time": "3-5 min", "infra": "4-8 parallel threads"},
            {"tier": "Docker", "tests": "200-1000", "time": "5-10 min", "infra": "8-16 containers"},
            {"tier": "Kubernetes", "tests": "1000+", "time": "5-15 min", "infra": "K8s Jobs, auto-scale"},
        ]
        for t in tiers:
            print(f"  [{t['tier']:<12}] {t['tests']:<10} tests → {t['time']:<10} ({t['infra']})")

dist = DistributedTesting()
dist.show_code()
dist.scaling_tiers()

Monitoring & Reporting

# monitoring.py — Test monitoring and reporting
import json
import random

class TestMonitoring:
    DASHBOARD = """
# test_dashboard.py — Newman test monitoring dashboard
from flask import Flask, jsonify, render_template
import json
import glob
from datetime import datetime

app = Flask(__name__)

@app.route("/api/results")
def get_results():
    results = []
    for f in glob.glob("results/*.json"):
        with open(f) as fh:
            data = json.load(fh)
            results.append({
                "collection": data.get("collection", {}).get("info", {}).get("name"),
                "total": data.get("run", {}).get("stats", {}).get("assertions", {}).get("total", 0),
                "passed": data.get("run", {}).get("stats", {}).get("assertions", {}).get("total", 0) -
                          data.get("run", {}).get("stats", {}).get("assertions", {}).get("failed", 0),
                "failed": data.get("run", {}).get("stats", {}).get("assertions", {}).get("failed", 0),
                "duration_ms": data.get("run", {}).get("timings", {}).get("completed", 0),
            })
    return jsonify(results)

@app.route("/api/trends")
def get_trends():
    # Aggregate test results over time
    return jsonify({"message": "Historical trends"})
"""

    def show_dashboard(self):
        print("=== Test Dashboard ===")
        print(self.DASHBOARD[:500])

    def sample_dashboard(self):
        print(f"\n=== Test Results Dashboard ===")
        collections = [
            {"name": "Auth API", "total": random.randint(20, 50), "passed": 0, "time": random.uniform(5, 30)},
            {"name": "User API", "total": random.randint(30, 80), "passed": 0, "time": random.uniform(10, 60)},
            {"name": "Payment API", "total": random.randint(40, 100), "passed": 0, "time": random.uniform(15, 90)},
            {"name": "Search API", "total": random.randint(15, 40), "passed": 0, "time": random.uniform(3, 20)},
        ]
        for c in collections:
            c["passed"] = c["total"] - random.randint(0, 3)
            c["failed"] = c["total"] - c["passed"]
            status = "PASS" if c["failed"] == 0 else "FAIL"
            print(f"  [{status}] {c['name']:<15} {c['passed']}/{c['total']} tests ({c['time']:.1f}s)")
        
        total = sum(c["total"] for c in collections)
        passed = sum(c["passed"] for c in collections)
        print(f"\n  Total: {passed}/{total} ({passed/total*100:.0f}% pass rate)")

mon = TestMonitoring()
mon.show_dashboard()
mon.sample_dashboard()

FAQ - คำถามที่พบบ่อย

Q: Newman กับ Postman Cloud Runs ต่างกันอย่างไร?

A: Newman: CLI tool, self-hosted, ฟรี, integrate กับ CI/CD ได้ง่าย, full control Postman Cloud: managed service, GUI-based scheduling, $$$, ง่ายกว่า แนะนำ: ใช้ Newman สำหรับ CI/CD + development, Postman Cloud สำหรับ non-technical team ที่ต้องการ schedule tests

Q: รัน Newman parallel กี่ workers ดี?

A: ขึ้นกับ target API capacity: Development: 2-4 workers (อย่า overload dev server) Staging: 4-8 workers Production (read-only tests): 2-4 workers (ระวัง rate limiting) หลักการ: เริ่มน้อย → เพิ่มจนเจอ bottleneck → หา sweet spot Monitor: API response time + error rate ขณะรัน test

Q: จัดการ test data อย่างไร?

A: CSV/JSON data files: สำหรับ data-driven testing Environment variables: สำหรับ API keys, base URLs, tokens Pre-request scripts: generate dynamic data (timestamps, UUIDs) Postman Collection Variables: สำหรับ data ที่ share ข้าม requests Setup/Teardown: สร้าง test data ก่อน → ลบหลัง test

Q: Newman เหมาะกับ load testing ไหม?

A: ไม่เหมาะ — Newman ออกแบบสำหรับ functional testing ไม่ใช่ load testing สำหรับ load testing: ใช้ k6, Artillery, Locust, JMeter Newman ทำได้: smoke testing (ทดสอบว่า API ทำงานได้), data-driven testing กับ หลาย datasets รวมกัน: Newman สำหรับ functional + k6 สำหรับ performance

📖 บทความที่เกี่ยวข้อง

Helm Chart Template Scaling Strategy วิธี Scaleอ่านบทความ → Postman Newman Machine Learning Pipelineอ่านบทความ → Postman Newman Interview Preparationอ่านบทความ → Postman Newman CQRS Event Sourcingอ่านบทความ → Postman Newman Technical Debt Managementอ่านบทความ →

📚 ดูบทความทั้งหมด →