Cybersecurity

Burp Suite Pro Pub Sub Architecture

burp suite pro pub sub architecture
Burp Suite Pro Pub Sub Architecture | SiamCafe Blog
2025-08-03· อ. บอม — SiamCafe.net· 1,800 คำ

Burp Suite Pro Pub Sub Architecture คืออะไร

Burp Suite Professional เป็นเครื่องมือ web application security testing ยอดนิยมจาก PortSwigger ใช้สำหรับ penetration testing, vulnerability scanning และ security research Pub/Sub (Publish-Subscribe) Architecture เป็นรูปแบบ messaging ที่ publishers ส่ง messages ไปยัง topics และ subscribers รับ messages ที่สนใจ โดยไม่ต้องรู้จักกัน การรวม Burp Suite กับ Pub/Sub ช่วยสร้างระบบ security testing แบบ distributed ที่ scan หลาย targets พร้อมกัน แชร์ findings แบบ real-time และ integrate กับ CI/CD pipeline

Burp Suite Pro Features

# burp_features.py — Burp Suite Pro features overview
import json

class BurpSuiteFeatures:
    FEATURES = {
        "scanner": {
            "name": "Active/Passive Scanner",
            "description": "สแกน vulnerabilities อัตโนมัติ — SQL injection, XSS, SSRF, XXE และอื่นๆ",
            "modes": ["Active scan (ส่ง payloads)", "Passive scan (วิเคราะห์ traffic)"],
        },
        "intruder": {
            "name": "Intruder",
            "description": "Automated attack tool — brute force, fuzzing, parameter tampering",
            "attack_types": ["Sniper", "Battering Ram", "Pitchfork", "Cluster Bomb"],
        },
        "repeater": {
            "name": "Repeater",
            "description": "ส่ง HTTP requests ซ้ำๆ แก้ไขได้ — ใช้ทดสอบ manually",
        },
        "extensions_api": {
            "name": "Extensions API (Montoya API)",
            "description": "เขียน extensions ด้วย Java/Python/Ruby — extend Burp functionality",
            "use_cases": ["Custom scan checks", "Auto-exploit", "Report generation", "Integration"],
        },
        "collaborator": {
            "name": "Burp Collaborator",
            "description": "Out-of-band testing — detect blind SSRF, blind XSS, DNS exfiltration",
        },
    }

    def show_features(self):
        print("=== Burp Suite Pro Features ===\n")
        for key, feat in self.FEATURES.items():
            print(f"[{feat['name']}]")
            print(f"  {feat['description']}")
            print()

burp = BurpSuiteFeatures()
burp.show_features()

Pub/Sub Architecture Design

# pubsub_arch.py — Pub/Sub architecture for security testing
import json

class PubSubArchitecture:
    COMPONENTS = {
        "scan_coordinator": {
            "name": "Scan Coordinator (Publisher)",
            "description": "จัดการ scan jobs — สร้าง tasks, กระจายไป workers, รวบรวมผลลัพธ์",
            "publishes_to": ["scan-requests", "scan-config"],
        },
        "burp_workers": {
            "name": "Burp Workers (Subscribers)",
            "description": "Burp Suite instances ที่รับ scan tasks จาก queue — ทำ active/passive scan",
            "subscribes_to": ["scan-requests"],
            "publishes_to": ["scan-results", "vulnerability-findings"],
        },
        "results_aggregator": {
            "name": "Results Aggregator",
            "description": "รวบรวม findings จากทุก workers — deduplicate, prioritize, report",
            "subscribes_to": ["scan-results", "vulnerability-findings"],
        },
        "notification_service": {
            "name": "Notification Service",
            "description": "แจ้งเตือนเมื่อพบ critical vulnerabilities — Slack, email, Jira",
            "subscribes_to": ["vulnerability-findings"],
        },
        "message_broker": {
            "name": "Message Broker",
            "description": "RabbitMQ หรือ Redis Pub/Sub — route messages ระหว่าง components",
            "topics": ["scan-requests", "scan-results", "vulnerability-findings", "scan-status"],
        },
    }

    def show_architecture(self):
        print("=== Pub/Sub Architecture ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            if 'publishes_to' in comp:
                print(f"  Publishes: {', '.join(comp['publishes_to'])}")
            if 'subscribes_to' in comp:
                print(f"  Subscribes: {', '.join(comp['subscribes_to'])}")
            print()

arch = PubSubArchitecture()
arch.show_architecture()

Burp Extension for Pub/Sub

# burp_extension.py — Burp Suite extension with Pub/Sub
import json

class BurpPubSubExtension:
    JAVA_CODE = """
// BurpPubSubExtension.java — Burp extension that publishes findings
package com.example.burppubsub;

import burp.api.montoya.BurpExtension;
import burp.api.montoya.MontoyaApi;
import burp.api.montoya.scanner.audit.issues.AuditIssue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.google.gson.Gson;

public class BurpPubSubExtension implements BurpExtension {
    private MontoyaApi api;
    private Channel channel;
    private Gson gson = new Gson();
    
    @Override
    public void initialize(MontoyaApi api) {
        this.api = api;
        api.extension().setName("Pub/Sub Scanner");
        
        // Connect to RabbitMQ
        try {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("rabbitmq.internal");
            Connection connection = factory.newConnection();
            channel = connection.createChannel();
            channel.exchangeDeclare("security-findings", "topic", true);
            
            api.logging().logToOutput("Connected to RabbitMQ");
        } catch (Exception e) {
            api.logging().logToError("RabbitMQ connection failed: " + e.getMessage());
        }
        
        // Register scan listener
        api.scanner().registerAuditIssueHandler(this::handleIssue);
    }
    
    private void handleIssue(AuditIssue issue) {
        Finding finding = new Finding(
            issue.name(),
            issue.severity().name(),
            issue.baseUrl(),
            issue.detail(),
            issue.remediation()
        );
        
        try {
            String routingKey = "finding." + issue.severity().name().toLowerCase();
            channel.basicPublish(
                "security-findings",
                routingKey,
                null,
                gson.toJson(finding).getBytes()
            );
            api.logging().logToOutput("Published: " + finding.name);
        } catch (Exception e) {
            api.logging().logToError("Publish failed: " + e.getMessage());
        }
    }
}
"""

    PYTHON_WORKER = """
# scan_worker.py — Python worker that coordinates Burp scans
import pika
import json
import subprocess
import requests
import time

class BurpScanWorker:
    def __init__(self, rabbitmq_url, burp_api_url="http://localhost:1337"):
        self.burp_api = burp_api_url
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.channel.queue_declare("scan-requests", durable=True)
        self.channel.basic_qos(prefetch_count=1)
    
    def start_scan(self, target_url, scan_config=None):
        '''Start a Burp scan via REST API'''
        payload = {
            "urls": [target_url],
            "scan_configurations": scan_config or [
                {"type": "NamedConfiguration", "name": "Audit checks - all"}
            ]
        }
        resp = requests.post(
            f"{self.burp_api}/v0.1/scan",
            json=payload
        )
        return resp.headers.get("Location")  # scan task ID
    
    def get_scan_status(self, task_id):
        resp = requests.get(f"{self.burp_api}/v0.1/scan/{task_id}")
        return resp.json()
    
    def process_message(self, ch, method, properties, body):
        message = json.loads(body)
        target = message.get("target_url")
        
        print(f"Starting scan: {target}")
        task_id = self.start_scan(target)
        
        # Wait for completion
        while True:
            status = self.get_scan_status(task_id)
            if status.get("scan_status") == "succeeded":
                # Publish results
                self.publish_results(status.get("issue_events", []))
                break
            time.sleep(10)
        
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    def publish_results(self, issues):
        for issue in issues:
            self.channel.basic_publish(
                exchange="security-findings",
                routing_key=f"finding.{issue.get('severity', 'info').lower()}",
                body=json.dumps(issue)
            )
    
    def start(self):
        self.channel.basic_consume("scan-requests", self.process_message)
        print("Worker started. Waiting for scan requests...")
        self.channel.start_consuming()

# worker = BurpScanWorker("amqp://guest:guest@rabbitmq:5672/")
# worker.start()
"""

    def show_java(self):
        print("=== Burp Extension (Java) ===")
        print(self.JAVA_CODE[:600])

    def show_python(self):
        print(f"\n=== Scan Worker (Python) ===")
        print(self.PYTHON_WORKER[:600])

ext = BurpPubSubExtension()
ext.show_java()
ext.show_python()

Results Aggregation & Reporting

# aggregation.py — Aggregate and report findings
import json
import random

class ResultsAggregation:
    CODE = """
# aggregator.py — Aggregate security findings
import pika
import json
from collections import Counter
from datetime import datetime

class FindingsAggregator:
    def __init__(self, rabbitmq_url):
        self.connection = pika.BlockingConnection(
            pika.URLParameters(rabbitmq_url)
        )
        self.channel = self.connection.channel()
        self.findings = []
        
        # Subscribe to all findings
        self.channel.exchange_declare("security-findings", "topic", durable=True)
        result = self.channel.queue_declare("", exclusive=True)
        queue_name = result.method.queue
        self.channel.queue_bind(queue_name, "security-findings", "finding.*")
        self.channel.basic_consume(queue_name, self.on_finding, auto_ack=True)
    
    def on_finding(self, ch, method, properties, body):
        finding = json.loads(body)
        
        # Deduplicate
        key = f"{finding.get('name')}|{finding.get('url')}"
        if not any(f.get('_key') == key for f in self.findings):
            finding['_key'] = key
            finding['found_at'] = datetime.utcnow().isoformat()
            self.findings.append(finding)
            
            # Alert on critical/high
            severity = finding.get('severity', '').lower()
            if severity in ('critical', 'high'):
                self.send_alert(finding)
    
    def send_alert(self, finding):
        # Send to Slack/Jira
        print(f"ALERT: [{finding['severity']}] {finding['name']} at {finding.get('url')}")
    
    def generate_report(self):
        severity_counts = Counter(f.get('severity', 'unknown') for f in self.findings)
        
        return {
            'total_findings': len(self.findings),
            'by_severity': dict(severity_counts),
            'critical': [f for f in self.findings if f.get('severity') == 'critical'],
            'scan_date': datetime.utcnow().isoformat(),
        }

# aggregator = FindingsAggregator("amqp://guest:guest@rabbitmq:5672/")
# aggregator.channel.start_consuming()
"""

    def show_code(self):
        print("=== Findings Aggregator ===")
        print(self.CODE[:600])

    def sample_report(self):
        print(f"\n=== Security Scan Report ===")
        print(f"  Targets scanned: {random.randint(5, 20)}")
        print(f"  Total findings: {random.randint(20, 100)}")
        print(f"  Critical: {random.randint(0, 3)}")
        print(f"  High: {random.randint(2, 10)}")
        print(f"  Medium: {random.randint(5, 30)}")
        print(f"  Low: {random.randint(10, 50)}")
        print(f"  Info: {random.randint(5, 20)}")
        vulns = ["SQL Injection", "XSS (Reflected)", "CSRF", "SSRF", "Open Redirect", "IDOR"]
        print(f"  Top finding: {random.choice(vulns)}")

agg = ResultsAggregation()
agg.show_code()
agg.sample_report()

CI/CD Integration

# cicd.py — CI/CD pipeline integration
import json

class CICDIntegration:
    GITHUB_ACTIONS = """
# .github/workflows/security-scan.yml
name: Security Scan (Burp Suite)
on:
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * 1'  # Weekly Monday 2 AM

jobs:
  security-scan:
    runs-on: self-hosted  # Need Burp Suite installed
    steps:
      - uses: actions/checkout@v4
      
      - name: Start target application
        run: docker compose up -d
      
      - name: Wait for app to be ready
        run: |
          for i in $(seq 1 30); do
            curl -s http://localhost:8080/health && break
            sleep 2
          done
      
      - name: Submit scan request
        run: |
          python3 scripts/submit_scan.py \\
            --target http://localhost:8080 \\
            --rabbitmq-url } \\
            --wait-for-results \\
            --timeout 1800
      
      - name: Check results
        run: |
          python3 scripts/check_results.py \\
            --rabbitmq-url } \\
            --fail-on-severity high \\
            --output report.html
      
      - name: Upload report
        if: always()
        uses: actions/upload-artifact@v4
        with:
          name: security-report
          path: report.html
      
      - name: Cleanup
        if: always()
        run: docker compose down
"""

    SCAN_SUBMIT = """
# submit_scan.py — Submit scan request to Pub/Sub
import pika
import json
import argparse

def submit_scan(target, rabbitmq_url):
    connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
    channel = connection.channel()
    channel.queue_declare("scan-requests", durable=True)
    
    message = {
        "target_url": target,
        "scan_type": "full",
        "config": {"max_crawl_depth": 5, "audit_checks": "all"},
    }
    
    channel.basic_publish(
        exchange="",
        routing_key="scan-requests",
        body=json.dumps(message),
        properties=pika.BasicProperties(delivery_mode=2),
    )
    
    print(f"Scan submitted for {target}")
    connection.close()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--target", required=True)
    parser.add_argument("--rabbitmq-url", required=True)
    args = parser.parse_args()
    submit_scan(args.target, args.rabbitmq_url)
"""

    def show_pipeline(self):
        print("=== CI/CD Pipeline ===")
        print(self.GITHUB_ACTIONS[:500])

    def show_submit(self):
        print(f"\n=== Scan Submit Script ===")
        print(self.SCAN_SUBMIT[:400])

cicd = CICDIntegration()
cicd.show_pipeline()
cicd.show_submit()

FAQ - คำถามที่พบบ่อย

Q: Burp Suite Pro จำเป็นต้องใช้ Pro ไหม?

A: Community Edition: ใช้ได้ฟรี แต่ไม่มี active scanner, limited Intruder Pro Edition ($449/ปี): active scanner, unlimited Intruder, Burp Collaborator, REST API Enterprise Edition: สำหรับ CI/CD, centralized scanning, multi-user สำหรับ Pub/Sub architecture: ต้อง Pro ขึ้นไป (ใช้ REST API)

Q: ทำไมต้องใช้ Pub/Sub กับ security testing?

A: Scale: scan หลาย targets พร้อมกัน — กระจาย load ไปหลาย Burp instances Decouple: scan coordinator ไม่ต้องรู้จัก workers โดยตรง Real-time: findings ส่งถึง aggregator ทันที — ไม่ต้องรอ scan เสร็จ Integration: subscribe ได้หลาย consumers (Slack, Jira, SIEM, dashboard)

Q: RabbitMQ กับ Kafka ใช้อันไหน?

A: RabbitMQ: แนะนำ — task queue pattern เหมาะกับ scan jobs, ง่ายกว่า, routing flexible Kafka: ถ้าต้อง store findings ระยะยาว + replay + high throughput Redis Pub/Sub: ง่ายที่สุด แต่ไม่มี persistence (messages หายถ้า subscriber offline) แนะนำ: เริ่มจาก RabbitMQ → ย้ายไป Kafka ถ้าต้องการ event streaming

Q: Security scan ใน CI/CD ช้าไหม?

A: Full scan: 30-60 นาที (ขึ้นกับ app complexity) เร่งได้: ใช้ lightweight scan config (เฉพาะ critical checks), จำกัด crawl depth, scan เฉพาะ changed endpoints Strategy: PR = quick scan (5-10 นาที), Weekly = full scan (60 นาที) Parallel: Pub/Sub ช่วยกระจาย scan → ลดเวลารวม

📖 บทความที่เกี่ยวข้อง

Burp Suite Pro API Integration เชื่อมต่อระบบอ่านบทความ → Burp Suite Pro Observability Stackอ่านบทความ → Burp Suite Pro Automation Scriptอ่านบทความ → Burp Suite Pro Edge Deploymentอ่านบทความ →

📚 ดูบทความทั้งหมด →