SiamCafe.net Blog
Cybersecurity

ModSecurity WAF Batch Processing Pipeline

modsecurity waf batch processing pipeline
ModSecurity WAF Batch Processing Pipeline | SiamCafe Blog
2026-05-13· อ. บอม — SiamCafe.net· 1,746 คำ

ModSecurity WAF Batch Processing Pipeline คืออะไร

ModSecurity เป็น open source Web Application Firewall (WAF) ที่ทำงานเป็น module ของ Apache, Nginx หรือ IIS ใช้ป้องกัน web attacks เช่น SQL Injection, XSS, CSRF และ OWASP Top 10 Batch Processing Pipeline คือการประมวลผล ModSecurity logs และ audit data เป็นชุดเพื่อวิเคราะห์ threats, สร้าง reports, ปรับปรุง rules และตรวจจับ attack patterns การรวมสองแนวคิดนี้ช่วยให้ security teams วิเคราะห์ WAF data ได้อย่างมีประสิทธิภาพ ลด false positives และปรับปรุง security posture อย่างต่อเนื่อง

ModSecurity Architecture

# modsec_arch.py — ModSecurity architecture
import json

class ModSecurityArch:
    COMPONENTS = {
        "engine": {
            "name": "ModSecurity Engine",
            "description": "Core WAF engine ที่วิเคราะห์ HTTP requests/responses",
            "versions": "v2 (Apache module), v3 (libmodsecurity — standalone library)",
        },
        "rules": {
            "name": "OWASP CRS (Core Rule Set)",
            "description": "ชุด rules มาตรฐานสำหรับป้องกัน common attacks",
            "version": "CRS v4.x — 200+ rules",
        },
        "audit_log": {
            "name": "Audit Log",
            "description": "บันทึกรายละเอียด requests ที่ถูก detect/block",
            "format": "JSON หรือ Serial format",
        },
        "connector": {
            "name": "Connector (Nginx/Apache)",
            "description": "เชื่อม ModSecurity engine กับ web server",
            "options": "Apache mod_security2, Nginx modsecurity-nginx connector",
        },
    }

    PROCESSING_MODES = {
        "detection": {"name": "Detection Only (SecRuleEngine DetectionOnly)", "action": "Log แต่ไม่ block", "use": "Testing, tuning rules"},
        "on": {"name": "On (SecRuleEngine On)", "action": "Detect + Block malicious requests", "use": "Production"},
        "off": {"name": "Off (SecRuleEngine Off)", "action": "ปิด WAF", "use": "Debugging"},
    }

    NGINX_CONFIG = """
# nginx.conf — ModSecurity with Nginx
load_module modules/ngx_http_modsecurity_module.so;

http {
    modsecurity on;
    modsecurity_rules_file /etc/modsecurity/modsecurity.conf;
    
    server {
        listen 443 ssl;
        server_name example.com;
        
        location / {
            proxy_pass http://backend;
            modsecurity_rules '
                SecRuleEngine On
                SecAuditEngine RelevantOnly
                SecAuditLogFormat JSON
                SecAuditLog /var/log/modsecurity/audit.json
            ';
        }
    }
}
"""

    def show_components(self):
        print("=== ModSecurity Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print()

    def show_modes(self):
        print("=== Processing Modes ===")
        for key, mode in self.PROCESSING_MODES.items():
            print(f"  [{mode['name']}] → {mode['action']}")

    def show_config(self):
        print(f"\n=== Nginx Config ===")
        print(self.NGINX_CONFIG[:400])

arch = ModSecurityArch()
arch.show_components()
arch.show_modes()
arch.show_config()

Batch Processing Pipeline

# pipeline.py — ModSecurity batch processing pipeline
import json
import random
from datetime import datetime, timedelta

class WAFBatchPipeline:
    PIPELINE_STAGES = {
        "collect": {
            "name": "1. Collect",
            "description": "รวบรวม audit logs จาก ModSecurity instances",
            "tools": "Filebeat, Fluentd, rsync",
        },
        "parse": {
            "name": "2. Parse",
            "description": "Parse audit log format → structured JSON",
            "tools": "Python parser, Logstash, custom ETL",
        },
        "enrich": {
            "name": "3. Enrich",
            "description": "เพิ่มข้อมูล: GeoIP, ASN, threat intel, reputation",
            "tools": "MaxMind GeoIP, AbuseIPDB, VirusTotal",
        },
        "analyze": {
            "name": "4. Analyze",
            "description": "วิเคราะห์ patterns, false positives, attack campaigns",
            "tools": "Python analytics, Elasticsearch, SQL",
        },
        "report": {
            "name": "5. Report",
            "description": "สร้าง reports, dashboards, alerts",
            "tools": "Grafana, Kibana, custom reports",
        },
        "tune": {
            "name": "6. Tune",
            "description": "ปรับปรุง rules: ลด FP, เพิ่ม detection",
            "tools": "CRS exclusion rules, custom rules",
        },
    }

    PYTHON_PARSER = """
# modsec_parser.py — Parse ModSecurity audit logs
import json
import os
from datetime import datetime
from collections import Counter

class ModSecLogParser:
    def __init__(self, log_dir="/var/log/modsecurity"):
        self.log_dir = log_dir
        self.events = []
    
    def parse_json_log(self, filepath):
        with open(filepath) as f:
            for line in f:
                try:
                    event = json.loads(line.strip())
                    parsed = {
                        "timestamp": event.get("transaction", {}).get("time", ""),
                        "client_ip": event.get("transaction", {}).get("client_ip", ""),
                        "method": event.get("request", {}).get("method", ""),
                        "uri": event.get("request", {}).get("uri", ""),
                        "status": event.get("response", {}).get("status", 0),
                        "rules_matched": [],
                    }
                    for msg in event.get("audit_data", {}).get("messages", []):
                        parsed["rules_matched"].append({
                            "id": msg.get("details", {}).get("ruleId", ""),
                            "severity": msg.get("details", {}).get("severity", ""),
                            "message": msg.get("message", ""),
                        })
                    self.events.append(parsed)
                except json.JSONDecodeError:
                    continue
    
    def summary(self):
        ip_counts = Counter(e["client_ip"] for e in self.events)
        rule_counts = Counter()
        for e in self.events:
            for r in e["rules_matched"]:
                rule_counts[r["id"]] += 1
        
        print(f"Total events: {len(self.events)}")
        print(f"Unique IPs: {len(ip_counts)}")
        print(f"Top IPs: {ip_counts.most_common(5)}")
        print(f"Top Rules: {rule_counts.most_common(5)}")

parser = ModSecLogParser()
parser.parse_json_log("/var/log/modsecurity/audit.json")
parser.summary()
"""

    def show_pipeline(self):
        print("=== Batch Pipeline Stages ===\n")
        for key, stage in self.PIPELINE_STAGES.items():
            print(f"[{stage['name']}]")
            print(f"  {stage['description']}")
            print(f"  Tools: {stage['tools']}")
            print()

    def show_parser(self):
        print("=== Python Parser ===")
        print(self.PYTHON_PARSER[:600])

pipeline = WAFBatchPipeline()
pipeline.show_pipeline()
pipeline.show_parser()

Analytics & False Positive Tuning

# analytics.py — WAF analytics and FP tuning
import json
import random

class WAFAnalytics:
    def attack_analysis(self):
        print("=== Attack Analysis (Daily Batch) ===\n")
        attacks = [
            {"type": "SQL Injection (CRS 942xxx)", "count": random.randint(50, 500), "blocked": random.randint(95, 100)},
            {"type": "XSS (CRS 941xxx)", "count": random.randint(30, 300), "blocked": random.randint(90, 100)},
            {"type": "Path Traversal (CRS 930xxx)", "count": random.randint(20, 200), "blocked": random.randint(95, 100)},
            {"type": "Scanner Detection (CRS 913xxx)", "count": random.randint(100, 1000), "blocked": random.randint(98, 100)},
            {"type": "PHP Injection (CRS 933xxx)", "count": random.randint(10, 100), "blocked": random.randint(95, 100)},
        ]
        for a in sorted(attacks, key=lambda x: x["count"], reverse=True):
            print(f"  [{a['type']}] Count: {a['count']:>5} | Blocked: {a['blocked']}%")

    def false_positive_report(self):
        print(f"\n=== False Positive Analysis ===")
        fps = [
            {"rule": "942100", "description": "SQL Injection (libinjection)", "fp_count": random.randint(5, 30), "action": "Add URI exclusion"},
            {"rule": "941100", "description": "XSS via libinjection", "fp_count": random.randint(3, 20), "action": "Whitelist parameter"},
            {"rule": "920350", "description": "Host header is IP address", "fp_count": random.randint(10, 50), "action": "Allow health checks"},
        ]
        for fp in fps:
            print(f"  [Rule {fp['rule']}] {fp['description']}")
            print(f"    FP count: {fp['fp_count']} | Action: {fp['action']}")

    def tuning_rules(self):
        print(f"\n=== Tuning Rules (CRS Exclusions) ===")
        exclusions = """
# modsecurity_crs_exclusions.conf

# Exclude health check endpoint from all rules
SecRule REQUEST_URI "@streq /health" \\
    "id:1001, phase:1, pass, nolog, ctl:ruleEngine=Off"

# Exclude specific parameter from SQL injection check
SecRule REQUEST_URI "@beginsWith /api/search" \\
    "id:1002, phase:1, pass, nolog,\\
    ctl:ruleRemoveTargetById=942100;ARGS:query"

# Exclude admin IP from WAF
SecRule REMOTE_ADDR "@ipMatch 10.0.0.0/8" \\
    "id:1003, phase:1, pass, nolog, ctl:ruleEngine=Off"
"""
        print(exclusions[:400])

analytics = WAFAnalytics()
analytics.attack_analysis()
analytics.false_positive_report()
analytics.tuning_rules()

Automated Pipeline with Airflow

# airflow_dag.py — Airflow DAG for WAF batch processing
import json

class WAFAirflowDAG:
    DAG_CODE = """
# dags/waf_batch_pipeline.py — Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'security-team',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'waf_batch_pipeline',
    default_args=default_args,
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    catchup=False,
)

collect = BashOperator(
    task_id='collect_logs',
    bash_command='rsync -az waf-server:/var/log/modsecurity/ /data/waf/raw/',
    dag=dag,
)

parse = PythonOperator(
    task_id='parse_logs',
    python_callable=lambda: __import__('modsec_parser').parse_daily(),
    dag=dag,
)

enrich = PythonOperator(
    task_id='enrich_data',
    python_callable=lambda: __import__('enricher').enrich_with_geoip(),
    dag=dag,
)

analyze = PythonOperator(
    task_id='analyze_threats',
    python_callable=lambda: __import__('analyzer').run_analysis(),
    dag=dag,
)

report = PythonOperator(
    task_id='generate_report',
    python_callable=lambda: __import__('reporter').daily_report(),
    dag=dag,
)

tune = PythonOperator(
    task_id='suggest_tuning',
    python_callable=lambda: __import__('tuner').fp_analysis(),
    dag=dag,
)

collect >> parse >> enrich >> analyze >> [report, tune]
"""

    def show_dag(self):
        print("=== Airflow DAG ===")
        print(self.DAG_CODE[:600])

    def pipeline_metrics(self):
        print(f"\n=== Pipeline Metrics ===")
        import random
        metrics = {
            "Logs processed/day": f"{random.randint(100, 500)}K events",
            "Processing time": f"{random.randint(5, 30)} minutes",
            "False positives detected": f"{random.randint(10, 100)}",
            "New rules suggested": f"{random.randint(0, 5)}",
            "Reports generated": "Daily + Weekly",
        }
        for m, v in metrics.items():
            print(f"  {m}: {v}")

dag = WAFAirflowDAG()
dag.show_dag()
dag.pipeline_metrics()

Threat Intelligence Integration

# threat_intel.py — Threat intelligence enrichment
import json
import random

class ThreatIntel:
    SOURCES = {
        "geoip": {"name": "MaxMind GeoIP", "data": "Country, City, ASN", "cost": "Free (GeoLite2)"},
        "abuseipdb": {"name": "AbuseIPDB", "data": "IP reputation score, reports", "cost": "Free tier: 1K lookups/day"},
        "virustotal": {"name": "VirusTotal", "data": "URL/IP reputation, malware", "cost": "Free: 4 lookups/min"},
        "otx": {"name": "AlienVault OTX", "data": "Threat indicators, pulses", "cost": "Free"},
        "shodan": {"name": "Shodan", "data": "Open ports, services, vulns", "cost": "Free: limited"},
    }

    ENRICHER = """
# enricher.py — Enrich WAF events with threat intel
import requests
import json

class WAFEnricher:
    def __init__(self):
        self.geoip_db = None  # MaxMind GeoIP2 database
        self.cache = {}
    
    def enrich_ip(self, ip):
        if ip in self.cache:
            return self.cache[ip]
        
        result = {"ip": ip}
        
        # GeoIP lookup
        try:
            import geoip2.database
            reader = geoip2.database.Reader('GeoLite2-City.mmdb')
            response = reader.city(ip)
            result["country"] = response.country.name
            result["city"] = response.city.name
            result["asn"] = response.traits.autonomous_system_number
        except Exception:
            result["country"] = "Unknown"
        
        # AbuseIPDB lookup
        try:
            resp = requests.get(
                "https://api.abuseipdb.com/api/v2/check",
                params={"ipAddress": ip},
                headers={"Key": "YOUR_API_KEY", "Accept": "application/json"}
            )
            data = resp.json().get("data", {})
            result["abuse_score"] = data.get("abuseConfidenceScore", 0)
            result["total_reports"] = data.get("totalReports", 0)
        except Exception:
            result["abuse_score"] = -1
        
        self.cache[ip] = result
        return result

enricher = WAFEnricher()
print(enricher.enrich_ip("1.2.3.4"))
"""

    def show_sources(self):
        print("=== Threat Intel Sources ===\n")
        for key, source in self.SOURCES.items():
            print(f"  [{source['name']}] {source['data']} ({source['cost']})")

    def show_enricher(self):
        print(f"\n=== Enricher Script ===")
        print(self.ENRICHER[:500])

ti = ThreatIntel()
ti.show_sources()
ti.show_enricher()

FAQ - คำถามที่พบบ่อย

Q: ModSecurity กับ AWS WAF/Cloudflare WAF อันไหนดี?

A: ModSecurity: ฟรี, self-hosted, full control, ปรับ rules ได้ทุกอย่าง AWS WAF/Cloudflare: managed, ง่ายกว่า, scale อัตโนมัติ, ไม่ต้องดูแลเอง ใช้ ModSecurity: on-premise, ต้องการ full control, budget จำกัด ใช้ managed WAF: cloud-native, ไม่มี ops team, ต้องการความง่าย

Q: CRS rules ทำให้เว็บช้าไหม?

A: เพิ่ม latency 1-5ms ต่อ request (เล็กน้อย) Paranoia Level 1 (default): เร็ว, FP น้อย PL2-4: ช้าขึ้น, strict ขึ้น, FP มากขึ้น เริ่ม PL1 แล้วค่อย tune ขึ้น ถ้าต้องการ Performance: ใช้ ModSecurity v3 (เร็วกว่า v2)

Q: Batch processing กับ real-time ใช้อันไหน?

A: Real-time: blocking attacks, urgent alerts (ModSecurity engine) Batch: analytics, reports, FP tuning, trend analysis ใช้ทั้งคู่: ModSecurity block real-time + batch pipeline วิเคราะห์ logs รายวัน Batch ช่วย tune rules → ลด FP → real-time detection ดีขึ้น

Q: False positives มากทำอย่างไร?

A: เริ่มด้วย DetectionOnly mode (log ไม่ block) วิเคราะห์ logs ด้วย batch pipeline หา FP สร้าง exclusion rules สำหรับ known good traffic ค่อยๆ เปิด On mode หลัง tune เสร็จ Review FP report รายสัปดาห์ ปรับ rules อย่างต่อเนื่อง

📖 บทความที่เกี่ยวข้อง

Crowdsec IPS Batch Processing Pipelineอ่านบทความ → ModSecurity WAF Zero Downtime Deploymentอ่านบทความ → ModSecurity WAF GitOps Workflowอ่านบทความ → ModSecurity WAF Stream Processingอ่านบทความ → Datadog APM Batch Processing Pipelineอ่านบทความ →

📚 ดูบทความทั้งหมด →