ai

Postman Newman Scaling Strategy วิธี Scale

Postman Newman Scaling Strategy วิธี Scale

Postman Newman Scaling Strategy วิธี Scale คืออะไร

Postman Newman Scaling Strategy วิธี Scale

Postman เป็นเครื่องมือยอดนิยมสำหรับ API testing และ development Newman เป็น command-line collection runner ของ Postman ที่รัน collections จาก CLI ได้โดยไม่ต้องเปิด Postman GUI เหมาะสำหรับ CI/CD pipelines การ Scale Newman คือการออกแบบระบบให้รัน API tests จำนวนมากได้อย่างมีประสิทธิภาพ ทั้ง parallel execution, distributed testing, load testing และ test orchestration บทความนี้อธิบาย strategies สำหรับ scale Postman/Newman testing ตั้งแต่ single runner จนถึง distributed testing infrastructure

Newman Basics & CLI

# newman_basics.py — Newman fundamentals
import json

class NewmanBasics:
    COMMANDS = {
        "basic_run": {
            "name": "Basic Collection Run",
            "command": "newman run collection.json",
            "description": "รัน Postman collection จากไฟล์ JSON",
        },
        "with_environment": {
            "name": "Run with Environment",
            "command": "newman run collection.json -e staging.json",
            "description": "รัน collection พร้อม environment variables",
        },
        "with_data": {
            "name": "Data-driven Testing",
            "command": "newman run collection.json -d test_data.csv --iteration-count 100",
            "description": "รัน collection กับ data file หลาย iterations",
        },
        "reporters": {
            "name": "Custom Reporters",
            "command": "newman run collection.json -r cli, htmlextra, junit --reporter-htmlextra-export report.html",
            "description": "Export results ในหลายรูปแบบ — HTML, JUnit XML, JSON",
        },
        "timeout": {
            "name": "Timeout & Delay",
            "command": "newman run collection.json --timeout-request 10000 --delay-request 500",
            "description": "ตั้ง timeout per request + delay ระหว่าง requests",
        },
    }

    INSTALL = """
# Install Newman globally
npm install -g newman

# Install reporters
npm install -g newman-reporter-htmlextra
npm install -g newman-reporter-junitfull

# Export collection from Postman
# Postman → Collection → Export → Collection v2.1

# Export environment
# Postman → Environment → Export
"""

    def show_commands(self):
        print("=== Newman Commands ===\n")
        for key, cmd in self.COMMANDS.items():
            print(f"[{cmd['name']}]")
            print(f"  $ {cmd['command']}")
            print(f"  {cmd['description']}")
            print()

basics = NewmanBasics()
basics.show_commands()

Parallel Execution Strategy

# parallel.py — Parallel Newman execution
import json

class ParallelExecution:
    CODE = """
# parallel_newman.py — Run Newman collections in parallel
import subprocess
import json
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path

class ParallelNewmanRunner:
    def __init__(self, max_workers=4, environment=None):
        self.max_workers = max_workers
        self.environment = environment
        self.results = []
    
    def run_collection(self, collection_path, env_override=None):
        '''Run a single Newman collection'''
        cmd = ["newman", "run", collection_path]
        
        env = env_override or self.environment
        if env:
            cmd.extend(["-e", env])
        
        cmd.extend([
            "-r", "json, cli",
            "--reporter-json-export", f"results/{Path(collection_path).stem}.json",
            "--timeout-request", "30000",
            "--suppress-exit-code",
        ])
        
        start = time.time()
        result = subprocess.run(cmd, capture_output=True, text=True)
        duration = time.time() - start
        
        return {
            "collection": collection_path,
            "exit_code": result.returncode,
            "duration_sec": round(duration, 2),
            "stdout": result.stdout[-500:],  # Last 500 chars
            "stderr": result.stderr[-200:] if result.stderr else None,
        }
    
    def run_parallel(self, collections):
        '''Run multiple collections in parallel'''
        os.makedirs("results", exist_ok=True)
        
        print(f"Running {len(collections)} collections with {self.max_workers} workers...")
        start_total = time.time()
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(self.run_collection, col): col
                for col in collections
            }
            
            for future in as_completed(futures):
                result = future.result()
                self.results.append(result)
                status = "PASS" if result["exit_code"] == 0 else "FAIL"
                print(f"  [{status}] {result['collection']} ({result['duration_sec']}s)")
        
        total_time = time.time() - start_total
        passed = sum(1 for r in self.results if r["exit_code"] == 0)
        
        return {
            "total": len(self.results),
            "passed": passed,
            "failed": len(self.results) - passed,
            "total_time_sec": round(total_time, 2),
            "results": self.results,
        }
    
    def split_collection(self, collection_path, chunks=4):
        '''Split a large collection into smaller chunks'''
        with open(collection_path) as f:
            collection = json.load(f)
        
        items = collection.get("item", [])
        chunk_size = len(items) // chunks + 1
        
        split_files = []
        for i in range(0, len(items), chunk_size):
            chunk = collection.copy()
            chunk["item"] = items[i:i+chunk_size]
            chunk["info"]["name"] = f"{collection['info']['name']}_chunk_{i//chunk_size}"
            
            filename = f"chunks/chunk_{i//chunk_size}.json"
            os.makedirs("chunks", exist_ok=True)
            with open(filename, 'w') as f:
                json.dump(chunk, f)
            split_files.append(filename)
        
        return split_files

# Usage
# runner = ParallelNewmanRunner(max_workers=4, environment="staging.json")
# collections = ["auth_tests.json", "user_tests.json", "payment_tests.json", "search_tests.json"]
# results = runner.run_parallel(collections)
"""

    def show_code(self):
        print("=== Parallel Newman ===")
        print(self.CODE[:600])

parallel = ParallelExecution()
parallel.show_code()

CI/CD Integration

Postman Newman Scaling Strategy วิธี Scale
# cicd.py — CI/CD pipeline with Newman
import json

class CICDIntegration:
    GITHUB_ACTIONS = """
# .github/workflows/api-tests.yml
name: API Tests (Newman)
on:
  push:
    branches: [main, develop]
  pull_request:
  schedule:
    - cron: '0 */6 * * *'  # Every 6 hours

jobs:
  api-tests:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        collection:
          - auth_tests
          - user_api_tests
          - payment_tests
          - search_tests
      fail-fast: false
    
    steps:
      - uses: actions/checkout@v4
      
      - uses: actions/setup-node@v4
        with:
          node-version: '20'
      
      - name: Install Newman
        run: |
          npm install -g newman
          npm install -g newman-reporter-htmlextra
          npm install -g newman-reporter-junitfull
      
      - name: Run API Tests
        run: |
          newman run collections/}.json \\
            -e environments/}.json \\
            -r cli, htmlextra, junitfull \\
            --reporter-htmlextra-export reports/}.html \\
            --reporter-junitfull-export reports/}.xml \\
            --timeout-request 30000 \\
            --iteration-count 1
      
      - name: Upload Report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: test-report-}
          path: reports/
      
      - name: Publish Test Results
        if: always()
        uses: dorny/test-reporter@v1
        with:
          name: Newman }
          path: reports/*.xml
          reporter: java-junit

  notify:
    needs: api-tests
    if: failure()
    runs-on: ubuntu-latest
    steps:
      - name: Notify Slack
        uses: 8398a7/action-slack@v3
        with:
          status: failure
          text: "API Tests Failed!"
        env:
          SLACK_WEBHOOK_URL: }
"""

    def show_pipeline(self):
        print("=== GitHub Actions Pipeline ===")
        print(self.GITHUB_ACTIONS[:600])

cicd = CICDIntegration()
cicd.show_pipeline()

Distributed Testing

# distributed.py — Distributed Newman testing
import json
import random

class DistributedTesting:
    CODE = """
# distributed_newman.py — Distributed test execution
import docker
import json
import time
from concurrent.futures import ThreadPoolExecutor

class DistributedNewmanRunner:
    def __init__(self, docker_image="postman/newman:latest"):
        self.client = docker.from_env()
        self.image = docker_image
        self.results = []
    
    def run_in_container(self, collection_path, environment_path=None):
        '''Run Newman in a Docker container'''
        volumes = {
            os.path.abspath("collections"): {"bind": "/etc/newman/collections", "mode": "ro"},
            os.path.abspath("environments"): {"bind": "/etc/newman/environments", "mode": "ro"},
            os.path.abspath("results"): {"bind": "/etc/newman/results", "mode": "rw"},
        }
        
        cmd = f"run /etc/newman/collections/{collection_path}"
        if environment_path:
            cmd += f" -e /etc/newman/environments/{environment_path}"
        cmd += " -r json --reporter-json-export /etc/newman/results/result.json"
        
        container = self.client.containers.run(
            self.image,
            command=cmd,
            volumes=volumes,
            detach=True,
            mem_limit="512m",
            cpu_period=100000,
            cpu_quota=50000,  # 50% CPU
        )
        
        result = container.wait()
        logs = container.logs().decode()
        container.remove()
        
        return {
            "collection": collection_path,
            "exit_code": result["StatusCode"],
            "logs": logs[-500:],
        }
    
    def run_distributed(self, collections, max_containers=8):
        '''Run collections across multiple containers'''
        with ThreadPoolExecutor(max_workers=max_containers) as executor:
            futures = [
                executor.submit(self.run_in_container, col)
                for col in collections
            ]
            
            for future in futures:
                self.results.append(future.result())
        
        return self.results

# Kubernetes-based distributed testing
K8S_JOB = '''
apiVersion: batch/v1
kind: Job
metadata:
  name: newman-test-{collection}
spec:
  parallelism: 1
  template:
    spec:
      containers:
        - name: newman
          image: postman/newman:latest
          command: ["newman", "run", "/collections/{collection}.json",
                    "-e", "/environments/staging.json",
                    "-r", "json", "--reporter-json-export", "/results/result.json"]
          volumeMounts:
            - name: collections
              mountPath: /collections
            - name: results
              mountPath: /results
      volumes:
        - name: collections
          configMap:
            name: newman-collections
        - name: results
          emptyDir: {}
      restartPolicy: Never
'''
"""

    def show_code(self):
        print("=== Distributed Testing ===")
        print(self.CODE[:600])

    def scaling_tiers(self):
        print(f"\n=== Scaling Tiers ===")
        tiers = [
            {"tier": "Single", "tests": "< 50", "time": "5-10 min", "infra": "1 Newman process"},
            {"tier": "Parallel", "tests": "50-200", "time": "3-5 min", "infra": "4-8 parallel threads"},
            {"tier": "Docker", "tests": "200-1000", "time": "5-10 min", "infra": "8-16 containers"},
            {"tier": "Kubernetes", "tests": "1000+", "time": "5-15 min", "infra": "K8s Jobs, auto-scale"},
        ]
        for t in tiers:
            print(f"  [{t['tier']:<12}] {t['tests']:<10} tests → {t['time']:<10} ({t['infra']})")

dist = DistributedTesting()
dist.show_code()
dist.scaling_tiers()

Monitoring & Reporting

# monitoring.py — Test monitoring and reporting
import json
import random

class TestMonitoring:
    DASHBOARD = """
# test_dashboard.py — Newman test monitoring dashboard
from flask import Flask, jsonify, render_template
import json
import glob
from datetime import datetime

app = Flask(__name__)

@app.route("/api/results")
def get_results():
    results = []
    for f in glob.glob("results/*.json"):
        with open(f) as fh:
            data = json.load(fh)
            results.append({
                "collection": data.get("collection", {}).get("info", {}).get("name"),
                "total": data.get("run", {}).get("stats", {}).get("assertions", {}).get("total", 0),
                "passed": data.get("run", {}).get("stats", {}).get("assertions", {}).get("total", 0) -
                          data.get("run", {}).get("stats", {}).get("assertions", {}).get("failed", 0),
                "failed": data.get("run", {}).get("stats", {}).get("assertions", {}).get("failed", 0),
                "duration_ms": data.get("run", {}).get("timings", {}).get("completed", 0),
            })
    return jsonify(results)

@app.route("/api/trends")
def get_trends():
    # Aggregate test results over time
    return jsonify({"message": "Historical trends"})
"""

    def show_dashboard(self):
        print("=== Test Dashboard ===")
        print(self.DASHBOARD[:500])

    def sample_dashboard(self):
        print(f"\n=== Test Results Dashboard ===")
        collections = [
            {"name": "Auth API", "total": random.randint(20, 50), "passed": 0, "time": random.uniform(5, 30)},
            {"name": "User API", "total": random.randint(30, 80), "passed": 0, "time": random.uniform(10, 60)},
            {"name": "Payment API", "total": random.randint(40, 100), "passed": 0, "time": random.uniform(15, 90)},
            {"name": "Search API", "total": random.randint(15, 40), "passed": 0, "time": random.uniform(3, 20)},
        ]
        for c in collections:
            c["passed"] = c["total"] - random.randint(0, 3)
            c["failed"] = c["total"] - c["passed"]
            status = "PASS" if c["failed"] == 0 else "FAIL"
            print(f"  [{status}] {c['name']:<15} {c['passed']}/{c['total']} tests ({c['time']:.1f}s)")
        
        total = sum(c["total"] for c in collections)
        passed = sum(c["passed"] for c in collections)
        print(f"\n  Total: {passed}/{total} ({passed/total*100:.0f}% pass rate)")

mon = TestMonitoring()
mon.show_dashboard()
mon.sample_dashboard()

FAQ - คำถามที่พบบ่อย

Q: Newman กับ Postman Cloud Runs ต่างกันอย่างไร?

A: Newman: CLI tool, self-hosted, ฟรี, integrate กับ CI/CD ได้ง่าย, full control Postman Cloud: managed service, GUI-based scheduling, $$$, ง่ายกว่า แนะนำ: ใช้ Newman สำหรับ CI/CD + development, Postman Cloud สำหรับ non-technical team ที่ต้องการ schedule tests

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง penetration testing tools

Q: รัน Newman parallel กี่ workers ดี?

แนะนำเพิ่มเติม — ติดตาม XM Signal

A: ขึ้นกับ target API capacity: Development: 2-4 workers (อย่า overload dev server) Staging: 4-8 workers Production (read-only tests): 2-4 workers (ระวัง rate limiting) หลักการ: เริ่มน้อย → เพิ่มจนเจอ bottleneck → หา sweet spot Monitor: API response time + error rate ขณะรัน test

เนื้อหาเกี่ยวข้อง — อ่านต่อ: smart contract address คือ

Q: จัดการ test data อย่างไร?

A: CSV/JSON data files: สำหรับ data-driven testing Environment variables: สำหรับ API keys, base URLs, tokens Pre-request scripts: generate dynamic data (timestamps, UUIDs) Postman Collection Variables: สำหรับ data ที่ share ข้าม requests Setup/Teardown: สร้าง test data ก่อน → ลบหลัง test

แนะนำเพิ่มเติม — คอร์สเทรด Forex ที่ iCafeForex

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ open source penetration testing tools

Q: Newman เหมาะกับ load testing ไหม?

A: ไม่เหมาะ — Newman ออกแบบสำหรับ functional testing ไม่ใช่ load testing สำหรับ load testing: ใช้ k6, Artillery, Locust, JMeter Newman ทำได้: smoke testing (ทดสอบว่า API ทำงานได้), data-driven testing กับ หลาย datasets รวมกัน: Newman สำหรับ functional + k6 สำหรับ performance

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Medusa Commerce CQRS Event Sourcing

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง