SSE Security Batch Processing Pipeline คืออะไร
Server-Sent Events (SSE) เป็น web technology สำหรับส่งข้อมูลจาก server ไป client แบบ one-way real-time ผ่าน HTTP connection เดียว SSE Security หมายถึงการรักษาความปลอดภัยของ SSE connections รวมถึง authentication, authorization, data validation และ rate limiting Batch Processing Pipeline คือระบบประมวลผลข้อมูลเป็น batch สำหรับ security events การรวมสามแนวคิดนี้ช่วยสร้างระบบ security monitoring ที่รับ events แบบ real-time ผ่าน SSE แล้วประมวลผลเป็น batch สำหรับ threat detection, log analysis และ compliance reporting
SSE Fundamentals & Security
# sse_basics.py — SSE fundamentals with security
import json
class SSEFundamentals:
SSE_OVERVIEW = {
"protocol": "HTTP-based, text/event-stream content type",
"direction": "Server → Client (one-way)",
"reconnect": "Auto-reconnect built-in (browser handles)",
"format": "data: {json}\\n\\n (double newline separator)",
"vs_websocket": "SSE: simpler, HTTP-native, one-way | WebSocket: bidirectional, complex",
}
SECURITY_CONCERNS = {
"authentication": {
"name": "Authentication",
"risk": "Unauthorized access to SSE stream",
"mitigation": "JWT/Bearer token in URL param or EventSource polyfill with headers",
},
"authorization": {
"name": "Authorization",
"risk": "User sees events they shouldn't (cross-tenant leak)",
"mitigation": "Server-side filtering — only send events user is authorized to see",
},
"injection": {
"name": "Event Injection",
"risk": "Malicious data injected into SSE stream",
"mitigation": "Server-side input validation, output encoding, CSP headers",
},
"dos": {
"name": "DoS via Connection Exhaustion",
"risk": "Too many SSE connections exhaust server resources",
"mitigation": "Connection limits per user, rate limiting, connection pooling",
},
"data_exposure": {
"name": "Sensitive Data Exposure",
"risk": "PII or secrets leaked through SSE events",
"mitigation": "Data classification, field-level encryption, masking",
},
}
def show_overview(self):
print("=== SSE Overview ===\n")
for key, value in self.SSE_OVERVIEW.items():
print(f" [{key}] {value}")
def show_security(self):
print(f"\n=== Security Concerns ===")
for key, sec in self.SECURITY_CONCERNS.items():
print(f"\n[{sec['name']}]")
print(f" Risk: {sec['risk']}")
print(f" Fix: {sec['mitigation']}")
sse = SSEFundamentals()
sse.show_overview()
sse.show_security()
Secure SSE Server Implementation
# sse_server.py — Secure SSE server with FastAPI
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import asyncio
import json
import jwt
import time
app = FastAPI(title="Secure SSE Security Events")
security = HTTPBearer()
SECRET_KEY = "your-secret-key"
MAX_CONNECTIONS_PER_USER = 5
active_connections = {}
async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
try:
payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
return {
"user_id": payload["sub"],
"tenant_id": payload["tenant_id"],
"roles": payload.get("roles", []),
}
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
def check_connection_limit(user_id: str):
count = active_connections.get(user_id, 0)
if count >= MAX_CONNECTIONS_PER_USER:
raise HTTPException(429, f"Max {MAX_CONNECTIONS_PER_USER} SSE connections per user")
async def security_event_generator(user: dict):
user_id = user["user_id"]
tenant_id = user["tenant_id"]
active_connections[user_id] = active_connections.get(user_id, 0) + 1
try:
while True:
# Fetch events for this tenant only (authorization)
events = await fetch_tenant_events(tenant_id)
for event in events:
# Filter by user's roles
if event.get("min_role") and event["min_role"] not in user["roles"]:
continue
# Mask sensitive fields
sanitized = mask_sensitive_data(event)
data = json.dumps(sanitized, ensure_ascii=False)
yield f"event: security_event\ndata: {data}\n\n"
# Heartbeat every 30 seconds
yield f": heartbeat {int(time.time())}\n\n"
await asyncio.sleep(5)
finally:
active_connections[user_id] = max(0, active_connections.get(user_id, 1) - 1)
def mask_sensitive_data(event):
masked = event.copy()
sensitive_fields = ["source_ip", "user_email", "api_key"]
for field in sensitive_fields:
if field in masked:
value = str(masked[field])
if "@" in value:
parts = value.split("@")
masked[field] = f"{parts[0][:3]}***@{parts[1]}"
elif "." in value and len(value.split(".")) == 4:
parts = value.split(".")
masked[field] = f"{parts[0]}.{parts[1]}.xxx.xxx"
else:
masked[field] = value[:4] + "***"
return masked
async def fetch_tenant_events(tenant_id):
# Simulated — replace with actual event store query
return [
{"type": "login_attempt", "status": "failed", "source_ip": "10.0.1.50", "timestamp": time.time()},
{"type": "api_access", "endpoint": "/admin", "status": "blocked", "timestamp": time.time()},
]
@app.get("/events/security")
async def security_stream(user: dict = Depends(verify_token)):
check_connection_limit(user["user_id"])
return StreamingResponse(
security_event_generator(user),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive",
},
)
Batch Processing Pipeline
# batch_pipeline.py — Security event batch processing
import json
import time
from collections import defaultdict
class SecurityBatchPipeline:
CODE = """
# batch_processor.py — Batch process security events
import json
from datetime import datetime, timedelta
class SecurityEventProcessor:
def __init__(self, batch_size=1000, flush_interval=60):
self.batch_size = batch_size
self.flush_interval = flush_interval
self.buffer = []
self.last_flush = time.time()
self.stats = defaultdict(int)
def ingest(self, event):
'''Add event to buffer'''
enriched = self.enrich(event)
self.buffer.append(enriched)
self.stats[event.get("type", "unknown")] += 1
if len(self.buffer) >= self.batch_size:
return self.flush()
if time.time() - self.last_flush > self.flush_interval:
return self.flush()
return None
def enrich(self, event):
'''Enrich event with additional context'''
event["processed_at"] = datetime.utcnow().isoformat()
event["severity"] = self.classify_severity(event)
event["geo"] = self.lookup_geo(event.get("source_ip", ""))
return event
def classify_severity(self, event):
'''Classify event severity'''
rules = {
"brute_force": "critical",
"login_failed": "warning",
"api_blocked": "high",
"config_change": "medium",
"login_success": "info",
}
return rules.get(event.get("type"), "info")
def flush(self):
'''Process and store batch'''
if not self.buffer:
return None
batch = self.buffer.copy()
self.buffer.clear()
self.last_flush = time.time()
# Aggregate
aggregated = self.aggregate(batch)
# Detect threats
threats = self.detect_threats(batch)
# Store
self.store_batch(batch, aggregated, threats)
return {
"batch_size": len(batch),
"threats": len(threats),
"aggregated": aggregated,
}
def aggregate(self, batch):
'''Aggregate batch statistics'''
stats = {
"total_events": len(batch),
"by_type": defaultdict(int),
"by_severity": defaultdict(int),
"unique_ips": set(),
}
for event in batch:
stats["by_type"][event.get("type", "unknown")] += 1
stats["by_severity"][event.get("severity", "info")] += 1
if "source_ip" in event:
stats["unique_ips"].add(event["source_ip"])
stats["unique_ips"] = len(stats["unique_ips"])
return stats
def detect_threats(self, batch):
'''Simple threat detection rules'''
threats = []
ip_failures = defaultdict(int)
for event in batch:
if event.get("type") == "login_failed":
ip_failures[event.get("source_ip", "")] += 1
# Brute force detection: > 10 failures from same IP
for ip, count in ip_failures.items():
if count > 10:
threats.append({
"type": "brute_force",
"source_ip": ip,
"attempts": count,
"severity": "critical",
})
return threats
processor = SecurityEventProcessor(batch_size=500, flush_interval=30)
"""
def show_code(self):
print("=== Batch Processor ===")
print(self.CODE[:600])
def pipeline_stats(self):
import random
print(f"\n=== Pipeline Stats (Last Hour) ===")
print(f" Events ingested: {random.randint(10000, 100000):,}")
print(f" Batches processed: {random.randint(20, 200)}")
print(f" Threats detected: {random.randint(0, 15)}")
print(f" Avg batch size: {random.randint(200, 1000)}")
print(f" Processing latency: {random.uniform(0.5, 5.0):.1f}s")
pipeline = SecurityBatchPipeline()
pipeline.show_code()
pipeline.pipeline_stats()
Threat Detection Rules
# threat_detection.py — Threat detection rules engine
import json
import random
class ThreatDetection:
RULES = {
"brute_force": {
"name": "Brute Force Detection",
"condition": "> 10 failed logins from same IP in 5 minutes",
"severity": "critical",
"action": "Block IP + Alert SOC team",
},
"impossible_travel": {
"name": "Impossible Travel",
"condition": "Login from 2 locations > 500km apart within 30 minutes",
"severity": "high",
"action": "Flag account + MFA challenge",
},
"privilege_escalation": {
"name": "Privilege Escalation",
"condition": "User gains admin role without approval workflow",
"severity": "critical",
"action": "Revert role change + Alert + Investigate",
},
"data_exfiltration": {
"name": "Data Exfiltration",
"condition": "Unusual download volume (> 3x normal) or bulk API calls",
"severity": "high",
"action": "Rate limit + Alert + Review access",
},
"off_hours_access": {
"name": "Off-hours Access",
"condition": "Admin access outside business hours (weekday 8-18, no weekends)",
"severity": "medium",
"action": "Log + Alert if from new device/location",
},
}
def show_rules(self):
print("=== Threat Detection Rules ===\n")
for key, rule in self.RULES.items():
print(f"[{rule['severity'].upper():>8}] {rule['name']}")
print(f" Condition: {rule['condition']}")
print(f" Action: {rule['action']}")
print()
def detection_dashboard(self):
print("=== Detection Dashboard (24h) ===")
for key, rule in self.RULES.items():
count = random.randint(0, 20) if rule["severity"] in ("medium", "info") else random.randint(0, 5)
print(f" [{rule['severity']:>8}] {rule['name']:<30} Triggers: {count}")
td = ThreatDetection()
td.show_rules()
td.detection_dashboard()
Infrastructure & Deployment
# infra.py — Infrastructure for SSE security pipeline
import json
class Infrastructure:
DOCKER_COMPOSE = """
# docker-compose.yml — SSE Security Pipeline
version: '3.8'
services:
sse-server:
build: ./sse-server
ports: ["8080:8080"]
environment:
JWT_SECRET:
REDIS_URL: redis://redis:6379
KAFKA_BROKERS: kafka:9092
depends_on: [redis, kafka]
deploy:
replicas: 3
resources:
limits:
cpus: '1'
memory: 512M
batch-processor:
build: ./batch-processor
environment:
KAFKA_BROKERS: kafka:9092
ELASTICSEARCH_URL: http://elasticsearch:9200
BATCH_SIZE: 1000
FLUSH_INTERVAL: 30
depends_on: [kafka, elasticsearch]
deploy:
replicas: 2
redis:
image: redis:7-alpine
ports: ["6379:6379"]
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_NUM_PARTITIONS: 12
elasticsearch:
image: elasticsearch:8.11.0
environment:
discovery.type: single-node
xpack.security.enabled: "false"
ports: ["9200:9200"]
kibana:
image: kibana:8.11.0
ports: ["5601:5601"]
depends_on: [elasticsearch]
"""
SCALING = {
"sse_server": "Horizontal scaling behind load balancer (sticky sessions)",
"batch_processor": "Scale by Kafka partition count — 1 consumer per partition",
"storage": "Elasticsearch hot-warm architecture — hot: 7 days SSD, warm: 90 days HDD",
"monitoring": "Prometheus + Grafana — SSE connections, batch throughput, detection latency",
}
def show_compose(self):
print("=== Docker Compose ===")
print(self.DOCKER_COMPOSE[:500])
def show_scaling(self):
print(f"\n=== Scaling Strategy ===")
for comp, strategy in self.SCALING.items():
print(f" [{comp}] {strategy}")
infra = Infrastructure()
infra.show_compose()
infra.show_scaling()
FAQ - คำถามที่พบบ่อย
Q: SSE กับ WebSocket อันไหนดีสำหรับ security events?
A: SSE ดีกว่าสำหรับ security events เพราะ: One-way (server → client) = เพียงพอสำหรับ event streaming, ง่ายกว่า, HTTP-native, auto-reconnect, ผ่าน proxy/firewall ง่าย WebSocket: ใช้เมื่อต้องการ bidirectional (เช่น chat, interactive dashboard) SSE + REST API: ดีที่สุด — SSE รับ events, REST ส่ง commands กลับ
Q: Batch size เท่าไหร่ดี?
A: ขึ้นกับ requirements: Low latency (real-time alerts): batch size 100-500, flush interval 10-30s Balanced: batch size 500-2000, flush interval 30-60s High throughput: batch size 5000-10000, flush interval 60-300s กฎ: detection latency ต้อง < SLA — ถ้า SLA 1 minute → flush ≤ 30 seconds
Q: SSE connection limit เท่าไหร่?
A: Browser limit: 6 connections per domain (HTTP/1.1), unlimited (HTTP/2) Server: ขึ้นกับ RAM — 1 SSE connection ≈ 50-200KB Production: ตั้ง limit per user (3-10 connections), use connection pooling HTTP/2 แนะนำ: multiplexing ลด connection overhead
Q: เก็บ security events นานเท่าไหร่?
A: Hot storage (fast query): 7-30 วัน (SSD/Elasticsearch) Warm storage (slower): 90-365 วัน (HDD/S3 + Athena) Cold storage (archive): 1-7 ปี (S3 Glacier) ขึ้นกับ compliance: PDPA ไม่กำหนดชัด, PCI-DSS 1 ปี, SOX 7 ปี ใช้ lifecycle policies อัตโนมัติ — hot → warm → cold → delete
