SiamCafe.net Blog
Technology

SASE Framework Pub Sub Architecture

sase framework pub sub architecture
SASE Framework Pub Sub Architecture | SiamCafe Blog
2025-06-29· อ. บอม — SiamCafe.net· 9,469 คำ

SASE กับ Pub/Sub Architecture

SASE สร้าง Security Events จำนวนมาก Firewall Logs Access Logs Threat Alerts ใช้ Pub/Sub กระจาย Events ไป SIEM SOAR Analytics แบบ Real-time

Pub/Sub ให้ระบบ Loosely Coupled Publisher ส่ง Messages ไป Topic Subscriber รับเฉพาะ Events ที่สนใจ Scale ตาม Volume

Kafka Security Event Pipeline

# === Kafka Security Event Pipeline ===
# pip install confluent-kafka

import json
import time
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum

class EventSeverity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

@dataclass
class SecurityEvent:
    event_id: str
    event_type: str  # firewall, access, threat, dlp, audit
    severity: str
    source: str
    destination: str
    user: str
    action: str  # allow, block, alert
    details: Dict = field(default_factory=dict)
    timestamp: str = ""

    def __post_init__(self):
        if not self.timestamp:
            self.timestamp = datetime.now().isoformat()

class SecurityEventProducer:
    """Kafka Producer สำหรับ Security Events"""

    def __init__(self, bootstrap_servers="localhost:9092"):
        self.servers = bootstrap_servers
        self.events_sent = 0
        # self.producer = Producer({
        #     'bootstrap.servers': bootstrap_servers,
        #     'security.protocol': 'SASL_SSL',
        #     'sasl.mechanisms': 'PLAIN',
        #     'sasl.username': 'security-producer',
        #     'sasl.password': 'secret',
        # })

    def send_event(self, topic, event: SecurityEvent):
        """ส่ง Event ไป Kafka Topic"""
        # self.producer.produce(
        #     topic,
        #     key=event.event_type,
        #     value=json.dumps(asdict(event)),
        #     callback=self.delivery_report,
        # )
        # self.producer.flush()
        self.events_sent += 1

    def delivery_report(self, err, msg):
        if err:
            print(f"  Delivery failed: {err}")
        else:
            print(f"  Delivered to {msg.topic()} [{msg.partition()}]")

# Kafka Topics สำหรับ Security Events
TOPICS = {
    "security.firewall":    "Firewall allow/block events",
    "security.access":      "User access events (login, logout, MFA)",
    "security.threats":     "Threat detection alerts",
    "security.dlp":         "Data Loss Prevention events",
    "security.audit":       "Audit trail events",
    "security.compliance":  "Compliance check results",
}

# Topic Configuration
# kafka-topics.sh --create \
#   --topic security.firewall \
#   --bootstrap-server localhost:9092 \
#   --partitions 12 \
#   --replication-factor 3 \
#   --config retention.ms=604800000 \
#   --config cleanup.policy=delete \
#   --config min.insync.replicas=2

# ตัวอย่าง
producer = SecurityEventProducer()

events = [
    SecurityEvent("evt-001", "firewall", "high", "198.51.100.1",
                 "internal-db", "unknown", "block",
                 {"rule": "deny-external-db", "port": 3306}),
    SecurityEvent("evt-002", "access", "info", "vpn-gateway",
                 "app-server", "alice@company.com", "allow",
                 {"method": "mfa", "location": "Bangkok"}),
    SecurityEvent("evt-003", "threat", "critical", "endpoint-01",
                 "malware-c2.evil.com", "bob@company.com", "block",
                 {"threat_type": "malware", "signature": "Trojan.Gen"}),
    SecurityEvent("evt-004", "dlp", "high", "alice@company.com",
                 "dropbox.com", "alice@company.com", "block",
                 {"data_type": "credit-card", "file": "report.xlsx"}),
]

print("Security Event Pipeline:")
print(f"  Topics: {len(TOPICS)}")
for topic, desc in TOPICS.items():
    print(f"    {topic}: {desc}")

print(f"\n  Events:")
for event in events:
    producer.send_event(f"security.{event.event_type}", event)
    print(f"    [{event.severity:>8}] {event.event_type}: "
          f"{event.source} -> {event.destination} ({event.action})")
print(f"\n  Total sent: {producer.events_sent}")

Event Consumer และ Processing

# event_consumer.py — Security Event Consumer
from dataclasses import dataclass, field
from typing import List, Dict, Callable
from datetime import datetime
from collections import Counter

@dataclass
class ProcessedEvent:
    event_id: str
    event_type: str
    severity: str
    action_taken: str
    processor: str

class SecurityEventConsumer:
    """Kafka Consumer สำหรับ Security Events"""

    def __init__(self, group_id="security-processor"):
        self.group_id = group_id
        self.processed: List[ProcessedEvent] = []
        self.stats = Counter()
        # self.consumer = Consumer({
        #     'bootstrap.servers': 'localhost:9092',
        #     'group.id': group_id,
        #     'auto.offset.reset': 'earliest',
        #     'security.protocol': 'SASL_SSL',
        # })

    def process_event(self, event_type, severity, event_id):
        """Process Security Event"""
        self.stats[event_type] += 1
        self.stats[severity] += 1

        # Auto-response rules
        action = "logged"
        if severity == "critical":
            action = "auto-blocked + incident-created"
        elif severity == "high":
            action = "alerted + investigated"
        elif severity == "medium":
            action = "alerted"

        self.processed.append(ProcessedEvent(
            event_id, event_type, severity, action, self.group_id))

        return action

    def dashboard(self):
        """Consumer Dashboard"""
        print(f"\n{'='*55}")
        print(f"Security Event Consumer: {self.group_id}")
        print(f"{'='*55}")
        print(f"  Total Processed: {len(self.processed)}")

        print(f"\n  By Type:")
        for event_type in ["firewall", "access", "threat", "dlp", "audit"]:
            count = self.stats.get(event_type, 0)
            if count > 0:
                print(f"    {event_type}: {count}")

        print(f"\n  By Severity:")
        for severity in ["critical", "high", "medium", "low", "info"]:
            count = self.stats.get(severity, 0)
            if count > 0:
                bar = "#" * min(count * 2, 20)
                print(f"    {severity:>8}: {count:>4} {bar}")

        print(f"\n  Recent Actions:")
        for pe in self.processed[-5:]:
            print(f"    [{pe.severity:>8}] {pe.event_type}: {pe.action_taken}")

# Consumer Groups
consumer_groups = {
    "siem-ingest": "ส่ง Events ไป SIEM (Splunk/Elastic)",
    "soar-trigger": "Trigger SOAR Playbooks",
    "ml-anomaly": "ML Anomaly Detection",
    "compliance-check": "Compliance Monitoring",
    "dashboard-update": "Real-time Dashboard Updates",
}

# ตัวอย่าง
consumer = SecurityEventConsumer("siem-ingest")

# Process events
test_events = [
    ("firewall", "high", "evt-001"),
    ("access", "info", "evt-002"),
    ("threat", "critical", "evt-003"),
    ("dlp", "high", "evt-004"),
    ("firewall", "low", "evt-005"),
    ("access", "info", "evt-006"),
    ("threat", "medium", "evt-007"),
    ("audit", "info", "evt-008"),
]

for event_type, severity, eid in test_events:
    consumer.process_event(event_type, severity, eid)

consumer.dashboard()

print(f"\n  Consumer Groups:")
for group, desc in consumer_groups.items():
    print(f"    {group}: {desc}")

Secure Kafka Configuration

# === Secure Kafka Configuration ===

# 1. server.properties — Kafka Broker Security
# listeners=SASL_SSL://0.0.0.0:9093
# advertised.listeners=SASL_SSL://kafka-1.internal:9093
#
# # SSL Configuration
# ssl.keystore.location=/etc/kafka/ssl/kafka.keystore.jks
# ssl.keystore.password=keystore-password
# ssl.key.password=key-password
# ssl.truststore.location=/etc/kafka/ssl/kafka.truststore.jks
# ssl.truststore.password=truststore-password
# ssl.client.auth=required    # mTLS
#
# # SASL Configuration
# sasl.enabled.mechanisms=PLAIN, SCRAM-SHA-256
# sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
# security.inter.broker.protocol=SASL_SSL
#
# # ACLs
# authorizer.class.name=kafka.security.authorizer.AclAuthorizer
# super.users=User:admin
# allow.everyone.if.no.acl.found=false

# 2. ACL Configuration
# kafka-acls.sh --bootstrap-server localhost:9093 \
#   --command-config admin.properties \
#   --add \
#   --allow-principal User:security-producer \
#   --operation Write \
#   --topic security.firewall

# kafka-acls.sh --bootstrap-server localhost:9093 \
#   --command-config admin.properties \
#   --add \
#   --allow-principal User:siem-consumer \
#   --operation Read \
#   --topic security.firewall \
#   --group siem-ingest

# 3. Docker Compose — Secure Kafka Cluster
# version: '3.8'
# services:
#   kafka-1:
#     image: confluentinc/cp-kafka:7.5.0
#     ports:
#       - "9093:9093"
#     environment:
#       KAFKA_BROKER_ID: 1
#       KAFKA_LISTENERS: SASL_SSL://0.0.0.0:9093
#       KAFKA_SECURITY_INTER_BROKER_PROTOCOL: SASL_SSL
#       KAFKA_SASL_ENABLED_MECHANISMS: SCRAM-SHA-256
#       KAFKA_SSL_KEYSTORE_LOCATION: /etc/kafka/ssl/kafka.keystore.jks
#       KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/ssl/kafka.truststore.jks
#       KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
#     volumes:
#       - ./ssl:/etc/kafka/ssl

# 4. Monitoring
# kafka-consumer-groups.sh --bootstrap-server localhost:9093 \
#   --describe --group siem-ingest
# Columns: TOPIC, PARTITION, CURRENT-OFFSET, LOG-END-OFFSET, LAG

security_config = {
    "encryption": "TLS 1.3 (in-transit) + AES-256 (at-rest)",
    "authentication": "SASL/SCRAM-SHA-256 + mTLS",
    "authorization": "Kafka ACLs (topic-level)",
    "audit": "All access logged to audit topic",
    "retention": "7 days (security events), 90 days (audit)",
    "replication": "Factor 3, min.insync.replicas=2",
}

print("Secure Kafka Configuration:")
for key, value in security_config.items():
    print(f"  {key}: {value}")

Best Practices

Pub/Sub Architecture คืออะไร

Messaging Pattern Publisher ส่ง Messages ไป Topic Subscriber รับจาก Topic Loosely Coupled Scale ง่าย Kafka Google Pub/Sub AWS SNS/SQS RabbitMQ

SASE กับ Pub/Sub เกี่ยวข้องกันอย่างไร

SASE สร้าง Security Events มาก Firewall Access Threat Pub/Sub กระจาย Events ไป SIEM SOAR Analytics Real-time ไม่ Polling Scale ตาม Volume

Apache Kafka เหมาะกับ Security Events หรือไม่

เหมาะมาก High Throughput ล้าน Events/วินาที Durability เก็บ Log ย้อนหลัง Partitioning กระจาย Load Consumer Groups ประมวลผลขนาน SIEM SOAR ML

วิธี Secure Pub/Sub System ทำอย่างไร

TLS เข้ารหัส Messages Authentication SASL mTLS Authorization ACLs Topic-level Encrypt at Rest Audit Logging Network Segmentation แยก Broker Cluster

สรุป

SASE ร่วมกับ Pub/Sub Architecture ให้ Event-driven Security Processing Kafka รองรับ High Throughput Security Events แยก Topics ตาม Event Type ACLs ควบคุมสิทธิ์ mTLS เข้ารหัส Consumer Groups แยกตาม Use Case SIEM SOAR ML Real-time Processing

📖 บทความที่เกี่ยวข้อง

SASE Framework Remote Work Setupอ่านบทความ → MySQL Replication Pub Sub Architectureอ่านบทความ → Redis Pub Sub SSL TLS Certificateอ่านบทความ → Redis Pub Sub Troubleshooting แก้ปัญหาอ่านบทความ → Radix UI Primitives Pub Sub Architectureอ่านบทความ →

📚 ดูบทความทั้งหมด →