SASE กับ Pub/Sub Architecture
SASE สร้าง Security Events จำนวนมาก Firewall Logs Access Logs Threat Alerts ใช้ Pub/Sub กระจาย Events ไป SIEM SOAR Analytics แบบ Real-time
Pub/Sub ให้ระบบ Loosely Coupled Publisher ส่ง Messages ไป Topic Subscriber รับเฉพาะ Events ที่สนใจ Scale ตาม Volume
Kafka Security Event Pipeline
# === Kafka Security Event Pipeline ===
# pip install confluent-kafka
import json
import time
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum
class EventSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class SecurityEvent:
event_id: str
event_type: str # firewall, access, threat, dlp, audit
severity: str
source: str
destination: str
user: str
action: str # allow, block, alert
details: Dict = field(default_factory=dict)
timestamp: str = ""
def __post_init__(self):
if not self.timestamp:
self.timestamp = datetime.now().isoformat()
class SecurityEventProducer:
"""Kafka Producer สำหรับ Security Events"""
def __init__(self, bootstrap_servers="localhost:9092"):
self.servers = bootstrap_servers
self.events_sent = 0
# self.producer = Producer({
# 'bootstrap.servers': bootstrap_servers,
# 'security.protocol': 'SASL_SSL',
# 'sasl.mechanisms': 'PLAIN',
# 'sasl.username': 'security-producer',
# 'sasl.password': 'secret',
# })
def send_event(self, topic, event: SecurityEvent):
"""ส่ง Event ไป Kafka Topic"""
# self.producer.produce(
# topic,
# key=event.event_type,
# value=json.dumps(asdict(event)),
# callback=self.delivery_report,
# )
# self.producer.flush()
self.events_sent += 1
def delivery_report(self, err, msg):
if err:
print(f" Delivery failed: {err}")
else:
print(f" Delivered to {msg.topic()} [{msg.partition()}]")
# Kafka Topics สำหรับ Security Events
TOPICS = {
"security.firewall": "Firewall allow/block events",
"security.access": "User access events (login, logout, MFA)",
"security.threats": "Threat detection alerts",
"security.dlp": "Data Loss Prevention events",
"security.audit": "Audit trail events",
"security.compliance": "Compliance check results",
}
# Topic Configuration
# kafka-topics.sh --create \
# --topic security.firewall \
# --bootstrap-server localhost:9092 \
# --partitions 12 \
# --replication-factor 3 \
# --config retention.ms=604800000 \
# --config cleanup.policy=delete \
# --config min.insync.replicas=2
# ตัวอย่าง
producer = SecurityEventProducer()
events = [
SecurityEvent("evt-001", "firewall", "high", "198.51.100.1",
"internal-db", "unknown", "block",
{"rule": "deny-external-db", "port": 3306}),
SecurityEvent("evt-002", "access", "info", "vpn-gateway",
"app-server", "alice@company.com", "allow",
{"method": "mfa", "location": "Bangkok"}),
SecurityEvent("evt-003", "threat", "critical", "endpoint-01",
"malware-c2.evil.com", "bob@company.com", "block",
{"threat_type": "malware", "signature": "Trojan.Gen"}),
SecurityEvent("evt-004", "dlp", "high", "alice@company.com",
"dropbox.com", "alice@company.com", "block",
{"data_type": "credit-card", "file": "report.xlsx"}),
]
print("Security Event Pipeline:")
print(f" Topics: {len(TOPICS)}")
for topic, desc in TOPICS.items():
print(f" {topic}: {desc}")
print(f"\n Events:")
for event in events:
producer.send_event(f"security.{event.event_type}", event)
print(f" [{event.severity:>8}] {event.event_type}: "
f"{event.source} -> {event.destination} ({event.action})")
print(f"\n Total sent: {producer.events_sent}")
Event Consumer และ Processing
# event_consumer.py — Security Event Consumer
from dataclasses import dataclass, field
from typing import List, Dict, Callable
from datetime import datetime
from collections import Counter
@dataclass
class ProcessedEvent:
event_id: str
event_type: str
severity: str
action_taken: str
processor: str
class SecurityEventConsumer:
"""Kafka Consumer สำหรับ Security Events"""
def __init__(self, group_id="security-processor"):
self.group_id = group_id
self.processed: List[ProcessedEvent] = []
self.stats = Counter()
# self.consumer = Consumer({
# 'bootstrap.servers': 'localhost:9092',
# 'group.id': group_id,
# 'auto.offset.reset': 'earliest',
# 'security.protocol': 'SASL_SSL',
# })
def process_event(self, event_type, severity, event_id):
"""Process Security Event"""
self.stats[event_type] += 1
self.stats[severity] += 1
# Auto-response rules
action = "logged"
if severity == "critical":
action = "auto-blocked + incident-created"
elif severity == "high":
action = "alerted + investigated"
elif severity == "medium":
action = "alerted"
self.processed.append(ProcessedEvent(
event_id, event_type, severity, action, self.group_id))
return action
def dashboard(self):
"""Consumer Dashboard"""
print(f"\n{'='*55}")
print(f"Security Event Consumer: {self.group_id}")
print(f"{'='*55}")
print(f" Total Processed: {len(self.processed)}")
print(f"\n By Type:")
for event_type in ["firewall", "access", "threat", "dlp", "audit"]:
count = self.stats.get(event_type, 0)
if count > 0:
print(f" {event_type}: {count}")
print(f"\n By Severity:")
for severity in ["critical", "high", "medium", "low", "info"]:
count = self.stats.get(severity, 0)
if count > 0:
bar = "#" * min(count * 2, 20)
print(f" {severity:>8}: {count:>4} {bar}")
print(f"\n Recent Actions:")
for pe in self.processed[-5:]:
print(f" [{pe.severity:>8}] {pe.event_type}: {pe.action_taken}")
# Consumer Groups
consumer_groups = {
"siem-ingest": "ส่ง Events ไป SIEM (Splunk/Elastic)",
"soar-trigger": "Trigger SOAR Playbooks",
"ml-anomaly": "ML Anomaly Detection",
"compliance-check": "Compliance Monitoring",
"dashboard-update": "Real-time Dashboard Updates",
}
# ตัวอย่าง
consumer = SecurityEventConsumer("siem-ingest")
# Process events
test_events = [
("firewall", "high", "evt-001"),
("access", "info", "evt-002"),
("threat", "critical", "evt-003"),
("dlp", "high", "evt-004"),
("firewall", "low", "evt-005"),
("access", "info", "evt-006"),
("threat", "medium", "evt-007"),
("audit", "info", "evt-008"),
]
for event_type, severity, eid in test_events:
consumer.process_event(event_type, severity, eid)
consumer.dashboard()
print(f"\n Consumer Groups:")
for group, desc in consumer_groups.items():
print(f" {group}: {desc}")
Secure Kafka Configuration
# === Secure Kafka Configuration ===
# 1. server.properties — Kafka Broker Security
# listeners=SASL_SSL://0.0.0.0:9093
# advertised.listeners=SASL_SSL://kafka-1.internal:9093
#
# # SSL Configuration
# ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
# ssl.keystore.password=keystore-password
# ssl.key.password=key-password
# ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
# ssl.truststore.password=truststore-password
# ssl.client.auth=required # mTLS
#
# # SASL Configuration
# sasl.enabled.mechanisms=PLAIN, SCRAM-SHA-256
# sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# security.inter.broker.protocol=SASL_SSL
#
# # ACLs
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# super.users=User:admin
# allow.everyone.if.no.acl.found=false
# 2. ACL Configuration
# kafka-acls.sh --bootstrap-server localhost:9093 \
# --command-config admin.properties \
# --add \
# --allow-principal User:security-producer \
# --operation Write \
# --topic security.firewall
# kafka-acls.sh --bootstrap-server localhost:9093 \
# --command-config admin.properties \
# --add \
# --allow-principal User:siem-consumer \
# --operation Read \
# --topic security.firewall \
# --group siem-ingest
# 3. Docker Compose — Secure Kafka Cluster
# version: '3.8'
# services:
# kafka-1:
# image: confluentinc/cp-kafka:7.5.0
# ports:
# - "9093:9093"
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_LISTENERS: SASL_SSL://0.0.0.0:9093
# KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
# KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
# KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/ssl/kafka.keystore.jks
# KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/ssl/kafka.truststore.jks
# KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
# volumes:
# - ./ssl:/etc/kafka/ssl
# 4. Monitoring
# kafka-consumer-groups.sh --bootstrap-server localhost:9093 \
# --describe --group siem-ingest
# Columns: TOPIC, PARTITION, CURRENT-OFFSET, LOG-END-OFFSET, LAG
security_config = {
"encryption": "TLS 1.3 (in-transit) + AES-256 (at-rest)",
"authentication": "SASL/SCRAM-SHA-256 + mTLS",
"authorization": "Kafka ACLs (topic-level)",
"audit": "All access logged to audit topic",
"retention": "7 days (security events), 90 days (audit)",
"replication": "Factor 3, min.insync.replicas=2",
}
print("Secure Kafka Configuration:")
for key, value in security_config.items():
print(f" {key}: {value}")
Best Practices
- Topic Design: แยก Topic ตาม Event Type (firewall, access, threat) ง่ายต่อ ACL
- Partitioning: ใช้ 12+ Partitions สำหรับ High-volume Topics
- mTLS: ใช้ mTLS สำหรับ Broker-to-Broker และ Client-to-Broker
- ACLs: กำหนด ACL ระดับ Topic ให้ Producer/Consumer เข้าถึงเฉพาะที่จำเป็น
- Retention: ตั้ง Retention ตาม Compliance (7 วัน Events, 90 วัน Audit)
- Consumer Groups: แยก Consumer Group ตาม Use Case (SIEM, SOAR, ML)
Pub/Sub Architecture คืออะไร
Messaging Pattern Publisher ส่ง Messages ไป Topic Subscriber รับจาก Topic Loosely Coupled Scale ง่าย Kafka Google Pub/Sub AWS SNS/SQS RabbitMQ
SASE กับ Pub/Sub เกี่ยวข้องกันอย่างไร
SASE สร้าง Security Events มาก Firewall Access Threat Pub/Sub กระจาย Events ไป SIEM SOAR Analytics Real-time ไม่ Polling Scale ตาม Volume
Apache Kafka เหมาะกับ Security Events หรือไม่
เหมาะมาก High Throughput ล้าน Events/วินาที Durability เก็บ Log ย้อนหลัง Partitioning กระจาย Load Consumer Groups ประมวลผลขนาน SIEM SOAR ML
วิธี Secure Pub/Sub System ทำอย่างไร
TLS เข้ารหัส Messages Authentication SASL mTLS Authorization ACLs Topic-level Encrypt at Rest Audit Logging Network Segmentation แยก Broker Cluster
สรุป
SASE ร่วมกับ Pub/Sub Architecture ให้ Event-driven Security Processing Kafka รองรับ High Throughput Security Events แยก Topics ตาม Event Type ACLs ควบคุมสิทธิ์ mTLS เข้ารหัส Consumer Groups แยกตาม Use Case SIEM SOAR ML Real-time Processing
