Better Uptime การใช้งาน 2026 — ระบบ Monitoring
SASE Security กับ Clean Architecture — วิธีออกแบบระบบ Security ด้วย SASE และ Clean Architecture | SiamCafe Blog
เรียนรู้การออกแบบระบบ Security ด้วย SASE Framework ร่วมกับ Clean Architecture Principles ตั้งแต่ Layer Design, Security Policies, Use Cases ไปจนถึง Implementation พร้อม Code จริง
FAQ_Q:Clean Architecture คืออะไร
FAQ_A:Clean Architecture เป็นหลักการออกแบบซอฟต์แวร์ของ Robert C. Martin (Uncle Bob) แบ่งระบบเป็น Layers ที่แยก Concerns ชัดเจน Entities (Business Rules), Use Cases (Application Logic), Interface Adapters (Controllers, Presenters), Frameworks (UI, Database) Dependencies ชี้เข้าด้านในเสมอ
FAQ_Q:SASE กับ Clean Architecture เกี่ยวกันอย่างไร
FAQ_A:Clean Architecture ช่วยออกแบบระบบ SASE ให้ Maintainable และ Testable แยก Security Logic (Use Cases) ออกจาก Infrastructure (Frameworks) เปลี่ยน SASE Provider ได้โดยไม่กระทบ Business Logic ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง
FAQ_Q:Dependency Rule คืออะไร
FAQ_A:Dependency Rule เป็นกฎสำคัญของ Clean Architecture กำหนดว่า Dependencies ต้องชี้จาก Layer นอกเข้า Layer ใน เช่น Frameworks ขึ้นกับ Interface Adapters, Interface Adapters ขึ้นกับ Use Cases, Use Cases ขึ้นกับ Entities Layer ในไม่รู้จัก Layer นอก
FAQ_Q: วิธีทดสอบ Security Policies ทำอย่างไร
FAQ_A: ใช้ Clean Architecture แยก Security Logic เป็น Use Cases ทดสอบด้วย Unit Tests โดยไม่ต้องมี Infrastructure จริง Mock Repository Interfaces ทดสอบ Policy Evaluation, Access Control, Threat Detection แยกจาก Network, Database, External APIs
BODY_START
SASE Security และ Clean Architecture
SASE (Secure Access Service Edge) เป็น Framework ที่รวม Network และ Security Services เมื่อออกแบบด้วย Clean Architecture ได้ระบบที่ Maintainable, Testable และเปลี่ยน Provider ได้ง่าย แยก Business Logic ออกจาก Infrastructure ชัดเจน
Clean Architecture แบ่งระบบเป็น 4 Layers หลัก โดย Dependencies ชี้เข้าด้านในเสมอ (Dependency Rule) ทำให้ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง
Clean Architecture Layers สำหรับ SASE
# === Clean Architecture สำหรับ SASE Security ===
# โครงสร้าง Project
# sase_security/
# ├── domain/ # Layer 1: Entities
# │ ├── entities/
# │ │ ├── user.py
# │ │ ├── device.py
# │ │ ├── policy.py
# │ │ └── threat.py
# │ └── value_objects/
# │ ├── ip_address.py
# │ └── risk_score.py
# │
# ├── use_cases/ # Layer 2: Use Cases
# │ ├── evaluate_access.py
# │ ├── detect_threat.py
# │ ├── enforce_policy.py
# │ └── interfaces/
# │ ├── policy_repository.py
# │ ├── threat_detector.py
# │ └── access_gateway.py
# │
# ├── adapters/ # Layer 3: Interface Adapters
# │ ├── controllers/
# │ │ ├── access_controller.py
# │ │ └── policy_controller.py
# │ ├── presenters/
# │ │ └── security_dashboard.py
# │ └── gateways/
# │ ├── cloudflare_gateway.py
# │ └── zscaler_gateway.py
# │
# └── infrastructure/ # Layer 4: Frameworks
# ├── api/
# │ └── fastapi_app.py
# ├── database/
# │ └── postgres_repository.py
# └── external/
# ├── cloudflare_client.py
# └── zscaler_client.py
# === Layer 1: Entities (Domain) ===
from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
from datetime import datetime
class RiskLevel(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class AccessDecision(Enum):
ALLOW = "allow"
DENY = "deny"
CHALLENGE = "challenge" # MFA Required
@dataclass
class User:
user_id: str
email: str
department: str
role: str
mfa_enabled: bool = False
risk_score: float = 0.0
@dataclass
class Device:
device_id: str
os: str
os_version: str
compliant: bool = True
managed: bool = True
last_seen: Optional[datetime] = None
@dataclass
class AccessRequest:
user: User
device: Device
source_ip: str
destination: str
timestamp: datetime
location: str = ""
protocol: str = "HTTPS"
@dataclass
class SecurityPolicy:
policy_id: str
name: str
conditions: dict = field(default_factory=dict)
action: AccessDecision = AccessDecision.DENY
priority: int = 100
enabled: bool = True
@dataclass
class ThreatEvent:
event_id: str
source_ip: str
event_type: str
risk_level: RiskLevel
description: str
timestamp: datetime
mitigated: bool = False
print("Layer 1: Entities defined")
print(" - User, Device, AccessRequest")
print(" - SecurityPolicy, ThreatEvent")
print(" - RiskLevel, AccessDecision")
Use Cases และ Repository Interfaces
# === Layer 2: Use Cases ===
from abc import ABC, abstractmethod
from typing import List, Optional
# --- Interfaces (Ports) ---
class PolicyRepository(ABC):
"""Interface สำหรับ Policy Storage"""
@abstractmethod
def get_policies(self, destination: str) -> List[SecurityPolicy]:
pass
@abstractmethod
def save_policy(self, policy: SecurityPolicy) -> bool:
pass
class ThreatDetector(ABC):
"""Interface สำหรับ Threat Detection"""
@abstractmethod
def analyze(self, request: AccessRequest) -> Optional[ThreatEvent]:
pass
class AccessGateway(ABC):
"""Interface สำหรับ Access Enforcement"""
@abstractmethod
def enforce(self, request: AccessRequest, decision: AccessDecision) -> bool:
pass
class AuditLogger(ABC):
"""Interface สำหรับ Audit Logging"""
@abstractmethod
def log_access(self, request: AccessRequest, decision: AccessDecision, reason: str):
pass
# --- Use Cases ---
class EvaluateAccessUseCase:
"""Use Case: ประเมิน Access Request"""
def __init__(self, policy_repo: PolicyRepository,
threat_detector: ThreatDetector,
access_gateway: AccessGateway,
audit_logger: AuditLogger):
self.policy_repo = policy_repo
self.threat_detector = threat_detector
self.gateway = access_gateway
self.audit = audit_logger
def execute(self, request: AccessRequest) -> AccessDecision:
"""ประเมินและตัดสินใจ Access"""
# 1. ตรวจ Threat
threat = self.threat_detector.analyze(request)
if threat and threat.risk_level.value >= RiskLevel.HIGH.value:
self.audit.log_access(request, AccessDecision.DENY,
f"Threat detected: {threat.event_type}")
self.gateway.enforce(request, AccessDecision.DENY)
return AccessDecision.DENY
# 2. ตรวจ Device Compliance
if not request.device.compliant:
self.audit.log_access(request, AccessDecision.DENY,
"Device not compliant")
self.gateway.enforce(request, AccessDecision.DENY)
return AccessDecision.DENY
# 3. ตรวจ Policies
policies = self.policy_repo.get_policies(request.destination)
for policy in sorted(policies, key=lambda p: p.priority):
if not policy.enabled:
continue
if self._matches(request, policy):
decision = policy.action
self.audit.log_access(request, decision,
f"Policy: {policy.name}")
self.gateway.enforce(request, decision)
return decision
# 4. Default: ต้องมี MFA
if not request.user.mfa_enabled:
self.audit.log_access(request, AccessDecision.CHALLENGE,
"MFA required")
return AccessDecision.CHALLENGE
# 5. Default Allow
self.audit.log_access(request, AccessDecision.ALLOW, "Default allow")
self.gateway.enforce(request, AccessDecision.ALLOW)
return AccessDecision.ALLOW
def _matches(self, request, policy):
"""ตรวจสอบว่า Request ตรงกับ Policy Conditions"""
conditions = policy.conditions
if "department" in conditions:
if request.user.department not in conditions["department"]:
return False
if "managed_device" in conditions:
if request.device.managed != conditions["managed_device"]:
return False
if "min_risk_score" in conditions:
if request.user.risk_score < conditions["min_risk_score"]:
return False
return True
# === Unit Test ===
class MockPolicyRepo(PolicyRepository):
def __init__(self, policies=None):
self.policies = policies or []
def get_policies(self, destination):
return self.policies
def save_policy(self, policy):
self.policies.append(policy)
return True
class MockThreatDetector(ThreatDetector):
def __init__(self, threat=None):
self.threat = threat
def analyze(self, request):
return self.threat
class MockGateway(AccessGateway):
def __init__(self):
self.last_decision = None
def enforce(self, request, decision):
self.last_decision = decision
return True
class MockAuditLogger(AuditLogger):
def __init__(self):
self.logs = []
def log_access(self, request, decision, reason):
self.logs.append({"decision": decision, "reason": reason})
# Test
user = User("u1", "alice@example.com", "engineering", "developer", mfa_enabled=True)
device = Device("d1", "macOS", "14.0", compliant=True, managed=True)
req = AccessRequest(user, device, "10.0.0.1", "api.internal", datetime.now())
policy = SecurityPolicy("p1", "Allow Engineering",
conditions={"department": ["engineering"]},
action=AccessDecision.ALLOW, priority=1)
repo = MockPolicyRepo([policy])
detector = MockThreatDetector()
gateway = MockGateway()
logger = MockAuditLogger()
uc = EvaluateAccessUseCase(repo, detector, gateway, logger)
result = uc.execute(req)
print(f"\nAccess Decision: {result.value}")
print(f"Audit Logs: {len(logger.logs)}")
Adapters และ Infrastructure
# === Layer 3 & 4: Adapters + Infrastructure ===
# --- Cloudflare Gateway Adapter ---
class CloudflareAccessGateway(AccessGateway):
"""Adapter สำหรับ Cloudflare Zero Trust"""
def __init__(self, api_token, account_id):
self.api_token = api_token
self.account_id = account_id
self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"
def enforce(self, request, decision):
"""Enforce ผ่าน Cloudflare Access"""
import requests
headers = {"Authorization": f"Bearer {self.api_token}"}
if decision == AccessDecision.DENY:
# Block IP
payload = {
"mode": "block",
"configuration": {
"target": "ip",
"value": request.source_ip,
},
"notes": f"Blocked by SASE policy for {request.user.email}",
}
resp = requests.post(
f"{self.base_url}/firewall/access_rules/rules",
headers=headers, json=payload,
)
return resp.status_code == 200
return True
# --- PostgreSQL Policy Repository ---
class PostgresPolicyRepository(PolicyRepository):
"""Adapter สำหรับ PostgreSQL"""
def __init__(self, connection_string):
import psycopg2
self.conn = psycopg2.connect(connection_string)
def get_policies(self, destination):
cur = self.conn.cursor()
cur.execute(
"SELECT * FROM security_policies WHERE destination = %s AND enabled = true ORDER BY priority",
(destination,)
)
rows = cur.fetchall()
return [self._row_to_policy(r) for r in rows]
def save_policy(self, policy):
cur = self.conn.cursor()
cur.execute(
"INSERT INTO security_policies (policy_id, name, conditions, action, priority, enabled) VALUES (%s, %s, %s, %s, %s, %s)",
(policy.policy_id, policy.name, str(policy.conditions), policy.action.value, policy.priority, policy.enabled)
)
self.conn.commit()
return True
def _row_to_policy(self, row):
return SecurityPolicy(
policy_id=row[0], name=row[1],
conditions=eval(row[2]), action=AccessDecision(row[3]),
priority=row[4], enabled=row[5],
)
# --- FastAPI Controller ---
# from fastapi import FastAPI, HTTPException
# app = FastAPI()
#
# @app.post("/evaluate")
# async def evaluate_access(request_data: dict):
# # Build AccessRequest from request_data
# # Call EvaluateAccessUseCase
# # Return decision
# pass
#
# @app.get("/policies")
# async def list_policies(destination: str):
# policies = policy_repo.get_policies(destination)
# return {"policies": [p.__dict__ for p in policies]}
print("\nClean Architecture Layers:")
print(" Layer 1 (Entities): User, Device, Policy, Threat")
print(" Layer 2 (Use Cases): EvaluateAccess, DetectThreat")
print(" Layer 3 (Adapters): Cloudflare, Zscaler, PostgreSQL")
print(" Layer 4 (Frameworks): FastAPI, PostgreSQL Driver")
Best Practices
- Dependency Rule: Dependencies ชี้เข้าด้านในเสมอ Layer ในไม่รู้จัก Layer นอก
- Interfaces: ใช้ Abstract Classes (Interfaces) เชื่อม Layers ทำให้เปลี่ยน Implementation ได้
- Unit Testing: ทดสอบ Use Cases ด้วย Mock Repositories ไม่ต้องมี Infrastructure จริง
- Provider Agnostic: เปลี่ยน SASE Provider (Cloudflare → Zscaler) โดยแก้แค่ Adapter Layer
- Domain Events: ใช้ Events สื่อสารระหว่าง Use Cases แทน Direct Coupling
- Separation of Concerns: แยก Security Logic ออกจาก Framework Code ชัดเจน
Clean Architecture คืออะไร
หลักการออกแบบซอฟต์แวร์ของ Uncle Bob แบ่งเป็น Layers แยก Concerns Entities Use Cases Interface Adapters Frameworks Dependencies ชี้เข้าด้านในเสมอ