SiamCafe · Blog
Burp Suite Pro Pub Sub Architecture
บทความ

Burp Suite Pro Pub Sub Architecture

เผยแพร่ 28 พฤษภาคม 2569

Burp Suite Pro Pub Sub Architecture คืออะไร

Burp Suite Professional เป็นเครื่องมือ web application security testing ยอดนิยมจาก PortSwigger ใช้สำหรับ penetration testing, vulnerability scanning และ security research Pub/Sub (Publish-Subscribe) Architecture เป็นรูปแบบ messaging ที่ publishers ส่ง messages ไปยัง topics และ subscribers รับ messages ที่สนใจ โดยไม่ต้องรู้จักกัน การรวม Burp Suite กับ Pub/Sub ช่วยสร้างระบบ security testing แบบ distributed ที่ scan หลาย targets พร้อมกัน แชร์ findings แบบ real-time และ integrate กับ CI/CD pipeline

Burp Suite Pro Features

# burp_features.py — Burp Suite Pro features overview
import json

class BurpSuiteFeatures:
    FEATURES = {
        "scanner": {
            "name": "Active/Passive Scanner",
            "description": "สแกน vulnerabilities อัตโนมัติ — SQL injection, XSS, SSRF, XXE และอื่นๆ",
            "modes": ["Active scan (ส่ง payloads)", "Passive scan (วิเคราะห์ traffic)"],
        },
        "intruder": {
            "name": "Intruder",
            "description": "Automated attack tool — brute force, fuzzing, parameter tampering",
            "attack_types": ["Sniper", "Battering Ram", "Pitchfork", "Cluster Bomb"],
        },
        "repeater": {
            "name": "Repeater",
            "description": "ส่ง HTTP requests ซ้ำๆ แก้ไขได้ — ใช้ทดสอบ manually",
        },
        "extensions_api": {
            "name": "Extensions API (Montoya API)",
            "description": "เขียน extensions ด้วย Java/Python/Ruby — extend Burp functionality",
            "use_cases": ["Custom scan checks", "Auto-exploit", "Report generation", "Integration"],
        },
        "collaborator": {
            "name": "Burp Collaborator",
            "description": "Out-of-band testing — detect blind SSRF, blind XSS, DNS exfiltration",
        },
    }

    def show_features(self):
        print("=== Burp Suite Pro Features ===\n")
        for key, feat in self.FEATURES.items():
            print(f"[{feat['name']}]")
            print(f"  {feat['description']}")
            print()

burp = BurpSuiteFeatures()
burp.show_features()

Pub/Sub Architecture Design

# pubsub_arch.py — Pub/Sub architecture for security testing
import json

class PubSubArchitecture:
    COMPONENTS = {
        "scan_coordinator": {
            "name": "Scan Coordinator (Publisher)",
            "description": "จัดการ scan jobs — สร้าง tasks, กระจายไป workers, รวบรวมผลลัพธ์",
            "publishes_to": ["scan-requests", "scan-config"],
        },
        "burp_workers": {
            "name": "Burp Workers (Subscribers)",
            "description": "Burp Suite instances ที่รับ scan tasks จาก queue — ทำ active/passive scan",
            "subscribes_to": ["scan-requests"],
            "publishes_to": ["scan-results", "vulnerability-findings"],
        },
        "results_aggregator": {
            "name": "Results Aggregator",
            "description": "รวบรวม findings จากทุก workers — deduplicate, prioritize, report",
            "subscribes_to": ["scan-results", "vulnerability-findings"],
        },
        "notification_service": {
            "name": "Notification Service",
            "description": "แจ้งเตือนเมื่อพบ critical vulnerabilities — Slack, email, Jira",
            "subscribes_to": ["vulnerability-findings"],
        },
        "message_broker": {
            "name": "Message Broker",
            "description": "RabbitMQ หรือ Redis Pub/Sub — route messages ระหว่าง components",
            "topics": ["scan-requests", "scan-results", "vulnerability-findings", "scan-status"],
        },
    }

    def show_architecture(self):
        print("=== Pub/Sub Architecture ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            if 'publishes_to' in comp:
                print(f"  Publishes: {', '.join(comp['publishes_to'])}")
            if 'subscribes_to' in comp:
                print(f"  Subscribes: {', '.join(comp['subscribes_to'])}")
            print()

arch = PubSubArchitecture()
arch.show_architecture()

Burp Extension for Pub/Sub

# burp_extension.py — Burp Suite extension with Pub/Sub
import json

class BurpPubSubExtension:
    JAVA_CODE = """
// BurpPubSubExtension.java — Burp extension that publishes findings
package com.example.burppubsub;

import burp.api.montoya.BurpExtension;
import burp.api.montoya.MontoyaApi;
import burp.api.montoya.scanner.audit.issues.AuditIssue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.google.gson.Gson;

public class BurpPubSubExtension implements BurpExtension {
    private MontoyaApi api;
    private Channel channel;
    private Gson gson = new Gson();
    
    @Override
    public void initialize(MontoyaApi api) {
        this.api = api;
        api.extension().setName("Pub/Sub Scanner");
        
        // Connect to RabbitMQ
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("rabbitmq.internal");
            Connection connection = factory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("security-findings", "topic", true);
            
            api.logging().logToOutput("Connected to RabbitMQ");
        } catch (Exception e) {
            api.logging().logToError("RabbitMQ connection failed: " + e.getMessage());
        }
        
        // Register scan listener
        api.scanner().registerAuditIssueHandler(this::handleIssue);
    }
    
    private void handleIssue(AuditIssue issue) {
        Finding finding = new Finding(
            issue.name(),
            issue.severity().name(),
            issue.baseUrl(),
            issue.detail(),
            issue.remediation()
        );
        
        try {
            String routingKey = "finding." + issue.severity().name().toLowerCase();
            channel.basicPublish(
                "security-findings",
                routingKey,
                null,
                gson.toJson(finding).getBytes()
            );
            api.logging().logToOutput("Published: " + finding.name);
        } catch (Exception e) {
            api.logging().logToError("Publish failed: " + e.getMessage());
        }
    }
}
"""

    PYTHON_WORKER = """
# scan_worker.py — Python worker that coordinates Burp scans
import pika
import json
import subprocess
import requests
import time

class BurpScanWorker:
    def __init__(self, rabbitmq_url, burp_api_url="http://localhost:1337"):
        self.burp_api = burp_api_url
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare("scan-requests", durable=True)
        self.channel.basic_qos(prefetch_count=1)
    
    def start_scan(self, target_url, scan_config=None):
        '''Start a Burp scan via REST API'''
        payload = {
            "urls": [target_url],
            "scan_configurations": scan_config or [
                {"type": "NamedConfiguration", "name": "Audit checks - all"}
            ]
        }
        resp = requests.post(
            f"{self.burp_api}/v0.1/scan",
            json=payload
        )
        return resp.headers.get("Location")  # scan task ID
    
    def get_scan_status(self, task_id):
        resp = requests.get(f"{self.burp_api}/v0.1/scan/{task_id}")
        return resp.json()
    
    def process_message(self, ch, method, properties, body):
        message = json.loads(body)
        target = message.get("target_url")
        
        print(f"Starting scan: {target}")
        task_id = self.start_scan(target)
        
        # Wait for completion
        while True:
            status = self.get_scan_status(task_id)
            if status.get("scan_status") == "succeeded":
                # Publish results
                self.publish_results(status.get("issue_events", []))
                break
            time.sleep(10)
        
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    def publish_results(self, issues):
        for issue in issues:
            self.channel.basic_publish(
                exchange="security-findings",
                routing_key=f"finding.{issue.get('severity', 'info').lower()}",
                body=json.dumps(issue)
            )
    
    def start(self):
        self.channel.basic_consume("scan-requests", self.process_message)
        print("Worker started. Waiting for scan requests...")
        self.channel.start_consuming()

# worker = BurpScanWorker("amqp://guest:guest@rabbitmq:5672/")
# worker.start()
"""

    def show_java(self):
        print("=== Burp Extension (Java) ===")
        print(self.JAVA_CODE[:600])

    def show_python(self):
        print(f"\n=== Scan Worker (Python) ===")
        print(self.PYTHON_WORKER[:600])

ext = BurpPubSubExtension()
ext.show_java()
ext.show_python()

Results Aggregation & Reporting

# aggregation.py — Aggregate and report findings
import json
import random

class ResultsAggregation:
    CODE = """
# aggregator.py — Aggregate security findings
import pika
import json
from collections import Counter
from datetime import datetime

class FindingsAggregator:
    def __init__(self, rabbitmq_url):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.findings = []
        
        # Subscribe to all findings
        self.channel.exchange_declare("security-findings", "topic", durable=True)
        result = self.channel.queue_declare("", exclusive=True)
        queue_name = result.method.queue
        self.channel.queue_bind(queue_name, "security-findings", "finding.*")
        self.channel.basic_consume(queue_name, self.on_finding, auto_ack=True)
    
    def on_finding(self, ch, method, properties, body):
        finding = json.loads(body)
        
        # Deduplicate
        key = f"{finding.get('name')}|{finding.get('url')}"
        if not any(f.get('_key') == key for f in self.findings):
            finding['_key'] = key
            finding['found_at'] = datetime.utcnow().isoformat()
            self.findings.append(finding)
            
            # Alert on critical/high
            severity = finding.get('severity', '').lower()
            if severity in ('critical', 'high'):
                self.send_alert(finding)
    
    def send_alert(self, finding):
        # Send to Slack/Jira
        print(f"ALERT: [{finding['severity']}] {finding['name']} at {finding.get('url')}")
    
    def generate_report(self):
        severity_counts = Counter(f.get('severity', 'unknown') for f in self.findings)
        
        return {
            'total_findings': len(self.findings),
            'by_severity': dict(severity_counts),
            'critical': [f for f in self.findings if f.get('severity') == 'critical'],
            'scan_date': datetime.utcnow().isoformat(),
        }

# aggregator = FindingsAggregator("amqp://guest:guest@rabbitmq:5672/")
# aggregator.channel.start_consuming()
"""

    def show_code(self):
        print("=== Findings Aggregator ===")
        print(self.CODE[:600])

    def sample_report(self):
        print(f"\n=== Security Scan Report ===")
        print(f"  Targets scanned: {random.randint(5, 20)}")
        print(f"  Total findings: {random.randint(20, 100)}")
        print(f"  Critical: {random.randint(0, 3)}")
        print(f"  High: {random.randint(2, 10)}")
        print(f"  Medium: {random.randint(5, 30)}")
        print(f"  Low: {random.randint(10, 50)}")
        print(f"  Info: {random.randint(5, 20)}")
        vulns = ["SQL Injection", "XSS (Reflected)", "CSRF", "SSRF", "Open Redirect", "IDOR"]
        print(f"  Top finding: {random.choice(vulns)}")

agg = ResultsAggregation()
agg.show_code()
agg.sample_report()

CI/CD Integration

# cicd.py — CI/CD pipeline integration
import json

class CICDIntegration:
    GITHUB_ACTIONS = """
# .github/workflows/security-scan.yml
name: Security Scan (Burp Suite)
on:
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * 1'  # Weekly Monday 2 AM

jobs:
  security-scan:
    runs-on: self-hosted  # Need Burp Suite installed
    steps:
      - uses: actions/checkout@v4
      
      - name: Start target application
        run: docker compose up -d
      
      - name: Wait for app to be ready
        run: |
          for i in $(seq 1 30); do
            curl -s http://localhost:8080/health && break
            sleep 2
          done
      
      - name: Submit scan request
        run: |
          python3 scripts/submit_scan.py \\
            --target http://localhost:8080 \\
            --rabbitmq-url } \\
            --wait-for-results \\
            --timeout 1800
      
      - name: Check results
        run: |
          python3 scripts/check_results.py \\
            --rabbitmq-url } \\
            --fail-on-severity high \\
            --output report.html
      
      - name: Upload report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: security-report
          path: report.html
      
      - name: Cleanup
        if: always()
        run: docker compose down
"""

    SCAN_SUBMIT = """
# submit_scan.py — Submit scan request to Pub/Sub
import pika
import json
import argparse

def submit_scan(target, rabbitmq_url):
    connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
    channel = connection.channel()
    channel.queue_declare("scan-requests", durable=True)
    
    message = {
        "target_url": target,
        "scan_type": "full",
        "config": {"max_crawl_depth": 5, "audit_checks": "all"},
    }
    
    channel.basic_publish(
        exchange="",
        routing_key="scan-requests",
        body=json.dumps(message),
        properties=pika.BasicProperties(delivery_mode=2),
    )
    
    print(f"Scan submitted for {target}")
    connection.close()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--target", required=True)
    parser.add_argument("--rabbitmq-url", required=True)
    args = parser.parse_args()
    submit_scan(args.target, args.rabbitmq_url)
"""

    def show_pipeline(self):
        print("=== CI/CD Pipeline ===")
        print(self.GITHUB_ACTIONS[:500])

    def show_submit(self):
        print(f"\n=== Scan Submit Script ===")
        print(self.SCAN_SUBMIT[:400])

cicd = CICDIntegration()
cicd.show_pipeline()
cicd.show_submit()

FAQ - คำถามที่พบบ่อย

Q: Burp Suite Pro จำเป็นต้องใช้ Pro ไหม?

A: Community Edition: ใช้ได้ฟรี แต่ไม่มี active scanner, limited Intruder Pro Edition ($449/ปี): active scanner, unlimited Intruder, Burp Collaborator, REST API Enterprise Edition: สำหรับ CI/CD, centralized scanning, multi-user สำหรับ Pub/Sub architecture: ต้อง Pro ขึ้นไป (ใช้ REST API)

Q: ทำไมต้องใช้ Pub/Sub กับ security testing?

A: Scale: scan หลาย targets พร้อมกัน — กระจาย load ไปหลาย Burp instances Decouple: scan coordinator ไม่ต้องรู้จัก workers โดยตรง Real-time: findings ส่งถึง aggregator ทันที — ไม่ต้องรอ scan เสร็จ Integration: subscribe ได้หลาย consumers (Slack, Jira, SIEM, dashboard)

Q: RabbitMQ กับ Kafka ใช้อันไหน?

A: RabbitMQ: แนะนำ — task queue pattern เหมาะกับ scan jobs, ง่ายกว่า, routing flexible Kafka: ถ้าต้อง store findings ระยะยาว + replay + high throughput Redis Pub/Sub: ง่ายที่สุด แต่ไม่มี persistence (messages หายถ้า subscriber offline) แนะนำ: เริ่มจาก RabbitMQ → ย้ายไป Kafka ถ้าต้องการ event streaming

Q: Security scan ใน CI/CD ช้าไหม?

A: Full scan: 30-60 นาที (ขึ้นกับ app complexity) เร่งได้: ใช้ lightweight scan config (เฉพาะ critical checks), จำกัด crawl depth, scan เฉพาะ changed endpoints Strategy: PR = quick scan (5-10 นาที), Weekly = full scan (60 นาที) Parallel: Pub/Sub ช่วยกระจาย scan → ลดเวลารวม