SiamCafe.net Blog
Cybersecurity

OWASP ZAP Event-Driven Design Automated Security Scanning Pipeline

owasp zap event driven design
OWASP ZAP Event Driven Design | SiamCafe Blog
2025-12-26· อ. บอม — SiamCafe.net· 1,266 คำ

OWASP ZAP ????????? Event-Driven Design ?????????????????????

OWASP ZAP (Zed Attack Proxy) ???????????? open-source web application security scanner ????????????????????????????????????????????????????????????????????? ??????????????????????????? DAST (Dynamic Application Security Testing) ?????? vulnerabilities ???????????? XSS, SQL Injection, CSRF, Broken Authentication ?????? web applications

Event-Driven Design ?????? context ????????? security testing ????????????????????? ??????????????????????????? security scanning pipeline ????????? trigger ????????? events ??????????????? ???????????? Code push ?????? repository, Pull request ???????????????????????????, Deployment ??????????????????, Scheduled scan (??????????????????/?????????????????????), Vulnerability discovered (trigger deeper scan)

???????????????????????? Event-Driven Security Scanning Shift-left security (scan ?????????????????????????????? development lifecycle), Automated ?????????????????????????????? manual scans, Consistent ????????? deployment ???????????? security scan, Scalable ?????????????????????????????? applications ????????????????????????, Feedback loop ???????????? developers ????????? vulnerabilities ???????????????

????????????????????? OWASP ZAP ?????????????????? Event-Driven Architecture

Setup ZAP ?????????????????? automated scanning

# === OWASP ZAP Event-Driven Setup ===

# 1. Docker Compose for ZAP + Event System
cat > docker-compose.yml << 'EOF'
version: '3.8'
services:
  zap:
    image: ghcr.io/zaproxy/zaproxy:stable
    command: zap.sh -daemon -host 0.0.0.0 -port 8080
      -config api.key=zap-api-key-12345
      -config api.addrs.addr.name=.*
      -config api.addrs.addr.regex=true
    ports:
      - "8080:8080"
    volumes:
      - zap-data:/home/zap/.ZAP
      - ./scripts:/home/zap/scripts
      - ./reports:/home/zap/reports
    networks:
      - security-net

  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret
    networks:
      - security-net

  scan-orchestrator:
    build: ./orchestrator
    environment:
      - ZAP_API_URL=http://zap:8080
      - ZAP_API_KEY=zap-api-key-12345
      - RABBITMQ_URL=amqp://admin:secret@rabbitmq:5672
      - SLACK_WEBHOOK_URL=
    depends_on:
      - zap
      - rabbitmq
    networks:
      - security-net

  results-processor:
    build: ./processor
    environment:
      - RABBITMQ_URL=amqp://admin:secret@rabbitmq:5672
      - DATABASE_URL=postgres://user:pass@postgres:5432/security
    depends_on:
      - rabbitmq
    networks:
      - security-net

volumes:
  zap-data:

networks:
  security-net:
EOF

# 2. ZAP Automation Framework config
cat > zap-automation.yaml << 'EOF'
env:
  contexts:
    - name: "WebApp"
      urls:
        - "https://app.example.com"
      includePaths:
        - "https://app.example.com/.*"
      excludePaths:
        - "https://app.example.com/logout.*"
        - "https://app.example.com/static/.*"
      authentication:
        method: "form"
        parameters:
          loginUrl: "https://app.example.com/login"
          loginRequestData: "username={%username%}&password={%password%}"
        verification:
          method: "response"
          loggedInRegex: "dashboard"

jobs:
  - type: spider
    parameters:
      maxDuration: 5
      maxDepth: 5
      maxChildren: 10

  - type: spiderAjax
    parameters:
      maxDuration: 5
      maxCrawlDepth: 3

  - type: passiveScan-wait
    parameters:
      maxDuration: 10

  - type: activeScan
    parameters:
      maxRuleDurationInMins: 5
      maxScanDurationInMins: 30
      policy: "Default Policy"

  - type: report
    parameters:
      template: "traditional-json"
      reportDir: "/home/zap/reports"
      reportFile: "scan-report"
EOF

echo "ZAP event-driven setup configured"

Automated Security Scanning Pipeline

??????????????? event-driven scanning pipeline

#!/usr/bin/env python3
# scan_orchestrator.py ??? Event-Driven Security Scan Orchestrator
import json
import logging
import time
from typing import Dict, List
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("orchestrator")

class ZAPScanOrchestrator:
    """Orchestrate OWASP ZAP scans based on events"""
    
    def __init__(self, zap_url="http://localhost:8080", api_key="zap-api-key"):
        self.zap_url = zap_url
        self.api_key = api_key
        self.scan_queue = []
        self.results = []
    
    def handle_event(self, event):
        """Route events to appropriate scan type"""
        event_type = event.get("type", "")
        
        handlers = {
            "code_push": self._handle_code_push,
            "pull_request": self._handle_pull_request,
            "deployment": self._handle_deployment,
            "scheduled": self._handle_scheduled,
            "vulnerability_found": self._handle_vuln_found,
        }
        
        handler = handlers.get(event_type)
        if handler:
            return handler(event)
        else:
            logger.warning(f"Unknown event type: {event_type}")
            return None
    
    def _handle_code_push(self, event):
        """Quick baseline scan on code push"""
        return {
            "scan_type": "baseline",
            "target": event.get("preview_url", ""),
            "config": {
                "spider_duration": 2,
                "passive_only": True,
                "ajax_spider": False,
            },
            "priority": "low",
            "timeout_minutes": 10,
        }
    
    def _handle_pull_request(self, event):
        """Standard scan on PR"""
        return {
            "scan_type": "standard",
            "target": event.get("preview_url", ""),
            "config": {
                "spider_duration": 5,
                "passive_only": False,
                "active_scan": True,
                "ajax_spider": True,
            },
            "priority": "medium",
            "timeout_minutes": 30,
        }
    
    def _handle_deployment(self, event):
        """Full scan after deployment"""
        return {
            "scan_type": "full",
            "target": event.get("deploy_url", ""),
            "config": {
                "spider_duration": 10,
                "passive_only": False,
                "active_scan": True,
                "ajax_spider": True,
                "api_scan": True,
            },
            "priority": "high",
            "timeout_minutes": 60,
        }
    
    def _handle_scheduled(self, event):
        """Deep scan on schedule"""
        return {
            "scan_type": "deep",
            "target": event.get("target_url", ""),
            "config": {
                "spider_duration": 15,
                "active_scan": True,
                "ajax_spider": True,
                "api_scan": True,
                "full_policy": True,
            },
            "priority": "medium",
            "timeout_minutes": 120,
        }
    
    def _handle_vuln_found(self, event):
        """Focused scan on specific vulnerability area"""
        return {
            "scan_type": "focused",
            "target": event.get("affected_url", ""),
            "config": {
                "scan_rules": event.get("related_rules", []),
                "deep_scan": True,
            },
            "priority": "critical",
            "timeout_minutes": 15,
        }
    
    def process_results(self, scan_result):
        """Process and categorize scan results"""
        alerts = scan_result.get("alerts", [])
        
        categorized = {
            "critical": [a for a in alerts if a.get("risk") == "High" and a.get("confidence") == "High"],
            "high": [a for a in alerts if a.get("risk") == "High"],
            "medium": [a for a in alerts if a.get("risk") == "Medium"],
            "low": [a for a in alerts if a.get("risk") == "Low"],
            "info": [a for a in alerts if a.get("risk") == "Informational"],
        }
        
        return {
            "total_alerts": len(alerts),
            "by_risk": {k: len(v) for k, v in categorized.items()},
            "block_deployment": len(categorized["critical"]) > 0,
            "requires_review": len(categorized["high"]) > 0,
        }

# Demo
orchestrator = ZAPScanOrchestrator()

events = [
    {"type": "code_push", "preview_url": "https://preview-123.app.com"},
    {"type": "pull_request", "preview_url": "https://pr-456.app.com"},
    {"type": "deployment", "deploy_url": "https://staging.app.com"},
    {"type": "scheduled", "target_url": "https://app.com"},
]

print("Event-Driven Scan Orchestration:")
for event in events:
    scan = orchestrator.handle_event(event)
    if scan:
        print(f"\n  Event: {event['type']}")
        print(f"  Scan: {scan['scan_type']} (Priority: {scan['priority']}, Timeout: {scan['timeout_minutes']}min)")
        print(f"  Target: {scan['target']}")

Custom Scripts ????????? Event Hooks

??????????????? custom ZAP scripts ?????????????????? event-driven scanning

# === Custom ZAP Scripts ===

# 1. ZAP API Python client
cat > zap_client.py << 'PYTHON'
#!/usr/bin/env python3
"""ZAP API Client for Event-Driven Scanning"""
import requests
import time
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("zap-client")

class ZAPClient:
    def __init__(self, base_url="http://localhost:8080", api_key=""):
        self.base_url = base_url
        self.api_key = api_key
    
    def _request(self, endpoint, params=None):
        params = params or {}
        params["apikey"] = self.api_key
        url = f"{self.base_url}{endpoint}"
        resp = requests.get(url, params=params)
        return resp.json()
    
    def spider_scan(self, target, max_children=10):
        """Start spider scan"""
        result = self._request("/JSON/spider/action/scan/", {
            "url": target,
            "maxChildren": max_children,
        })
        scan_id = result.get("scan", "0")
        logger.info(f"Spider started: {scan_id}")
        return scan_id
    
    def active_scan(self, target):
        """Start active scan"""
        result = self._request("/JSON/ascan/action/scan/", {
            "url": target,
            "recurse": "true",
        })
        scan_id = result.get("scan", "0")
        logger.info(f"Active scan started: {scan_id}")
        return scan_id
    
    def wait_for_scan(self, scan_type, scan_id, timeout=300):
        """Wait for scan to complete"""
        endpoint = f"/JSON/{scan_type}/view/status/"
        start = time.time()
        
        while time.time() - start < timeout:
            result = self._request(endpoint, {"scanId": scan_id})
            progress = int(result.get("status", "0"))
            logger.info(f"Scan {scan_id} progress: {progress}%")
            
            if progress >= 100:
                return True
            time.sleep(5)
        
        logger.warning(f"Scan {scan_id} timed out")
        return False
    
    def get_alerts(self, base_url=None):
        """Get all alerts"""
        params = {}
        if base_url:
            params["baseurl"] = base_url
        return self._request("/JSON/alert/view/alerts/", params)
    
    def generate_report(self, report_type="traditional-json"):
        """Generate scan report"""
        return self._request("/JSON/reports/action/generate/", {
            "template": report_type,
        })

# Usage
client = ZAPClient("http://localhost:8080", "zap-api-key")
print("ZAP Client ready")
print("Methods: spider_scan(), active_scan(), get_alerts(), generate_report()")
PYTHON

# 2. Webhook receiver for events
cat > webhook_receiver.py << 'PYTHON'
#!/usr/bin/env python3
"""Webhook receiver for CI/CD events"""
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("webhook")

class WebhookHandler(BaseHTTPRequestHandler):
    def do_POST(self):
        content_length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(content_length)
        event = json.loads(body)
        
        # Route event
        source = self.headers.get("X-Event-Source", "unknown")
        logger.info(f"Event from {source}: {event.get('action', 'unknown')}")
        
        # Map to scan event
        if source == "github":
            scan_event = self._map_github_event(event)
        elif source == "gitlab":
            scan_event = self._map_gitlab_event(event)
        else:
            scan_event = event
        
        # Queue scan (publish to RabbitMQ)
        logger.info(f"Queuing scan: {json.dumps(scan_event)}")
        
        self.send_response(200)
        self.end_headers()
        self.wfile.write(b'{"status": "queued"}')
    
    def _map_github_event(self, event):
        action = event.get("action", "")
        if action == "opened" and "pull_request" in event:
            return {
                "type": "pull_request",
                "preview_url": event["pull_request"].get("html_url", ""),
                "pr_number": event["pull_request"].get("number"),
            }
        return {"type": "code_push", "preview_url": ""}
    
    def _map_gitlab_event(self, event):
        return {"type": "code_push", "preview_url": ""}

print("Webhook receiver ready on :9000")
# HTTPServer(("0.0.0.0", 9000), WebhookHandler).serve_forever()
PYTHON

echo "Custom scripts ready"

CI/CD Integration

????????? ZAP ????????????????????? CI/CD pipeline

# === CI/CD Integration ===

# 1. GitHub Actions - ZAP Scan
cat > .github/workflows/zap-scan.yml << 'EOF'
name: OWASP ZAP Security Scan

on:
  pull_request:
    branches: [main]
  push:
    branches: [main]
  schedule:
    - cron: '0 2 * * 1'  # Weekly Monday 2AM

jobs:
  zap-baseline:
    if: github.event_name == 'pull_request'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Start application
        run: docker compose up -d app
      
      - name: ZAP Baseline Scan
        uses: zaproxy/action-baseline@v0.12.0
        with:
          target: 'http://localhost:3000'
          rules_file_name: '.zap/rules.tsv'
          cmd_options: '-a -j'
      
      - name: Upload Report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: zap-baseline-report
          path: report_html.html

  zap-full:
    if: github.event_name == 'push' || github.event_name == 'schedule'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Start application
        run: docker compose up -d app
      
      - name: ZAP Full Scan
        uses: zaproxy/action-full-scan@v0.10.0
        with:
          target: 'http://localhost:3000'
          rules_file_name: '.zap/rules.tsv'
          cmd_options: '-a -j'
      
      - name: Check results
        run: |
          HIGH=$(cat report_json.json | python3 -c "
          import json,sys
          data=json.load(sys.stdin)
          alerts=[a for a in data.get('site',[{}])[0].get('alerts',[]) if a.get('riskcode','0')>='3']
          print(len(alerts))
          ")
          if [ "$HIGH" -gt 0 ]; then
            echo "::error::Found $HIGH high-risk vulnerabilities"
            exit 1
          fi

  zap-api:
    if: github.event_name == 'push'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: ZAP API Scan
        uses: zaproxy/action-api-scan@v0.7.0
        with:
          target: 'http://localhost:3000/api/openapi.json'
          format: openapi
          cmd_options: '-a -j'
EOF

# 2. ZAP rules file (suppress false positives)
cat > .zap/rules.tsv << 'TSV'
10015	IGNORE	(Incomplete or No Cache-control Header Set)
10037	IGNORE	(Server Leaks Information via "X-Powered-By")
10098	IGNORE	(Cross-Domain Misconfiguration)
90033	WARN	(Loosely Scoped Cookie)
TSV

echo "CI/CD integration configured"

Monitoring ????????? Alerting

????????????????????????????????? scan ????????????????????????????????????

#!/usr/bin/env python3
# security_dashboard.py ??? Security Scan Dashboard
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("dashboard")

class SecurityDashboard:
    def __init__(self):
        pass
    
    def dashboard(self):
        return {
            "scan_summary_30d": {
                "total_scans": 120,
                "baseline_scans": 80,
                "full_scans": 30,
                "api_scans": 10,
                "avg_scan_time": "12 minutes",
            },
            "vulnerability_trends": {
                "critical": {"current": 0, "prev_month": 1, "trend": "improved"},
                "high": {"current": 3, "prev_month": 5, "trend": "improved"},
                "medium": {"current": 12, "prev_month": 15, "trend": "improved"},
                "low": {"current": 25, "prev_month": 22, "trend": "worsened"},
            },
            "top_vulnerabilities": [
                {"name": "Cross-Site Scripting (XSS)", "count": 5, "risk": "High", "cwe": "CWE-79"},
                {"name": "SQL Injection", "count": 2, "risk": "High", "cwe": "CWE-89"},
                {"name": "Missing Security Headers", "count": 8, "risk": "Medium", "cwe": "CWE-693"},
                {"name": "Cookie Without Secure Flag", "count": 4, "risk": "Medium", "cwe": "CWE-614"},
                {"name": "Information Disclosure", "count": 6, "risk": "Low", "cwe": "CWE-200"},
            ],
            "blocked_deployments": {
                "total": 3,
                "reasons": [
                    "SQL Injection found in /api/search",
                    "Stored XSS in comment field",
                    "Authentication bypass in admin panel",
                ],
            },
            "compliance": {
                "owasp_top10_coverage": "85%",
                "scan_frequency_met": True,
                "mean_time_to_remediate": "3.5 days",
            },
        }

dash = SecurityDashboard()
data = dash.dashboard()
print("Security Scan Dashboard (30 days):")
summary = data["scan_summary_30d"]
print(f"  Scans: {summary['total_scans']} (Baseline: {summary['baseline_scans']}, Full: {summary['full_scans']})")

print(f"\nVulnerability Trends:")
for risk, info in data["vulnerability_trends"].items():
    arrow = "???" if info["trend"] == "improved" else "???"
    print(f"  {risk}: {info['current']} ({arrow} from {info['prev_month']})")

print(f"\nTop Vulnerabilities:")
for v in data["top_vulnerabilities"][:3]:
    print(f"  [{v['risk']}] {v['name']}: {v['count']} instances ({v['cwe']})")

print(f"\nBlocked Deployments: {data['blocked_deployments']['total']}")
comp = data["compliance"]
print(f"Compliance: OWASP Top 10 {comp['owasp_top10_coverage']}, MTTR {comp['mean_time_to_remediate']}")

FAQ ??????????????????????????????????????????

Q: OWASP ZAP ????????? Burp Suite ???????????????????????????????????????????

A: OWASP ZAP ????????? open-source, community-driven, automation ??????????????? (GitHub Actions, Docker), API ????????????????????? ??????????????? CI/CD, Automation Framework built-in ??????????????? DevSecOps teams ?????????????????????????????? automated scanning ?????? pipeline Burp Suite Pro ?????? license fee ($449/??????), manual testing features ???????????????????????????, extensions marketplace ????????????, Intruder/Repeater tools ?????????????????? ??????????????? penetration testers ??????????????? manual testing ??????????????? ????????? ZAP ?????????????????? automated CI/CD scanning (DAST), ????????? Burp Suite ?????????????????? manual pen testing ????????????????????????????????????????????????????????????????????????????????????????????????

Q: Event-Driven Scan ?????????????????? Scheduled Scan ??????????????????????

A: Event-Driven ?????????????????? ?????????????????? scan ????????? relevant events ???????????????????????? (code push, deployment) feedback ????????????????????????, ??????????????????????????? vulnerability ?????????????????? production, ??????????????????????????? weekly/monthly scan, Resource efficient scan ?????????????????????????????? changes Scheduled Scan ??????????????????????????? ?????????????????? full deep scan ???????????????????????????????????????, compliance requirements ????????????????????? regular scans, catch vulnerabilities ????????? dependency updates ??????????????? ????????????????????????????????????????????? Event-driven ?????????????????? quick feedback (baseline scan ????????? PR, standard scan ????????? deployment), Scheduled ?????????????????? deep scan ??????????????????????????????

Q: False Positives ????????????????????????????????????????

A: False positives ???????????????????????????????????????????????? DAST scanners ?????????????????????????????? Rules file (.tsv) ??????????????? IGNORE ?????????????????? rules ????????????????????? false positive, Context file ????????????????????? authentication, session management ?????????????????????????????? ?????? false positives, Custom scripts ??????????????? scripts ????????????????????????????????????????????????, Tuning ???????????? scan policy ????????? rules ?????????????????? relevant, Review process ????????? security team review alerts ???????????? assign ????????? developers ??????????????? ???????????? IGNORE ???????????????????????? ????????? false positive ???????????? verify ????????????, Document ??????????????????????????? ignore, Review ignored rules ???????????????????????????

Q: Scan ??????????????????????????????????????????????

A: ?????????????????????????????? spider crawl ??????????????????????????????????????????, active scan ????????? rules, target application ?????????????????? ???????????????????????? ?????? spider scope (exclude static files, limit depth/children), ????????? scan policy ???????????????????????? rules ??????????????? (????????????????????? all rules), ???????????? scan ???????????? phases (quick baseline ????????????, deep scan ??????????????????), ????????? Ajax Spider ??????????????? SPA applications, Parallel scanning (scan ???????????? targets ????????????????????????), ???????????? timeout ????????????????????? scan ???????????? Benchmark baseline scan ????????? < 10 ????????????, standard scan < 30 ????????????, full scan < 2 ?????????????????????

📖 บทความที่เกี่ยวข้อง

OWASP ZAP Citizen Developerอ่านบทความ → HTTP/3 QUIC Event Driven Designอ่านบทความ → OWASP ZAP Agile Scrum Kanbanอ่านบทความ → OWASP ZAP Shift Left Securityอ่านบทความ → OWASP ZAP Production Setup Guideอ่านบทความ →

📚 ดูบทความทั้งหมด →