Burp Suite Pro Pub Sub Architecture คืออะไร
Burp Suite Professional เป็นเครื่องมือ web application security testing ยอดนิยมจาก PortSwigger ใช้สำหรับ penetration testing, vulnerability scanning และ security research Pub/Sub (Publish-Subscribe) Architecture เป็นรูปแบบ messaging ที่ publishers ส่ง messages ไปยัง topics และ subscribers รับ messages ที่สนใจ โดยไม่ต้องรู้จักกัน การรวม Burp Suite กับ Pub/Sub ช่วยสร้างระบบ security testing แบบ distributed ที่ scan หลาย targets พร้อมกัน แชร์ findings แบบ real-time และ integrate กับ CI/CD pipeline
Burp Suite Pro Features
# burp_features.py — Burp Suite Pro features overview
import json
class BurpSuiteFeatures:
FEATURES = {
"scanner": {
"name": "Active/Passive Scanner",
"description": "สแกน vulnerabilities อัตโนมัติ — SQL injection, XSS, SSRF, XXE และอื่นๆ",
"modes": ["Active scan (ส่ง payloads)", "Passive scan (วิเคราะห์ traffic)"],
},
"intruder": {
"name": "Intruder",
"description": "Automated attack tool — brute force, fuzzing, parameter tampering",
"attack_types": ["Sniper", "Battering Ram", "Pitchfork", "Cluster Bomb"],
},
"repeater": {
"name": "Repeater",
"description": "ส่ง HTTP requests ซ้ำๆ แก้ไขได้ — ใช้ทดสอบ manually",
},
"extensions_api": {
"name": "Extensions API (Montoya API)",
"description": "เขียน extensions ด้วย Java/Python/Ruby — extend Burp functionality",
"use_cases": ["Custom scan checks", "Auto-exploit", "Report generation", "Integration"],
},
"collaborator": {
"name": "Burp Collaborator",
"description": "Out-of-band testing — detect blind SSRF, blind XSS, DNS exfiltration",
},
}
def show_features(self):
print("=== Burp Suite Pro Features ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}]")
print(f" {feat['description']}")
print()
burp = BurpSuiteFeatures()
burp.show_features()
Pub/Sub Architecture Design
# pubsub_arch.py — Pub/Sub architecture for security testing
import json
class PubSubArchitecture:
COMPONENTS = {
"scan_coordinator": {
"name": "Scan Coordinator (Publisher)",
"description": "จัดการ scan jobs — สร้าง tasks, กระจายไป workers, รวบรวมผลลัพธ์",
"publishes_to": ["scan-requests", "scan-config"],
},
"burp_workers": {
"name": "Burp Workers (Subscribers)",
"description": "Burp Suite instances ที่รับ scan tasks จาก queue — ทำ active/passive scan",
"subscribes_to": ["scan-requests"],
"publishes_to": ["scan-results", "vulnerability-findings"],
},
"results_aggregator": {
"name": "Results Aggregator",
"description": "รวบรวม findings จากทุก workers — deduplicate, prioritize, report",
"subscribes_to": ["scan-results", "vulnerability-findings"],
},
"notification_service": {
"name": "Notification Service",
"description": "แจ้งเตือนเมื่อพบ critical vulnerabilities — Slack, email, Jira",
"subscribes_to": ["vulnerability-findings"],
},
"message_broker": {
"name": "Message Broker",
"description": "RabbitMQ หรือ Redis Pub/Sub — route messages ระหว่าง components",
"topics": ["scan-requests", "scan-results", "vulnerability-findings", "scan-status"],
},
}
def show_architecture(self):
print("=== Pub/Sub Architecture ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
if 'publishes_to' in comp:
print(f" Publishes: {', '.join(comp['publishes_to'])}")
if 'subscribes_to' in comp:
print(f" Subscribes: {', '.join(comp['subscribes_to'])}")
print()
arch = PubSubArchitecture()
arch.show_architecture()
Burp Extension for Pub/Sub
# burp_extension.py — Burp Suite extension with Pub/Sub
import json
class BurpPubSubExtension:
JAVA_CODE = """
// BurpPubSubExtension.java — Burp extension that publishes findings
package com.example.burppubsub;
import burp.api.montoya.BurpExtension;
import burp.api.montoya.MontoyaApi;
import burp.api.montoya.scanner.audit.issues.AuditIssue;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.google.gson.Gson;
public class BurpPubSubExtension implements BurpExtension {
private MontoyaApi api;
private Channel channel;
private Gson gson = new Gson();
@Override
public void initialize(MontoyaApi api) {
this.api = api;
api.extension().setName("Pub/Sub Scanner");
// Connect to RabbitMQ
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq.internal");
Connection connection = factory.newConnection();
channel = connection.createChannel();
channel.exchangeDeclare("security-findings", "topic", true);
api.logging().logToOutput("Connected to RabbitMQ");
} catch (Exception e) {
api.logging().logToError("RabbitMQ connection failed: " + e.getMessage());
}
// Register scan listener
api.scanner().registerAuditIssueHandler(this::handleIssue);
}
private void handleIssue(AuditIssue issue) {
Finding finding = new Finding(
issue.name(),
issue.severity().name(),
issue.baseUrl(),
issue.detail(),
issue.remediation()
);
try {
String routingKey = "finding." + issue.severity().name().toLowerCase();
channel.basicPublish(
"security-findings",
routingKey,
null,
gson.toJson(finding).getBytes()
);
api.logging().logToOutput("Published: " + finding.name);
} catch (Exception e) {
api.logging().logToError("Publish failed: " + e.getMessage());
}
}
}
"""
PYTHON_WORKER = """
# scan_worker.py — Python worker that coordinates Burp scans
import pika
import json
import subprocess
import requests
import time
class BurpScanWorker:
def __init__(self, rabbitmq_url, burp_api_url="http://localhost:1337"):
self.burp_api = burp_api_url
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.channel.queue_declare("scan-requests", durable=True)
self.channel.basic_qos(prefetch_count=1)
def start_scan(self, target_url, scan_config=None):
'''Start a Burp scan via REST API'''
payload = {
"urls": [target_url],
"scan_configurations": scan_config or [
{"type": "NamedConfiguration", "name": "Audit checks - all"}
]
}
resp = requests.post(
f"{self.burp_api}/v0.1/scan",
json=payload
)
return resp.headers.get("Location") # scan task ID
def get_scan_status(self, task_id):
resp = requests.get(f"{self.burp_api}/v0.1/scan/{task_id}")
return resp.json()
def process_message(self, ch, method, properties, body):
message = json.loads(body)
target = message.get("target_url")
print(f"Starting scan: {target}")
task_id = self.start_scan(target)
# Wait for completion
while True:
status = self.get_scan_status(task_id)
if status.get("scan_status") == "succeeded":
# Publish results
self.publish_results(status.get("issue_events", []))
break
time.sleep(10)
ch.basic_ack(delivery_tag=method.delivery_tag)
def publish_results(self, issues):
for issue in issues:
self.channel.basic_publish(
exchange="security-findings",
routing_key=f"finding.{issue.get('severity', 'info').lower()}",
body=json.dumps(issue)
)
def start(self):
self.channel.basic_consume("scan-requests", self.process_message)
print("Worker started. Waiting for scan requests...")
self.channel.start_consuming()
# worker = BurpScanWorker("amqp://guest:guest@rabbitmq:5672/")
# worker.start()
"""
def show_java(self):
print("=== Burp Extension (Java) ===")
print(self.JAVA_CODE[:600])
def show_python(self):
print(f"\n=== Scan Worker (Python) ===")
print(self.PYTHON_WORKER[:600])
ext = BurpPubSubExtension()
ext.show_java()
ext.show_python()
Results Aggregation & Reporting
# aggregation.py — Aggregate and report findings
import json
import random
class ResultsAggregation:
CODE = """
# aggregator.py — Aggregate security findings
import pika
import json
from collections import Counter
from datetime import datetime
class FindingsAggregator:
def __init__(self, rabbitmq_url):
self.connection = pika.BlockingConnection(
pika.URLParameters(rabbitmq_url)
)
self.channel = self.connection.channel()
self.findings = []
# Subscribe to all findings
self.channel.exchange_declare("security-findings", "topic", durable=True)
result = self.channel.queue_declare("", exclusive=True)
queue_name = result.method.queue
self.channel.queue_bind(queue_name, "security-findings", "finding.*")
self.channel.basic_consume(queue_name, self.on_finding, auto_ack=True)
def on_finding(self, ch, method, properties, body):
finding = json.loads(body)
# Deduplicate
key = f"{finding.get('name')}|{finding.get('url')}"
if not any(f.get('_key') == key for f in self.findings):
finding['_key'] = key
finding['found_at'] = datetime.utcnow().isoformat()
self.findings.append(finding)
# Alert on critical/high
severity = finding.get('severity', '').lower()
if severity in ('critical', 'high'):
self.send_alert(finding)
def send_alert(self, finding):
# Send to Slack/Jira
print(f"ALERT: [{finding['severity']}] {finding['name']} at {finding.get('url')}")
def generate_report(self):
severity_counts = Counter(f.get('severity', 'unknown') for f in self.findings)
return {
'total_findings': len(self.findings),
'by_severity': dict(severity_counts),
'critical': [f for f in self.findings if f.get('severity') == 'critical'],
'scan_date': datetime.utcnow().isoformat(),
}
# aggregator = FindingsAggregator("amqp://guest:guest@rabbitmq:5672/")
# aggregator.channel.start_consuming()
"""
def show_code(self):
print("=== Findings Aggregator ===")
print(self.CODE[:600])
def sample_report(self):
print(f"\n=== Security Scan Report ===")
print(f" Targets scanned: {random.randint(5, 20)}")
print(f" Total findings: {random.randint(20, 100)}")
print(f" Critical: {random.randint(0, 3)}")
print(f" High: {random.randint(2, 10)}")
print(f" Medium: {random.randint(5, 30)}")
print(f" Low: {random.randint(10, 50)}")
print(f" Info: {random.randint(5, 20)}")
vulns = ["SQL Injection", "XSS (Reflected)", "CSRF", "SSRF", "Open Redirect", "IDOR"]
print(f" Top finding: {random.choice(vulns)}")
agg = ResultsAggregation()
agg.show_code()
agg.sample_report()
CI/CD Integration
# cicd.py — CI/CD pipeline integration
import json
class CICDIntegration:
GITHUB_ACTIONS = """
# .github/workflows/security-scan.yml
name: Security Scan (Burp Suite)
on:
pull_request:
branches: [main]
schedule:
- cron: '0 2 * * 1' # Weekly Monday 2 AM
jobs:
security-scan:
runs-on: self-hosted # Need Burp Suite installed
steps:
- uses: actions/checkout@v4
- name: Start target application
run: docker compose up -d
- name: Wait for app to be ready
run: |
for i in $(seq 1 30); do
curl -s http://localhost:8080/health && break
sleep 2
done
- name: Submit scan request
run: |
python3 scripts/submit_scan.py \\
--target http://localhost:8080 \\
--rabbitmq-url } \\
--wait-for-results \\
--timeout 1800
- name: Check results
run: |
python3 scripts/check_results.py \\
--rabbitmq-url } \\
--fail-on-severity high \\
--output report.html
- name: Upload report
if: always()
uses: actions/upload-artifact@v4
with:
name: security-report
path: report.html
- name: Cleanup
if: always()
run: docker compose down
"""
SCAN_SUBMIT = """
# submit_scan.py — Submit scan request to Pub/Sub
import pika
import json
import argparse
def submit_scan(target, rabbitmq_url):
connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
channel = connection.channel()
channel.queue_declare("scan-requests", durable=True)
message = {
"target_url": target,
"scan_type": "full",
"config": {"max_crawl_depth": 5, "audit_checks": "all"},
}
channel.basic_publish(
exchange="",
routing_key="scan-requests",
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2),
)
print(f"Scan submitted for {target}")
connection.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--target", required=True)
parser.add_argument("--rabbitmq-url", required=True)
args = parser.parse_args()
submit_scan(args.target, args.rabbitmq_url)
"""
def show_pipeline(self):
print("=== CI/CD Pipeline ===")
print(self.GITHUB_ACTIONS[:500])
def show_submit(self):
print(f"\n=== Scan Submit Script ===")
print(self.SCAN_SUBMIT[:400])
cicd = CICDIntegration()
cicd.show_pipeline()
cicd.show_submit()
FAQ - คำถามที่พบบ่อย
Q: Burp Suite Pro จำเป็นต้องใช้ Pro ไหม?
A: Community Edition: ใช้ได้ฟรี แต่ไม่มี active scanner, limited Intruder Pro Edition ($449/ปี): active scanner, unlimited Intruder, Burp Collaborator, REST API Enterprise Edition: สำหรับ CI/CD, centralized scanning, multi-user สำหรับ Pub/Sub architecture: ต้อง Pro ขึ้นไป (ใช้ REST API)
Q: ทำไมต้องใช้ Pub/Sub กับ security testing?
A: Scale: scan หลาย targets พร้อมกัน — กระจาย load ไปหลาย Burp instances Decouple: scan coordinator ไม่ต้องรู้จัก workers โดยตรง Real-time: findings ส่งถึง aggregator ทันที — ไม่ต้องรอ scan เสร็จ Integration: subscribe ได้หลาย consumers (Slack, Jira, SIEM, dashboard)
Q: RabbitMQ กับ Kafka ใช้อันไหน?
A: RabbitMQ: แนะนำ — task queue pattern เหมาะกับ scan jobs, ง่ายกว่า, routing flexible Kafka: ถ้าต้อง store findings ระยะยาว + replay + high throughput Redis Pub/Sub: ง่ายที่สุด แต่ไม่มี persistence (messages หายถ้า subscriber offline) แนะนำ: เริ่มจาก RabbitMQ → ย้ายไป Kafka ถ้าต้องการ event streaming
Q: Security scan ใน CI/CD ช้าไหม?
A: Full scan: 30-60 นาที (ขึ้นกับ app complexity) เร่งได้: ใช้ lightweight scan config (เฉพาะ critical checks), จำกัด crawl depth, scan เฉพาะ changed endpoints Strategy: PR = quick scan (5-10 นาที), Weekly = full scan (60 นาที) Parallel: Pub/Sub ช่วยกระจาย scan → ลดเวลารวม
