ModSecurity WAF Blue Green Canary Deploy คืออะไร
ModSecurity เป็น open-source Web Application Firewall (WAF) ที่ทำงานเป็น module ของ Apache, Nginx หรือ IIS ป้องกัน web attacks เช่น SQL Injection, XSS, CSRF และ OWASP Top 10 Blue-Green Deployment คือการ deploy โดยมี 2 environments (blue/green) สลับ traffic ระหว่างกัน Canary Deployment คือการปล่อย version ใหม่ให้ traffic ส่วันนี้อยก่อน การรวม ModSecurity WAF กับ Blue-Green และ Canary deployment ช่วยให้อัพเดท WAF rules อย่างปลอดภัย ทดสอบ rules ใหม่กับ traffic จริงก่อน rollout เต็ม ลดความเสี่ยงจาก false positives
ModSecurity Fundamentals
# modsecurity.py — ModSecurity WAF fundamentals
import json
class ModSecurityBasics:
FEATURES = {
"rule_engine": {
"name": "Rule Engine",
"description": "ประมวลผล HTTP requests/responses ตาม rules — block, allow, log",
"modes": "DetectionOnly (log only), On (block), Off (disabled)",
},
"crs": {
"name": "OWASP Core Rule Set (CRS)",
"description": "ชุด rules มาตรฐานป้องกัน OWASP Top 10 — SQL injection, XSS, RCE",
"version": "CRS v4.x — paranoia levels 1-4",
},
"audit_log": {
"name": "Audit Logging",
"description": "Log ทุก request ที่ match rules — forensic analysis, compliance",
},
"virtual_patching": {
"name": "Virtual Patching",
"description": "เขียน rules ป้องกัน vulnerabilities โดยไม่ต้องแก้ code — patch ชั่วคราว",
},
}
RULES_EXAMPLE = """
# ModSecurity rules example
# Block SQL injection
SecRule ARGS "@detectSQLi" \\
"id:100001,\\
phase:2,\\
deny,\\
status:403,\\
msg:'SQL Injection Detected',\\
tag:'attack-sqli',\\
severity:'CRITICAL'"
# Block XSS
SecRule ARGS "@detectXSS" \\
"id:100002,\\
phase:2,\\
deny,\\
status:403,\\
msg:'XSS Attack Detected',\\
tag:'attack-xss',\\
severity:'CRITICAL'"
# Rate limiting
SecRule IP:COUNTER "@gt 100" \\
"id:100003,\\
phase:1,\\
deny,\\
status:429,\\
msg:'Rate Limit Exceeded',\\
initcol:ip=%{REMOTE_ADDR},\\
setvar:ip.counter=+1,\\
expirevar:ip.counter=60"
"""
def show_features(self):
print("=== ModSecurity Features ===\n")
for key, feat in self.FEATURES.items():
print(f"[{feat['name']}]")
print(f" {feat['description']}")
print()
def show_rules(self):
print("=== Rules Example ===")
print(self.RULES_EXAMPLE[:400])
basics = ModSecurityBasics()
basics.show_features()
basics.show_rules()
Blue-Green WAF Deployment
# blue_green.py — Blue-Green deployment for ModSecurity
import json
class BlueGreenWAF:
ARCHITECTURE = {
"concept": "มี 2 WAF environments (Blue/Green) — สลับ traffic ด้วย load balancer",
"blue": "Blue: production ปัจจุบัน — รับ traffic 100%",
"green": "Green: version ใหม่ (rules ใหม่) — idle, พร้อม deploy",
"switch": "เมื่อ Green ผ่าน test → สลับ traffic จาก Blue ไป Green ทันที",
"rollback": "ถ้ามีปัญหา → สลับกลับ Blue ภายในวินาที",
}
STEPS = {
"step1": "Deploy WAF rules ใหม่ไป Green environment",
"step2": "รัน automated tests กับ Green (functional + security)",
"step3": "Smoke test: ส่ง sample traffic ไป Green → ตรวจ false positives",
"step4": "Switch traffic: Load balancer point ไป Green",
"step5": "Monitor: ดู error rate, false positive rate, latency",
"step6": "ถ้า OK → Green เป็น production ใหม่, Blue เป็น standby",
"step7": "ถ้ามีปัญหา → switch กลับ Blue ทันที",
}
NGINX_CONFIG = """
# nginx.conf — Blue-Green WAF switching
upstream waf_backend {
# Blue (current production)
server waf-blue:80 weight=100;
# Green (new version) — uncomment to switch
# server waf-green:80 weight=100;
}
# Alternative: use variable for instant switch
map $waf_target $waf_upstream {
"blue" waf-blue:80;
"green" waf-green:80;
}
server {
listen 443 ssl;
location / {
proxy_pass http://waf_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
}
}
"""
def show_architecture(self):
print("=== Blue-Green Architecture ===\n")
for key, val in self.ARCHITECTURE.items():
print(f" [{key}] {val}")
def show_steps(self):
print(f"\n=== Deployment Steps ===")
for key, step in self.STEPS.items():
print(f" {step}")
def show_config(self):
print(f"\n=== NGINX Config ===")
print(self.NGINX_CONFIG[:400])
bg = BlueGreenWAF()
bg.show_architecture()
bg.show_steps()
Canary WAF Deployment
# canary.py — Canary deployment for WAF rules
import json
class CanaryWAF:
STRATEGY = {
"concept": "ปล่อย WAF rules ใหม่ให้ traffic ส่วันนี้อยก่อน (5-10%) → เพิ่มทีละขั้น",
"benefit": "ตรวจจับ false positives จาก rules ใหม่ก่อน rollout เต็ม",
"stages": [
"Stage 1: 5% traffic → monitor 1 ชั่วโมง",
"Stage 2: 25% traffic → monitor 4 ชั่วโมง",
"Stage 3: 50% traffic → monitor 12 ชั่วโมง",
"Stage 4: 100% traffic → full rollout",
],
"rollback_trigger": "false positive rate > 0.1% หรือ block rate spike > 2x baseline",
}
NGINX_CANARY = """
# nginx.conf — Canary WAF with split traffic
split_clients "" $waf_version {
5% "canary";
* "stable";
}
upstream waf_stable {
server waf-stable:80;
}
upstream waf_canary {
server waf-canary:80;
}
server {
listen 443 ssl;
location / {
if ($waf_version = "canary") {
proxy_pass http://waf_canary;
}
proxy_pass http://waf_stable;
# Add header for debugging
add_header X-WAF-Version $waf_version;
}
}
"""
METRICS = {
"false_positive_rate": "ถูก block โดยไม่ควร block — ต้อง < 0.1%",
"true_positive_rate": "block attacks ได้ถูกต้อง — ต้อง > 99%",
"latency_impact": "WAF เพิ่ม latency เท่าไหร่ — ต้อง < 5ms",
"block_rate": "% ของ requests ที่ถูก block — เปรียบเทียบ stable vs canary",
"error_rate": "5xx errors — ต้องไม่เพิ่มขึ้นจาก baseline",
}
def show_strategy(self):
print("=== Canary Strategy ===\n")
print(f" Concept: {self.STRATEGY['concept']}")
for stage in self.STRATEGY['stages']:
print(f" {stage}")
print(f"\n Rollback: {self.STRATEGY['rollback_trigger']}")
def show_metrics(self):
print(f"\n=== Canary Metrics ===")
for metric, desc in self.METRICS.items():
print(f" [{metric}] {desc}")
canary = CanaryWAF()
canary.show_strategy()
canary.show_metrics()
Python Automation
# automation.py — Python WAF deployment automation
import json
class WAFAutomation:
CODE = """
# waf_deployer.py — Automate ModSecurity WAF deployments
import subprocess
import json
import time
import requests
from datetime import datetime
from pathlib import Path
class WAFDeployer:
def __init__(self, stable_url, canary_url, lb_api_url):
self.stable_url = stable_url
self.canary_url = canary_url
self.lb_api_url = lb_api_url
self.metrics = {'stable': [], 'canary': []}
def deploy_rules(self, rules_path, target='canary'):
'''Deploy new WAF rules to target'''
url = self.canary_url if target == 'canary' else self.stable_url
rules = Path(rules_path).read_text()
# Upload rules via API
resp = requests.post(
f"{url}/admin/rules",
json={'rules': rules, 'action': 'replace'},
timeout=30,
)
if resp.status_code == 200:
# Reload WAF
requests.post(f"{url}/admin/reload", timeout=10)
return {'status': 'deployed', 'target': target}
return {'status': 'failed', 'error': resp.text}
def run_smoke_test(self, target='canary'):
'''Run smoke tests against WAF'''
url = self.canary_url if target == 'canary' else self.stable_url
tests = [
{'name': 'normal_request', 'path': '/', 'expected': 200},
{'name': 'sqli_blocked', 'path': "/?id=1' OR 1=1--", 'expected': 403},
{'name': 'xss_blocked', 'path': '/?q=', 'expected': 403},
{'name': 'normal_post', 'path': '/api/data', 'method': 'POST',
'data': {'name': 'test'}, 'expected': 200},
]
results = []
for test in tests:
try:
method = test.get('method', 'GET')
if method == 'GET':
resp = requests.get(f"{url}{test['path']}", timeout=10)
else:
resp = requests.post(f"{url}{test['path']}",
json=test.get('data'), timeout=10)
passed = resp.status_code == test['expected']
results.append({
'name': test['name'],
'expected': test['expected'],
'actual': resp.status_code,
'passed': passed,
})
except Exception as e:
results.append({'name': test['name'], 'passed': False, 'error': str(e)})
all_passed = all(r['passed'] for r in results)
return {'passed': all_passed, 'results': results}
def set_canary_weight(self, weight_pct):
'''Set canary traffic weight'''
resp = requests.put(
f"{self.lb_api_url}/canary/weight",
json={'weight': weight_pct},
timeout=10,
)
return resp.json()
def monitor_metrics(self, duration_minutes=60, interval_seconds=30):
'''Monitor WAF metrics during canary'''
start = time.time()
while time.time() - start < duration_minutes * 60:
for target in ['stable', 'canary']:
url = self.stable_url if target == 'stable' else self.canary_url
try:
resp = requests.get(f"{url}/metrics", timeout=5)
metrics = resp.json()
metrics['timestamp'] = datetime.utcnow().isoformat()
self.metrics[target].append(metrics)
except:
pass
time.sleep(interval_seconds)
return self.compare_metrics()
def compare_metrics(self):
'''Compare stable vs canary metrics'''
def avg(data, key):
vals = [d.get(key, 0) for d in data if key in d]
return sum(vals) / max(len(vals), 1)
return {
'stable': {
'avg_block_rate': round(avg(self.metrics['stable'], 'block_rate'), 3),
'avg_latency_ms': round(avg(self.metrics['stable'], 'latency_ms')),
'avg_error_rate': round(avg(self.metrics['stable'], 'error_rate'), 4),
},
'canary': {
'avg_block_rate': round(avg(self.metrics['canary'], 'block_rate'), 3),
'avg_latency_ms': round(avg(self.metrics['canary'], 'latency_ms')),
'avg_error_rate': round(avg(self.metrics['canary'], 'error_rate'), 4),
},
}
def should_promote(self):
'''Decide if canary should be promoted'''
comparison = self.compare_metrics()
s = comparison['stable']
c = comparison['canary']
block_diff = abs(c['avg_block_rate'] - s['avg_block_rate'])
latency_diff = c['avg_latency_ms'] - s['avg_latency_ms']
promote = (
block_diff < 0.01 and # Block rate diff < 1%
latency_diff < 5 and # Latency increase < 5ms
c['avg_error_rate'] < s['avg_error_rate'] * 1.5
)
return {
'promote': promote,
'block_rate_diff': round(block_diff, 4),
'latency_diff_ms': round(latency_diff),
'action': 'PROMOTE' if promote else 'ROLLBACK',
}
# deployer = WAFDeployer("http://waf-stable", "http://waf-canary", "http://lb-api")
# deployer.deploy_rules("new_rules.conf", target="canary")
# smoke = deployer.run_smoke_test("canary")
# deployer.set_canary_weight(5)
# decision = deployer.should_promote()
"""
def show_code(self):
print("=== WAF Deployer ===")
print(self.CODE[:600])
automation = WAFAutomation()
automation.show_code()
Docker Setup
# docker.py — Docker setup for Blue-Green WAF
import json
class DockerSetup:
COMPOSE = """
# docker-compose.yaml — Blue-Green ModSecurity WAF
version: '3.8'
services:
nginx-lb:
image: nginx:alpine
ports:
- "443:443"
- "80:80"
volumes:
- ./nginx/lb.conf:/etc/nginx/conf.d/default.conf
depends_on:
- waf-blue
- waf-green
waf-blue:
image: owasp/modsecurity-crs:nginx
environment:
- PARANOIA=1
- ANOMALY_INBOUND=5
- ANOMALY_OUTBOUND=4
volumes:
- ./rules/blue:/etc/modsecurity.d/owasp-crs/rules/custom
- ./modsecurity.conf:/etc/modsecurity.d/modsecurity.conf
waf-green:
image: owasp/modsecurity-crs:nginx
environment:
- PARANOIA=1
- ANOMALY_INBOUND=5
- ANOMALY_OUTBOUND=4
volumes:
- ./rules/green:/etc/modsecurity.d/owasp-crs/rules/custom
- ./modsecurity.conf:/etc/modsecurity.d/modsecurity.conf
app:
build: ./app
ports:
- "8080:8080"
"""
TESTING = {
"nikto": "nikto -h https://waf-canary — web vulnerability scanner",
"sqlmap": "sqlmap -u 'https://waf-canary/?id=1' — SQL injection test",
"zap": "OWASP ZAP active scan — comprehensive security test",
"custom": "Python script: ส่ง OWASP Top 10 payloads → verify blocks",
}
def show_compose(self):
print("=== Docker Compose ===")
print(self.COMPOSE[:500])
def show_testing(self):
print("\n=== Security Testing Tools ===")
for tool, desc in self.TESTING.items():
print(f" [{tool}] {desc}")
docker = DockerSetup()
docker.show_compose()
docker.show_testing()
FAQ - คำถามที่พบบ่อย
Q: ทำไมต้อง Blue-Green/Canary สำหรับ WAF rules?
A: WAF rules ใหม่อาจ block legitimate traffic (false positives) ถ้า deploy เต็ม 100% ทันที → ผู้ใช้ถูก block จำนวนมาก = downtime Blue-Green: สลับกลับได้ทันทีถ้ามีปัญหา (seconds) Canary: ทดสอบกับ traffic จริง 5% ก่อน → ตรวจจับ false positives ก่อน rollout เต็ม สำคัญมาก: WAF false positive = legitimate users ถูก block = เสียรายได้
Q: ModSecurity กับ Cloud WAF (Cloudflare, AWS WAF) อันไหนดีกว่า?
A: ModSecurity: open-source, ฟรี, custom rules ได้เต็มที่, self-hosted — control 100% Cloud WAF: managed, easy setup, DDoS protection built-in, global edge — ไม่ต้อง manage เลือก ModSecurity: ถ้าต้องการ control เต็ม, custom rules ซับซ้อน, ไม่อยากพึ่ง vendor เลือก Cloud WAF: ถ้าต้องการ easy management, DDoS protection, ไม่มี team ดูแล ใช้ร่วมกัน: Cloud WAF (layer 1) → ModSecurity (layer 2) = defense in depth
Q: Paranoia Level ควรตั้งเท่าไหร่?
A: PL 1: พื้นฐาน — block attacks ชัดเจน, false positive ต่ำ (แนะนำเริ่มต้น) PL 2: เพิ่ม rules — block มากขึ้น, false positive ปานกลาง PL 3: aggressive — block เยอะ, false positive สูง (ต้อง tune) PL 4: paranoid — block เกือบทุกอย่าง, false positive สูงมาก แนะนำ: เริ่ม PL 1 → ค่อยๆ เพิ่มทีละ level → tune exclusions สำหรับ false positives
Q: WAF rules update บ่อยแค่ไหน?
A: CRS updates: ทุก 2-3 เดือน (major releases) Custom rules: เมื่อมี vulnerability ใหม่หรือ false positive ที่ต้องแก้ Virtual patches: ทันทีเมื่อพบ vulnerability ใน app แนะนำ: Canary deploy ทุกครั้งที่ update rules — ไม่ว่าจะเป็น CRS update หรือ custom rules Automation: CI/CD pipeline สำหรับ WAF rules — test → canary → promote อัตโนมัติ
