CrowdSec IPS Message Queue
CrowdSec IPS Intrusion Prevention Crowd Intelligence Bouncer Message Queue Kafka RabbitMQ Log Parsing Behavior Detection Distributed Security Brute Force DDoS
| IPS Tool | Detection | Intelligence | Distributed | ราคา |
|---|---|---|---|---|
| CrowdSec | Behavior | Crowd (Global) | ใช่ | ฟรี/Premium |
| Fail2ban | Regex | ไม่มี | ไม่ | ฟรี |
| Suricata | Signature+Anomaly | Rules | ใช่ | ฟรี |
| Snort | Signature | Rules | ใช่ | ฟรี/Paid |
CrowdSec Setup
# === CrowdSec Installation & Configuration ===
# Install CrowdSec
# curl -s https://packagecloud.io/install/repositories/crowdsec/crowdsec/script.deb.sh | sudo bash
# sudo apt install crowdsec
# sudo apt install crowdsec-firewall-bouncer-iptables
# Configuration
# /etc/crowdsec/acquis.yaml
# ---
# filenames:
# - /var/log/nginx/access.log
# - /var/log/nginx/error.log
# labels:
# type: nginx
# ---
# filenames:
# - /var/log/auth.log
# labels:
# type: syslog
# ---
# source: kafka
# topic: server-logs
# brokers:
# - kafka-01:9092
# - kafka-02:9092
# labels:
# type: nginx
# group_id: crowdsec-consumer
# Install Collections (Detection Rules)
# sudo cscli collections install crowdsecurity/nginx
# sudo cscli collections install crowdsecurity/sshd
# sudo cscli collections install crowdsecurity/linux
# sudo cscli collections install crowdsecurity/http-cve
# Check Status
# sudo cscli metrics
# sudo cscli decisions list
# sudo cscli alerts list
# sudo cscli bouncers list
from dataclasses import dataclass
from typing import List
@dataclass
class CrowdSecAlert:
ip: str
scenario: str
country: str
decisions: str
timestamp: str
alerts = [
CrowdSecAlert("185.220.101.5", "crowdsecurity/ssh-bf", "RU", "ban 4h", "10:00:05"),
CrowdSecAlert("45.33.32.156", "crowdsecurity/http-crawl-non_statics", "US", "ban 1h", "10:02:15"),
CrowdSecAlert("103.152.118.24", "crowdsecurity/http-bad-user-agent", "CN", "ban 24h", "10:05:30"),
CrowdSecAlert("192.168.1.100", "crowdsecurity/http-bf-wordpress_bf", "TH", "ban 4h", "10:10:00"),
CrowdSecAlert("91.242.217.18", "crowdsecurity/http-cve-2021-41773", "DE", "ban 48h", "10:15:22"),
]
print("=== CrowdSec Alerts ===")
for a in alerts:
print(f" [{a.decisions}] {a.ip} ({a.country})")
print(f" Scenario: {a.scenario} | Time: {a.timestamp}")
Message Queue Integration
# === Kafka Message Queue for IPS ===
# Architecture:
# Servers -> Filebeat -> Kafka -> CrowdSec -> Decision Queue -> Bouncers
# Filebeat -> Kafka
# filebeat.yml
# filebeat.inputs:
# - type: log
# paths: ["/var/log/nginx/access.log"]
# fields: { log_type: nginx }
# output.kafka:
# hosts: ["kafka-01:9092", "kafka-02:9092"]
# topic: "server-logs"
# partition.round_robin:
# reachable_only: true
# Python — Kafka Producer for Logs
# from kafka import KafkaProducer
# import json
# import time
#
# producer = KafkaProducer(
# bootstrap_servers=['kafka-01:9092'],
# value_serializer=lambda v: json.dumps(v).encode('utf-8'),
# )
#
# def send_log_event(log_line, source):
# event = {
# "message": log_line,
# "source": source,
# "timestamp": time.time(),
# }
# producer.send('server-logs', value=event)
# Decision Consumer — Apply Bans
# from kafka import KafkaConsumer
# import subprocess
#
# consumer = KafkaConsumer(
# 'crowdsec-decisions',
# bootstrap_servers=['kafka-01:9092'],
# value_deserializer=lambda m: json.loads(m.decode('utf-8')),
# group_id='bouncer-group',
# )
#
# for message in consumer:
# decision = message.value
# if decision['type'] == 'ban':
# ip = decision['value']
# duration = decision['duration']
# subprocess.run(['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'])
@dataclass
class QueueMetrics:
topic: str
partitions: int
messages_sec: int
consumer_lag: int
status: str
queues = [
QueueMetrics("server-logs", 6, 5000, 120, "Healthy"),
QueueMetrics("crowdsec-decisions", 3, 50, 0, "Healthy"),
QueueMetrics("alert-notifications", 2, 10, 5, "Healthy"),
QueueMetrics("threat-intel", 1, 100, 2000, "Lagging"),
]
print("\n=== Message Queue Status ===")
for q in queues:
print(f" [{q.status}] {q.topic}")
print(f" Partitions: {q.partitions} | Rate: {q.messages_sec}/s | Lag: {q.consumer_lag}")
Distributed Security
# === Distributed CrowdSec ===
# Multi-Server Setup
# Server 1 (LAPI - Local API):
# sudo cscli machines add server2 --auto
# sudo cscli machines add server3 --auto
# Server 2/3 (Agent):
# /etc/crowdsec/local_api_credentials.yaml
# url: http://lapi-server:8080
# login: server2
# password:
# Bouncer Registration
# sudo cscli bouncers add nginx-bouncer-server1
# sudo cscli bouncers add fw-bouncer-server2
# CrowdSec Console (Cloud Dashboard)
# sudo cscli console enroll
architecture = {
"Log Sources": "Nginx, SSH, WordPress, Custom Apps",
"Filebeat": "Ship Logs ไป Kafka ทุก Server",
"Kafka": "Message Queue กลาง Buffer + Distribute",
"CrowdSec Engine": "วิเคราะห์ Log ตรวจจับ Attack",
"LAPI": "Local API แชร์ Decision ระหว่าง Agents",
"Bouncers": "Enforce Ban ที่ iptables/Nginx/Cloudflare",
"Console": "Cloud Dashboard ดู Alerts ทั้งหมด",
"Community": "แชร์ IP ร้ายกับผู้ใช้ CrowdSec ทั่วโลก",
}
print("Distributed Architecture:")
for component, role in architecture.items():
print(f" [{component}]: {role}")
# Detection Scenarios
scenarios = {
"SSH Brute Force": {"ban": "4h", "threshold": "5 fails/min"},
"HTTP Crawl": {"ban": "1h", "threshold": "50 non-static/10s"},
"WordPress Brute Force": {"ban": "4h", "threshold": "10 login fails/min"},
"CVE Exploitation": {"ban": "48h", "threshold": "1 attempt"},
"Bad User Agent": {"ban": "24h", "threshold": "SQLmap/Nikto detected"},
"DDoS L7": {"ban": "12h", "threshold": "1000 req/10s"},
}
print(f"\n\nDetection Scenarios:")
for name, info in scenarios.items():
print(f" [{name}] Ban: {info['ban']} | Threshold: {info['threshold']}")
เคล็ดลับ
- Collections: ติดตั้ง Collection ที่ตรงกับ Service เช่น nginx sshd
- Kafka: ใช้ Kafka รวม Log จากหลาย Server ไม่ต้องอ่าน File
- Whitelist: Whitelist IP ของตัวเองก่อน ป้องกันแบนตัวเอง
- Console: ใช้ CrowdSec Console ดู Dashboard ฟรี
- Community: เปิด Community Blocklist รับ IP ร้ายจากทั่วโลก
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
CrowdSec คืออะไร
Open Source IPS Crowd Intelligence Log Behavior Detection Brute Force DDoS Bouncer แบน IP อัตโนมัติ Community ฟรี
Message Queue ใช้กับ IPS อย่างไร
รวม Log หลาย Source Queue กลาง CrowdSec Consumer วิเคราะห์ Decision Queue Bouncer Distributed Scale Fault Tolerant
CrowdSec กับ Fail2ban ต่างกันอย่างไร
CrowdSec Behavior Crowd Intelligence Distributed Dashboard API Fail2ban Regex Local ง่าย CrowdSec ทันสมัย Multi-server
Bouncer คืออะไร
Enforce Decision Block IP Firewall iptables Nginx Cloudflare Custom API REST Ban Decision CrowdSec Engine หลายแบบ
สรุป
CrowdSec IPS Crowd Intelligence Bouncer Message Queue Kafka Log Parsing Behavior Detection Distributed Brute Force DDoS Firewall Nginx Cloudflare Community Blocklist
