SASE Security และ Clean Architecture
SASE (Secure Access Service Edge) เป็น Framework ที่รวม Network และ Security Services เมื่อออกแบบด้วย Clean Architecture ได้ระบบที่ Maintainable, Testable และเปลี่ยน Provider ได้ง่าย แยก Business Logic ออกจาก Infrastructure ชัดเจน
Clean Architecture แบ่งระบบเป็น 4 Layers หลัก โดย Dependencies ชี้เข้าด้านในเสมอ (Dependency Rule) ทำให้ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง
Clean Architecture Layers สำหรับ SASE
# === Clean Architecture สำหรับ SASE Security ===
# โครงสร้าง Project
# sase_security/
# ├── domain/ # Layer 1: Entities
# │ ├── entities/
# │ │ ├── user.py
# │ │ ├── device.py
# │ │ ├── policy.py
# │ │ └── threat.py
# │ └── value_objects/
# │ ├── ip_address.py
# │ └── risk_score.py
# │
# ├── use_cases/ # Layer 2: Use Cases
# │ ├── evaluate_access.py
# │ ├── detect_threat.py
# │ ├── enforce_policy.py
# │ └── interfaces/
# │ ├── policy_repository.py
# │ ├── threat_detector.py
# │ └── access_gateway.py
# │
# ├── adapters/ # Layer 3: Interface Adapters
# │ ├── controllers/
# │ │ ├── access_controller.py
# │ │ └── policy_controller.py
# │ ├── presenters/
# │ │ └── security_dashboard.py
# │ └── gateways/
# │ ├── cloudflare_gateway.py
# │ └── zscaler_gateway.py
# │
# └── infrastructure/ # Layer 4: Frameworks
# ├── api/
# │ └── fastapi_app.py
# ├── database/
# │ └── postgres_repository.py
# └── external/
# ├── cloudflare_client.py
# └── zscaler_client.py
# === Layer 1: Entities (Domain) ===
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
from datetime import datetime
class RiskLevel(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class AccessDecision(Enum):
ALLOW = "allow"
DENY = "deny"
CHALLENGE = "challenge" # MFA Required
@dataclass
class User:
user_id: str
email: str
department: str
role: str
mfa_enabled: bool = False
risk_score: float = 0.0
@dataclass
class Device:
device_id: str
os: str
os_version: str
compliant: bool = True
managed: bool = True
last_seen: Optional[datetime] = None
@dataclass
class AccessRequest:
user: User
device: Device
source_ip: str
destination: str
timestamp: datetime
location: str = ""
protocol: str = "HTTPS"
@dataclass
class SecurityPolicy:
policy_id: str
name: str
conditions: dict = field(default_factory=dict)
action: AccessDecision = AccessDecision.DENY
priority: int = 100
enabled: bool = True
@dataclass
class ThreatEvent:
event_id: str
source_ip: str
event_type: str
risk_level: RiskLevel
description: str
timestamp: datetime
mitigated: bool = False
print("Layer 1: Entities defined")
print(" - User, Device, AccessRequest")
print(" - SecurityPolicy, ThreatEvent")
print(" - RiskLevel, AccessDecision")
Use Cases และ Repository Interfaces
# === Layer 2: Use Cases ===
from abc import ABC, abstractmethod
from typing import List, Optional
# --- Interfaces (Ports) ---
class PolicyRepository(ABC):
"""Interface สำหรับ Policy Storage"""
@abstractmethod
def get_policies(self, destination: str) -> List[SecurityPolicy]:
pass
@abstractmethod
def save_policy(self, policy: SecurityPolicy) -> bool:
pass
class ThreatDetector(ABC):
"""Interface สำหรับ Threat Detection"""
@abstractmethod
def analyze(self, request: AccessRequest) -> Optional[ThreatEvent]:
pass
class AccessGateway(ABC):
"""Interface สำหรับ Access Enforcement"""
@abstractmethod
def enforce(self, request: AccessRequest, decision: AccessDecision) -> bool:
pass
class AuditLogger(ABC):
"""Interface สำหรับ Audit Logging"""
@abstractmethod
def log_access(self, request: AccessRequest, decision: AccessDecision, reason: str):
pass
# --- Use Cases ---
class EvaluateAccessUseCase:
"""Use Case: ประเมิน Access Request"""
def __init__(self, policy_repo: PolicyRepository,
threat_detector: ThreatDetector,
access_gateway: AccessGateway,
audit_logger: AuditLogger):
self.policy_repo = policy_repo
self.threat_detector = threat_detector
self.gateway = access_gateway
self.audit = audit_logger
def execute(self, request: AccessRequest) -> AccessDecision:
"""ประเมินและตัดสินใจ Access"""
# 1. ตรวจ Threat
threat = self.threat_detector.analyze(request)
if threat and threat.risk_level.value >= RiskLevel.HIGH.value:
self.audit.log_access(request, AccessDecision.DENY,
f"Threat detected: {threat.event_type}")
self.gateway.enforce(request, AccessDecision.DENY)
return AccessDecision.DENY
# 2. ตรวจ Device Compliance
if not request.device.compliant:
self.audit.log_access(request, AccessDecision.DENY,
"Device not compliant")
self.gateway.enforce(request, AccessDecision.DENY)
return AccessDecision.DENY
# 3. ตรวจ Policies
policies = self.policy_repo.get_policies(request.destination)
for policy in sorted(policies, key=lambda p: p.priority):
if not policy.enabled:
continue
if self._matches(request, policy):
decision = policy.action
self.audit.log_access(request, decision,
f"Policy: {policy.name}")
self.gateway.enforce(request, decision)
return decision
# 4. Default: ต้องมี MFA
if not request.user.mfa_enabled:
self.audit.log_access(request, AccessDecision.CHALLENGE,
"MFA required")
return AccessDecision.CHALLENGE
# 5. Default Allow
self.audit.log_access(request, AccessDecision.ALLOW, "Default allow")
self.gateway.enforce(request, AccessDecision.ALLOW)
return AccessDecision.ALLOW
def _matches(self, request, policy):
"""ตรวจสอบว่า Request ตรงกับ Policy Conditions"""
conditions = policy.conditions
if "department" in conditions:
if request.user.department not in conditions["department"]:
return False
if "managed_device" in conditions:
if request.device.managed != conditions["managed_device"]:
return False
if "min_risk_score" in conditions:
if request.user.risk_score < conditions["min_risk_score"]:
return False
return True
# === Unit Test ===
class MockPolicyRepo(PolicyRepository):
def __init__(self, policies=None):
self.policies = policies or []
def get_policies(self, destination):
return self.policies
def save_policy(self, policy):
self.policies.append(policy)
return True
class MockThreatDetector(ThreatDetector):
def __init__(self, threat=None):
self.threat = threat
def analyze(self, request):
return self.threat
class MockGateway(AccessGateway):
def __init__(self):
self.last_decision = None
def enforce(self, request, decision):
self.last_decision = decision
return True
class MockAuditLogger(AuditLogger):
def __init__(self):
self.logs = []
def log_access(self, request, decision, reason):
self.logs.append({"decision": decision, "reason": reason})
# Test
user = User("u1", "alice@example.com", "engineering", "developer", mfa_enabled=True)
device = Device("d1", "macOS", "14.0", compliant=True, managed=True)
req = AccessRequest(user, device, "10.0.0.1", "api.internal", datetime.now())
policy = SecurityPolicy("p1", "Allow Engineering",
conditions={"department": ["engineering"]},
action=AccessDecision.ALLOW, priority=1)
repo = MockPolicyRepo([policy])
detector = MockThreatDetector()
gateway = MockGateway()
logger = MockAuditLogger()
uc = EvaluateAccessUseCase(repo, detector, gateway, logger)
result = uc.execute(req)
print(f"\nAccess Decision: {result.value}")
print(f"Audit Logs: {len(logger.logs)}")
Adapters และ Infrastructure
# === Layer 3 & 4: Adapters + Infrastructure ===
# --- Cloudflare Gateway Adapter ---
class CloudflareAccessGateway(AccessGateway):
"""Adapter สำหรับ Cloudflare Zero Trust"""
def __init__(self, api_token, account_id):
self.api_token = api_token
self.account_id = account_id
self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"
def enforce(self, request, decision):
"""Enforce ผ่าน Cloudflare Access"""
import requests
headers = {"Authorization": f"Bearer {self.api_token}"}
if decision == AccessDecision.DENY:
# Block IP
payload = {
"mode": "block",
"configuration": {
"target": "ip",
"value": request.source_ip,
},
"notes": f"Blocked by SASE policy for {request.user.email}",
}
resp = requests.post(
f"{self.base_url}/firewall/access_rules/rules",
headers=headers, json=payload,
)
return resp.status_code == 200
return True
# --- PostgreSQL Policy Repository ---
class PostgresPolicyRepository(PolicyRepository):
"""Adapter สำหรับ PostgreSQL"""
def __init__(self, connection_string):
import psycopg2
self.conn = psycopg2.connect(connection_string)
def get_policies(self, destination):
cur = self.conn.cursor()
cur.execute(
"SELECT * FROM security_policies WHERE destination = %s AND enabled = true ORDER BY priority",
(destination,)
)
rows = cur.fetchall()
return [self._row_to_policy(r) for r in rows]
def save_policy(self, policy):
cur = self.conn.cursor()
cur.execute(
"INSERT INTO security_policies (policy_id, name, conditions, action, priority, enabled) VALUES (%s, %s, %s, %s, %s, %s)",
(policy.policy_id, policy.name, str(policy.conditions), policy.action.value, policy.priority, policy.enabled)
)
self.conn.commit()
return True
def _row_to_policy(self, row):
return SecurityPolicy(
policy_id=row[0], name=row[1],
conditions=eval(row[2]), action=AccessDecision(row[3]),
priority=row[4], enabled=row[5],
)
# --- FastAPI Controller ---
# from fastapi import FastAPI, HTTPException
# app = FastAPI()
#
# @app.post("/evaluate")
# async def evaluate_access(request_data: dict):
# # Build AccessRequest from request_data
# # Call EvaluateAccessUseCase
# # Return decision
# pass
#
# @app.get("/policies")
# async def list_policies(destination: str):
# policies = policy_repo.get_policies(destination)
# return {"policies": [p.__dict__ for p in policies]}
print("\nClean Architecture Layers:")
print(" Layer 1 (Entities): User, Device, Policy, Threat")
print(" Layer 2 (Use Cases): EvaluateAccess, DetectThreat")
print(" Layer 3 (Adapters): Cloudflare, Zscaler, PostgreSQL")
print(" Layer 4 (Frameworks): FastAPI, PostgreSQL Driver")
Best Practices
- Dependency Rule: Dependencies ชี้เข้าด้านในเสมอ Layer ในไม่รู้จัก Layer นอก
- Interfaces: ใช้ Abstract Classes (Interfaces) เชื่อม Layers ทำให้เปลี่ยน Implementation ได้
- Unit Testing: ทดสอบ Use Cases ด้วย Mock Repositories ไม่ต้องมี Infrastructure จริง
- Provider Agnostic: เปลี่ยน SASE Provider (Cloudflare → Zscaler) โดยแก้แค่ Adapter Layer
- Domain Events: ใช้ Events สื่อสารระหว่าง Use Cases แทน Direct Coupling
- Separation of Concerns: แยก Security Logic ออกจาก Framework Code ชัดเจน
Clean Architecture คืออะไร
หลักการออกแบบซอฟต์แวร์ของ Uncle Bob แบ่งเป็น Layers แยก Concerns Entities Use Cases Interface Adapters Frameworks Dependencies ชี้เข้าด้านในเสมอ
SASE กับ Clean Architecture เกี่ยวกันอย่างไร
Clean Architecture ช่วยออกแบบ SASE ให้ Maintainable Testable แยก Security Logic ออกจาก Infrastructure เปลี่ยน Provider ได้ไม่กระทบ Business Logic ทดสอบ Policies ไม่ต้องมี Infrastructure จริง
Dependency Rule คืออะไร
กฎสำคัญกำหนดว่า Dependencies ชี้จาก Layer นอกเข้า Layer ใน Frameworks ขึ้นกับ Adapters, Adapters ขึ้นกับ Use Cases, Use Cases ขึ้นกับ Entities Layer ในไม่รู้จัก Layer นอก
วิธีทดสอบ Security Policies ทำอย่างไร
แยก Security Logic เป็น Use Cases ทดสอบด้วย Unit Tests Mock Repository Interfaces ทดสอบ Policy Evaluation Access Control Threat Detection แยกจาก Network Database External APIs
สรุป
SASE Security ร่วมกับ Clean Architecture ให้ระบบที่ Maintainable Testable และ Provider Agnostic แบ่งเป็น 4 Layers ตาม Dependency Rule ใช้ Interfaces เชื่อม Layers ทดสอบ Use Cases ด้วย Mock เปลี่ยน SASE Provider โดยแก้แค่ Adapter Layer ใช้ Domain Events สื่อสารระหว่าง Use Cases
