Burp Suite Pro Pub Sub Architecture
Burp Suite Pro Pub Sub Architecture คืออะไร
Burp Suite Professional เป็นเครื่องมือ web application security testing ยอดนิยมจาก PortSwigger ใช้สำหรับ penetration testing, vulnerability scanning และ security research Pub/Sub (Publish-Subscribe) Architecture เป็นรูปแบบ messaging ที่ publishers ส่ง messages ไปยัง topics และ subscribers รับ messages ที่สนใจ โดยไม่ต้องรู้จักกัน การรวม Burp Suite กับ Pub/Sub ช่วยสร้างระบบ security testing แบบ distributed ที่ scan หลาย targets พร้อมกัน แชร์ findings แบบ real-time และ integrate กับ CI/CD pipeline
Burp Suite Pro Features
# burp_features.py — Burp Suite Pro features overview
import json
class BurpSuiteFeatures:
FEATURES = {
"scanner": {
"name": "Active/Passive Scanner",
"description": "สแกน vulnerabilities อัตโนมัติ — SQL injection, XSS, SSRF, XXE และอื่นๆ",
"modes": ["Active scan (ส่ง payloads)", "Passive scan (วิเคราะห์ traffic)"],
},
"intruder": {
"name": "Intruder",
"description": "Automated attack tool — brute force, fuzzing, parameter tampering",
"attack_types": ["Sniper", "Battering Ram", "Pitchfork", "Cluster Bomb"],
},
"repeater": {
"name": "Repeater",
"description": "ส่ง HTTP requests ซ้ำๆ แก้ไขได้ — ใช้ทดสอบ manually",
},
"extensions_api": {
"name": "Extensions API (Montoya API)",
"description": "เขียน extensions ด้วย Java/Python/Ruby — extend Burp functionality",
"use_cases": ["Custom scan checks", "Auto-exploit", "Report generation", "Integration"],
},
"collaborator": {
"name": "Burp Collaborator",
"description": "Out-of-band testing — detect blind SSRF, blind XSS, DNS exfiltration",
},
}
def show_features(self):
print("=== Burp Suite Pro Features ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}]")
print(f" {feat['description']}")
print()
burp = BurpSuiteFeatures()
burp.show_features()
Pub/Sub Architecture Design
# pubsub_arch.py — Pub/Sub architecture for security testing
import json
class PubSubArchitecture:
COMPONENTS = {
"scan_coordinator": {
"name": "Scan Coordinator (Publisher)",
"description": "จัดการ scan jobs — สร้าง tasks, กระจายไป workers, รวบรวมผลลัพธ์",
"publishes_to": ["scan-requests", "scan-config"],
},
"burp_workers": {
"name": "Burp Workers (Subscribers)",
"description": "Burp Suite instances ที่รับ scan tasks จาก queue — ทำ active/passive scan",
"subscribes_to": ["scan-requests"],
"publishes_to": ["scan-results", "vulnerability-findings"],
},
"results_aggregator": {
"name": "Results Aggregator",
"description": "รวบรวม findings จากทุก workers — deduplicate, prioritize, report",
"subscribes_to": ["scan-results", "vulnerability-findings"],
},
"notification_service": {
"name": "Notification Service",
"description": "แจ้งเตือนเมื่อพบ critical vulnerabilities — Slack, email, Jira",
"subscribes_to": ["vulnerability-findings"],
},
"message_broker": {
"name": "Message Broker",
"description": "RabbitMQ หรือ Redis Pub/Sub — route messages ระหว่าง components",
"topics": ["scan-requests", "scan-results", "vulnerability-findings", "scan-status"],
},
}
def show_architecture(self):
print("=== Pub/Sub Architecture ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
if 'publishes_to' in comp:
print(f" Publishes: {', '.join(comp['publishes_to'])}")
if 'subscribes_to' in comp:
print(f" Subscribes: {', '.join(comp['subscribes_to'])}")
print()
arch = PubSubArchitecture()
arch.show_architecture()
Burp Extension for Pub/Sub
# burp_extension.py — Burp Suite extension with Pub/Sub
import json
class BurpPubSubExtension:
JAVA_CODE = """
// BurpPubSubExtension.java — Burp extension that publishes findings
package com.example.burppubsub;
import burp.api.montoya.BurpExtension;
import burp.api.montoya.MontoyaApi;
import burp.api.montoya.scanner.audit.issues.AuditIssue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.google.gson.Gson;
public class BurpPubSubExtension implements BurpExtension {
private MontoyaApi api;
private Channel channel;
private Gson gson = new Gson();
@Override
public void initialize(MontoyaApi api) {
this.api = api;
api.extension().setName("Pub/Sub Scanner");
// Connect to RabbitMQ
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq.internal");
Connection connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("security-findings", "topic", true);
api.logging().logToOutput("Connected to RabbitMQ");
} catch (Exception e) {
api.logging().logToError("RabbitMQ connection failed: " + e.getMessage());
}
// Register scan listener
api.scanner().registerAuditIssueHandler(this::handleIssue);
}
private void handleIssue(AuditIssue issue) {
Finding finding = new Finding(
issue.name(),
issue.severity().name(),
issue.baseUrl(),
issue.detail(),
issue.remediation()
);
try {
String routingKey = "finding." + issue.severity().name().toLowerCase();
channel.basicPublish(
"security-findings",
routingKey,
null,
gson.toJson(finding).getBytes()
);
api.logging().logToOutput("Published: " + finding.name);
} catch (Exception e) {
api.logging().logToError("Publish failed: " + e.getMessage());
}
}
}
"""
PYTHON_WORKER = """
# scan_worker.py — Python worker that coordinates Burp scans
import pika
import json
import subprocess
import requests
import time
class BurpScanWorker:
def __init__(self, rabbitmq_url, burp_api_url="http://localhost:1337"):
self.burp_api = burp_api_url
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.channel.queue_declare("scan-requests", durable=True)
self.channel.basic_qos(prefetch_count=1)
def start_scan(self, target_url, scan_config=None):
'''Start a Burp scan via REST API'''
payload = {
"urls": [target_url],
"scan_configurations": scan_config or [
{"type": "NamedConfiguration", "name": "Audit checks - all"}
]
}
resp = requests.post(
f"{self.burp_api}/v0.1/scan",
json=payload
)
return resp.headers.get("Location") # scan task ID
def get_scan_status(self, task_id):
resp = requests.get(f"{self.burp_api}/v0.1/scan/{task_id}")
return resp.json()
def process_message(self, ch, method, properties, body):
message = json.loads(body)
target = message.get("target_url")
print(f"Starting scan: {target}")
task_id = self.start_scan(target)
# Wait for completion
while True:
status = self.get_scan_status(task_id)
if status.get("scan_status") == "succeeded":
# Publish results
self.publish_results(status.get("issue_events", []))
break
time.sleep(10)
ch.basic_ack(delivery_tag=method.delivery_tag)
def publish_results(self, issues):
for issue in issues:
self.channel.basic_publish(
exchange="security-findings",
routing_key=f"finding.{issue.get('severity', 'info').lower()}",
body=json.dumps(issue)
)
def start(self):
self.channel.basic_consume("scan-requests", self.process_message)
print("Worker started. Waiting for scan requests...")
self.channel.start_consuming()
# worker = BurpScanWorker("amqp://guest:guest@rabbitmq:5672/")
# worker.start()
"""
def show_java(self):
print("=== Burp Extension (Java) ===")
print(self.JAVA_CODE[:600])
def show_python(self):
print(f"\n=== Scan Worker (Python) ===")
print(self.PYTHON_WORKER[:600])
ext = BurpPubSubExtension()
ext.show_java()
ext.show_python()
Results Aggregation & Reporting
# aggregation.py — Aggregate and report findings
import json
import random
class ResultsAggregation:
CODE = """
# aggregator.py — Aggregate security findings
import pika
import json
from collections import Counter
from datetime import datetime
class FindingsAggregator:
def __init__(self, rabbitmq_url):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.findings = []
# Subscribe to all findings
self.channel.exchange_declare("security-findings", "topic", durable=True)
result = self.channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
self.channel.queue_bind(queue_name, "security-findings", "finding.*")
self.channel.basic_consume(queue_name, self.on_finding, auto_ack=True)
def on_finding(self, ch, method, properties, body):
finding = json.loads(body)
# Deduplicate
key = f"{finding.get('name')}|{finding.get('url')}"
if not any(f.get('_key') == key for f in self.findings):
finding['_key'] = key
finding['found_at'] = datetime.utcnow().isoformat()
self.findings.append(finding)
# Alert on critical/high
severity = finding.get('severity', '').lower()
if severity in ('critical', 'high'):
self.send_alert(finding)
def send_alert(self, finding):
# Send to Slack/Jira
print(f"ALERT: [{finding['severity']}] {finding['name']} at {finding.get('url')}")
def generate_report(self):
severity_counts = Counter(f.get('severity', 'unknown') for f in self.findings)
return {
'total_findings': len(self.findings),
'by_severity': dict(severity_counts),
'critical': [f for f in self.findings if f.get('severity') == 'critical'],
'scan_date': datetime.utcnow().isoformat(),
}
# aggregator = FindingsAggregator("amqp://guest:guest@rabbitmq:5672/")
# aggregator.channel.start_consuming()
"""
def show_code(self):
print("=== Findings Aggregator ===")
print(self.CODE[:600])
def sample_report(self):
print(f"\n=== Security Scan Report ===")
print(f" Targets scanned: {random.randint(5, 20)}")
print(f" Total findings: {random.randint(20, 100)}")
print(f" Critical: {random.randint(0, 3)}")
print(f" High: {random.randint(2, 10)}")
print(f" Medium: {random.randint(5, 30)}")
print(f" Low: {random.randint(10, 50)}")
print(f" Info: {random.randint(5, 20)}")
vulns = ["SQL Injection", "XSS (Reflected)", "CSRF", "SSRF", "Open Redirect", "IDOR"]
print(f" Top finding: {random.choice(vulns)}")
agg = ResultsAggregation()
agg.show_code()
agg.sample_report()
CI/CD Integration
# cicd.py — CI/CD pipeline integration
import json
class CICDIntegration:
GITHUB_ACTIONS = """
# .github/workflows/security-scan.yml
name: Security Scan (Burp Suite)
on:
pull_request:
branches: [main]
schedule:
- cron: '0 2 * * 1' # Weekly Monday 2 AM
jobs:
security-scan:
runs-on: self-hosted # Need Burp Suite installed
steps:
- uses: actions/checkout@v4
- name: Start target application
run: docker compose up -d
- name: Wait for app to be ready
run: |
for i in $(seq 1 30); do
curl -s http://localhost:8080/health && break
sleep 2
done
- name: Submit scan request
run: |
python3 scripts/submit_scan.py \\
--target http://localhost:8080 \\
--rabbitmq-url } \\
--wait-for-results \\
--timeout 1800
- name: Check results
run: |
python3 scripts/check_results.py \\
--rabbitmq-url } \\
--fail-on-severity high \\
--output report.html
- name: Upload report
if: always()
uses: actions/upload-artifact@v4
with:
name: security-report
path: report.html
- name: Cleanup
if: always()
run: docker compose down
"""
SCAN_SUBMIT = """
# submit_scan.py — Submit scan request to Pub/Sub
import pika
import json
import argparse
def submit_scan(target, rabbitmq_url):
connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
channel = connection.channel()
channel.queue_declare("scan-requests", durable=True)
message = {
"target_url": target,
"scan_type": "full",
"config": {"max_crawl_depth": 5, "audit_checks": "all"},
}
channel.basic_publish(
exchange="",
routing_key="scan-requests",
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2),
)
print(f"Scan submitted for {target}")
connection.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--target", required=True)
parser.add_argument("--rabbitmq-url", required=True)
args = parser.parse_args()
submit_scan(args.target, args.rabbitmq_url)
"""
def show_pipeline(self):
print("=== CI/CD Pipeline ===")
print(self.GITHUB_ACTIONS[:500])
def show_submit(self):
print(f"\n=== Scan Submit Script ===")
print(self.SCAN_SUBMIT[:400])
cicd = CICDIntegration()
cicd.show_pipeline()
cicd.show_submit()
FAQ - คำถามที่พบบ่อย
Q: Burp Suite Pro จำเป็นต้องใช้ Pro ไหม?
A: Community Edition: ใช้ได้ฟรี แต่ไม่มี active scanner, limited Intruder Pro Edition ($449/ปี): active scanner, unlimited Intruder, Burp Collaborator, REST API Enterprise Edition: สำหรับ CI/CD, centralized scanning, multi-user สำหรับ Pub/Sub architecture: ต้อง Pro ขึ้นไป (ใช้ REST API)
Q: ทำไมต้องใช้ Pub/Sub กับ security testing?
A: Scale: scan หลาย targets พร้อมกัน — กระจาย load ไปหลาย Burp instances Decouple: scan coordinator ไม่ต้องรู้จัก workers โดยตรง Real-time: findings ส่งถึง aggregator ทันที — ไม่ต้องรอ scan เสร็จ Integration: subscribe ได้หลาย consumers (Slack, Jira, SIEM, dashboard)
Q: RabbitMQ กับ Kafka ใช้อันไหน?
A: RabbitMQ: แนะนำ — task queue pattern เหมาะกับ scan jobs, ง่ายกว่า, routing flexible Kafka: ถ้าต้อง store findings ระยะยาว + replay + high throughput Redis Pub/Sub: ง่ายที่สุด แต่ไม่มี persistence (messages หายถ้า subscriber offline) แนะนำ: เริ่มจาก RabbitMQ → ย้ายไป Kafka ถ้าต้องการ event streaming
Q: Security scan ใน CI/CD ช้าไหม?
A: Full scan: 30-60 นาที (ขึ้นกับ app complexity) เร่งได้: ใช้ lightweight scan config (เฉพาะ critical checks), จำกัด crawl depth, scan เฉพาะ changed endpoints Strategy: PR = quick scan (5-10 นาที), Weekly = full scan (60 นาที) Parallel: Pub/Sub ช่วยกระจาย scan → ลดเวลารวม