SiamCafe.net Blog
Cybersecurity

OWASP ZAP Batch Processing Pipeline — สร้างระบบ DAST Scanning อัตโนมัติ

owasp zap batch processing pipeline
OWASP ZAP Batch Processing Pipeline | SiamCafe Blog
2025-08-29· อ. บอม — SiamCafe.net· 1,207 คำ

OWASP ZAP คืออะไรและใช้ทำอะไรใน Pipeline

OWASP ZAP (Zed Attack Proxy) เป็น open source web application security scanner ที่พัฒนาโดย OWASP community ใช้สำหรับค้นหาช่องโหว่ด้านความปลอดภัยใน web applications เช่น SQL Injection, Cross-Site Scripting (XSS), CSRF, Insecure Headers และอีกมากมาย ZAP เป็นเครื่องมือ DAST (Dynamic Application Security Testing) ที่ทดสอบ application ขณะทำงานจริง

การใช้ ZAP ใน Batch Processing Pipeline หมายถึงการ scan หลาย targets พร้อมกันหรือต่อเนื่องแบบอัตโนมัติ เหมาะสำหรับองค์กรที่มี web applications หลายตัวและต้องการ scan ทั้งหมดเป็นประจำ หรือ scan ทุกครั้งที่มีการ deploy

ZAP รองรับ automation ผ่านหลายช่องทาง ได้แก่ ZAP API (REST) สำหรับควบคุม ZAP ผ่าน HTTP, ZAP Automation Framework สำหรับ YAML-based configuration, ZAP Docker image สำหรับ containerized scanning และ ZAP CLI สำหรับ command-line execution

ข้อดีของ ZAP เมื่อเทียบกับ commercial tools เช่น Burp Suite Enterprise คือ ฟรีและ open source, community ใหญ่, API ครบถ้วน, Docker support ดี และมี Marketplace สำหรับ add-ons

ติดตั้ง ZAP และตั้งค่าสำหรับ Automation

วิธีติดตั้งและเตรียม ZAP สำหรับ batch scanning

# ติดตั้ง ZAP ด้วย Docker (แนะนำสำหรับ CI/CD)
docker pull ghcr.io/zaproxy/zaproxy:stable

# รัน ZAP ในโหมด daemon (headless)
docker run -d --name zap \
  -p 8080:8080 \
  -v $(pwd)/zap-work:/zap/wrk \
  ghcr.io/zaproxy/zaproxy:stable \
  zap.sh -daemon -host 0.0.0.0 -port 8080 \
  -config api.key=your-api-key-here \
  -config api.addrs.addr.name=.* \
  -config api.addrs.addr.regex=true

# ตรวจสอบว่า ZAP ทำงาน
curl http://localhost:8080/JSON/core/view/version/?apikey=your-api-key-here

# === ZAP Automation Framework (YAML config) ===
# automation.yaml
# env:
#   contexts:
#     - name: "target-app"
#       urls:
#         - "https://target-app.example.com"
#       includePaths:
#         - "https://target-app.example.com/.*"
#       excludePaths:
#         - "https://target-app.example.com/logout.*"
#       authentication:
#         method: "form"
#         parameters:
#           loginUrl: "https://target-app.example.com/login"
#           loginRequestData: "username={%username%}&password={%password%}"
#         verification:
#           method: "response"
#           loggedInRegex: "Welcome.*"
#       users:
#         - name: "test-user"
#           credentials:
#             username: "testuser"
#             password: "testpass123"
#
# jobs:
#   - type: spider
#     parameters:
#       maxDuration: 5
#       maxDepth: 5
#       maxChildren: 10
#
#   - type: spiderAjax
#     parameters:
#       maxDuration: 5
#       maxCrawlDepth: 3
#
#   - type: passiveScan-wait
#     parameters:
#       maxDuration: 10
#
#   - type: activeScan
#     parameters:
#       maxRuleDurationInMins: 5
#       maxScanDurationInMins: 30
#
#   - type: report
#     parameters:
#       template: "traditional-html"
#       reportDir: "/zap/wrk"
#       reportFile: "zap-report"

# รัน Automation Framework
docker run --rm \
  -v $(pwd):/zap/wrk:rw \
  ghcr.io/zaproxy/zaproxy:stable \
  zap.sh -cmd -autorun /zap/wrk/automation.yaml

# Quick scans
# Baseline scan (passive only, fast)
docker run --rm -v $(pwd):/zap/wrk ghcr.io/zaproxy/zaproxy:stable \
  zap-baseline.py -t https://target.example.com -r baseline-report.html

# Full scan (active + passive)
docker run --rm -v $(pwd):/zap/wrk ghcr.io/zaproxy/zaproxy:stable \
  zap-full-scan.py -t https://target.example.com -r full-report.html

# API scan
docker run --rm -v $(pwd):/zap/wrk ghcr.io/zaproxy/zaproxy:stable \
  zap-api-scan.py -t https://target.example.com/openapi.json -f openapi -r api-report.html

สร้าง Batch Scanning Pipeline ด้วย Python

Python script สำหรับ scan หลาย targets

#!/usr/bin/env python3
# zap_batch_scanner.py — Batch DAST Scanner with OWASP ZAP
import requests
import time
import json
import logging
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("zap_batch")

@dataclass
class ScanTarget:
    name: str
    url: str
    scan_type: str = "baseline"  # baseline, full, api
    api_spec: str = ""
    auth_token: str = ""

@dataclass
class ScanResult:
    target_name: str
    url: str
    scan_type: str
    status: str
    high_alerts: int = 0
    medium_alerts: int = 0
    low_alerts: int = 0
    info_alerts: int = 0
    duration_seconds: float = 0
    report_path: str = ""
    error: str = ""

class ZAPBatchScanner:
    def __init__(self, zap_url="http://localhost:8080", api_key="your-api-key"):
        self.zap_url = zap_url
        self.api_key = api_key
        self.session = requests.Session()
        self.results = []
    
    def _api(self, endpoint, params=None):
        params = params or {}
        params["apikey"] = self.api_key
        resp = self.session.get(f"{self.zap_url}{endpoint}", params=params)
        resp.raise_for_status()
        return resp.json()
    
    def wait_for_zap(self, timeout=120):
        start = time.time()
        while time.time() - start < timeout:
            try:
                self._api("/JSON/core/view/version/")
                logger.info("ZAP is ready")
                return True
            except Exception:
                time.sleep(2)
        raise TimeoutError("ZAP did not start in time")
    
    def scan_target(self, target: ScanTarget) -> ScanResult:
        start = time.time()
        logger.info(f"Starting {target.scan_type} scan: {target.name} ({target.url})")
        
        try:
            # Clear previous session
            self._api("/JSON/core/action/newSession/", {"name": target.name})
            
            # Set auth header if provided
            if target.auth_token:
                self._api("/JSON/replacer/action/addRule/", {
                    "description": "Auth Header",
                    "enabled": "true",
                    "matchType": "REQ_HEADER",
                    "matchRegex": "false",
                    "matchString": "Authorization",
                    "replacement": f"Bearer {target.auth_token}",
                })
            
            # Spider
            logger.info(f"[{target.name}] Spidering...")
            scan_id = self._api("/JSON/spider/action/scan/", {
                "url": target.url, "maxChildren": "10", "recurse": "true"
            })["scan"]
            
            while True:
                progress = int(self._api("/JSON/spider/view/status/", {"scanId": scan_id})["status"])
                if progress >= 100:
                    break
                time.sleep(2)
            
            # Passive scan wait
            logger.info(f"[{target.name}] Waiting for passive scan...")
            while int(self._api("/JSON/pscan/view/recordsToScan/")["recordsToScan"]) > 0:
                time.sleep(1)
            
            # Active scan (only for full scan type)
            if target.scan_type == "full":
                logger.info(f"[{target.name}] Active scanning...")
                scan_id = self._api("/JSON/ascan/action/scan/", {
                    "url": target.url, "recurse": "true"
                })["scan"]
                
                while True:
                    progress = int(self._api("/JSON/ascan/view/status/", {"scanId": scan_id})["status"])
                    if progress >= 100:
                        break
                    time.sleep(5)
            
            # Get alerts
            alerts = self._api("/JSON/core/view/alerts/", {"baseurl": target.url})["alerts"]
            
            high = sum(1 for a in alerts if a["risk"] == "High")
            medium = sum(1 for a in alerts if a["risk"] == "Medium")
            low = sum(1 for a in alerts if a["risk"] == "Low")
            info = sum(1 for a in alerts if a["risk"] == "Informational")
            
            # Generate report
            report_dir = Path("reports")
            report_dir.mkdir(exist_ok=True)
            report_path = report_dir / f"{target.name}_{datetime.now().strftime('%Y%m%d_%H%M')}.html"
            
            html_report = self.session.get(
                f"{self.zap_url}/OTHER/core/other/htmlreport/",
                params={"apikey": self.api_key}
            ).text
            report_path.write_text(html_report)
            
            duration = time.time() - start
            result = ScanResult(
                target_name=target.name, url=target.url,
                scan_type=target.scan_type, status="completed",
                high_alerts=high, medium_alerts=medium,
                low_alerts=low, info_alerts=info,
                duration_seconds=round(duration, 1),
                report_path=str(report_path),
            )
            
            logger.info(f"[{target.name}] Done: H={high} M={medium} L={low} I={info} ({duration:.0f}s)")
            return result
            
        except Exception as e:
            return ScanResult(
                target_name=target.name, url=target.url,
                scan_type=target.scan_type, status="error",
                duration_seconds=round(time.time() - start, 1),
                error=str(e),
            )
    
    def batch_scan(self, targets):
        self.wait_for_zap()
        
        for target in targets:
            result = self.scan_target(target)
            self.results.append(result)
        
        self._generate_summary()
        return self.results
    
    def _generate_summary(self):
        print(f"\n{'='*60}")
        print(f"Batch Scan Summary — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        print(f"{'='*60}")
        
        for r in self.results:
            status_icon = "PASS" if r.high_alerts == 0 else "FAIL"
            print(f"[{status_icon}] {r.target_name}: H={r.high_alerts} M={r.medium_alerts} "
                  f"L={r.low_alerts} ({r.duration_seconds}s)")
        
        total_high = sum(r.high_alerts for r in self.results)
        print(f"\nTotal HIGH alerts: {total_high}")
        
        summary = [asdict(r) for r in self.results]
        Path("reports/summary.json").write_text(json.dumps(summary, indent=2))

# ใช้งาน
targets = [
    ScanTarget("frontend", "https://app.example.com", "baseline"),
    ScanTarget("api", "https://api.example.com", "full"),
    ScanTarget("admin", "https://admin.example.com", "baseline", auth_token="xxx"),
]

scanner = ZAPBatchScanner()
results = scanner.batch_scan(targets)

รวม ZAP กับ CI/CD Pipeline

GitHub Actions workflow สำหรับ automated DAST

# .github/workflows/dast-scan.yml
name: DAST Security Scan

on:
  push:
    branches: [main]
  schedule:
    - cron: '0 2 * * 1'  # ทุกวันจันทร์ 02:00 UTC

jobs:
  dast-baseline:
    runs-on: ubuntu-latest
    services:
      app:
        image: }:latest
        ports: ["8080:8080"]
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Wait for app
        run: |
          for i in $(seq 1 30); do
            curl -s http://localhost:8080/health && break || sleep 2
          done
      
      - name: ZAP Baseline Scan
        uses: zaproxy/action-baseline@v0.12.0
        with:
          target: 'http://localhost:8080'
          rules_file_name: '.zap/rules.tsv'
          cmd_options: '-a -j'
      
      - name: Upload Report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: zap-baseline-report
          path: report_html.html

  dast-full:
    runs-on: ubuntu-latest
    if: github.event_name == 'schedule'
    
    steps:
      - uses: actions/checkout@v4
      
      - name: Deploy to staging
        run: |
          echo "Deploy app to staging environment"
      
      - name: ZAP Full Scan
        uses: zaproxy/action-full-scan@v0.10.0
        with:
          target: 'https://staging.example.com'
          rules_file_name: '.zap/rules.tsv'
          cmd_options: '-a -j -m 30'
      
      - name: Check Results
        if: always()
        run: |
          if grep -q '"risk": "High"' zap-report.json 2>/dev/null; then
            echo "HIGH vulnerabilities found!"
            exit 1
          fi
      
      - name: Notify Security Team
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {"text": "DAST Scan FAILED: High vulnerabilities detected in staging"}

  dast-api:
    runs-on: ubuntu-latest
    if: github.event_name == 'schedule'
    
    steps:
      - uses: actions/checkout@v4
      
      - name: ZAP API Scan
        uses: zaproxy/action-api-scan@v0.7.0
        with:
          target: 'https://staging.example.com/openapi.json'
          format: openapi
          rules_file_name: '.zap/api-rules.tsv'

# .zap/rules.tsv — Customize alert handling
# ID	Action	Description
# 10015	IGNORE	Incomplete or No Cache-control Header Set
# 10037	IGNORE	Server Leaks Information via X-Powered-By
# 10096	WARN	Timestamp Disclosure
# 40012	FAIL	Cross Site Scripting (Reflected)
# 40014	FAIL	Cross Site Scripting (Persistent)
# 40018	FAIL	SQL Injection
# 90033	FAIL	Loosely Scoped Cookie

วิเคราะห์ผลลัพธ์และสร้าง Reports

สร้าง custom reports จากผลการ scan

#!/usr/bin/env python3
# zap_reporter.py — Custom Report Generator for ZAP Results
import json
from pathlib import Path
from datetime import datetime
from collections import Counter

class ZAPReporter:
    def __init__(self, results_file="reports/summary.json"):
        with open(results_file) as f:
            self.results = json.load(f)
    
    def generate_executive_summary(self):
        total_targets = len(self.results)
        passed = sum(1 for r in self.results if r["high_alerts"] == 0)
        failed = total_targets - passed
        
        total_high = sum(r["high_alerts"] for r in self.results)
        total_medium = sum(r["medium_alerts"] for r in self.results)
        total_low = sum(r["low_alerts"] for r in self.results)
        
        report = f"""
# Security Scan Executive Summary
Date: {datetime.now().strftime('%Y-%m-%d %H:%M')}

## Overview
- **Targets Scanned**: {total_targets}
- **Passed**: {passed}
- **Failed**: {failed}
- **Pass Rate**: {passed/total_targets*100:.0f}%

## Vulnerability Summary
| Severity | Count |
|----------|-------|
| High     | {total_high} |
| Medium   | {total_medium} |
| Low      | {total_low} |

## Target Details
"""
        for r in self.results:
            status = "PASS" if r["high_alerts"] == 0 else "FAIL"
            report += f"- [{status}] **{r['target_name']}** ({r['url']}): "
            report += f"H={r['high_alerts']} M={r['medium_alerts']} L={r['low_alerts']} "
            report += f"({r['duration_seconds']}s)\n"
        
        if failed > 0:
            report += "\n## Action Required\n"
            for r in self.results:
                if r["high_alerts"] > 0:
                    report += f"- **{r['target_name']}**: {r['high_alerts']} HIGH vulnerabilities need immediate attention\n"
        
        return report
    
    def generate_trend_data(self, history_file="reports/history.json"):
        history = []
        if Path(history_file).exists():
            with open(history_file) as f:
                history = json.load(f)
        
        current = {
            "date": datetime.now().isoformat(),
            "targets": len(self.results),
            "high": sum(r["high_alerts"] for r in self.results),
            "medium": sum(r["medium_alerts"] for r in self.results),
            "low": sum(r["low_alerts"] for r in self.results),
        }
        history.append(current)
        
        with open(history_file, "w") as f:
            json.dump(history[-90:], f, indent=2)
        
        return history
    
    def check_policy(self, max_high=0, max_medium=5):
        violations = []
        
        for r in self.results:
            if r["high_alerts"] > max_high:
                violations.append(f"{r['target_name']}: {r['high_alerts']} HIGH (max: {max_high})")
            if r["medium_alerts"] > max_medium:
                violations.append(f"{r['target_name']}: {r['medium_alerts']} MEDIUM (max: {max_medium})")
        
        return {
            "passed": len(violations) == 0,
            "violations": violations,
        }

reporter = ZAPReporter()
summary = reporter.generate_executive_summary()
print(summary)

policy = reporter.check_policy(max_high=0, max_medium=5)
if not policy["passed"]:
    print("POLICY VIOLATIONS:")
    for v in policy["violations"]:
        print(f"  - {v}")
    exit(1)

Advanced Configuration และ Custom Scan Policies

ตั้งค่า scan policies และ custom scripts

# === Custom Scan Policy ===
# สร้างผ่าน ZAP API

# Python script สำหรับสร้าง custom policy
import requests

ZAP_URL = "http://localhost:8080"
API_KEY = "your-api-key"

def zap_api(endpoint, params=None):
    params = params or {}
    params["apikey"] = API_KEY
    return requests.get(f"{ZAP_URL}{endpoint}", params=params).json()

# สร้าง scan policy
zap_api("/JSON/ascan/action/addScanPolicy/", {"scanPolicyName": "production-policy"})

# ปิด scanners ที่ไม่ต้องการ (ลด scan time)
# Buffer Overflow (ไม่เกี่ยวกับ web apps ส่วนใหญ่)
zap_api("/JSON/ascan/action/disableScanners/", {
    "ids": "30001,30002",
    "scanPolicyName": "production-policy"
})

# เปิด scanners ที่สำคัญ
# SQL Injection, XSS, CSRF, Path Traversal
zap_api("/JSON/ascan/action/enableScanners/", {
    "ids": "40018,40012,40014,6,40003",
    "scanPolicyName": "production-policy"
})

# ตั้ง threshold และ strength
# Threshold: OFF, DEFAULT, LOW, MEDIUM, HIGH
# Strength: DEFAULT, LOW, MEDIUM, HIGH, INSANE
zap_api("/JSON/ascan/action/setScannerAlertThreshold/", {
    "id": "40018",  # SQL Injection
    "alertThreshold": "LOW",
    "scanPolicyName": "production-policy"
})

zap_api("/JSON/ascan/action/setScannerAttackStrength/", {
    "id": "40018",
    "attackStrength": "HIGH",
    "scanPolicyName": "production-policy"
})

# === ZAP Script สำหรับ Custom Authentication ===
# authentication_script.js (Zest/JS)
# สำหรับ JWT-based authentication

# === Docker Compose สำหรับ Production ===
# docker-compose.yml:
# services:
#   zap:
#     image: ghcr.io/zaproxy/zaproxy:stable
#     command: >
#       zap.sh -daemon -host 0.0.0.0 -port 8080
#       -config api.key=
#       -config api.addrs.addr.name=.*
#       -config api.addrs.addr.regex=true
#       -config connection.timeoutInSecs=120
#     ports: ["8080:8080"]
#     volumes:
#       - zap-data:/zap/wrk
#       - ./policies:/home/zap/.ZAP/policies
#       - ./scripts:/home/zap/.ZAP/scripts
#     deploy:
#       resources:
#         limits:
#           cpus: '2.0'
#           memory: 4G
#
#   scanner:
#     build: ./scanner
#     depends_on: [zap]
#     environment:
#       ZAP_URL: http://zap:8080
#       ZAP_API_KEY: 
#     volumes:
#       - ./reports:/app/reports
#
# volumes:
#   zap-data:

FAQ คำถามที่พบบ่อย

Q: Baseline scan กับ Full scan ต่างกันอย่างไร?

A: Baseline scan ทำเฉพาะ passive scanning ที่วิเคราะห์ HTTP responses โดยไม่ส่ง attack payloads ใช้เวลา 1-5 นาที เหมาะสำหรับ CI/CD pipeline ทุก commit Full scan ทำทั้ง passive และ active scanning ที่ส่ง attack payloads จริง ใช้เวลา 30 นาทีถึงหลายชั่วโมง เหมาะสำหรับ scheduled weekly/monthly scans

Q: ZAP scan ทำให้ production ล่มได้ไหม?

A: Active scan ส่ง malicious payloads จริง อาจทำให้ application crash หรือ data corruption ได้ ไม่ควรรัน active scan กับ production โดยตรง ใช้ staging/testing environment แทน Baseline scan (passive only) ปลอดภัยกว่าแต่ก็อาจเพิ่ม load ได้ ควรจำกัด concurrent requests และ scan speed

Q: ZAP กับ Burp Suite เลือกอันไหน?

A: ZAP เป็น open source ฟรี เหมาะสำหรับ CI/CD automation มี Docker support ดี API ครบ Burp Suite Pro มี scanner ที่แม่นยำกว่า UI ดีกว่าสำหรับ manual testing มี extensions มากกว่า แต่มีค่าลิขสิทธิ์ สำหรับ automated pipeline scanning ZAP เพียงพอ สำหรับ penetration testing แบบ manual Burp Suite ดีกว่า หลายทีมใช้ทั้งสอง

Q: ลด false positives ของ ZAP ได้อย่างไร?

A: ใช้ rules.tsv file กำหนด IGNORE สำหรับ alerts ที่ไม่เกี่ยวข้อง ตั้ง alert threshold เป็น MEDIUM หรือ HIGH สร้าง context ที่กำหนด scope ชัดเจน exclude URLs ที่ไม่ต้อง scan (logout, static files) ใช้ custom scan policy ที่ปิด scanners ที่ไม่เกี่ยวข้องกับ tech stack และ review alerts เป็นประจำเพื่อ tune configuration

📖 บทความที่เกี่ยวข้อง

OWASP ZAP Citizen Developerอ่านบทความ → OWASP ZAP Audit Trail Loggingอ่านบทความ → OWASP ZAP Agile Scrum Kanbanอ่านบทความ → OWASP ZAP Shift Left Securityอ่านบทความ → OWASP ZAP Production Setup Guideอ่านบทความ →

📚 ดูบทความทั้งหมด →