SiamCafe.net Blog
Technology

SASE Framework Event Driven Design

sase framework event driven design
SASE Framework Event Driven Design | SiamCafe Blog
2025-11-12· อ. บอม — SiamCafe.net· 1,566 คำ

SASE Framework Event Driven Design คืออะไร

SASE (Secure Access Service Edge) เป็น cloud-native security framework ที่รวม network security services เข้าด้วยกัน ได้แก่ SD-WAN, ZTNA, CASB, FWaaS และ SWG Event-Driven Architecture (EDA) คือรูปแบบการออกแบบ software ที่ใช้ events เป็นตัวขับเคลื่อนการทำงาน แทนที่จะเรียก services โดยตรง (synchronous) จะใช้ event producers ส่ง events ไปยัง event brokers แล้ว consumers รับไปประมวลผล (asynchronous) การรวม SASE กับ Event-Driven Design ช่วยให้ระบบ security ตอบสนองต่อ threats แบบ real-time สร้าง automated security workflows และ scale ได้ดี

Event-Driven Architecture Fundamentals

# eda_basics.py — Event-Driven Architecture fundamentals
import json

class EDABasics:
    COMPONENTS = {
        "producer": {
            "name": "Event Producer",
            "description": "สร้างและส่ง events — เช่น user login, file access, network request",
            "sase_example": "ZTNA: user authentication event, CASB: file upload event",
        },
        "broker": {
            "name": "Event Broker (Message Queue)",
            "description": "รับ events จาก producers → ส่งต่อให้ consumers — decouple producers/consumers",
            "tools": "Apache Kafka, RabbitMQ, AWS EventBridge, Azure Event Hub",
        },
        "consumer": {
            "name": "Event Consumer",
            "description": "รับ events → ประมวลผล → ตอบสนอง (alert, block, log)",
            "sase_example": "Security analyzer: ตรวจจับ anomaly, Incident responder: block IP",
        },
        "event_store": {
            "name": "Event Store",
            "description": "เก็บ events ทั้งหมดเป็น immutable log — audit trail, replay",
            "sase_example": "Security event log: ทุก access event เก็บ 90+ วัน สำหรับ compliance",
        },
    }

    PATTERNS = {
        "pub_sub": {
            "name": "Pub/Sub (Publish-Subscribe)",
            "description": "Publisher ส่ง event ไป topic → ทุก subscriber ที่ subscribe topic ได้รับ",
            "use_case": "Security alert → ส่งไปทุก consumer: SIEM, Slack, PagerDuty พร้อมกัน",
        },
        "event_sourcing": {
            "name": "Event Sourcing",
            "description": "เก็บทุก state change เป็น events — reconstruct state จาก event history",
            "use_case": "Security posture: replay events เพื่อดู security state ณ เวลาใดก็ได้",
        },
        "cqrs": {
            "name": "CQRS (Command Query Responsibility Segregation)",
            "description": "แยก write (commands) กับ read (queries) — optimize แต่ละด้านแยกกัน",
            "use_case": "Write: ingest security events เร็ว, Read: query analytics/dashboard เร็ว",
        },
        "saga": {
            "name": "Saga Pattern",
            "description": "Orchestrate distributed transactions ผ่าน events — compensating actions",
            "use_case": "Incident response: detect → investigate → contain → remediate (each step = event)",
        },
    }

    def show_components(self):
        print("=== EDA Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}]")
            print(f"  {comp['description']}")
            print()

    def show_patterns(self):
        print("=== EDA Patterns ===")
        for key, p in self.PATTERNS.items():
            print(f"\n[{p['name']}]")
            print(f"  {p['description']}")
            print(f"  Use: {p['use_case']}")

eda = EDABasics()
eda.show_components()
eda.show_patterns()

SASE Event-Driven Security

# sase_events.py — SASE event-driven security architecture
import json

class SASEEventDriven:
    EVENT_SOURCES = {
        "ztna": {
            "events": ["user_login", "user_logout", "access_denied", "mfa_challenge", "device_posture_check"],
            "volume": "High — ทุก authentication event",
        },
        "casb": {
            "events": ["file_upload", "file_download", "sharing_external", "dlp_violation", "shadow_it_detected"],
            "volume": "Medium-High — ทุก cloud app activity",
        },
        "swg": {
            "events": ["url_blocked", "malware_detected", "ssl_inspection", "category_violation"],
            "volume": "Very High — ทุก web request",
        },
        "fwaas": {
            "events": ["connection_blocked", "port_scan_detected", "ddos_attempt", "rule_matched"],
            "volume": "High — ทุก network connection",
        },
    }

    WORKFLOWS = {
        "threat_detection": {
            "name": "Real-time Threat Detection",
            "flow": "SWG event (malware) → Kafka → Threat Analyzer → Alert + Block IP → SIEM log",
            "latency": "< 1 second",
        },
        "data_leak_prevention": {
            "name": "Data Leak Prevention",
            "flow": "CASB event (file upload) → DLP Scanner → Match PII → Block + Alert → Audit log",
            "latency": "< 2 seconds",
        },
        "anomaly_response": {
            "name": "Anomaly Detection & Response",
            "flow": "ZTNA events (login patterns) → ML Analyzer → Anomaly detected → MFA challenge → Log",
            "latency": "< 5 seconds",
        },
        "compliance_audit": {
            "name": "Compliance Audit Trail",
            "flow": "All events → Event Store (Kafka) → Long-term storage (S3) → Query engine → Dashboard",
            "retention": "90+ days",
        },
    }

    def show_sources(self):
        print("=== SASE Event Sources ===\n")
        for source, info in self.EVENT_SOURCES.items():
            print(f"[{source.upper()}] Volume: {info['volume']}")
            for event in info['events'][:3]:
                print(f"  • {event}")
            print()

    def show_workflows(self):
        print("=== Security Workflows ===")
        for key, wf in self.WORKFLOWS.items():
            print(f"\n[{wf['name']}]")
            print(f"  Flow: {wf['flow']}")

sase = SASEEventDriven()
sase.show_sources()
sase.show_workflows()

Python Event Processing

# event_processor.py — Python event processing for SASE
import json

class EventProcessor:
    CODE = """
# sase_event_processor.py — Process SASE security events
import json
import time
from datetime import datetime
from collections import defaultdict
from dataclasses import dataclass, asdict
from typing import List, Optional, Callable

@dataclass
class SecurityEvent:
    event_id: str
    source: str       # ztna, casb, swg, fwaas
    event_type: str   # login, file_upload, url_blocked, etc.
    timestamp: str
    user_id: Optional[str] = None
    ip_address: Optional[str] = None
    resource: Optional[str] = None
    action: str = ''  # allow, block, alert
    severity: str = 'info'  # info, warning, critical
    details: dict = None

class EventBus:
    '''Simple in-process event bus'''
    def __init__(self):
        self.subscribers = defaultdict(list)
        self.event_store = []
    
    def subscribe(self, event_type: str, handler: Callable):
        self.subscribers[event_type].append(handler)
    
    def publish(self, event: SecurityEvent):
        self.event_store.append(asdict(event))
        
        # Notify specific subscribers
        for handler in self.subscribers.get(event.event_type, []):
            handler(event)
        
        # Notify wildcard subscribers
        for handler in self.subscribers.get('*', []):
            handler(event)
    
    def query(self, source=None, event_type=None, severity=None, limit=100):
        results = self.event_store
        if source:
            results = [e for e in results if e['source'] == source]
        if event_type:
            results = [e for e in results if e['event_type'] == event_type]
        if severity:
            results = [e for e in results if e['severity'] == severity]
        return results[-limit:]

class ThreatDetector:
    '''Detect threats from event stream'''
    def __init__(self):
        self.login_attempts = defaultdict(list)
        self.alerts = []
    
    def on_event(self, event: SecurityEvent):
        if event.event_type == 'access_denied':
            self._check_brute_force(event)
        elif event.event_type == 'dlp_violation':
            self._alert_data_leak(event)
        elif event.event_type == 'malware_detected':
            self._block_and_alert(event)
    
    def _check_brute_force(self, event):
        key = event.ip_address or event.user_id
        self.login_attempts[key].append(event.timestamp)
        
        # Keep last 5 minutes
        recent = [t for t in self.login_attempts[key]
                  if t > (datetime.utcnow().isoformat()[:-7])]
        self.login_attempts[key] = recent
        
        if len(recent) >= 5:
            self.alerts.append({
                'type': 'brute_force',
                'severity': 'critical',
                'source': key,
                'attempts': len(recent),
                'action': 'block_ip',
            })
    
    def _alert_data_leak(self, event):
        self.alerts.append({
            'type': 'data_leak',
            'severity': 'critical',
            'user': event.user_id,
            'resource': event.resource,
            'action': 'block_and_notify',
        })
    
    def _block_and_alert(self, event):
        self.alerts.append({
            'type': 'malware',
            'severity': 'critical',
            'ip': event.ip_address,
            'resource': event.resource,
            'action': 'quarantine',
        })

# bus = EventBus()
# detector = ThreatDetector()
# bus.subscribe('*', detector.on_event)
# bus.publish(SecurityEvent(
#     event_id='evt_001', source='ztna', event_type='access_denied',
#     timestamp=datetime.utcnow().isoformat(), user_id='user123',
#     ip_address='1.2.3.4', severity='warning',
# ))
"""

    def show_code(self):
        print("=== Event Processor ===")
        print(self.CODE[:600])

processor = EventProcessor()
processor.show_code()

Kafka Integration

# kafka.py — Apache Kafka for SASE events
import json

class KafkaSetup:
    TOPICS = {
        "sase.ztna.events": "Authentication and access events",
        "sase.casb.events": "Cloud application security events",
        "sase.swg.events": "Web gateway events",
        "sase.fwaas.events": "Firewall events",
        "sase.alerts": "Processed alerts (from analyzers)",
        "sase.audit": "Compliance audit trail",
    }

    ARCHITECTURE = """
# docker-compose.yaml — Kafka for SASE events
version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.6.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker, controller
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092, CONTROLLER://0.0.0.0:9093
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
      KAFKA_LOG_RETENTION_HOURS: 168  # 7 days
      KAFKA_NUM_PARTITIONS: 6
    volumes:
      - kafka-data:/var/lib/kafka/data

  schema-registry:
    image: confluentinc/cp-schema-registry:7.6.0
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092

  event-processor:
    build: ./processor
    environment:
      KAFKA_BOOTSTRAP_SERVERS: kafka:9092
      CONSUMER_GROUP: sase-processor
    depends_on:
      - kafka

volumes:
  kafka-data:
"""

    def show_topics(self):
        print("=== Kafka Topics ===\n")
        for topic, desc in self.TOPICS.items():
            print(f"  [{topic}] {desc}")

    def show_architecture(self):
        print("\n=== Docker Compose ===")
        print(self.ARCHITECTURE[:400])

kafka = KafkaSetup()
kafka.show_topics()
kafka.show_architecture()

Monitoring & Dashboards

# monitoring.py — Event-driven monitoring
import json

class EventMonitoring:
    METRICS = {
        "throughput": "Events per second by source (ztna, casb, swg, fwaas)",
        "latency": "Event processing latency (P50, P95, P99)",
        "consumer_lag": "Kafka consumer lag — events waiting to be processed",
        "alert_rate": "Alerts generated per minute by severity",
        "false_positive": "False positive rate for threat detection",
    }

    DASHBOARDS = {
        "real_time": {
            "name": "Real-time Security Events",
            "panels": ["Event stream (live)", "Events by source", "Top blocked IPs", "Active alerts"],
        },
        "analytics": {
            "name": "Security Analytics",
            "panels": ["Threat trends (7 days)", "Top attack vectors", "User risk scores", "Geo-distribution"],
        },
        "operations": {
            "name": "Event Pipeline Operations",
            "panels": ["Kafka throughput", "Consumer lag", "Processing errors", "Pipeline health"],
        },
    }

    def show_metrics(self):
        print("=== Key Metrics ===\n")
        for name, desc in self.METRICS.items():
            print(f"  [{name}] {desc}")

    def show_dashboards(self):
        print(f"\n=== Dashboards ===")
        for key, dash in self.DASHBOARDS.items():
            print(f"\n[{dash['name']}]")
            for panel in dash['panels'][:3]:
                print(f"  • {panel}")

monitoring = EventMonitoring()
monitoring.show_metrics()
monitoring.show_dashboards()

FAQ - คำถามที่พบบ่อย

Q: Event-Driven Architecture ดีกว่า REST API อย่างไร?

A: EDA: asynchronous, loosely coupled, scalable, real-time — เหมาะกับ security events ที่ volume สูง REST: synchronous, simple, request-response — เหมาะกับ CRUD operations เลือก EDA เมื่อ: high volume events, real-time processing, multiple consumers เลือก REST เมื่อ: simple queries, low volume, request-response pattern ใช้ร่วมกัน: EDA สำหรับ event processing + REST สำหรับ management APIs

Q: Kafka กับ RabbitMQ อันไหนดีกว่าสำหรับ security events?

A: Kafka: high throughput (millions events/sec), durable storage, replay ได้, event sourcing RabbitMQ: lower latency, flexible routing, simpler setup, traditional messaging เลือก Kafka: security events volume สูง, ต้อง retain events, multiple consumers, analytics เลือก RabbitMQ: volume ต่ำ-กลาง, complex routing, task queues สำหรับ SASE: Kafka เหมาะกว่า — volume สูง + ต้อง audit trail + replay

Q: Event-Driven Security ต่างจาก SIEM อย่างไร?

A: SIEM: collect + store + analyze logs — batch processing, retrospective analysis EDA Security: real-time event processing — detect + respond ทันที ใช้ร่วมกัน: EDA process events real-time → ส่ง enriched alerts ไป SIEM สำหรับ storage + compliance SIEM เก่า: log-based, batch, slow; Modern SIEM (Elastic, Splunk): รองรับ streaming ดีขึ้น EDA เพิ่ม: real-time response (block, alert), event correlation, automated workflows

Q: ต้อง handle events กี่ events ต่อวินาที?

A: ขึ้นกับขนาดองค์กร: Small (100 users): 100-1,000 events/sec Medium (1,000 users): 1,000-10,000 events/sec Large (10,000+ users): 10,000-100,000+ events/sec Kafka รองรับ: millions events/sec (partitioned, replicated) สำคัญ: ออกแบบ partitioning strategy ดี — partition by source, user_id, หรือ event_type

📖 บทความที่เกี่ยวข้อง

Data Lakehouse Event Driven Designอ่านบทความ → HTTP/3 QUIC Event Driven Designอ่านบทความ → Python Pydantic Event Driven Designอ่านบทความ → oVirt Virtualization Event Driven Designอ่านบทความ → SASE Framework Backup Recovery Strategyอ่านบทความ →

📚 ดูบทความทั้งหมด →