OWASP ZAP ????????? Event-Driven Design ?????????????????????
OWASP ZAP (Zed Attack Proxy) ???????????? open-source web application security scanner ????????????????????????????????????????????????????????????????????? ??????????????????????????? DAST (Dynamic Application Security Testing) ?????? vulnerabilities ???????????? XSS, SQL Injection, CSRF, Broken Authentication ?????? web applications
Event-Driven Design ?????? context ????????? security testing ????????????????????? ??????????????????????????? security scanning pipeline ????????? trigger ????????? events ??????????????? ???????????? Code push ?????? repository, Pull request ???????????????????????????, Deployment ??????????????????, Scheduled scan (??????????????????/?????????????????????), Vulnerability discovered (trigger deeper scan)
???????????????????????? Event-Driven Security Scanning Shift-left security (scan ?????????????????????????????? development lifecycle), Automated ?????????????????????????????? manual scans, Consistent ????????? deployment ???????????? security scan, Scalable ?????????????????????????????? applications ????????????????????????, Feedback loop ???????????? developers ????????? vulnerabilities ???????????????
????????????????????? OWASP ZAP ?????????????????? Event-Driven Architecture
Setup ZAP ?????????????????? automated scanning
# === OWASP ZAP Event-Driven Setup ===
# 1. Docker Compose for ZAP + Event System
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
zap:
image: ghcr.io/zaproxy/zaproxy:stable
command: zap.sh -daemon -host 0.0.0.0 -port 8080
-config api.key=zap-api-key-12345
-config api.addrs.addr.name=.*
-config api.addrs.addr.regex=true
ports:
- "8080:8080"
volumes:
- zap-data:/home/zap/.ZAP
- ./scripts:/home/zap/scripts
- ./reports:/home/zap/reports
networks:
- security-net
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret
networks:
- security-net
scan-orchestrator:
build: ./orchestrator
environment:
- ZAP_API_URL=http://zap:8080
- ZAP_API_KEY=zap-api-key-12345
- RABBITMQ_URL=amqp://admin:secret@rabbitmq:5672
- SLACK_WEBHOOK_URL=
depends_on:
- zap
- rabbitmq
networks:
- security-net
results-processor:
build: ./processor
environment:
- RABBITMQ_URL=amqp://admin:secret@rabbitmq:5672
- DATABASE_URL=postgres://user:pass@postgres:5432/security
depends_on:
- rabbitmq
networks:
- security-net
volumes:
zap-data:
networks:
security-net:
EOF
# 2. ZAP Automation Framework config
cat > zap-automation.yaml << 'EOF'
env:
contexts:
- name: "WebApp"
urls:
- "https://app.example.com"
includePaths:
- "https://app.example.com/.*"
excludePaths:
- "https://app.example.com/logout.*"
- "https://app.example.com/static/.*"
authentication:
method: "form"
parameters:
loginUrl: "https://app.example.com/login"
loginRequestData: "username={%username%}&password={%password%}"
verification:
method: "response"
loggedInRegex: "dashboard"
jobs:
- type: spider
parameters:
maxDuration: 5
maxDepth: 5
maxChildren: 10
- type: spiderAjax
parameters:
maxDuration: 5
maxCrawlDepth: 3
- type: passiveScan-wait
parameters:
maxDuration: 10
- type: activeScan
parameters:
maxRuleDurationInMins: 5
maxScanDurationInMins: 30
policy: "Default Policy"
- type: report
parameters:
template: "traditional-json"
reportDir: "/home/zap/reports"
reportFile: "scan-report"
EOF
echo "ZAP event-driven setup configured"
Automated Security Scanning Pipeline
??????????????? event-driven scanning pipeline
#!/usr/bin/env python3
# scan_orchestrator.py ??? Event-Driven Security Scan Orchestrator
import json
import logging
import time
from typing import Dict, List
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("orchestrator")
class ZAPScanOrchestrator:
"""Orchestrate OWASP ZAP scans based on events"""
def __init__(self, zap_url="http://localhost:8080", api_key="zap-api-key"):
self.zap_url = zap_url
self.api_key = api_key
self.scan_queue = []
self.results = []
def handle_event(self, event):
"""Route events to appropriate scan type"""
event_type = event.get("type", "")
handlers = {
"code_push": self._handle_code_push,
"pull_request": self._handle_pull_request,
"deployment": self._handle_deployment,
"scheduled": self._handle_scheduled,
"vulnerability_found": self._handle_vuln_found,
}
handler = handlers.get(event_type)
if handler:
return handler(event)
else:
logger.warning(f"Unknown event type: {event_type}")
return None
def _handle_code_push(self, event):
"""Quick baseline scan on code push"""
return {
"scan_type": "baseline",
"target": event.get("preview_url", ""),
"config": {
"spider_duration": 2,
"passive_only": True,
"ajax_spider": False,
},
"priority": "low",
"timeout_minutes": 10,
}
def _handle_pull_request(self, event):
"""Standard scan on PR"""
return {
"scan_type": "standard",
"target": event.get("preview_url", ""),
"config": {
"spider_duration": 5,
"passive_only": False,
"active_scan": True,
"ajax_spider": True,
},
"priority": "medium",
"timeout_minutes": 30,
}
def _handle_deployment(self, event):
"""Full scan after deployment"""
return {
"scan_type": "full",
"target": event.get("deploy_url", ""),
"config": {
"spider_duration": 10,
"passive_only": False,
"active_scan": True,
"ajax_spider": True,
"api_scan": True,
},
"priority": "high",
"timeout_minutes": 60,
}
def _handle_scheduled(self, event):
"""Deep scan on schedule"""
return {
"scan_type": "deep",
"target": event.get("target_url", ""),
"config": {
"spider_duration": 15,
"active_scan": True,
"ajax_spider": True,
"api_scan": True,
"full_policy": True,
},
"priority": "medium",
"timeout_minutes": 120,
}
def _handle_vuln_found(self, event):
"""Focused scan on specific vulnerability area"""
return {
"scan_type": "focused",
"target": event.get("affected_url", ""),
"config": {
"scan_rules": event.get("related_rules", []),
"deep_scan": True,
},
"priority": "critical",
"timeout_minutes": 15,
}
def process_results(self, scan_result):
"""Process and categorize scan results"""
alerts = scan_result.get("alerts", [])
categorized = {
"critical": [a for a in alerts if a.get("risk") == "High" and a.get("confidence") == "High"],
"high": [a for a in alerts if a.get("risk") == "High"],
"medium": [a for a in alerts if a.get("risk") == "Medium"],
"low": [a for a in alerts if a.get("risk") == "Low"],
"info": [a for a in alerts if a.get("risk") == "Informational"],
}
return {
"total_alerts": len(alerts),
"by_risk": {k: len(v) for k, v in categorized.items()},
"block_deployment": len(categorized["critical"]) > 0,
"requires_review": len(categorized["high"]) > 0,
}
# Demo
orchestrator = ZAPScanOrchestrator()
events = [
{"type": "code_push", "preview_url": "https://preview-123.app.com"},
{"type": "pull_request", "preview_url": "https://pr-456.app.com"},
{"type": "deployment", "deploy_url": "https://staging.app.com"},
{"type": "scheduled", "target_url": "https://app.com"},
]
print("Event-Driven Scan Orchestration:")
for event in events:
scan = orchestrator.handle_event(event)
if scan:
print(f"\n Event: {event['type']}")
print(f" Scan: {scan['scan_type']} (Priority: {scan['priority']}, Timeout: {scan['timeout_minutes']}min)")
print(f" Target: {scan['target']}")
Custom Scripts ????????? Event Hooks
??????????????? custom ZAP scripts ?????????????????? event-driven scanning
# === Custom ZAP Scripts ===
# 1. ZAP API Python client
cat > zap_client.py << 'PYTHON'
#!/usr/bin/env python3
"""ZAP API Client for Event-Driven Scanning"""
import requests
import time
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("zap-client")
class ZAPClient:
def __init__(self, base_url="http://localhost:8080", api_key=""):
self.base_url = base_url
self.api_key = api_key
def _request(self, endpoint, params=None):
params = params or {}
params["apikey"] = self.api_key
url = f"{self.base_url}{endpoint}"
resp = requests.get(url, params=params)
return resp.json()
def spider_scan(self, target, max_children=10):
"""Start spider scan"""
result = self._request("/JSON/spider/action/scan/", {
"url": target,
"maxChildren": max_children,
})
scan_id = result.get("scan", "0")
logger.info(f"Spider started: {scan_id}")
return scan_id
def active_scan(self, target):
"""Start active scan"""
result = self._request("/JSON/ascan/action/scan/", {
"url": target,
"recurse": "true",
})
scan_id = result.get("scan", "0")
logger.info(f"Active scan started: {scan_id}")
return scan_id
def wait_for_scan(self, scan_type, scan_id, timeout=300):
"""Wait for scan to complete"""
endpoint = f"/JSON/{scan_type}/view/status/"
start = time.time()
while time.time() - start < timeout:
result = self._request(endpoint, {"scanId": scan_id})
progress = int(result.get("status", "0"))
logger.info(f"Scan {scan_id} progress: {progress}%")
if progress >= 100:
return True
time.sleep(5)
logger.warning(f"Scan {scan_id} timed out")
return False
def get_alerts(self, base_url=None):
"""Get all alerts"""
params = {}
if base_url:
params["baseurl"] = base_url
return self._request("/JSON/alert/view/alerts/", params)
def generate_report(self, report_type="traditional-json"):
"""Generate scan report"""
return self._request("/JSON/reports/action/generate/", {
"template": report_type,
})
# Usage
client = ZAPClient("http://localhost:8080", "zap-api-key")
print("ZAP Client ready")
print("Methods: spider_scan(), active_scan(), get_alerts(), generate_report()")
PYTHON
# 2. Webhook receiver for events
cat > webhook_receiver.py << 'PYTHON'
#!/usr/bin/env python3
"""Webhook receiver for CI/CD events"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("webhook")
class WebhookHandler(BaseHTTPRequestHandler):
def do_POST(self):
content_length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(content_length)
event = json.loads(body)
# Route event
source = self.headers.get("X-Event-Source", "unknown")
logger.info(f"Event from {source}: {event.get('action', 'unknown')}")
# Map to scan event
if source == "github":
scan_event = self._map_github_event(event)
elif source == "gitlab":
scan_event = self._map_gitlab_event(event)
else:
scan_event = event
# Queue scan (publish to RabbitMQ)
logger.info(f"Queuing scan: {json.dumps(scan_event)}")
self.send_response(200)
self.end_headers()
self.wfile.write(b'{"status": "queued"}')
def _map_github_event(self, event):
action = event.get("action", "")
if action == "opened" and "pull_request" in event:
return {
"type": "pull_request",
"preview_url": event["pull_request"].get("html_url", ""),
"pr_number": event["pull_request"].get("number"),
}
return {"type": "code_push", "preview_url": ""}
def _map_gitlab_event(self, event):
return {"type": "code_push", "preview_url": ""}
print("Webhook receiver ready on :9000")
# HTTPServer(("0.0.0.0", 9000), WebhookHandler).serve_forever()
PYTHON
echo "Custom scripts ready"
CI/CD Integration
????????? ZAP ????????????????????? CI/CD pipeline
# === CI/CD Integration ===
# 1. GitHub Actions - ZAP Scan
cat > .github/workflows/zap-scan.yml << 'EOF'
name: OWASP ZAP Security Scan
on:
pull_request:
branches: [main]
push:
branches: [main]
schedule:
- cron: '0 2 * * 1' # Weekly Monday 2AM
jobs:
zap-baseline:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start application
run: docker compose up -d app
- name: ZAP Baseline Scan
uses: zaproxy/action-baseline@v0.12.0
with:
target: 'http://localhost:3000'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a -j'
- name: Upload Report
if: always()
uses: actions/upload-artifact@v4
with:
name: zap-baseline-report
path: report_html.html
zap-full:
if: github.event_name == 'push' || github.event_name == 'schedule'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start application
run: docker compose up -d app
- name: ZAP Full Scan
uses: zaproxy/action-full-scan@v0.10.0
with:
target: 'http://localhost:3000'
rules_file_name: '.zap/rules.tsv'
cmd_options: '-a -j'
- name: Check results
run: |
HIGH=$(cat report_json.json | python3 -c "
import json,sys
data=json.load(sys.stdin)
alerts=[a for a in data.get('site',[{}])[0].get('alerts',[]) if a.get('riskcode','0')>='3']
print(len(alerts))
")
if [ "$HIGH" -gt 0 ]; then
echo "::error::Found $HIGH high-risk vulnerabilities"
exit 1
fi
zap-api:
if: github.event_name == 'push'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: ZAP API Scan
uses: zaproxy/action-api-scan@v0.7.0
with:
target: 'http://localhost:3000/api/openapi.json'
format: openapi
cmd_options: '-a -j'
EOF
# 2. ZAP rules file (suppress false positives)
cat > .zap/rules.tsv << 'TSV'
10015 IGNORE (Incomplete or No Cache-control Header Set)
10037 IGNORE (Server Leaks Information via "X-Powered-By")
10098 IGNORE (Cross-Domain Misconfiguration)
90033 WARN (Loosely Scoped Cookie)
TSV
echo "CI/CD integration configured"
Monitoring ????????? Alerting
????????????????????????????????? scan ????????????????????????????????????
#!/usr/bin/env python3
# security_dashboard.py ??? Security Scan Dashboard
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("dashboard")
class SecurityDashboard:
def __init__(self):
pass
def dashboard(self):
return {
"scan_summary_30d": {
"total_scans": 120,
"baseline_scans": 80,
"full_scans": 30,
"api_scans": 10,
"avg_scan_time": "12 minutes",
},
"vulnerability_trends": {
"critical": {"current": 0, "prev_month": 1, "trend": "improved"},
"high": {"current": 3, "prev_month": 5, "trend": "improved"},
"medium": {"current": 12, "prev_month": 15, "trend": "improved"},
"low": {"current": 25, "prev_month": 22, "trend": "worsened"},
},
"top_vulnerabilities": [
{"name": "Cross-Site Scripting (XSS)", "count": 5, "risk": "High", "cwe": "CWE-79"},
{"name": "SQL Injection", "count": 2, "risk": "High", "cwe": "CWE-89"},
{"name": "Missing Security Headers", "count": 8, "risk": "Medium", "cwe": "CWE-693"},
{"name": "Cookie Without Secure Flag", "count": 4, "risk": "Medium", "cwe": "CWE-614"},
{"name": "Information Disclosure", "count": 6, "risk": "Low", "cwe": "CWE-200"},
],
"blocked_deployments": {
"total": 3,
"reasons": [
"SQL Injection found in /api/search",
"Stored XSS in comment field",
"Authentication bypass in admin panel",
],
},
"compliance": {
"owasp_top10_coverage": "85%",
"scan_frequency_met": True,
"mean_time_to_remediate": "3.5 days",
},
}
dash = SecurityDashboard()
data = dash.dashboard()
print("Security Scan Dashboard (30 days):")
summary = data["scan_summary_30d"]
print(f" Scans: {summary['total_scans']} (Baseline: {summary['baseline_scans']}, Full: {summary['full_scans']})")
print(f"\nVulnerability Trends:")
for risk, info in data["vulnerability_trends"].items():
arrow = "???" if info["trend"] == "improved" else "???"
print(f" {risk}: {info['current']} ({arrow} from {info['prev_month']})")
print(f"\nTop Vulnerabilities:")
for v in data["top_vulnerabilities"][:3]:
print(f" [{v['risk']}] {v['name']}: {v['count']} instances ({v['cwe']})")
print(f"\nBlocked Deployments: {data['blocked_deployments']['total']}")
comp = data["compliance"]
print(f"Compliance: OWASP Top 10 {comp['owasp_top10_coverage']}, MTTR {comp['mean_time_to_remediate']}")
FAQ ??????????????????????????????????????????
Q: OWASP ZAP ????????? Burp Suite ???????????????????????????????????????????
A: OWASP ZAP ????????? open-source, community-driven, automation ??????????????? (GitHub Actions, Docker), API ????????????????????? ??????????????? CI/CD, Automation Framework built-in ??????????????? DevSecOps teams ?????????????????????????????? automated scanning ?????? pipeline Burp Suite Pro ?????? license fee ($449/??????), manual testing features ???????????????????????????, extensions marketplace ????????????, Intruder/Repeater tools ?????????????????? ??????????????? penetration testers ??????????????? manual testing ??????????????? ????????? ZAP ?????????????????? automated CI/CD scanning (DAST), ????????? Burp Suite ?????????????????? manual pen testing ????????????????????????????????????????????????????????????????????????????????????????????????
Q: Event-Driven Scan ?????????????????? Scheduled Scan ??????????????????????
A: Event-Driven ?????????????????? ?????????????????? scan ????????? relevant events ???????????????????????? (code push, deployment) feedback ????????????????????????, ??????????????????????????? vulnerability ?????????????????? production, ??????????????????????????? weekly/monthly scan, Resource efficient scan ?????????????????????????????? changes Scheduled Scan ??????????????????????????? ?????????????????? full deep scan ???????????????????????????????????????, compliance requirements ????????????????????? regular scans, catch vulnerabilities ????????? dependency updates ??????????????? ????????????????????????????????????????????? Event-driven ?????????????????? quick feedback (baseline scan ????????? PR, standard scan ????????? deployment), Scheduled ?????????????????? deep scan ??????????????????????????????
Q: False Positives ????????????????????????????????????????
A: False positives ???????????????????????????????????????????????? DAST scanners ?????????????????????????????? Rules file (.tsv) ??????????????? IGNORE ?????????????????? rules ????????????????????? false positive, Context file ????????????????????? authentication, session management ?????????????????????????????? ?????? false positives, Custom scripts ??????????????? scripts ????????????????????????????????????????????????, Tuning ???????????? scan policy ????????? rules ?????????????????? relevant, Review process ????????? security team review alerts ???????????? assign ????????? developers ??????????????? ???????????? IGNORE ???????????????????????? ????????? false positive ???????????? verify ????????????, Document ??????????????????????????? ignore, Review ignored rules ???????????????????????????
Q: Scan ??????????????????????????????????????????????
A: ?????????????????????????????? spider crawl ??????????????????????????????????????????, active scan ????????? rules, target application ?????????????????? ???????????????????????? ?????? spider scope (exclude static files, limit depth/children), ????????? scan policy ???????????????????????? rules ??????????????? (????????????????????? all rules), ???????????? scan ???????????? phases (quick baseline ????????????, deep scan ??????????????????), ????????? Ajax Spider ??????????????? SPA applications, Parallel scanning (scan ???????????? targets ????????????????????????), ???????????? timeout ????????????????????? scan ???????????? Benchmark baseline scan ????????? < 10 ????????????, standard scan < 30 ????????????, full scan < 2 ?????????????????????
