Postman Newman Scaling Strategy วิธี Scale คืออะไร
Postman เป็นเครื่องมือยอดนิยมสำหรับ API testing และ development Newman เป็น command-line collection runner ของ Postman ที่รัน collections จาก CLI ได้โดยไม่ต้องเปิด Postman GUI เหมาะสำหรับ CI/CD pipelines การ Scale Newman คือการออกแบบระบบให้รัน API tests จำนวนมากได้อย่างมีประสิทธิภาพ ทั้ง parallel execution, distributed testing, load testing และ test orchestration บทความนี้อธิบาย strategies สำหรับ scale Postman/Newman testing ตั้งแต่ single runner จนถึง distributed testing infrastructure
Newman Basics & CLI
# newman_basics.py — Newman fundamentals
import json
class NewmanBasics:
COMMANDS = {
"basic_run": {
"name": "Basic Collection Run",
"command": "newman run collection.json",
"description": "รัน Postman collection จากไฟล์ JSON",
},
"with_environment": {
"name": "Run with Environment",
"command": "newman run collection.json -e staging.json",
"description": "รัน collection พร้อม environment variables",
},
"with_data": {
"name": "Data-driven Testing",
"command": "newman run collection.json -d test_data.csv --iteration-count 100",
"description": "รัน collection กับ data file หลาย iterations",
},
"reporters": {
"name": "Custom Reporters",
"command": "newman run collection.json -r cli, htmlextra, junit --reporter-htmlextra-export report.html",
"description": "Export results ในหลายรูปแบบ — HTML, JUnit XML, JSON",
},
"timeout": {
"name": "Timeout & Delay",
"command": "newman run collection.json --timeout-request 10000 --delay-request 500",
"description": "ตั้ง timeout per request + delay ระหว่าง requests",
},
}
INSTALL = """
# Install Newman globally
npm install -g newman
# Install reporters
npm install -g newman-reporter-htmlextra
npm install -g newman-reporter-junitfull
# Export collection from Postman
# Postman → Collection → Export → Collection v2.1
# Export environment
# Postman → Environment → Export
"""
def show_commands(self):
print("=== Newman Commands ===\n")
for key, cmd in self.COMMANDS.items():
print(f"[{cmd['name']}]")
print(f" $ {cmd['command']}")
print(f" {cmd['description']}")
print()
basics = NewmanBasics()
basics.show_commands()
Parallel Execution Strategy
# parallel.py — Parallel Newman execution
import json
class ParallelExecution:
CODE = """
# parallel_newman.py — Run Newman collections in parallel
import subprocess
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
class ParallelNewmanRunner:
def __init__(self, max_workers=4, environment=None):
self.max_workers = max_workers
self.environment = environment
self.results = []
def run_collection(self, collection_path, env_override=None):
'''Run a single Newman collection'''
cmd = ["newman", "run", collection_path]
env = env_override or self.environment
if env:
cmd.extend(["-e", env])
cmd.extend([
"-r", "json, cli",
"--reporter-json-export", f"results/{Path(collection_path).stem}.json",
"--timeout-request", "30000",
"--suppress-exit-code",
])
start = time.time()
result = subprocess.run(cmd, capture_output=True, text=True)
duration = time.time() - start
return {
"collection": collection_path,
"exit_code": result.returncode,
"duration_sec": round(duration, 2),
"stdout": result.stdout[-500:], # Last 500 chars
"stderr": result.stderr[-200:] if result.stderr else None,
}
def run_parallel(self, collections):
'''Run multiple collections in parallel'''
os.makedirs("results", exist_ok=True)
print(f"Running {len(collections)} collections with {self.max_workers} workers...")
start_total = time.time()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self.run_collection, col): col
for col in collections
}
for future in as_completed(futures):
result = future.result()
self.results.append(result)
status = "PASS" if result["exit_code"] == 0 else "FAIL"
print(f" [{status}] {result['collection']} ({result['duration_sec']}s)")
total_time = time.time() - start_total
passed = sum(1 for r in self.results if r["exit_code"] == 0)
return {
"total": len(self.results),
"passed": passed,
"failed": len(self.results) - passed,
"total_time_sec": round(total_time, 2),
"results": self.results,
}
def split_collection(self, collection_path, chunks=4):
'''Split a large collection into smaller chunks'''
with open(collection_path) as f:
collection = json.load(f)
items = collection.get("item", [])
chunk_size = len(items) // chunks + 1
split_files = []
for i in range(0, len(items), chunk_size):
chunk = collection.copy()
chunk["item"] = items[i:i+chunk_size]
chunk["info"]["name"] = f"{collection['info']['name']}_chunk_{i//chunk_size}"
filename = f"chunks/chunk_{i//chunk_size}.json"
os.makedirs("chunks", exist_ok=True)
with open(filename, 'w') as f:
json.dump(chunk, f)
split_files.append(filename)
return split_files
# Usage
# runner = ParallelNewmanRunner(max_workers=4, environment="staging.json")
# collections = ["auth_tests.json", "user_tests.json", "payment_tests.json", "search_tests.json"]
# results = runner.run_parallel(collections)
"""
def show_code(self):
print("=== Parallel Newman ===")
print(self.CODE[:600])
parallel = ParallelExecution()
parallel.show_code()
CI/CD Integration
# cicd.py — CI/CD pipeline with Newman
import json
class CICDIntegration:
GITHUB_ACTIONS = """
# .github/workflows/api-tests.yml
name: API Tests (Newman)
on:
push:
branches: [main, develop]
pull_request:
schedule:
- cron: '0 */6 * * *' # Every 6 hours
jobs:
api-tests:
runs-on: ubuntu-latest
strategy:
matrix:
collection:
- auth_tests
- user_api_tests
- payment_tests
- search_tests
fail-fast: false
steps:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install Newman
run: |
npm install -g newman
npm install -g newman-reporter-htmlextra
npm install -g newman-reporter-junitfull
- name: Run API Tests
run: |
newman run collections/}.json \\
-e environments/}.json \\
-r cli, htmlextra, junitfull \\
--reporter-htmlextra-export reports/}.html \\
--reporter-junitfull-export reports/}.xml \\
--timeout-request 30000 \\
--iteration-count 1
- name: Upload Report
if: always()
uses: actions/upload-artifact@v4
with:
name: test-report-}
path: reports/
- name: Publish Test Results
if: always()
uses: dorny/test-reporter@v1
with:
name: Newman }
path: reports/*.xml
reporter: java-junit
notify:
needs: api-tests
if: failure()
runs-on: ubuntu-latest
steps:
- name: Notify Slack
uses: 8398a7/action-slack@v3
with:
status: failure
text: "API Tests Failed!"
env:
SLACK_WEBHOOK_URL: }
"""
def show_pipeline(self):
print("=== GitHub Actions Pipeline ===")
print(self.GITHUB_ACTIONS[:600])
cicd = CICDIntegration()
cicd.show_pipeline()
Distributed Testing
# distributed.py — Distributed Newman testing
import json
import random
class DistributedTesting:
CODE = """
# distributed_newman.py — Distributed test execution
import docker
import json
import time
from concurrent.futures import ThreadPoolExecutor
class DistributedNewmanRunner:
def __init__(self, docker_image="postman/newman:latest"):
self.client = docker.from_env()
self.image = docker_image
self.results = []
def run_in_container(self, collection_path, environment_path=None):
'''Run Newman in a Docker container'''
volumes = {
os.path.abspath("collections"): {"bind": "/etc/newman/collections", "mode": "ro"},
os.path.abspath("environments"): {"bind": "/etc/newman/environments", "mode": "ro"},
os.path.abspath("results"): {"bind": "/etc/newman/results", "mode": "rw"},
}
cmd = f"run /etc/newman/collections/{collection_path}"
if environment_path:
cmd += f" -e /etc/newman/environments/{environment_path}"
cmd += " -r json --reporter-json-export /etc/newman/results/result.json"
container = self.client.containers.run(
self.image,
command=cmd,
volumes=volumes,
detach=True,
mem_limit="512m",
cpu_period=100000,
cpu_quota=50000, # 50% CPU
)
result = container.wait()
logs = container.logs().decode()
container.remove()
return {
"collection": collection_path,
"exit_code": result["StatusCode"],
"logs": logs[-500:],
}
def run_distributed(self, collections, max_containers=8):
'''Run collections across multiple containers'''
with ThreadPoolExecutor(max_workers=max_containers) as executor:
futures = [
executor.submit(self.run_in_container, col)
for col in collections
]
for future in futures:
self.results.append(future.result())
return self.results
# Kubernetes-based distributed testing
K8S_JOB = '''
apiVersion: batch/v1
kind: Job
metadata:
name: newman-test-{collection}
spec:
parallelism: 1
template:
spec:
containers:
- name: newman
image: postman/newman:latest
command: ["newman", "run", "/collections/{collection}.json",
"-e", "/environments/staging.json",
"-r", "json", "--reporter-json-export", "/results/result.json"]
volumeMounts:
- name: collections
mountPath: /collections
- name: results
mountPath: /results
volumes:
- name: collections
configMap:
name: newman-collections
- name: results
emptyDir: {}
restartPolicy: Never
'''
"""
def show_code(self):
print("=== Distributed Testing ===")
print(self.CODE[:600])
def scaling_tiers(self):
print(f"\n=== Scaling Tiers ===")
tiers = [
{"tier": "Single", "tests": "< 50", "time": "5-10 min", "infra": "1 Newman process"},
{"tier": "Parallel", "tests": "50-200", "time": "3-5 min", "infra": "4-8 parallel threads"},
{"tier": "Docker", "tests": "200-1000", "time": "5-10 min", "infra": "8-16 containers"},
{"tier": "Kubernetes", "tests": "1000+", "time": "5-15 min", "infra": "K8s Jobs, auto-scale"},
]
for t in tiers:
print(f" [{t['tier']:<12}] {t['tests']:<10} tests → {t['time']:<10} ({t['infra']})")
dist = DistributedTesting()
dist.show_code()
dist.scaling_tiers()
Monitoring & Reporting
# monitoring.py — Test monitoring and reporting
import json
import random
class TestMonitoring:
DASHBOARD = """
# test_dashboard.py — Newman test monitoring dashboard
from flask import Flask, jsonify, render_template
import json
import glob
from datetime import datetime
app = Flask(__name__)
@app.route("/api/results")
def get_results():
results = []
for f in glob.glob("results/*.json"):
with open(f) as fh:
data = json.load(fh)
results.append({
"collection": data.get("collection", {}).get("info", {}).get("name"),
"total": data.get("run", {}).get("stats", {}).get("assertions", {}).get("total", 0),
"passed": data.get("run", {}).get("stats", {}).get("assertions", {}).get("total", 0) -
data.get("run", {}).get("stats", {}).get("assertions", {}).get("failed", 0),
"failed": data.get("run", {}).get("stats", {}).get("assertions", {}).get("failed", 0),
"duration_ms": data.get("run", {}).get("timings", {}).get("completed", 0),
})
return jsonify(results)
@app.route("/api/trends")
def get_trends():
# Aggregate test results over time
return jsonify({"message": "Historical trends"})
"""
def show_dashboard(self):
print("=== Test Dashboard ===")
print(self.DASHBOARD[:500])
def sample_dashboard(self):
print(f"\n=== Test Results Dashboard ===")
collections = [
{"name": "Auth API", "total": random.randint(20, 50), "passed": 0, "time": random.uniform(5, 30)},
{"name": "User API", "total": random.randint(30, 80), "passed": 0, "time": random.uniform(10, 60)},
{"name": "Payment API", "total": random.randint(40, 100), "passed": 0, "time": random.uniform(15, 90)},
{"name": "Search API", "total": random.randint(15, 40), "passed": 0, "time": random.uniform(3, 20)},
]
for c in collections:
c["passed"] = c["total"] - random.randint(0, 3)
c["failed"] = c["total"] - c["passed"]
status = "PASS" if c["failed"] == 0 else "FAIL"
print(f" [{status}] {c['name']:<15} {c['passed']}/{c['total']} tests ({c['time']:.1f}s)")
total = sum(c["total"] for c in collections)
passed = sum(c["passed"] for c in collections)
print(f"\n Total: {passed}/{total} ({passed/total*100:.0f}% pass rate)")
mon = TestMonitoring()
mon.show_dashboard()
mon.sample_dashboard()
FAQ - คำถามที่พบบ่อย
Q: Newman กับ Postman Cloud Runs ต่างกันอย่างไร?
A: Newman: CLI tool, self-hosted, ฟรี, integrate กับ CI/CD ได้ง่าย, full control Postman Cloud: managed service, GUI-based scheduling, $$$, ง่ายกว่า แนะนำ: ใช้ Newman สำหรับ CI/CD + development, Postman Cloud สำหรับ non-technical team ที่ต้องการ schedule tests
Q: รัน Newman parallel กี่ workers ดี?
A: ขึ้นกับ target API capacity: Development: 2-4 workers (อย่า overload dev server) Staging: 4-8 workers Production (read-only tests): 2-4 workers (ระวัง rate limiting) หลักการ: เริ่มน้อย → เพิ่มจนเจอ bottleneck → หา sweet spot Monitor: API response time + error rate ขณะรัน test
Q: จัดการ test data อย่างไร?
A: CSV/JSON data files: สำหรับ data-driven testing Environment variables: สำหรับ API keys, base URLs, tokens Pre-request scripts: generate dynamic data (timestamps, UUIDs) Postman Collection Variables: สำหรับ data ที่ share ข้าม requests Setup/Teardown: สร้าง test data ก่อน → ลบหลัง test
Q: Newman เหมาะกับ load testing ไหม?
A: ไม่เหมาะ — Newman ออกแบบสำหรับ functional testing ไม่ใช่ load testing สำหรับ load testing: ใช้ k6, Artillery, Locust, JMeter Newman ทำได้: smoke testing (ทดสอบว่า API ทำงานได้), data-driven testing กับ หลาย datasets รวมกัน: Newman สำหรับ functional + k6 สำหรับ performance
