OWASP ZAP คืออะไรและใช้ทำอะไรใน Pipeline
OWASP ZAP (Zed Attack Proxy) เป็น open source web application security scanner ที่พัฒนาโดย OWASP community ใช้สำหรับค้นหาช่องโหว่ด้านความปลอดภัยใน web applications เช่น SQL Injection, Cross-Site Scripting (XSS), CSRF, Insecure Headers และอีกมากมาย ZAP เป็นเครื่องมือ DAST (Dynamic Application Security Testing) ที่ทดสอบ application ขณะทำงานจริง
การใช้ ZAP ใน Batch Processing Pipeline หมายถึงการ scan หลาย targets พร้อมกันหรือต่อเนื่องแบบอัตโนมัติ เหมาะสำหรับองค์กรที่มี web applications หลายตัวและต้องการ scan ทั้งหมดเป็นประจำ หรือ scan ทุกครั้งที่มีการ deploy
ZAP รองรับ automation ผ่านหลายช่องทาง ได้แก่ ZAP API (REST) สำหรับควบคุม ZAP ผ่าน HTTP, ZAP Automation Framework สำหรับ YAML-based configuration, ZAP Docker image สำหรับ containerized scanning และ ZAP CLI สำหรับ command-line execution
ข้อดีของ ZAP เมื่อเทียบกับ commercial tools เช่น Burp Suite Enterprise คือ ฟรีและ open source, community ใหญ่, API ครบถ้วน, Docker support ดี และมี Marketplace สำหรับ add-ons
ติดตั้ง ZAP และตั้งค่าสำหรับ Automation
วิธีติดตั้งและเตรียม ZAP สำหรับ batch scanning
# ติดตั้ง ZAP ด้วย Docker (แนะนำสำหรับ CI/CD)
docker pull ghcr.io/zaproxy/zaproxy:stable
# รัน ZAP ในโหมด daemon (headless)
docker run -d --name zap \
-p 8080:8080 \
-v $(pwd)/zap-work:/zap/wrk \
ghcr.io/zaproxy/zaproxy:stable \
zap.sh -daemon -host 0.0.0.0 -port 8080 \
-config api.key=your-api-key-here \
-config api.addrs.addr.name=.* \
-config api.addrs.addr.regex=true
# ตรวจสอบว่า ZAP ทำงาน
curl http://localhost:8080/JSON/core/view/version/?apikey=your-api-key-here
# === ZAP Automation Framework (YAML config) ===
# automation.yaml
# env:
# contexts:
# - name: "target-app"
# urls:
# - "https://target-app.example.com"
# includePaths:
# - "https://target-app.example.com/.*"
# excludePaths:
# - "https://target-app.example.com/logout.*"
# authentication:
# method: "form"
# parameters:
# loginUrl: "https://target-app.example.com/login"
# loginRequestData: "username={%username%}&password={%password%}"
# verification:
# method: "response"
# loggedInRegex: "Welcome.*"
# users:
# - name: "test-user"
# credentials:
# username: "testuser"
# password: "testpass123"
#
# jobs:
# - type: spider
# parameters:
# maxDuration: 5
# maxDepth: 5
# maxChildren: 10
#
# - type: spiderAjax
# parameters:
# maxDuration: 5
# maxCrawlDepth: 3
#
# - type: passiveScan-wait
# parameters:
# maxDuration: 10
#
# - type: activeScan
# parameters:
# maxRuleDurationInMins: 5
# maxScanDurationInMins: 30
#
# - type: report
# parameters:
# template: "traditional-html"
# reportDir: "/zap/wrk"
# reportFile: "zap-report"
# รัน Automation Framework
docker run --rm \
-v $(pwd):/zap/wrk:rw \
ghcr.io/zaproxy/zaproxy:stable \
zap.sh -cmd -autorun /zap/wrk/automation.yaml
# Quick scans
# Baseline scan (passive only, fast)
docker run --rm -v $(pwd):/zap/wrk ghcr.io/zaproxy/zaproxy:stable \
zap-baseline.py -t https://target.example.com -r baseline-report.html
# Full scan (active + passive)
docker run --rm -v $(pwd):/zap/wrk ghcr.io/zaproxy/zaproxy:stable \
zap-full-scan.py -t https://target.example.com -r full-report.html
# API scan
docker run --rm -v $(pwd):/zap/wrk ghcr.io/zaproxy/zaproxy:stable \
zap-api-scan.py -t https://target.example.com/openapi.json -f openapi -r api-report.html
สร้าง Batch Scanning Pipeline ด้วย Python
Python script สำหรับ scan หลาย targets
#!/usr/bin/env python3
# zap_batch_scanner.py — Batch DAST Scanner with OWASP ZAP
import requests
import time
import json
import logging
from datetime import datetime
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, asdict
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("zap_batch")
@dataclass
class ScanTarget:
name: str
url: str
scan_type: str = "baseline" # baseline, full, api
api_spec: str = ""
auth_token: str = ""
@dataclass
class ScanResult:
target_name: str
url: str
scan_type: str
status: str
high_alerts: int = 0
medium_alerts: int = 0
low_alerts: int = 0
info_alerts: int = 0
duration_seconds: float = 0
report_path: str = ""
error: str = ""
class ZAPBatchScanner:
def __init__(self, zap_url="http://localhost:8080", api_key="your-api-key"):
self.zap_url = zap_url
self.api_key = api_key
self.session = requests.Session()
self.results = []
def _api(self, endpoint, params=None):
params = params or {}
params["apikey"] = self.api_key
resp = self.session.get(f"{self.zap_url}{endpoint}", params=params)
resp.raise_for_status()
return resp.json()
def wait_for_zap(self, timeout=120):
start = time.time()
while time.time() - start < timeout:
try:
self._api("/JSON/core/view/version/")
logger.info("ZAP is ready")
return True
except Exception:
time.sleep(2)
raise TimeoutError("ZAP did not start in time")
def scan_target(self, target: ScanTarget) -> ScanResult:
start = time.time()
logger.info(f"Starting {target.scan_type} scan: {target.name} ({target.url})")
try:
# Clear previous session
self._api("/JSON/core/action/newSession/", {"name": target.name})
# Set auth header if provided
if target.auth_token:
self._api("/JSON/replacer/action/addRule/", {
"description": "Auth Header",
"enabled": "true",
"matchType": "REQ_HEADER",
"matchRegex": "false",
"matchString": "Authorization",
"replacement": f"Bearer {target.auth_token}",
})
# Spider
logger.info(f"[{target.name}] Spidering...")
scan_id = self._api("/JSON/spider/action/scan/", {
"url": target.url, "maxChildren": "10", "recurse": "true"
})["scan"]
while True:
progress = int(self._api("/JSON/spider/view/status/", {"scanId": scan_id})["status"])
if progress >= 100:
break
time.sleep(2)
# Passive scan wait
logger.info(f"[{target.name}] Waiting for passive scan...")
while int(self._api("/JSON/pscan/view/recordsToScan/")["recordsToScan"]) > 0:
time.sleep(1)
# Active scan (only for full scan type)
if target.scan_type == "full":
logger.info(f"[{target.name}] Active scanning...")
scan_id = self._api("/JSON/ascan/action/scan/", {
"url": target.url, "recurse": "true"
})["scan"]
while True:
progress = int(self._api("/JSON/ascan/view/status/", {"scanId": scan_id})["status"])
if progress >= 100:
break
time.sleep(5)
# Get alerts
alerts = self._api("/JSON/core/view/alerts/", {"baseurl": target.url})["alerts"]
high = sum(1 for a in alerts if a["risk"] == "High")
medium = sum(1 for a in alerts if a["risk"] == "Medium")
low = sum(1 for a in alerts if a["risk"] == "Low")
info = sum(1 for a in alerts if a["risk"] == "Informational")
# Generate report
report_dir = Path("reports")
report_dir.mkdir(exist_ok=True)
report_path = report_dir / f"{target.name}_{datetime.now().strftime('%Y%m%d_%H%M')}.html"
html_report = self.session.get(
f"{self.zap_url}/OTHER/core/other/htmlreport/",
params={"apikey": self.api_key}
).text
report_path.write_text(html_report)
duration = time.time() - start
result = ScanResult(
target_name=target.name, url=target.url,
scan_type=target.scan_type, status="completed",
high_alerts=high, medium_alerts=medium,
low_alerts=low, info_alerts=info,
duration_seconds=round(duration, 1),
report_path=str(report_path),
)
logger.info(f"[{target.name}] Done: H={high} M={medium} L={low} I={info} ({duration:.0f}s)")
return result
except Exception as e:
return ScanResult(
target_name=target.name, url=target.url,
scan_type=target.scan_type, status="error",
duration_seconds=round(time.time() - start, 1),
error=str(e),
)
def batch_scan(self, targets):
self.wait_for_zap()
for target in targets:
result = self.scan_target(target)
self.results.append(result)
self._generate_summary()
return self.results
def _generate_summary(self):
print(f"\n{'='*60}")
print(f"Batch Scan Summary — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*60}")
for r in self.results:
status_icon = "PASS" if r.high_alerts == 0 else "FAIL"
print(f"[{status_icon}] {r.target_name}: H={r.high_alerts} M={r.medium_alerts} "
f"L={r.low_alerts} ({r.duration_seconds}s)")
total_high = sum(r.high_alerts for r in self.results)
print(f"\nTotal HIGH alerts: {total_high}")
summary = [asdict(r) for r in self.results]
Path("reports/summary.json").write_text(json.dumps(summary, indent=2))
# ใช้งาน
targets = [
ScanTarget("frontend", "https://app.example.com", "baseline"),
ScanTarget("api", "https://api.example.com", "full"),
ScanTarget("admin", "https://admin.example.com", "baseline", auth_token="xxx"),
]
scanner = ZAPBatchScanner()
results = scanner.batch_scan(targets)
รวม ZAP กับ CI/CD Pipeline
GitHub Actions workflow สำหรับ automated DAST
# .github/workflows/dast-scan.yml
name: DAST Security Scan
on:
push:
branches: [main]
schedule:
- cron: '0 2 * * 1' # ทุกวันจันทร์ 02:00 UTC
jobs:
dast-baseline:
runs-on: ubuntu-latest
services:
app:
image: }:latest
ports: ["8080:8080"]
steps:
- uses: actions/checkout@v4
- name: Wait for app
run: |
for i in $(seq 1 30); do
curl -s http://localhost:8080/health && break || sleep 2
done
- name: ZAP Baseline Scan
uses: zaproxy/action-baseline@v0.12.0
with:
target: 'http://localhost:8080'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a -j'
- name: Upload Report
if: always()
uses: actions/upload-artifact@v4
with:
name: zap-baseline-report
path: report_html.html
dast-full:
runs-on: ubuntu-latest
if: github.event_name == 'schedule'
steps:
- uses: actions/checkout@v4
- name: Deploy to staging
run: |
echo "Deploy app to staging environment"
- name: ZAP Full Scan
uses: zaproxy/action-full-scan@v0.10.0
with:
target: 'https://staging.example.com'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a -j -m 30'
- name: Check Results
if: always()
run: |
if grep -q '"risk": "High"' zap-report.json 2>/dev/null; then
echo "HIGH vulnerabilities found!"
exit 1
fi
- name: Notify Security Team
if: failure()
uses: slackapi/slack-github-action@v1
with:
payload: |
{"text": "DAST Scan FAILED: High vulnerabilities detected in staging"}
dast-api:
runs-on: ubuntu-latest
if: github.event_name == 'schedule'
steps:
- uses: actions/checkout@v4
- name: ZAP API Scan
uses: zaproxy/action-api-scan@v0.7.0
with:
target: 'https://staging.example.com/openapi.json'
format: openapi
rules_file_name: '.zap/api-rules.tsv'
# .zap/rules.tsv — Customize alert handling
# ID Action Description
# 10015 IGNORE Incomplete or No Cache-control Header Set
# 10037 IGNORE Server Leaks Information via X-Powered-By
# 10096 WARN Timestamp Disclosure
# 40012 FAIL Cross Site Scripting (Reflected)
# 40014 FAIL Cross Site Scripting (Persistent)
# 40018 FAIL SQL Injection
# 90033 FAIL Loosely Scoped Cookie
วิเคราะห์ผลลัพธ์และสร้าง Reports
สร้าง custom reports จากผลการ scan
#!/usr/bin/env python3
# zap_reporter.py — Custom Report Generator for ZAP Results
import json
from pathlib import Path
from datetime import datetime
from collections import Counter
class ZAPReporter:
def __init__(self, results_file="reports/summary.json"):
with open(results_file) as f:
self.results = json.load(f)
def generate_executive_summary(self):
total_targets = len(self.results)
passed = sum(1 for r in self.results if r["high_alerts"] == 0)
failed = total_targets - passed
total_high = sum(r["high_alerts"] for r in self.results)
total_medium = sum(r["medium_alerts"] for r in self.results)
total_low = sum(r["low_alerts"] for r in self.results)
report = f"""
# Security Scan Executive Summary
Date: {datetime.now().strftime('%Y-%m-%d %H:%M')}
## Overview
- **Targets Scanned**: {total_targets}
- **Passed**: {passed}
- **Failed**: {failed}
- **Pass Rate**: {passed/total_targets*100:.0f}%
## Vulnerability Summary
| Severity | Count |
|----------|-------|
| High | {total_high} |
| Medium | {total_medium} |
| Low | {total_low} |
## Target Details
"""
for r in self.results:
status = "PASS" if r["high_alerts"] == 0 else "FAIL"
report += f"- [{status}] **{r['target_name']}** ({r['url']}): "
report += f"H={r['high_alerts']} M={r['medium_alerts']} L={r['low_alerts']} "
report += f"({r['duration_seconds']}s)\n"
if failed > 0:
report += "\n## Action Required\n"
for r in self.results:
if r["high_alerts"] > 0:
report += f"- **{r['target_name']}**: {r['high_alerts']} HIGH vulnerabilities need immediate attention\n"
return report
def generate_trend_data(self, history_file="reports/history.json"):
history = []
if Path(history_file).exists():
with open(history_file) as f:
history = json.load(f)
current = {
"date": datetime.now().isoformat(),
"targets": len(self.results),
"high": sum(r["high_alerts"] for r in self.results),
"medium": sum(r["medium_alerts"] for r in self.results),
"low": sum(r["low_alerts"] for r in self.results),
}
history.append(current)
with open(history_file, "w") as f:
json.dump(history[-90:], f, indent=2)
return history
def check_policy(self, max_high=0, max_medium=5):
violations = []
for r in self.results:
if r["high_alerts"] > max_high:
violations.append(f"{r['target_name']}: {r['high_alerts']} HIGH (max: {max_high})")
if r["medium_alerts"] > max_medium:
violations.append(f"{r['target_name']}: {r['medium_alerts']} MEDIUM (max: {max_medium})")
return {
"passed": len(violations) == 0,
"violations": violations,
}
reporter = ZAPReporter()
summary = reporter.generate_executive_summary()
print(summary)
policy = reporter.check_policy(max_high=0, max_medium=5)
if not policy["passed"]:
print("POLICY VIOLATIONS:")
for v in policy["violations"]:
print(f" - {v}")
exit(1)
Advanced Configuration และ Custom Scan Policies
ตั้งค่า scan policies และ custom scripts
# === Custom Scan Policy ===
# สร้างผ่าน ZAP API
# Python script สำหรับสร้าง custom policy
import requests
ZAP_URL = "http://localhost:8080"
API_KEY = "your-api-key"
def zap_api(endpoint, params=None):
params = params or {}
params["apikey"] = API_KEY
return requests.get(f"{ZAP_URL}{endpoint}", params=params).json()
# สร้าง scan policy
zap_api("/JSON/ascan/action/addScanPolicy/", {"scanPolicyName": "production-policy"})
# ปิด scanners ที่ไม่ต้องการ (ลด scan time)
# Buffer Overflow (ไม่เกี่ยวกับ web apps ส่วนใหญ่)
zap_api("/JSON/ascan/action/disableScanners/", {
"ids": "30001,30002",
"scanPolicyName": "production-policy"
})
# เปิด scanners ที่สำคัญ
# SQL Injection, XSS, CSRF, Path Traversal
zap_api("/JSON/ascan/action/enableScanners/", {
"ids": "40018,40012,40014,6,40003",
"scanPolicyName": "production-policy"
})
# ตั้ง threshold และ strength
# Threshold: OFF, DEFAULT, LOW, MEDIUM, HIGH
# Strength: DEFAULT, LOW, MEDIUM, HIGH, INSANE
zap_api("/JSON/ascan/action/setScannerAlertThreshold/", {
"id": "40018", # SQL Injection
"alertThreshold": "LOW",
"scanPolicyName": "production-policy"
})
zap_api("/JSON/ascan/action/setScannerAttackStrength/", {
"id": "40018",
"attackStrength": "HIGH",
"scanPolicyName": "production-policy"
})
# === ZAP Script สำหรับ Custom Authentication ===
# authentication_script.js (Zest/JS)
# สำหรับ JWT-based authentication
# === Docker Compose สำหรับ Production ===
# docker-compose.yml:
# services:
# zap:
# image: ghcr.io/zaproxy/zaproxy:stable
# command: >
# zap.sh -daemon -host 0.0.0.0 -port 8080
# -config api.key=
# -config api.addrs.addr.name=.*
# -config api.addrs.addr.regex=true
# -config connection.timeoutInSecs=120
# ports: ["8080:8080"]
# volumes:
# - zap-data:/zap/wrk
# - ./policies:/home/zap/.ZAP/policies
# - ./scripts:/home/zap/.ZAP/scripts
# deploy:
# resources:
# limits:
# cpus: '2.0'
# memory: 4G
#
# scanner:
# build: ./scanner
# depends_on: [zap]
# environment:
# ZAP_URL: http://zap:8080
# ZAP_API_KEY:
# volumes:
# - ./reports:/app/reports
#
# volumes:
# zap-data:
FAQ คำถามที่พบบ่อย
Q: Baseline scan กับ Full scan ต่างกันอย่างไร?
A: Baseline scan ทำเฉพาะ passive scanning ที่วิเคราะห์ HTTP responses โดยไม่ส่ง attack payloads ใช้เวลา 1-5 นาที เหมาะสำหรับ CI/CD pipeline ทุก commit Full scan ทำทั้ง passive และ active scanning ที่ส่ง attack payloads จริง ใช้เวลา 30 นาทีถึงหลายชั่วโมง เหมาะสำหรับ scheduled weekly/monthly scans
Q: ZAP scan ทำให้ production ล่มได้ไหม?
A: Active scan ส่ง malicious payloads จริง อาจทำให้ application crash หรือ data corruption ได้ ไม่ควรรัน active scan กับ production โดยตรง ใช้ staging/testing environment แทน Baseline scan (passive only) ปลอดภัยกว่าแต่ก็อาจเพิ่ม load ได้ ควรจำกัด concurrent requests และ scan speed
Q: ZAP กับ Burp Suite เลือกอันไหน?
A: ZAP เป็น open source ฟรี เหมาะสำหรับ CI/CD automation มี Docker support ดี API ครบ Burp Suite Pro มี scanner ที่แม่นยำกว่า UI ดีกว่าสำหรับ manual testing มี extensions มากกว่า แต่มีค่าลิขสิทธิ์ สำหรับ automated pipeline scanning ZAP เพียงพอ สำหรับ penetration testing แบบ manual Burp Suite ดีกว่า หลายทีมใช้ทั้งสอง
Q: ลด false positives ของ ZAP ได้อย่างไร?
A: ใช้ rules.tsv file กำหนด IGNORE สำหรับ alerts ที่ไม่เกี่ยวข้อง ตั้ง alert threshold เป็น MEDIUM หรือ HIGH สร้าง context ที่กำหนด scope ชัดเจน exclude URLs ที่ไม่ต้อง scan (logout, static files) ใช้ custom scan policy ที่ปิด scanners ที่ไม่เกี่ยวข้องกับ tech stack และ review alerts เป็นประจำเพื่อ tune configuration
