SiamCafe.net Blog
Cybersecurity

SSE Security Batch Processing Pipeline

sse security batch processing pipeline
SSE Security Batch Processing Pipeline | SiamCafe Blog
2025-08-02· อ. บอม — SiamCafe.net· 1,728 คำ

SSE Security Batch Processing Pipeline คืออะไร

Server-Sent Events (SSE) เป็น web technology สำหรับส่งข้อมูลจาก server ไป client แบบ one-way real-time ผ่าน HTTP connection เดียว SSE Security หมายถึงการรักษาความปลอดภัยของ SSE connections รวมถึง authentication, authorization, data validation และ rate limiting Batch Processing Pipeline คือระบบประมวลผลข้อมูลเป็น batch สำหรับ security events การรวมสามแนวคิดนี้ช่วยสร้างระบบ security monitoring ที่รับ events แบบ real-time ผ่าน SSE แล้วประมวลผลเป็น batch สำหรับ threat detection, log analysis และ compliance reporting

SSE Fundamentals & Security

# sse_basics.py — SSE fundamentals with security
import json

class SSEFundamentals:
    SSE_OVERVIEW = {
        "protocol": "HTTP-based, text/event-stream content type",
        "direction": "Server → Client (one-way)",
        "reconnect": "Auto-reconnect built-in (browser handles)",
        "format": "data: {json}\\n\\n (double newline separator)",
        "vs_websocket": "SSE: simpler, HTTP-native, one-way | WebSocket: bidirectional, complex",
    }

    SECURITY_CONCERNS = {
        "authentication": {
            "name": "Authentication",
            "risk": "Unauthorized access to SSE stream",
            "mitigation": "JWT/Bearer token in URL param or EventSource polyfill with headers",
        },
        "authorization": {
            "name": "Authorization",
            "risk": "User sees events they shouldn't (cross-tenant leak)",
            "mitigation": "Server-side filtering — only send events user is authorized to see",
        },
        "injection": {
            "name": "Event Injection",
            "risk": "Malicious data injected into SSE stream",
            "mitigation": "Server-side input validation, output encoding, CSP headers",
        },
        "dos": {
            "name": "DoS via Connection Exhaustion",
            "risk": "Too many SSE connections exhaust server resources",
            "mitigation": "Connection limits per user, rate limiting, connection pooling",
        },
        "data_exposure": {
            "name": "Sensitive Data Exposure",
            "risk": "PII or secrets leaked through SSE events",
            "mitigation": "Data classification, field-level encryption, masking",
        },
    }

    def show_overview(self):
        print("=== SSE Overview ===\n")
        for key, value in self.SSE_OVERVIEW.items():
            print(f"  [{key}] {value}")

    def show_security(self):
        print(f"\n=== Security Concerns ===")
        for key, sec in self.SECURITY_CONCERNS.items():
            print(f"\n[{sec['name']}]")
            print(f"  Risk: {sec['risk']}")
            print(f"  Fix: {sec['mitigation']}")

sse = SSEFundamentals()
sse.show_overview()
sse.show_security()

Secure SSE Server Implementation

# sse_server.py — Secure SSE server with FastAPI
from fastapi import FastAPI, Request, Depends, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import asyncio
import json
import jwt
import time

app = FastAPI(title="Secure SSE Security Events")
security = HTTPBearer()

SECRET_KEY = "your-secret-key"
MAX_CONNECTIONS_PER_USER = 5
active_connections = {}

async def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)):
    try:
        payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=["HS256"])
        return {
            "user_id": payload["sub"],
            "tenant_id": payload["tenant_id"],
            "roles": payload.get("roles", []),
        }
    except jwt.InvalidTokenError:
        raise HTTPException(401, "Invalid token")

def check_connection_limit(user_id: str):
    count = active_connections.get(user_id, 0)
    if count >= MAX_CONNECTIONS_PER_USER:
        raise HTTPException(429, f"Max {MAX_CONNECTIONS_PER_USER} SSE connections per user")

async def security_event_generator(user: dict):
    user_id = user["user_id"]
    tenant_id = user["tenant_id"]
    active_connections[user_id] = active_connections.get(user_id, 0) + 1
    
    try:
        while True:
            # Fetch events for this tenant only (authorization)
            events = await fetch_tenant_events(tenant_id)
            
            for event in events:
                # Filter by user's roles
                if event.get("min_role") and event["min_role"] not in user["roles"]:
                    continue
                
                # Mask sensitive fields
                sanitized = mask_sensitive_data(event)
                
                data = json.dumps(sanitized, ensure_ascii=False)
                yield f"event: security_event\ndata: {data}\n\n"
            
            # Heartbeat every 30 seconds
            yield f": heartbeat {int(time.time())}\n\n"
            await asyncio.sleep(5)
    
    finally:
        active_connections[user_id] = max(0, active_connections.get(user_id, 1) - 1)

def mask_sensitive_data(event):
    masked = event.copy()
    sensitive_fields = ["source_ip", "user_email", "api_key"]
    for field in sensitive_fields:
        if field in masked:
            value = str(masked[field])
            if "@" in value:
                parts = value.split("@")
                masked[field] = f"{parts[0][:3]}***@{parts[1]}"
            elif "." in value and len(value.split(".")) == 4:
                parts = value.split(".")
                masked[field] = f"{parts[0]}.{parts[1]}.xxx.xxx"
            else:
                masked[field] = value[:4] + "***"
    return masked

async def fetch_tenant_events(tenant_id):
    # Simulated — replace with actual event store query
    return [
        {"type": "login_attempt", "status": "failed", "source_ip": "10.0.1.50", "timestamp": time.time()},
        {"type": "api_access", "endpoint": "/admin", "status": "blocked", "timestamp": time.time()},
    ]

@app.get("/events/security")
async def security_stream(user: dict = Depends(verify_token)):
    check_connection_limit(user["user_id"])
    return StreamingResponse(
        security_event_generator(user),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",
            "Connection": "keep-alive",
        },
    )

Batch Processing Pipeline

# batch_pipeline.py — Security event batch processing
import json
import time
from collections import defaultdict

class SecurityBatchPipeline:
    CODE = """
# batch_processor.py — Batch process security events
import json
from datetime import datetime, timedelta

class SecurityEventProcessor:
    def __init__(self, batch_size=1000, flush_interval=60):
        self.batch_size = batch_size
        self.flush_interval = flush_interval
        self.buffer = []
        self.last_flush = time.time()
        self.stats = defaultdict(int)
    
    def ingest(self, event):
        '''Add event to buffer'''
        enriched = self.enrich(event)
        self.buffer.append(enriched)
        self.stats[event.get("type", "unknown")] += 1
        
        if len(self.buffer) >= self.batch_size:
            return self.flush()
        if time.time() - self.last_flush > self.flush_interval:
            return self.flush()
        return None
    
    def enrich(self, event):
        '''Enrich event with additional context'''
        event["processed_at"] = datetime.utcnow().isoformat()
        event["severity"] = self.classify_severity(event)
        event["geo"] = self.lookup_geo(event.get("source_ip", ""))
        return event
    
    def classify_severity(self, event):
        '''Classify event severity'''
        rules = {
            "brute_force": "critical",
            "login_failed": "warning",
            "api_blocked": "high",
            "config_change": "medium",
            "login_success": "info",
        }
        return rules.get(event.get("type"), "info")
    
    def flush(self):
        '''Process and store batch'''
        if not self.buffer:
            return None
        
        batch = self.buffer.copy()
        self.buffer.clear()
        self.last_flush = time.time()
        
        # Aggregate
        aggregated = self.aggregate(batch)
        
        # Detect threats
        threats = self.detect_threats(batch)
        
        # Store
        self.store_batch(batch, aggregated, threats)
        
        return {
            "batch_size": len(batch),
            "threats": len(threats),
            "aggregated": aggregated,
        }
    
    def aggregate(self, batch):
        '''Aggregate batch statistics'''
        stats = {
            "total_events": len(batch),
            "by_type": defaultdict(int),
            "by_severity": defaultdict(int),
            "unique_ips": set(),
        }
        for event in batch:
            stats["by_type"][event.get("type", "unknown")] += 1
            stats["by_severity"][event.get("severity", "info")] += 1
            if "source_ip" in event:
                stats["unique_ips"].add(event["source_ip"])
        
        stats["unique_ips"] = len(stats["unique_ips"])
        return stats
    
    def detect_threats(self, batch):
        '''Simple threat detection rules'''
        threats = []
        ip_failures = defaultdict(int)
        
        for event in batch:
            if event.get("type") == "login_failed":
                ip_failures[event.get("source_ip", "")] += 1
        
        # Brute force detection: > 10 failures from same IP
        for ip, count in ip_failures.items():
            if count > 10:
                threats.append({
                    "type": "brute_force",
                    "source_ip": ip,
                    "attempts": count,
                    "severity": "critical",
                })
        
        return threats

processor = SecurityEventProcessor(batch_size=500, flush_interval=30)
"""

    def show_code(self):
        print("=== Batch Processor ===")
        print(self.CODE[:600])

    def pipeline_stats(self):
        import random
        print(f"\n=== Pipeline Stats (Last Hour) ===")
        print(f"  Events ingested: {random.randint(10000, 100000):,}")
        print(f"  Batches processed: {random.randint(20, 200)}")
        print(f"  Threats detected: {random.randint(0, 15)}")
        print(f"  Avg batch size: {random.randint(200, 1000)}")
        print(f"  Processing latency: {random.uniform(0.5, 5.0):.1f}s")

pipeline = SecurityBatchPipeline()
pipeline.show_code()
pipeline.pipeline_stats()

Threat Detection Rules

# threat_detection.py — Threat detection rules engine
import json
import random

class ThreatDetection:
    RULES = {
        "brute_force": {
            "name": "Brute Force Detection",
            "condition": "> 10 failed logins from same IP in 5 minutes",
            "severity": "critical",
            "action": "Block IP + Alert SOC team",
        },
        "impossible_travel": {
            "name": "Impossible Travel",
            "condition": "Login from 2 locations > 500km apart within 30 minutes",
            "severity": "high",
            "action": "Flag account + MFA challenge",
        },
        "privilege_escalation": {
            "name": "Privilege Escalation",
            "condition": "User gains admin role without approval workflow",
            "severity": "critical",
            "action": "Revert role change + Alert + Investigate",
        },
        "data_exfiltration": {
            "name": "Data Exfiltration",
            "condition": "Unusual download volume (> 3x normal) or bulk API calls",
            "severity": "high",
            "action": "Rate limit + Alert + Review access",
        },
        "off_hours_access": {
            "name": "Off-hours Access",
            "condition": "Admin access outside business hours (weekday 8-18, no weekends)",
            "severity": "medium",
            "action": "Log + Alert if from new device/location",
        },
    }

    def show_rules(self):
        print("=== Threat Detection Rules ===\n")
        for key, rule in self.RULES.items():
            print(f"[{rule['severity'].upper():>8}] {rule['name']}")
            print(f"  Condition: {rule['condition']}")
            print(f"  Action: {rule['action']}")
            print()

    def detection_dashboard(self):
        print("=== Detection Dashboard (24h) ===")
        for key, rule in self.RULES.items():
            count = random.randint(0, 20) if rule["severity"] in ("medium", "info") else random.randint(0, 5)
            print(f"  [{rule['severity']:>8}] {rule['name']:<30} Triggers: {count}")

td = ThreatDetection()
td.show_rules()
td.detection_dashboard()

Infrastructure & Deployment

# infra.py — Infrastructure for SSE security pipeline
import json

class Infrastructure:
    DOCKER_COMPOSE = """
# docker-compose.yml — SSE Security Pipeline
version: '3.8'
services:
  sse-server:
    build: ./sse-server
    ports: ["8080:8080"]
    environment:
      JWT_SECRET: 
      REDIS_URL: redis://redis:6379
      KAFKA_BROKERS: kafka:9092
    depends_on: [redis, kafka]
    deploy:
      replicas: 3
      resources:
        limits:
          cpus: '1'
          memory: 512M

  batch-processor:
    build: ./batch-processor
    environment:
      KAFKA_BROKERS: kafka:9092
      ELASTICSEARCH_URL: http://elasticsearch:9200
      BATCH_SIZE: 1000
      FLUSH_INTERVAL: 30
    depends_on: [kafka, elasticsearch]
    deploy:
      replicas: 2

  redis:
    image: redis:7-alpine
    ports: ["6379:6379"]

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_NUM_PARTITIONS: 12

  elasticsearch:
    image: elasticsearch:8.11.0
    environment:
      discovery.type: single-node
      xpack.security.enabled: "false"
    ports: ["9200:9200"]

  kibana:
    image: kibana:8.11.0
    ports: ["5601:5601"]
    depends_on: [elasticsearch]
"""

    SCALING = {
        "sse_server": "Horizontal scaling behind load balancer (sticky sessions)",
        "batch_processor": "Scale by Kafka partition count — 1 consumer per partition",
        "storage": "Elasticsearch hot-warm architecture — hot: 7 days SSD, warm: 90 days HDD",
        "monitoring": "Prometheus + Grafana — SSE connections, batch throughput, detection latency",
    }

    def show_compose(self):
        print("=== Docker Compose ===")
        print(self.DOCKER_COMPOSE[:500])

    def show_scaling(self):
        print(f"\n=== Scaling Strategy ===")
        for comp, strategy in self.SCALING.items():
            print(f"  [{comp}] {strategy}")

infra = Infrastructure()
infra.show_compose()
infra.show_scaling()

FAQ - คำถามที่พบบ่อย

Q: SSE กับ WebSocket อันไหนดีสำหรับ security events?

A: SSE ดีกว่าสำหรับ security events เพราะ: One-way (server → client) = เพียงพอสำหรับ event streaming, ง่ายกว่า, HTTP-native, auto-reconnect, ผ่าน proxy/firewall ง่าย WebSocket: ใช้เมื่อต้องการ bidirectional (เช่น chat, interactive dashboard) SSE + REST API: ดีที่สุด — SSE รับ events, REST ส่ง commands กลับ

Q: Batch size เท่าไหร่ดี?

A: ขึ้นกับ requirements: Low latency (real-time alerts): batch size 100-500, flush interval 10-30s Balanced: batch size 500-2000, flush interval 30-60s High throughput: batch size 5000-10000, flush interval 60-300s กฎ: detection latency ต้อง < SLA — ถ้า SLA 1 minute → flush ≤ 30 seconds

Q: SSE connection limit เท่าไหร่?

A: Browser limit: 6 connections per domain (HTTP/1.1), unlimited (HTTP/2) Server: ขึ้นกับ RAM — 1 SSE connection ≈ 50-200KB Production: ตั้ง limit per user (3-10 connections), use connection pooling HTTP/2 แนะนำ: multiplexing ลด connection overhead

Q: เก็บ security events นานเท่าไหร่?

A: Hot storage (fast query): 7-30 วัน (SSD/Elasticsearch) Warm storage (slower): 90-365 วัน (HDD/S3 + Athena) Cold storage (archive): 1-7 ปี (S3 Glacier) ขึ้นกับ compliance: PDPA ไม่กำหนดชัด, PCI-DSS 1 ปี, SOX 7 ปี ใช้ lifecycle policies อัตโนมัติ — hot → warm → cold → delete

📖 บทความที่เกี่ยวข้อง

Azure Container Apps Batch Processing Pipelineอ่านบทความ → Falco Runtime Security Real-time Processingอ่านบทความ → SSE Security API Integration เชื่อมต่อระบบอ่านบทความ → ArgoCD ApplicationSet Batch Processing Pipelineอ่านบทความ → Crowdsec IPS Batch Processing Pipelineอ่านบทความ →

📚 ดูบทความทั้งหมด →