SiamCafe · Blog
SASE Security กับ Clean Architecture —
บทความ

SASE Security กับ Clean Architecture —

เผยแพร่ 28 พฤษภาคม 2569

SASE Security และ Clean Architecture

SASE Security กับ Clean Architecture —

SASE (Secure Access Service Edge) เป็น Framework ที่รวม Network และ Security Services เมื่อออกแบบด้วย Clean Architecture ได้ระบบที่ Maintainable, Testable และเปลี่ยน Provider ได้ง่าย แยก Business Logic ออกจาก Infrastructure ชัดเจน

Clean Architecture แบ่งระบบเป็น 4 Layers หลัก โดย Dependencies ชี้เข้าด้านในเสมอ (Dependency Rule) ทำให้ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง

Clean Architecture Layers สำหรับ SASE

# === Clean Architecture สำหรับ SASE Security ===
# โครงสร้าง Project

# sase_security/
# ├── domain/                    # Layer 1: Entities
# │   ├── entities/
# │   │   ├── user.py
# │   │   ├── device.py
# │   │   ├── policy.py
# │   │   └── threat.py
# │   └── value_objects/
# │       ├── ip_address.py
# │       └── risk_score.py
# │
# ├── use_cases/                 # Layer 2: Use Cases
# │   ├── evaluate_access.py
# │   ├── detect_threat.py
# │   ├── enforce_policy.py
# │   └── interfaces/
# │       ├── policy_repository.py
# │       ├── threat_detector.py
# │       └── access_gateway.py
# │
# ├── adapters/                  # Layer 3: Interface Adapters
# │   ├── controllers/
# │   │   ├── access_controller.py
# │   │   └── policy_controller.py
# │   ├── presenters/
# │   │   └── security_dashboard.py
# │   └── gateways/
# │       ├── cloudflare_gateway.py
# │       └── zscaler_gateway.py
# │
# └── infrastructure/            # Layer 4: Frameworks
#     ├── api/
#     │   └── fastapi_app.py
#     ├── database/
#     │   └── postgres_repository.py
#     └── external/
#         ├── cloudflare_client.py
#         └── zscaler_client.py

# === Layer 1: Entities (Domain) ===

from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
from datetime import datetime

class RiskLevel(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class AccessDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    CHALLENGE = "challenge"  # MFA Required

@dataclass
class User:
    user_id: str
    email: str
    department: str
    role: str
    mfa_enabled: bool = False
    risk_score: float = 0.0

@dataclass
class Device:
    device_id: str
    os: str
    os_version: str
    compliant: bool = True
    managed: bool = True
    last_seen: Optional[datetime] = None

@dataclass
class AccessRequest:
    user: User
    device: Device
    source_ip: str
    destination: str
    timestamp: datetime
    location: str = ""
    protocol: str = "HTTPS"

@dataclass
class SecurityPolicy:
    policy_id: str
    name: str
    conditions: dict = field(default_factory=dict)
    action: AccessDecision = AccessDecision.DENY
    priority: int = 100
    enabled: bool = True

@dataclass
class ThreatEvent:
    event_id: str
    source_ip: str
    event_type: str
    risk_level: RiskLevel
    description: str
    timestamp: datetime
    mitigated: bool = False

print("Layer 1: Entities defined")
print("  - User, Device, AccessRequest")
print("  - SecurityPolicy, ThreatEvent")
print("  - RiskLevel, AccessDecision")

Use Cases และ Repository Interfaces

SASE Security กับ Clean Architecture —
# === Layer 2: Use Cases ===
from abc import ABC, abstractmethod
from typing import List, Optional

# --- Interfaces (Ports) ---

class PolicyRepository(ABC):
    """Interface สำหรับ Policy Storage"""
    @abstractmethod
    def get_policies(self, destination: str) -> List[SecurityPolicy]:
        pass

    @abstractmethod
    def save_policy(self, policy: SecurityPolicy) -> bool:
        pass

class ThreatDetector(ABC):
    """Interface สำหรับ Threat Detection"""
    @abstractmethod
    def analyze(self, request: AccessRequest) -> Optional[ThreatEvent]:
        pass

class AccessGateway(ABC):
    """Interface สำหรับ Access Enforcement"""
    @abstractmethod
    def enforce(self, request: AccessRequest, decision: AccessDecision) -> bool:
        pass

class AuditLogger(ABC):
    """Interface สำหรับ Audit Logging"""
    @abstractmethod
    def log_access(self, request: AccessRequest, decision: AccessDecision, reason: str):
        pass

# --- Use Cases ---

class EvaluateAccessUseCase:
    """Use Case: ประเมิน Access Request"""

    def __init__(self, policy_repo: PolicyRepository,
                 threat_detector: ThreatDetector,
                 access_gateway: AccessGateway,
                 audit_logger: AuditLogger):
        self.policy_repo = policy_repo
        self.threat_detector = threat_detector
        self.gateway = access_gateway
        self.audit = audit_logger

    def execute(self, request: AccessRequest) -> AccessDecision:
        """ประเมินและตัดสินใจ Access"""

        # 1. ตรวจ Threat
        threat = self.threat_detector.analyze(request)
        if threat and threat.risk_level.value >= RiskLevel.HIGH.value:
            self.audit.log_access(request, AccessDecision.DENY,
                                  f"Threat detected: {threat.event_type}")
            self.gateway.enforce(request, AccessDecision.DENY)
            return AccessDecision.DENY

        # 2. ตรวจ Device Compliance
        if not request.device.compliant:
            self.audit.log_access(request, AccessDecision.DENY,
                                  "Device not compliant")
            self.gateway.enforce(request, AccessDecision.DENY)
            return AccessDecision.DENY

        # 3. ตรวจ Policies
        policies = self.policy_repo.get_policies(request.destination)
        for policy in sorted(policies, key=lambda p: p.priority):
            if not policy.enabled:
                continue
            if self._matches(request, policy):
                decision = policy.action
                self.audit.log_access(request, decision,
                                      f"Policy: {policy.name}")
                self.gateway.enforce(request, decision)
                return decision

        # 4. Default: ต้องมี MFA
        if not request.user.mfa_enabled:
            self.audit.log_access(request, AccessDecision.CHALLENGE,
                                  "MFA required")
            return AccessDecision.CHALLENGE

        # 5. Default Allow
        self.audit.log_access(request, AccessDecision.ALLOW, "Default allow")
        self.gateway.enforce(request, AccessDecision.ALLOW)
        return AccessDecision.ALLOW

    def _matches(self, request, policy):
        """ตรวจสอบว่า Request ตรงกับ Policy Conditions"""
        conditions = policy.conditions
        if "department" in conditions:
            if request.user.department not in conditions["department"]:
                return False
        if "managed_device" in conditions:
            if request.device.managed != conditions["managed_device"]:
                return False
        if "min_risk_score" in conditions:
            if request.user.risk_score < conditions["min_risk_score"]:
                return False
        return True

# === Unit Test ===
class MockPolicyRepo(PolicyRepository):
    def __init__(self, policies=None):
        self.policies = policies or []
    def get_policies(self, destination):
        return self.policies
    def save_policy(self, policy):
        self.policies.append(policy)
        return True

class MockThreatDetector(ThreatDetector):
    def __init__(self, threat=None):
        self.threat = threat
    def analyze(self, request):
        return self.threat

class MockGateway(AccessGateway):
    def __init__(self):
        self.last_decision = None
    def enforce(self, request, decision):
        self.last_decision = decision
        return True

class MockAuditLogger(AuditLogger):
    def __init__(self):
        self.logs = []
    def log_access(self, request, decision, reason):
        self.logs.append({"decision": decision, "reason": reason})

# Test
user = User("u1", "alice@example.com", "engineering", "developer", mfa_enabled=True)
device = Device("d1", "macOS", "14.0", compliant=True, managed=True)
req = AccessRequest(user, device, "10.0.0.1", "api.internal", datetime.now())

policy = SecurityPolicy("p1", "Allow Engineering",
    conditions={"department": ["engineering"]},
    action=AccessDecision.ALLOW, priority=1)

repo = MockPolicyRepo([policy])
detector = MockThreatDetector()
gateway = MockGateway()
logger = MockAuditLogger()

uc = EvaluateAccessUseCase(repo, detector, gateway, logger)
result = uc.execute(req)
print(f"\nAccess Decision: {result.value}")
print(f"Audit Logs: {len(logger.logs)}")

Adapters และ Infrastructure

# === Layer 3 & 4: Adapters + Infrastructure ===

# --- Cloudflare Gateway Adapter ---
class CloudflareAccessGateway(AccessGateway):
    """Adapter สำหรับ Cloudflare Zero Trust"""

    def __init__(self, api_token, account_id):
        self.api_token = api_token
        self.account_id = account_id
        self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"

    def enforce(self, request, decision):
        """Enforce ผ่าน Cloudflare Access"""
        import requests
        headers = {"Authorization": f"Bearer {self.api_token}"}

        if decision == AccessDecision.DENY:
            # Block IP
            payload = {
                "mode": "block",
                "configuration": {
                    "target": "ip",
                    "value": request.source_ip,
                },
                "notes": f"Blocked by SASE policy for {request.user.email}",
            }
            resp = requests.post(
                f"{self.base_url}/firewall/access_rules/rules",
                headers=headers, json=payload,
            )
            return resp.status_code == 200
        return True

# --- PostgreSQL Policy Repository ---
class PostgresPolicyRepository(PolicyRepository):
    """Adapter สำหรับ PostgreSQL"""

    def __init__(self, connection_string):
        import psycopg2
        self.conn = psycopg2.connect(connection_string)

    def get_policies(self, destination):
        cur = self.conn.cursor()
        cur.execute(
            "SELECT * FROM security_policies WHERE destination = %s AND enabled = true ORDER BY priority",
            (destination,)
        )
        rows = cur.fetchall()
        return [self._row_to_policy(r) for r in rows]

    def save_policy(self, policy):
        cur = self.conn.cursor()
        cur.execute(
            "INSERT INTO security_policies (policy_id, name, conditions, action, priority, enabled) VALUES (%s, %s, %s, %s, %s, %s)",
            (policy.policy_id, policy.name, str(policy.conditions), policy.action.value, policy.priority, policy.enabled)
        )
        self.conn.commit()
        return True

    def _row_to_policy(self, row):
        return SecurityPolicy(
            policy_id=row[0], name=row[1],
            conditions=eval(row[2]), action=AccessDecision(row[3]),
            priority=row[4], enabled=row[5],
        )

# --- FastAPI Controller ---
# from fastapi import FastAPI, HTTPException
# app = FastAPI()
#
# @app.post("/evaluate")
# async def evaluate_access(request_data: dict):
#     # Build AccessRequest from request_data
#     # Call EvaluateAccessUseCase
#     # Return decision
#     pass
#
# @app.get("/policies")
# async def list_policies(destination: str):
#     policies = policy_repo.get_policies(destination)
#     return {"policies": [p.__dict__ for p in policies]}

print("\nClean Architecture Layers:")
print("  Layer 1 (Entities): User, Device, Policy, Threat")
print("  Layer 2 (Use Cases): EvaluateAccess, DetectThreat")
print("  Layer 3 (Adapters): Cloudflare, Zscaler, PostgreSQL")
print("  Layer 4 (Frameworks): FastAPI, PostgreSQL Driver")

Best Practices

  • Dependency Rule: Dependencies ชี้เข้าด้านในเสมอ Layer ในไม่รู้จัก Layer นอก
  • Interfaces: ใช้ Abstract Classes (Interfaces) เชื่อม Layers ทำให้เปลี่ยน Implementation ได้
  • Unit Testing: ทดสอบ Use Cases ด้วย Mock Repositories ไม่ต้องมี Infrastructure จริง
  • Provider Agnostic: เปลี่ยน SASE Provider (Cloudflare → Zscaler) โดยแก้แค่ Adapter Layer
  • Domain Events: ใช้ Events สื่อสารระหว่าง Use Cases แทน Direct Coupling
  • Separation of Concerns: แยก Security Logic ออกจาก Framework Code ชัดเจน

Clean Architecture คืออะไร

หลักการออกแบบซอฟต์แวร์ของ Uncle Bob แบ่งเป็น Layers แยก Concerns Entities Use Cases Interface Adapters Frameworks Dependencies ชี้เข้าด้านในเสมอ