Nuclei Scanner Monitoring และ Alerting คืออะไร
Nuclei เป็น open source vulnerability scanner จาก ProjectDiscovery ที่ใช้ template-based scanning สำหรับตรวจหา vulnerabilities, misconfigurations และ security issues ในเว็บแอปพลิเคชัน APIs และ infrastructure รองรับ templates กว่า 8,000+ รายการ ครอบคลุม CVEs, OWASP Top 10, default credentials และ exposures Monitoring และ Alerting คือการตั้งระบบสแกนอัตโนมัติและแจ้งเตือนเมื่อพบ vulnerabilities ใหม่ เพื่อให้ทีม security ตอบสนองได้รวดเร็ว
Nuclei พื้นฐาน
# nuclei_basics.py — Nuclei scanner fundamentals
import json
class NucleiBasics:
FEATURES = {
"template_based": {
"name": "Template-Based Scanning",
"description": "ใช้ YAML templates กำหนด scan logic (ไม่ใช่ signature)",
"templates": "8,000+ community templates + custom templates",
},
"fast": {
"name": "High Performance",
"description": "เขียนด้วย Go, สแกนเร็วมาก, รองรับ concurrent requests",
"benchmark": "สแกน 1,000+ hosts ใน minutes",
},
"protocols": {
"name": "Multi-Protocol",
"description": "HTTP, DNS, TCP, SSL, File, Code, Headless browser",
"use": "ตรวจได้ทั้ง web, network, DNS misconfig",
},
"severity": {
"name": "Severity Levels",
"description": "critical, high, medium, low, info",
"use": "filter และ prioritize findings",
},
}
INSTALL = """
# Installation
go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest
# Or via Docker
docker pull projectdiscovery/nuclei:latest
# Update templates
nuclei -update-templates
# Basic scan
nuclei -u https://example.com -severity critical, high
# Scan multiple targets
nuclei -l targets.txt -severity critical, high, medium -o results.txt
# Scan with specific templates
nuclei -u https://example.com -t cves/ -t misconfigurations/
# JSON output for automation
nuclei -u https://example.com -jsonl -o results.jsonl
"""
TEMPLATE_EXAMPLE = """
# custom-template.yaml — Custom Nuclei template
id: admin-panel-exposed
info:
name: Admin Panel Exposed
author: security-team
severity: high
description: Admin panel is publicly accessible
tags: exposure, admin
http:
- method: GET
path:
- "{{BaseURL}}/admin"
- "{{BaseURL}}/admin/login"
- "{{BaseURL}}/wp-admin"
- "{{BaseURL}}/administrator"
matchers-condition: or
matchers:
- type: word
words:
- "admin"
- "login"
- "dashboard"
condition: or
- type: status
status:
- 200
- 302
"""
def show_features(self):
print("=== Nuclei Features ===\n")
for key, feature in self.FEATURES.items():
print(f"[{feature['name']}]")
print(f" {feature['description']}")
print()
def show_install(self):
print("=== Installation & Usage ===")
print(self.INSTALL[:400])
def show_template(self):
print(f"\n=== Custom Template ===")
print(self.TEMPLATE_EXAMPLE[:400])
nuclei = NucleiBasics()
nuclei.show_features()
nuclei.show_install()
nuclei.show_template()
Automated Scanning Pipeline
# pipeline.py — Automated Nuclei scanning pipeline
import json
class ScanPipeline:
PYTHON_SCANNER = """
# nuclei_scanner.py — Automated scanning with Python
import subprocess
import json
import os
from datetime import datetime
class NucleiScanner:
def __init__(self, nuclei_path="nuclei"):
self.nuclei_path = nuclei_path
self.results_dir = "scan_results"
os.makedirs(self.results_dir, exist_ok=True)
def scan(self, targets, severity="critical, high, medium",
templates=None, rate_limit=150):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"{self.results_dir}/scan_{timestamp}.jsonl"
cmd = [
self.nuclei_path,
"-severity", severity,
"-rate-limit", str(rate_limit),
"-jsonl",
"-o", output_file,
"-silent",
]
if isinstance(targets, list):
target_file = f"{self.results_dir}/targets_{timestamp}.txt"
with open(target_file, "w") as f:
f.write("\\n".join(targets))
cmd.extend(["-l", target_file])
else:
cmd.extend(["-u", targets])
if templates:
for t in templates:
cmd.extend(["-t", t])
print(f"Scanning {len(targets) if isinstance(targets, list) else 1} targets...")
result = subprocess.run(cmd, capture_output=True, text=True)
findings = self.parse_results(output_file)
return findings
def parse_results(self, output_file):
findings = []
if os.path.exists(output_file):
with open(output_file) as f:
for line in f:
try:
findings.append(json.loads(line.strip()))
except json.JSONDecodeError:
continue
return findings
def summary(self, findings):
severity_count = {}
for f in findings:
sev = f.get("info", {}).get("severity", "unknown")
severity_count[sev] = severity_count.get(sev, 0) + 1
print(f"Total findings: {len(findings)}")
for sev, count in sorted(severity_count.items()):
print(f" {sev}: {count}")
scanner = NucleiScanner()
targets = ["https://example.com", "https://api.example.com"]
findings = scanner.scan(targets)
scanner.summary(findings)
"""
CRON_SCHEDULE = """
# Cron schedule for automated scanning
# Daily scan (critical + high)
0 2 * * * /usr/local/bin/nuclei -l /opt/targets.txt -severity critical, high -jsonl -o /opt/results/daily_$(date +\\%Y\\%m\\%d).jsonl
# Weekly full scan (all severities)
0 3 * * 0 /usr/local/bin/nuclei -l /opt/targets.txt -jsonl -o /opt/results/weekly_$(date +\\%Y\\%m\\%d).jsonl
# Monthly template update
0 1 1 * * /usr/local/bin/nuclei -update-templates
"""
def show_scanner(self):
print("=== Python Scanner ===")
print(self.PYTHON_SCANNER[:600])
def show_cron(self):
print(f"\n=== Cron Schedule ===")
print(self.CRON_SCHEDULE[:400])
pipeline = ScanPipeline()
pipeline.show_scanner()
pipeline.show_cron()
Alerting Integration
# alerting.py — Nuclei alerting integration
import json
import random
class NucleiAlerting:
INTEGRATIONS = {
"slack": {
"name": "Slack",
"method": "Webhook",
"config": "nuclei -u target.com -severity critical, high -ms slack -slack-webhook-url https://hooks.slack.com/...",
},
"discord": {
"name": "Discord",
"method": "Webhook",
"config": "nuclei -u target.com -severity critical -ms discord -discord-webhook-url https://discord.com/api/webhooks/...",
},
"telegram": {
"name": "Telegram",
"method": "Bot API",
"config": "nuclei -u target.com -ms telegram -telegram-api-key BOT_TOKEN -telegram-chat-id CHAT_ID",
},
"email": {
"name": "Email (SMTP)",
"method": "SMTP",
"config": "Custom Python script → parse results → send email",
},
"jira": {
"name": "Jira",
"method": "API",
"config": "nuclei -u target.com -ms jira -jira-url https://company.atlassian.net -jira-token TOKEN",
},
}
SLACK_ALERT = """
# slack_alert.py — Custom Slack alerting
import requests
import json
SLACK_WEBHOOK = "https://hooks.slack.com/services/XXX/YYY/ZZZ"
def send_alert(findings):
critical = [f for f in findings if f['info']['severity'] == 'critical']
high = [f for f in findings if f['info']['severity'] == 'high']
if not critical and not high:
return
blocks = [{
"type": "header",
"text": {"type": "plain_text", "text": f"🚨 Nuclei Scan Alert: {len(critical)} Critical, {len(high)} High"}
}]
for finding in (critical + high)[:10]:
blocks.append({
"type": "section",
"text": {"type": "mrkdwn", "text":
f"*[{finding['info']['severity'].upper()}]* {finding['info']['name']}\\n"
f"Host: `{finding.get('host', 'N/A')}`\\n"
f"Template: `{finding.get('template-id', 'N/A')}`"
}
})
requests.post(SLACK_WEBHOOK, json={"blocks": blocks})
print(f"Slack alert sent: {len(critical)} critical, {len(high)} high")
"""
def show_integrations(self):
print("=== Alert Integrations ===\n")
for key, integ in self.INTEGRATIONS.items():
print(f" [{integ['name']}] {integ['method']}")
def show_slack(self):
print(f"\n=== Custom Slack Alert ===")
print(self.SLACK_ALERT[:500])
def simulate(self):
print(f"\n=== Simulated Alert ===")
alerts = [
{"severity": "CRITICAL", "name": "CVE-2024-1234 RCE", "host": "api.example.com"},
{"severity": "HIGH", "name": "SQL Injection", "host": "app.example.com/search"},
{"severity": "HIGH", "name": "Exposed Admin Panel", "host": "admin.example.com"},
]
for a in alerts:
print(f" [{a['severity']:>8}] {a['name']} — {a['host']}")
alert = NucleiAlerting()
alert.show_integrations()
alert.show_slack()
alert.simulate()
Dashboard & Reporting
# dashboard.py — Nuclei monitoring dashboard
import json
import random
class NucleiDashboard:
def scan_history(self):
print("=== Scan History (Last 7 Days) ===\n")
for i in range(7, 0, -1):
critical = random.randint(0, 3)
high = random.randint(1, 10)
medium = random.randint(5, 30)
total = critical + high + medium
bar = "█" * (total // 3)
print(f" Day -{i}: C={critical} H={high} M={medium} Total={total:>3} {bar}")
def top_findings(self):
print(f"\n=== Top Findings ===")
findings = [
{"name": "Exposed .git directory", "count": random.randint(3, 15), "severity": "HIGH"},
{"name": "Default credentials", "count": random.randint(2, 8), "severity": "CRITICAL"},
{"name": "Missing security headers", "count": random.randint(10, 50), "severity": "MEDIUM"},
{"name": "SSL certificate issues", "count": random.randint(5, 20), "severity": "MEDIUM"},
{"name": "Open redirect", "count": random.randint(1, 5), "severity": "HIGH"},
]
for f in sorted(findings, key=lambda x: x["count"], reverse=True):
print(f" [{f['severity']:>8}] {f['name']:<30} × {f['count']}")
def asset_coverage(self):
print(f"\n=== Asset Coverage ===")
total = random.randint(50, 200)
scanned = total - random.randint(0, 10)
coverage = (scanned / total) * 100
print(f" Total assets: {total}")
print(f" Scanned: {scanned}")
print(f" Coverage: {coverage:.1f}%")
print(f" Last full scan: {random.randint(1, 7)} days ago")
dash = NucleiDashboard()
dash.scan_history()
dash.top_findings()
dash.asset_coverage()
CI/CD Integration
# cicd.py — Nuclei in CI/CD pipeline
import json
class NucleiCICD:
GITHUB_ACTION = """
# .github/workflows/nuclei-scan.yml
name: Nuclei Security Scan
on:
schedule:
- cron: '0 2 * * *' # Daily at 2 AM
workflow_dispatch:
jobs:
nuclei-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Nuclei
run: |
go install -v github.com/projectdiscovery/nuclei/v3/cmd/nuclei@latest
nuclei -update-templates
- name: Run Scan
run: |
nuclei -l targets.txt \\
-severity critical, high \\
-jsonl -o results.jsonl \\
-ms slack \\
-slack-webhook-url }
- name: Check Results
run: |
CRITICAL=$(grep -c '"critical"' results.jsonl || true)
if [ "$CRITICAL" -gt 0 ]; then
echo "CRITICAL vulnerabilities found!"
exit 1
fi
- name: Upload Results
if: always()
uses: actions/upload-artifact@v4
with:
name: nuclei-results
path: results.jsonl
"""
def show_action(self):
print("=== GitHub Actions ===")
print(self.GITHUB_ACTION[:600])
def best_practices(self):
print(f"\n=== Best Practices ===")
practices = [
"Rate limit: -rate-limit 100 เพื่อไม่ overload target",
"Exclude: -exclude-tags dos, fuzz สำหรับ production",
"Scope: สแกนเฉพาะ assets ที่ได้รับอนุญาต",
"Schedule: Daily (critical/high), Weekly (all), Monthly (full audit)",
"Triage: review findings ก่อน alert เพื่อลด false positives",
]
for p in practices:
print(f" • {p}")
cicd = NucleiCICD()
cicd.show_action()
cicd.best_practices()
FAQ - คำถามที่พบบ่อย
Q: Nuclei กับ Nessus/Qualys อันไหนดี?
A: Nuclei: ฟรี, open source, template-based, เร็วมาก, community-driven Nessus/Qualys: enterprise, compliance-focused, GUI, professional support ใช้ Nuclei: web app scanning, bug bounty, DevSecOps pipeline ใช้ Nessus/Qualys: compliance (PCI DSS, ISO 27001), enterprise audit หลายทีมใช้ทั้งคู่: Nuclei สำหรับ web + Nessus สำหรับ infrastructure
Q: False positives มากไหม?
A: น้อยกว่า traditional scanners เพราะ template-based (specific patterns) Community templates มีคุณภาพดี (reviewed) แต่ยังมีบ้าง โดยเฉพาะ info-level findings แนะนำ: เริ่มด้วย critical+high, triage ก่อน alert, สร้าง exclude list
Q: สแกน production ปลอดภัยไหม?
A: ปลอดภัยถ้าทำถูกวิธี ใช้ -rate-limit (จำกัด requests/sec) ใช้ -exclude-tags dos, fuzz (ไม่ทำ DoS หรือ fuzzing) สแกนช่วง low-traffic แจ้ง ops team ก่อนสแกน ทดสอบใน staging ก่อน production เสมอ
Q: Custom template เขียนยากไหม?
A: ง่ายมาก YAML format อ่านเข้าใจง่าย ดู community templates เป็นตัวอย่าง (8,000+) ใช้ nuclei -validate เพื่อ check template syntax Templates รองรับ: HTTP, DNS, TCP, SSL, file, code, headless เขียน template ใหม่ได้ใน 10-15 นาที
