SiamCafe · Blog
CrowdSec IPS Message Queue Design —
บทความ

CrowdSec IPS Message Queue Design —

เผยแพร่ 28 พฤษภาคม 2569

CrowdSec IPS Message Queue

CrowdSec IPS Intrusion Prevention Crowd Intelligence Bouncer Message Queue Kafka RabbitMQ Log Parsing Behavior Detection Distributed Security Brute Force DDoS

IPS ToolDetectionIntelligenceDistributedราคา
CrowdSecBehaviorCrowd (Global)ใช่ฟรี/Premium
Fail2banRegexไม่มีไม่ฟรี
SuricataSignature+AnomalyRulesใช่ฟรี
SnortSignatureRulesใช่ฟรี/Paid

CrowdSec Setup

=== CrowdSec Installation & Configuration ===

Install CrowdSec

curl -s https://packagecloud.io/install/repositories/crowdsec/crowdsec/script.deb.sh | sudo bash

sudo apt install crowdsec

sudo apt install crowdsec-firewall-bouncer-iptables

Configuration

/etc/crowdsec/acquis.yaml

---

filenames:

  • /var/log/nginx/access.log
  • /var/log/nginx/error.log

labels:

type: nginx

---

filenames:

  • /var/log/auth.log

labels:

type: syslog

---

source: kafka

topic: server-logs

brokers:

  • kafka-01:9092
  • kafka-02:9092

labels:

type: nginx

group_id: crowdsec-consumer

Install Collections (Detection Rules)

sudo cscli collections install crowdsecurity/nginx

sudo cscli collections install crowdsecurity/sshd

sudo cscli collections install crowdsecurity/linux

sudo cscli collections install crowdsecurity/http-cve

Check Status

sudo cscli metrics

sudo cscli decisions list

sudo cscli alerts list

sudo cscli bouncers list

from dataclasses import dataclass

from typing import List

@dataclass

class CrowdSecAlert:

ip: str

scenario: str

country: str

decisions: str

timestamp: str

alerts = [

CrowdSecAlert("185.220.101.5", "crowdsecurity/ssh-bf", "RU", "ban 4h", "10:00:05"),

CrowdSecAlert("45.33.32.156", "crowdsecurity/http-crawl-non_statics", "US", "ban 1h", "10:02:15"),

CrowdSecAlert("103.152.118.24", "crowdsecurity/http-bad-user-agent", "CN", "ban 24h", "10:05:30"),

CrowdSecAlert("192.168.1.100", "crowdsecurity/http-bf-wordpress_bf", "TH", "ban 4h", "10:10:00"),

CrowdSecAlert("91.242.217.18", "crowdsecurity/http-cve-2021-41773", "DE", "ban 48h", "10:15:22"),

]

print("=== CrowdSec Alerts ===")

for a in alerts:

print(f" [{a.decisions}] {a.ip} ({a.country})")

print(f" Scenario: {a.scenario} | Time: {a.timestamp}")

Message Queue Integration

=== Kafka Message Queue for IPS ===

Architecture:

Servers -> Filebeat -> Kafka -> CrowdSec -> Decision Queue -> Bouncers

Filebeat -> Kafka

filebeat.yml

filebeat.inputs:

  • type: log

paths: ["/var/log/nginx/access.log"]

fields: { log_type: nginx }

output.kafka:

hosts: ["kafka-01:9092", "kafka-02:9092"]

topic: "server-logs"

partition.round_robin:

reachable_only: true

Python — Kafka Producer for Logs

from kafka import KafkaProducer

import json

import time

producer = KafkaProducer(

bootstrap_servers=['kafka-01:9092'],

value_serializer=lambda v: json.dumps(v).encode('utf-8'),

)

def send_log_event(log_line, source):

event = {

"message": log_line,

"source": source,

"timestamp": time.time(),

}

producer.send('server-logs', value=event)

Decision Consumer — Apply Bans

from kafka import KafkaConsumer

import subprocess

consumer = KafkaConsumer(

'crowdsec-decisions',

bootstrap_servers=['kafka-01:9092'],

value_deserializer=lambda m: json.loads(m.decode('utf-8')),

group_id='bouncer-group',

)

for message in consumer:

decision = message.value

if decision['type'] == 'ban':

ip = decision['value']

duration = decision['duration']

subprocess.run(['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'])

@dataclass

class QueueMetrics:

topic: str

partitions: int

messages_sec: int

consumer_lag: int

status: str

queues = [

QueueMetrics("server-logs", 6, 5000, 120, "Healthy"),

QueueMetrics("crowdsec-decisions", 3, 50, 0, "Healthy"),

QueueMetrics("alert-notifications", 2, 10, 5, "Healthy"),

QueueMetrics("threat-intel", 1, 100, 2000, "Lagging"),

]

print("\n=== Message Queue Status ===")

for q in queues:

print(f" [{q.status}] {q.topic}")

print(f" Partitions: {q.partitions} | Rate: {q.messages_sec}/s | Lag: {q.consumer_lag}")

Distributed Security

# === Distributed CrowdSec === # Multi-Server Setup # Server 1 (LAPI - Local API): # sudo cscli machines add server2 --auto # sudo cscli machines add server3 --auto # Server 2/3 (Agent): # /etc/crowdsec/local_api_credentials.yaml # url: http://lapi-server:8080 # login: server2 # password: # Bouncer Registration # sudo cscli bouncers add nginx-bouncer-server1 # sudo cscli bouncers add fw-bouncer-server2 # CrowdSec Console (Cloud Dashboard) # sudo cscli console enroll architecture = { "Log Sources": "Nginx, SSH, WordPress, Custom Apps", "Filebeat": "Ship Logs ไป Kafka ทุก Server", "Kafka": "Message Queue กลาง Buffer + Distribute", "CrowdSec Engine": "วิเคราะห์ Log ตรวจจับ Attack", "LAPI": "Local API แชร์ Decision ระหว่าง Agents", "Bouncers": "Enforce Ban ที่ iptables/Nginx/Cloudflare", "Console": "Cloud Dashboard ดู Alerts ทั้งหมด", "Community": "แชร์ IP ร้ายกับผู้ใช้ CrowdSec ทั่วโลก", } print("Distributed Architecture:") for component, role in architecture.items(): print(f" [{component}]: {role}") # Detection Scenarios scenarios = { "SSH Brute Force": {"ban": "4h", "threshold": "5 fails/min"}, "HTTP Crawl": {"ban": "1h", "threshold": "50 non-static/10s"}, "WordPress Brute Force": {"ban": "4h", "threshold": "10 login fails/min"}, "CVE Exploitation": {"ban": "48h", "threshold": "1 attempt"}, "Bad User Agent": {"ban": "24h", "threshold": "SQLmap/Nikto detected"}, "DDoS L7": {"ban": "12h", "threshold": "1000 req/10s"}, } print(f"\n\nDetection Scenarios:") for name, info in scenarios.items(): print(f" [{name}] Ban: {info['ban']} | Threshold: {info['threshold']}")

เคล็ดลับ

  • Collections: ติดตั้ง Collection ที่ตรงกับ Service เช่น nginx sshd
  • Kafka: ใช้ Kafka รวม Log จากหลาย Server ไม่ต้องอ่าน File
  • Whitelist: Whitelist IP ของตัวเองก่อน ป้องกันแบนตัวเอง
  • Console: ใช้ CrowdSec Console ดู Dashboard ฟรี
  • Community: เปิด Community Blocklist รับ IP ร้ายจากทั่วโลก

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

CrowdSec คืออะไร

Open Source IPS Crowd Intelligence Log Behavior Detection Brute Force DDoS Bouncer แบน IP อัตโนมัติ Community ฟรี

Message Queue ใช้กับ IPS อย่างไร

รวม Log หลาย Source Queue กลาง CrowdSec Consumer วิเคราะห์ Decision Queue Bouncer Distributed Scale Fault Tolerant

CrowdSec กับ Fail2ban ต่างกันอย่างไร

CrowdSec Behavior Crowd Intelligence Distributed Dashboard API Fail2ban Regex Local ง่าย CrowdSec ทันสมัย Multi-server

Bouncer คืออะไร

Enforce Decision Block IP Firewall iptables Nginx Cloudflare Custom API REST Ban Decision CrowdSec Engine หลายแบบ

สรุป

CrowdSec IPS Crowd Intelligence Bouncer Message Queue Kafka Log Parsing Behavior Detection Distributed Brute Force DDoS Firewall Nginx Cloudflare Community Blocklist