ModSecurity WAF Batch Processing Pipeline คืออะไร
ModSecurity เป็น open source Web Application Firewall (WAF) ที่ทำงานเป็น module ของ Apache, Nginx หรือ IIS ใช้ป้องกัน web attacks เช่น SQL Injection, XSS, CSRF และ OWASP Top 10 Batch Processing Pipeline คือการประมวลผล ModSecurity logs และ audit data เป็นชุดเพื่อวิเคราะห์ threats, สร้าง reports, ปรับปรุง rules และตรวจจับ attack patterns การรวมสองแนวคิดนี้ช่วยให้ security teams วิเคราะห์ WAF data ได้อย่างมีประสิทธิภาพ ลด false positives และปรับปรุง security posture อย่างต่อเนื่อง
ModSecurity Architecture
# modsec_arch.py — ModSecurity architecture
import json
class ModSecurityArch:
COMPONENTS = {
"engine": {
"name": "ModSecurity Engine",
"description": "Core WAF engine ที่วิเคราะห์ HTTP requests/responses",
"versions": "v2 (Apache module), v3 (libmodsecurity — standalone library)",
},
"rules": {
"name": "OWASP CRS (Core Rule Set)",
"description": "ชุด rules มาตรฐานสำหรับป้องกัน common attacks",
"version": "CRS v4.x — 200+ rules",
},
"audit_log": {
"name": "Audit Log",
"description": "บันทึกรายละเอียด requests ที่ถูก detect/block",
"format": "JSON หรือ Serial format",
},
"connector": {
"name": "Connector (Nginx/Apache)",
"description": "เชื่อม ModSecurity engine กับ web server",
"options": "Apache mod_security2, Nginx modsecurity-nginx connector",
},
}
PROCESSING_MODES = {
"detection": {"name": "Detection Only (SecRuleEngine DetectionOnly)", "action": "Log แต่ไม่ block", "use": "Testing, tuning rules"},
"on": {"name": "On (SecRuleEngine On)", "action": "Detect + Block malicious requests", "use": "Production"},
"off": {"name": "Off (SecRuleEngine Off)", "action": "ปิด WAF", "use": "Debugging"},
}
NGINX_CONFIG = """
# nginx.conf — ModSecurity with Nginx
load_module modules/ngx_http_modsecurity_module.so;
http {
modsecurity on;
modsecurity_rules_file /etc/modsecurity/modsecurity.conf;
server {
listen 443 ssl;
server_name example.com;
location / {
proxy_pass http://backend;
modsecurity_rules '
SecRuleEngine On
SecAuditEngine RelevantOnly
SecAuditLogFormat JSON
SecAuditLog /var/log/modsecurity/audit.json
';
}
}
}
"""
def show_components(self):
print("=== ModSecurity Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print()
def show_modes(self):
print("=== Processing Modes ===")
for key, mode in self.PROCESSING_MODES.items():
print(f" [{mode['name']}] → {mode['action']}")
def show_config(self):
print(f"\n=== Nginx Config ===")
print(self.NGINX_CONFIG[:400])
arch = ModSecurityArch()
arch.show_components()
arch.show_modes()
arch.show_config()
Batch Processing Pipeline
# pipeline.py — ModSecurity batch processing pipeline
import json
import random
from datetime import datetime, timedelta
class WAFBatchPipeline:
PIPELINE_STAGES = {
"collect": {
"name": "1. Collect",
"description": "รวบรวม audit logs จาก ModSecurity instances",
"tools": "Filebeat, Fluentd, rsync",
},
"parse": {
"name": "2. Parse",
"description": "Parse audit log format → structured JSON",
"tools": "Python parser, Logstash, custom ETL",
},
"enrich": {
"name": "3. Enrich",
"description": "เพิ่มข้อมูล: GeoIP, ASN, threat intel, reputation",
"tools": "MaxMind GeoIP, AbuseIPDB, VirusTotal",
},
"analyze": {
"name": "4. Analyze",
"description": "วิเคราะห์ patterns, false positives, attack campaigns",
"tools": "Python analytics, Elasticsearch, SQL",
},
"report": {
"name": "5. Report",
"description": "สร้าง reports, dashboards, alerts",
"tools": "Grafana, Kibana, custom reports",
},
"tune": {
"name": "6. Tune",
"description": "ปรับปรุง rules: ลด FP, เพิ่ม detection",
"tools": "CRS exclusion rules, custom rules",
},
}
PYTHON_PARSER = """
# modsec_parser.py — Parse ModSecurity audit logs
import json
import os
from datetime import datetime
from collections import Counter
class ModSecLogParser:
def __init__(self, log_dir="/var/log/modsecurity"):
self.log_dir = log_dir
self.events = []
def parse_json_log(self, filepath):
with open(filepath) as f:
for line in f:
try:
event = json.loads(line.strip())
parsed = {
"timestamp": event.get("transaction", {}).get("time", ""),
"client_ip": event.get("transaction", {}).get("client_ip", ""),
"method": event.get("request", {}).get("method", ""),
"uri": event.get("request", {}).get("uri", ""),
"status": event.get("response", {}).get("status", 0),
"rules_matched": [],
}
for msg in event.get("audit_data", {}).get("messages", []):
parsed["rules_matched"].append({
"id": msg.get("details", {}).get("ruleId", ""),
"severity": msg.get("details", {}).get("severity", ""),
"message": msg.get("message", ""),
})
self.events.append(parsed)
except json.JSONDecodeError:
continue
def summary(self):
ip_counts = Counter(e["client_ip"] for e in self.events)
rule_counts = Counter()
for e in self.events:
for r in e["rules_matched"]:
rule_counts[r["id"]] += 1
print(f"Total events: {len(self.events)}")
print(f"Unique IPs: {len(ip_counts)}")
print(f"Top IPs: {ip_counts.most_common(5)}")
print(f"Top Rules: {rule_counts.most_common(5)}")
parser = ModSecLogParser()
parser.parse_json_log("/var/log/modsecurity/audit.json")
parser.summary()
"""
def show_pipeline(self):
print("=== Batch Pipeline Stages ===\n")
for key, stage in self.PIPELINE_STAGES.items():
print(f"[{stage['name']}]")
print(f" {stage['description']}")
print(f" Tools: {stage['tools']}")
print()
def show_parser(self):
print("=== Python Parser ===")
print(self.PYTHON_PARSER[:600])
pipeline = WAFBatchPipeline()
pipeline.show_pipeline()
pipeline.show_parser()
Analytics & False Positive Tuning
# analytics.py — WAF analytics and FP tuning
import json
import random
class WAFAnalytics:
def attack_analysis(self):
print("=== Attack Analysis (Daily Batch) ===\n")
attacks = [
{"type": "SQL Injection (CRS 942xxx)", "count": random.randint(50, 500), "blocked": random.randint(95, 100)},
{"type": "XSS (CRS 941xxx)", "count": random.randint(30, 300), "blocked": random.randint(90, 100)},
{"type": "Path Traversal (CRS 930xxx)", "count": random.randint(20, 200), "blocked": random.randint(95, 100)},
{"type": "Scanner Detection (CRS 913xxx)", "count": random.randint(100, 1000), "blocked": random.randint(98, 100)},
{"type": "PHP Injection (CRS 933xxx)", "count": random.randint(10, 100), "blocked": random.randint(95, 100)},
]
for a in sorted(attacks, key=lambda x: x["count"], reverse=True):
print(f" [{a['type']}] Count: {a['count']:>5} | Blocked: {a['blocked']}%")
def false_positive_report(self):
print(f"\n=== False Positive Analysis ===")
fps = [
{"rule": "942100", "description": "SQL Injection (libinjection)", "fp_count": random.randint(5, 30), "action": "Add URI exclusion"},
{"rule": "941100", "description": "XSS via libinjection", "fp_count": random.randint(3, 20), "action": "Whitelist parameter"},
{"rule": "920350", "description": "Host header is IP address", "fp_count": random.randint(10, 50), "action": "Allow health checks"},
]
for fp in fps:
print(f" [Rule {fp['rule']}] {fp['description']}")
print(f" FP count: {fp['fp_count']} | Action: {fp['action']}")
def tuning_rules(self):
print(f"\n=== Tuning Rules (CRS Exclusions) ===")
exclusions = """
# modsecurity_crs_exclusions.conf
# Exclude health check endpoint from all rules
SecRule REQUEST_URI "@streq /health" \\
"id:1001, phase:1, pass, nolog, ctl:ruleEngine=Off"
# Exclude specific parameter from SQL injection check
SecRule REQUEST_URI "@beginsWith /api/search" \\
"id:1002, phase:1, pass, nolog,\\
ctl:ruleRemoveTargetById=942100;ARGS:query"
# Exclude admin IP from WAF
SecRule REMOTE_ADDR "@ipMatch 10.0.0.0/8" \\
"id:1003, phase:1, pass, nolog, ctl:ruleEngine=Off"
"""
print(exclusions[:400])
analytics = WAFAnalytics()
analytics.attack_analysis()
analytics.false_positive_report()
analytics.tuning_rules()
Automated Pipeline with Airflow
# airflow_dag.py — Airflow DAG for WAF batch processing
import json
class WAFAirflowDAG:
DAG_CODE = """
# dags/waf_batch_pipeline.py — Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'security-team',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'waf_batch_pipeline',
default_args=default_args,
schedule_interval='0 6 * * *', # Daily at 6 AM
catchup=False,
)
collect = BashOperator(
task_id='collect_logs',
bash_command='rsync -az waf-server:/var/log/modsecurity/ /data/waf/raw/',
dag=dag,
)
parse = PythonOperator(
task_id='parse_logs',
python_callable=lambda: __import__('modsec_parser').parse_daily(),
dag=dag,
)
enrich = PythonOperator(
task_id='enrich_data',
python_callable=lambda: __import__('enricher').enrich_with_geoip(),
dag=dag,
)
analyze = PythonOperator(
task_id='analyze_threats',
python_callable=lambda: __import__('analyzer').run_analysis(),
dag=dag,
)
report = PythonOperator(
task_id='generate_report',
python_callable=lambda: __import__('reporter').daily_report(),
dag=dag,
)
tune = PythonOperator(
task_id='suggest_tuning',
python_callable=lambda: __import__('tuner').fp_analysis(),
dag=dag,
)
collect >> parse >> enrich >> analyze >> [report, tune]
"""
def show_dag(self):
print("=== Airflow DAG ===")
print(self.DAG_CODE[:600])
def pipeline_metrics(self):
print(f"\n=== Pipeline Metrics ===")
import random
metrics = {
"Logs processed/day": f"{random.randint(100, 500)}K events",
"Processing time": f"{random.randint(5, 30)} minutes",
"False positives detected": f"{random.randint(10, 100)}",
"New rules suggested": f"{random.randint(0, 5)}",
"Reports generated": "Daily + Weekly",
}
for m, v in metrics.items():
print(f" {m}: {v}")
dag = WAFAirflowDAG()
dag.show_dag()
dag.pipeline_metrics()
Threat Intelligence Integration
# threat_intel.py — Threat intelligence enrichment
import json
import random
class ThreatIntel:
SOURCES = {
"geoip": {"name": "MaxMind GeoIP", "data": "Country, City, ASN", "cost": "Free (GeoLite2)"},
"abuseipdb": {"name": "AbuseIPDB", "data": "IP reputation score, reports", "cost": "Free tier: 1K lookups/day"},
"virustotal": {"name": "VirusTotal", "data": "URL/IP reputation, malware", "cost": "Free: 4 lookups/min"},
"otx": {"name": "AlienVault OTX", "data": "Threat indicators, pulses", "cost": "Free"},
"shodan": {"name": "Shodan", "data": "Open ports, services, vulns", "cost": "Free: limited"},
}
ENRICHER = """
# enricher.py — Enrich WAF events with threat intel
import requests
import json
class WAFEnricher:
def __init__(self):
self.geoip_db = None # MaxMind GeoIP2 database
self.cache = {}
def enrich_ip(self, ip):
if ip in self.cache:
return self.cache[ip]
result = {"ip": ip}
# GeoIP lookup
try:
import geoip2.database
reader = geoip2.database.Reader('GeoLite2-City.mmdb')
response = reader.city(ip)
result["country"] = response.country.name
result["city"] = response.city.name
result["asn"] = response.traits.autonomous_system_number
except Exception:
result["country"] = "Unknown"
# AbuseIPDB lookup
try:
resp = requests.get(
"https://api.abuseipdb.com/api/v2/check",
params={"ipAddress": ip},
headers={"Key": "YOUR_API_KEY", "Accept": "application/json"}
)
data = resp.json().get("data", {})
result["abuse_score"] = data.get("abuseConfidenceScore", 0)
result["total_reports"] = data.get("totalReports", 0)
except Exception:
result["abuse_score"] = -1
self.cache[ip] = result
return result
enricher = WAFEnricher()
print(enricher.enrich_ip("1.2.3.4"))
"""
def show_sources(self):
print("=== Threat Intel Sources ===\n")
for key, source in self.SOURCES.items():
print(f" [{source['name']}] {source['data']} ({source['cost']})")
def show_enricher(self):
print(f"\n=== Enricher Script ===")
print(self.ENRICHER[:500])
ti = ThreatIntel()
ti.show_sources()
ti.show_enricher()
FAQ - คำถามที่พบบ่อย
Q: ModSecurity กับ AWS WAF/Cloudflare WAF อันไหนดี?
A: ModSecurity: ฟรี, self-hosted, full control, ปรับ rules ได้ทุกอย่าง AWS WAF/Cloudflare: managed, ง่ายกว่า, scale อัตโนมัติ, ไม่ต้องดูแลเอง ใช้ ModSecurity: on-premise, ต้องการ full control, budget จำกัด ใช้ managed WAF: cloud-native, ไม่มี ops team, ต้องการความง่าย
Q: CRS rules ทำให้เว็บช้าไหม?
A: เพิ่ม latency 1-5ms ต่อ request (เล็กน้อย) Paranoia Level 1 (default): เร็ว, FP น้อย PL2-4: ช้าขึ้น, strict ขึ้น, FP มากขึ้น เริ่ม PL1 แล้วค่อย tune ขึ้น ถ้าต้องการ Performance: ใช้ ModSecurity v3 (เร็วกว่า v2)
Q: Batch processing กับ real-time ใช้อันไหน?
A: Real-time: blocking attacks, urgent alerts (ModSecurity engine) Batch: analytics, reports, FP tuning, trend analysis ใช้ทั้งคู่: ModSecurity block real-time + batch pipeline วิเคราะห์ logs รายวัน Batch ช่วย tune rules → ลด FP → real-time detection ดีขึ้น
Q: False positives มากทำอย่างไร?
A: เริ่มด้วย DetectionOnly mode (log ไม่ block) วิเคราะห์ logs ด้วย batch pipeline หา FP สร้าง exclusion rules สำหรับ known good traffic ค่อยๆ เปิด On mode หลัง tune เสร็จ Review FP report รายสัปดาห์ ปรับ rules อย่างต่อเนื่อง
