SASE Framework Event Driven Design คืออะไร
SASE (Secure Access Service Edge) เป็น cloud-native security framework ที่รวม network security services เข้าด้วยกัน ได้แก่ SD-WAN, ZTNA, CASB, FWaaS และ SWG Event-Driven Architecture (EDA) คือรูปแบบการออกแบบ software ที่ใช้ events เป็นตัวขับเคลื่อนการทำงาน แทนที่จะเรียก services โดยตรง (synchronous) จะใช้ event producers ส่ง events ไปยัง event brokers แล้ว consumers รับไปประมวลผล (asynchronous) การรวม SASE กับ Event-Driven Design ช่วยให้ระบบ security ตอบสนองต่อ threats แบบ real-time สร้าง automated security workflows และ scale ได้ดี
Event-Driven Architecture Fundamentals
# eda_basics.py — Event-Driven Architecture fundamentals
import json
class EDABasics:
COMPONENTS = {
"producer": {
"name": "Event Producer",
"description": "สร้างและส่ง events — เช่น user login, file access, network request",
"sase_example": "ZTNA: user authentication event, CASB: file upload event",
},
"broker": {
"name": "Event Broker (Message Queue)",
"description": "รับ events จาก producers → ส่งต่อให้ consumers — decouple producers/consumers",
"tools": "Apache Kafka, RabbitMQ, AWS EventBridge, Azure Event Hub",
},
"consumer": {
"name": "Event Consumer",
"description": "รับ events → ประมวลผล → ตอบสนอง (alert, block, log)",
"sase_example": "Security analyzer: ตรวจจับ anomaly, Incident responder: block IP",
},
"event_store": {
"name": "Event Store",
"description": "เก็บ events ทั้งหมดเป็น immutable log — audit trail, replay",
"sase_example": "Security event log: ทุก access event เก็บ 90+ วัน สำหรับ compliance",
},
}
PATTERNS = {
"pub_sub": {
"name": "Pub/Sub (Publish-Subscribe)",
"description": "Publisher ส่ง event ไป topic → ทุก subscriber ที่ subscribe topic ได้รับ",
"use_case": "Security alert → ส่งไปทุก consumer: SIEM, Slack, PagerDuty พร้อมกัน",
},
"event_sourcing": {
"name": "Event Sourcing",
"description": "เก็บทุก state change เป็น events — reconstruct state จาก event history",
"use_case": "Security posture: replay events เพื่อดู security state ณ เวลาใดก็ได้",
},
"cqrs": {
"name": "CQRS (Command Query Responsibility Segregation)",
"description": "แยก write (commands) กับ read (queries) — optimize แต่ละด้านแยกกัน",
"use_case": "Write: ingest security events เร็ว, Read: query analytics/dashboard เร็ว",
},
"saga": {
"name": "Saga Pattern",
"description": "Orchestrate distributed transactions ผ่าน events — compensating actions",
"use_case": "Incident response: detect → investigate → contain → remediate (each step = event)",
},
}
def show_components(self):
print("=== EDA Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}]")
print(f" {comp['description']}")
print()
def show_patterns(self):
print("=== EDA Patterns ===")
for key, p in self.PATTERNS.items():
print(f"\n[{p['name']}]")
print(f" {p['description']}")
print(f" Use: {p['use_case']}")
eda = EDABasics()
eda.show_components()
eda.show_patterns()
SASE Event-Driven Security
# sase_events.py — SASE event-driven security architecture
import json
class SASEEventDriven:
EVENT_SOURCES = {
"ztna": {
"events": ["user_login", "user_logout", "access_denied", "mfa_challenge", "device_posture_check"],
"volume": "High — ทุก authentication event",
},
"casb": {
"events": ["file_upload", "file_download", "sharing_external", "dlp_violation", "shadow_it_detected"],
"volume": "Medium-High — ทุก cloud app activity",
},
"swg": {
"events": ["url_blocked", "malware_detected", "ssl_inspection", "category_violation"],
"volume": "Very High — ทุก web request",
},
"fwaas": {
"events": ["connection_blocked", "port_scan_detected", "ddos_attempt", "rule_matched"],
"volume": "High — ทุก network connection",
},
}
WORKFLOWS = {
"threat_detection": {
"name": "Real-time Threat Detection",
"flow": "SWG event (malware) → Kafka → Threat Analyzer → Alert + Block IP → SIEM log",
"latency": "< 1 second",
},
"data_leak_prevention": {
"name": "Data Leak Prevention",
"flow": "CASB event (file upload) → DLP Scanner → Match PII → Block + Alert → Audit log",
"latency": "< 2 seconds",
},
"anomaly_response": {
"name": "Anomaly Detection & Response",
"flow": "ZTNA events (login patterns) → ML Analyzer → Anomaly detected → MFA challenge → Log",
"latency": "< 5 seconds",
},
"compliance_audit": {
"name": "Compliance Audit Trail",
"flow": "All events → Event Store (Kafka) → Long-term storage (S3) → Query engine → Dashboard",
"retention": "90+ days",
},
}
def show_sources(self):
print("=== SASE Event Sources ===\n")
for source, info in self.EVENT_SOURCES.items():
print(f"[{source.upper()}] Volume: {info['volume']}")
for event in info['events'][:3]:
print(f" • {event}")
print()
def show_workflows(self):
print("=== Security Workflows ===")
for key, wf in self.WORKFLOWS.items():
print(f"\n[{wf['name']}]")
print(f" Flow: {wf['flow']}")
sase = SASEEventDriven()
sase.show_sources()
sase.show_workflows()
Python Event Processing
# event_processor.py — Python event processing for SASE
import json
class EventProcessor:
CODE = """
# sase_event_processor.py — Process SASE security events
import json
import time
from datetime import datetime
from collections import defaultdict
from dataclasses import dataclass, asdict
from typing import List, Optional, Callable
@dataclass
class SecurityEvent:
event_id: str
source: str # ztna, casb, swg, fwaas
event_type: str # login, file_upload, url_blocked, etc.
timestamp: str
user_id: Optional[str] = None
ip_address: Optional[str] = None
resource: Optional[str] = None
action: str = '' # allow, block, alert
severity: str = 'info' # info, warning, critical
details: dict = None
class EventBus:
'''Simple in-process event bus'''
def __init__(self):
self.subscribers = defaultdict(list)
self.event_store = []
def subscribe(self, event_type: str, handler: Callable):
self.subscribers[event_type].append(handler)
def publish(self, event: SecurityEvent):
self.event_store.append(asdict(event))
# Notify specific subscribers
for handler in self.subscribers.get(event.event_type, []):
handler(event)
# Notify wildcard subscribers
for handler in self.subscribers.get('*', []):
handler(event)
def query(self, source=None, event_type=None, severity=None, limit=100):
results = self.event_store
if source:
results = [e for e in results if e['source'] == source]
if event_type:
results = [e for e in results if e['event_type'] == event_type]
if severity:
results = [e for e in results if e['severity'] == severity]
return results[-limit:]
class ThreatDetector:
'''Detect threats from event stream'''
def __init__(self):
self.login_attempts = defaultdict(list)
self.alerts = []
def on_event(self, event: SecurityEvent):
if event.event_type == 'access_denied':
self._check_brute_force(event)
elif event.event_type == 'dlp_violation':
self._alert_data_leak(event)
elif event.event_type == 'malware_detected':
self._block_and_alert(event)
def _check_brute_force(self, event):
key = event.ip_address or event.user_id
self.login_attempts[key].append(event.timestamp)
# Keep last 5 minutes
recent = [t for t in self.login_attempts[key]
if t > (datetime.utcnow().isoformat()[:-7])]
self.login_attempts[key] = recent
if len(recent) >= 5:
self.alerts.append({
'type': 'brute_force',
'severity': 'critical',
'source': key,
'attempts': len(recent),
'action': 'block_ip',
})
def _alert_data_leak(self, event):
self.alerts.append({
'type': 'data_leak',
'severity': 'critical',
'user': event.user_id,
'resource': event.resource,
'action': 'block_and_notify',
})
def _block_and_alert(self, event):
self.alerts.append({
'type': 'malware',
'severity': 'critical',
'ip': event.ip_address,
'resource': event.resource,
'action': 'quarantine',
})
# bus = EventBus()
# detector = ThreatDetector()
# bus.subscribe('*', detector.on_event)
# bus.publish(SecurityEvent(
# event_id='evt_001', source='ztna', event_type='access_denied',
# timestamp=datetime.utcnow().isoformat(), user_id='user123',
# ip_address='1.2.3.4', severity='warning',
# ))
"""
def show_code(self):
print("=== Event Processor ===")
print(self.CODE[:600])
processor = EventProcessor()
processor.show_code()
Kafka Integration
# kafka.py — Apache Kafka for SASE events
import json
class KafkaSetup:
TOPICS = {
"sase.ztna.events": "Authentication and access events",
"sase.casb.events": "Cloud application security events",
"sase.swg.events": "Web gateway events",
"sase.fwaas.events": "Firewall events",
"sase.alerts": "Processed alerts (from analyzers)",
"sase.audit": "Compliance audit trail",
}
ARCHITECTURE = """
# docker-compose.yaml — Kafka for SASE events
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.6.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker, controller
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092, CONTROLLER://0.0.0.0:9093
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093
KAFKA_LOG_RETENTION_HOURS: 168 # 7 days
KAFKA_NUM_PARTITIONS: 6
volumes:
- kafka-data:/var/lib/kafka/data
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
event-processor:
build: ./processor
environment:
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
CONSUMER_GROUP: sase-processor
depends_on:
- kafka
volumes:
kafka-data:
"""
def show_topics(self):
print("=== Kafka Topics ===\n")
for topic, desc in self.TOPICS.items():
print(f" [{topic}] {desc}")
def show_architecture(self):
print("\n=== Docker Compose ===")
print(self.ARCHITECTURE[:400])
kafka = KafkaSetup()
kafka.show_topics()
kafka.show_architecture()
Monitoring & Dashboards
# monitoring.py — Event-driven monitoring
import json
class EventMonitoring:
METRICS = {
"throughput": "Events per second by source (ztna, casb, swg, fwaas)",
"latency": "Event processing latency (P50, P95, P99)",
"consumer_lag": "Kafka consumer lag — events waiting to be processed",
"alert_rate": "Alerts generated per minute by severity",
"false_positive": "False positive rate for threat detection",
}
DASHBOARDS = {
"real_time": {
"name": "Real-time Security Events",
"panels": ["Event stream (live)", "Events by source", "Top blocked IPs", "Active alerts"],
},
"analytics": {
"name": "Security Analytics",
"panels": ["Threat trends (7 days)", "Top attack vectors", "User risk scores", "Geo-distribution"],
},
"operations": {
"name": "Event Pipeline Operations",
"panels": ["Kafka throughput", "Consumer lag", "Processing errors", "Pipeline health"],
},
}
def show_metrics(self):
print("=== Key Metrics ===\n")
for name, desc in self.METRICS.items():
print(f" [{name}] {desc}")
def show_dashboards(self):
print(f"\n=== Dashboards ===")
for key, dash in self.DASHBOARDS.items():
print(f"\n[{dash['name']}]")
for panel in dash['panels'][:3]:
print(f" • {panel}")
monitoring = EventMonitoring()
monitoring.show_metrics()
monitoring.show_dashboards()
FAQ - คำถามที่พบบ่อย
Q: Event-Driven Architecture ดีกว่า REST API อย่างไร?
A: EDA: asynchronous, loosely coupled, scalable, real-time — เหมาะกับ security events ที่ volume สูง REST: synchronous, simple, request-response — เหมาะกับ CRUD operations เลือก EDA เมื่อ: high volume events, real-time processing, multiple consumers เลือก REST เมื่อ: simple queries, low volume, request-response pattern ใช้ร่วมกัน: EDA สำหรับ event processing + REST สำหรับ management APIs
Q: Kafka กับ RabbitMQ อันไหนดีกว่าสำหรับ security events?
A: Kafka: high throughput (millions events/sec), durable storage, replay ได้, event sourcing RabbitMQ: lower latency, flexible routing, simpler setup, traditional messaging เลือก Kafka: security events volume สูง, ต้อง retain events, multiple consumers, analytics เลือก RabbitMQ: volume ต่ำ-กลาง, complex routing, task queues สำหรับ SASE: Kafka เหมาะกว่า — volume สูง + ต้อง audit trail + replay
Q: Event-Driven Security ต่างจาก SIEM อย่างไร?
A: SIEM: collect + store + analyze logs — batch processing, retrospective analysis EDA Security: real-time event processing — detect + respond ทันที ใช้ร่วมกัน: EDA process events real-time → ส่ง enriched alerts ไป SIEM สำหรับ storage + compliance SIEM เก่า: log-based, batch, slow; Modern SIEM (Elastic, Splunk): รองรับ streaming ดีขึ้น EDA เพิ่ม: real-time response (block, alert), event correlation, automated workflows
Q: ต้อง handle events กี่ events ต่อวินาที?
A: ขึ้นกับขนาดองค์กร: Small (100 users): 100-1,000 events/sec Medium (1,000 users): 1,000-10,000 events/sec Large (10,000+ users): 10,000-100,000+ events/sec Kafka รองรับ: millions events/sec (partitioned, replicated) สำคัญ: ออกแบบ partitioning strategy ดี — partition by source, user_id, หรือ event_type
