Technology

Better Uptime การใช้งาน 2026 — ระบบ Monitoring และ Incident Management ครบวงจร

betteruptime คมอฉบบสมบรณ 2026
Betteruptime | SiamCafe Blog
2025-06-15· อ. บอม — SiamCafe.net· 1,476 คำ
SASE Security กับ Clean Architecture — วิธีออกแบบระบบ Security ด้วย SASE และ Clean Architecture | SiamCafe Blog เรียนรู้การออกแบบระบบ Security ด้วย SASE Framework ร่วมกับ Clean Architecture Principles ตั้งแต่ Layer Design, Security Policies, Use Cases ไปจนถึง Implementation พร้อม Code จริง FAQ_Q:Clean Architecture คืออะไร FAQ_A:Clean Architecture เป็นหลักการออกแบบซอฟต์แวร์ของ Robert C. Martin (Uncle Bob) แบ่งระบบเป็น Layers ที่แยก Concerns ชัดเจน Entities (Business Rules), Use Cases (Application Logic), Interface Adapters (Controllers, Presenters), Frameworks (UI, Database) Dependencies ชี้เข้าด้านในเสมอ FAQ_Q:SASE กับ Clean Architecture เกี่ยวกันอย่างไร FAQ_A:Clean Architecture ช่วยออกแบบระบบ SASE ให้ Maintainable และ Testable แยก Security Logic (Use Cases) ออกจาก Infrastructure (Frameworks) เปลี่ยน SASE Provider ได้โดยไม่กระทบ Business Logic ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง FAQ_Q:Dependency Rule คืออะไร FAQ_A:Dependency Rule เป็นกฎสำคัญของ Clean Architecture กำหนดว่า Dependencies ต้องชี้จาก Layer นอกเข้า Layer ใน เช่น Frameworks ขึ้นกับ Interface Adapters, Interface Adapters ขึ้นกับ Use Cases, Use Cases ขึ้นกับ Entities Layer ในไม่รู้จัก Layer นอก FAQ_Q: วิธีทดสอบ Security Policies ทำอย่างไร FAQ_A: ใช้ Clean Architecture แยก Security Logic เป็น Use Cases ทดสอบด้วย Unit Tests โดยไม่ต้องมี Infrastructure จริง Mock Repository Interfaces ทดสอบ Policy Evaluation, Access Control, Threat Detection แยกจาก Network, Database, External APIs BODY_START

SASE Security และ Clean Architecture

SASE (Secure Access Service Edge) เป็น Framework ที่รวม Network และ Security Services เมื่อออกแบบด้วย Clean Architecture ได้ระบบที่ Maintainable, Testable และเปลี่ยน Provider ได้ง่าย แยก Business Logic ออกจาก Infrastructure ชัดเจน

Clean Architecture แบ่งระบบเป็น 4 Layers หลัก โดย Dependencies ชี้เข้าด้านในเสมอ (Dependency Rule) ทำให้ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง

Clean Architecture Layers สำหรับ SASE

# === Clean Architecture สำหรับ SASE Security ===
# โครงสร้าง Project

# sase_security/
# ├── domain/                    # Layer 1: Entities
# │   ├── entities/
# │   │   ├── user.py
# │   │   ├── device.py
# │   │   ├── policy.py
# │   │   └── threat.py
# │   └── value_objects/
# │       ├── ip_address.py
# │       └── risk_score.py
# │
# ├── use_cases/                 # Layer 2: Use Cases
# │   ├── evaluate_access.py
# │   ├── detect_threat.py
# │   ├── enforce_policy.py
# │   └── interfaces/
# │       ├── policy_repository.py
# │       ├── threat_detector.py
# │       └── access_gateway.py
# │
# ├── adapters/                  # Layer 3: Interface Adapters
# │   ├── controllers/
# │   │   ├── access_controller.py
# │   │   └── policy_controller.py
# │   ├── presenters/
# │   │   └── security_dashboard.py
# │   └── gateways/
# │       ├── cloudflare_gateway.py
# │       └── zscaler_gateway.py
# │
# └── infrastructure/            # Layer 4: Frameworks
#     ├── api/
#     │   └── fastapi_app.py
#     ├── database/
#     │   └── postgres_repository.py
#     └── external/
#         ├── cloudflare_client.py
#         └── zscaler_client.py

# === Layer 1: Entities (Domain) ===

from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum
from datetime import datetime

class RiskLevel(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

class AccessDecision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    CHALLENGE = "challenge"  # MFA Required

@dataclass
class User:
    user_id: str
    email: str
    department: str
    role: str
    mfa_enabled: bool = False
    risk_score: float = 0.0

@dataclass
class Device:
    device_id: str
    os: str
    os_version: str
    compliant: bool = True
    managed: bool = True
    last_seen: Optional[datetime] = None

@dataclass
class AccessRequest:
    user: User
    device: Device
    source_ip: str
    destination: str
    timestamp: datetime
    location: str = ""
    protocol: str = "HTTPS"

@dataclass
class SecurityPolicy:
    policy_id: str
    name: str
    conditions: dict = field(default_factory=dict)
    action: AccessDecision = AccessDecision.DENY
    priority: int = 100
    enabled: bool = True

@dataclass
class ThreatEvent:
    event_id: str
    source_ip: str
    event_type: str
    risk_level: RiskLevel
    description: str
    timestamp: datetime
    mitigated: bool = False

print("Layer 1: Entities defined")
print("  - User, Device, AccessRequest")
print("  - SecurityPolicy, ThreatEvent")
print("  - RiskLevel, AccessDecision")

Use Cases และ Repository Interfaces

# === Layer 2: Use Cases ===
from abc import ABC, abstractmethod
from typing import List, Optional

# --- Interfaces (Ports) ---

class PolicyRepository(ABC):
    """Interface สำหรับ Policy Storage"""
    @abstractmethod
    def get_policies(self, destination: str) -> List[SecurityPolicy]:
        pass

    @abstractmethod
    def save_policy(self, policy: SecurityPolicy) -> bool:
        pass

class ThreatDetector(ABC):
    """Interface สำหรับ Threat Detection"""
    @abstractmethod
    def analyze(self, request: AccessRequest) -> Optional[ThreatEvent]:
        pass

class AccessGateway(ABC):
    """Interface สำหรับ Access Enforcement"""
    @abstractmethod
    def enforce(self, request: AccessRequest, decision: AccessDecision) -> bool:
        pass

class AuditLogger(ABC):
    """Interface สำหรับ Audit Logging"""
    @abstractmethod
    def log_access(self, request: AccessRequest, decision: AccessDecision, reason: str):
        pass

# --- Use Cases ---

class EvaluateAccessUseCase:
    """Use Case: ประเมิน Access Request"""

    def __init__(self, policy_repo: PolicyRepository,
                 threat_detector: ThreatDetector,
                 access_gateway: AccessGateway,
                 audit_logger: AuditLogger):
        self.policy_repo = policy_repo
        self.threat_detector = threat_detector
        self.gateway = access_gateway
        self.audit = audit_logger

    def execute(self, request: AccessRequest) -> AccessDecision:
        """ประเมินและตัดสินใจ Access"""

        # 1. ตรวจ Threat
        threat = self.threat_detector.analyze(request)
        if threat and threat.risk_level.value >= RiskLevel.HIGH.value:
            self.audit.log_access(request, AccessDecision.DENY,
                                  f"Threat detected: {threat.event_type}")
            self.gateway.enforce(request, AccessDecision.DENY)
            return AccessDecision.DENY

        # 2. ตรวจ Device Compliance
        if not request.device.compliant:
            self.audit.log_access(request, AccessDecision.DENY,
                                  "Device not compliant")
            self.gateway.enforce(request, AccessDecision.DENY)
            return AccessDecision.DENY

        # 3. ตรวจ Policies
        policies = self.policy_repo.get_policies(request.destination)
        for policy in sorted(policies, key=lambda p: p.priority):
            if not policy.enabled:
                continue
            if self._matches(request, policy):
                decision = policy.action
                self.audit.log_access(request, decision,
                                      f"Policy: {policy.name}")
                self.gateway.enforce(request, decision)
                return decision

        # 4. Default: ต้องมี MFA
        if not request.user.mfa_enabled:
            self.audit.log_access(request, AccessDecision.CHALLENGE,
                                  "MFA required")
            return AccessDecision.CHALLENGE

        # 5. Default Allow
        self.audit.log_access(request, AccessDecision.ALLOW, "Default allow")
        self.gateway.enforce(request, AccessDecision.ALLOW)
        return AccessDecision.ALLOW

    def _matches(self, request, policy):
        """ตรวจสอบว่า Request ตรงกับ Policy Conditions"""
        conditions = policy.conditions
        if "department" in conditions:
            if request.user.department not in conditions["department"]:
                return False
        if "managed_device" in conditions:
            if request.device.managed != conditions["managed_device"]:
                return False
        if "min_risk_score" in conditions:
            if request.user.risk_score < conditions["min_risk_score"]:
                return False
        return True

# === Unit Test ===
class MockPolicyRepo(PolicyRepository):
    def __init__(self, policies=None):
        self.policies = policies or []
    def get_policies(self, destination):
        return self.policies
    def save_policy(self, policy):
        self.policies.append(policy)
        return True

class MockThreatDetector(ThreatDetector):
    def __init__(self, threat=None):
        self.threat = threat
    def analyze(self, request):
        return self.threat

class MockGateway(AccessGateway):
    def __init__(self):
        self.last_decision = None
    def enforce(self, request, decision):
        self.last_decision = decision
        return True

class MockAuditLogger(AuditLogger):
    def __init__(self):
        self.logs = []
    def log_access(self, request, decision, reason):
        self.logs.append({"decision": decision, "reason": reason})

# Test
user = User("u1", "alice@example.com", "engineering", "developer", mfa_enabled=True)
device = Device("d1", "macOS", "14.0", compliant=True, managed=True)
req = AccessRequest(user, device, "10.0.0.1", "api.internal", datetime.now())

policy = SecurityPolicy("p1", "Allow Engineering",
    conditions={"department": ["engineering"]},
    action=AccessDecision.ALLOW, priority=1)

repo = MockPolicyRepo([policy])
detector = MockThreatDetector()
gateway = MockGateway()
logger = MockAuditLogger()

uc = EvaluateAccessUseCase(repo, detector, gateway, logger)
result = uc.execute(req)
print(f"\nAccess Decision: {result.value}")
print(f"Audit Logs: {len(logger.logs)}")

Adapters และ Infrastructure

# === Layer 3 & 4: Adapters + Infrastructure ===

# --- Cloudflare Gateway Adapter ---
class CloudflareAccessGateway(AccessGateway):
    """Adapter สำหรับ Cloudflare Zero Trust"""

    def __init__(self, api_token, account_id):
        self.api_token = api_token
        self.account_id = account_id
        self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"

    def enforce(self, request, decision):
        """Enforce ผ่าน Cloudflare Access"""
        import requests
        headers = {"Authorization": f"Bearer {self.api_token}"}

        if decision == AccessDecision.DENY:
            # Block IP
            payload = {
                "mode": "block",
                "configuration": {
                    "target": "ip",
                    "value": request.source_ip,
                },
                "notes": f"Blocked by SASE policy for {request.user.email}",
            }
            resp = requests.post(
                f"{self.base_url}/firewall/access_rules/rules",
                headers=headers, json=payload,
            )
            return resp.status_code == 200
        return True

# --- PostgreSQL Policy Repository ---
class PostgresPolicyRepository(PolicyRepository):
    """Adapter สำหรับ PostgreSQL"""

    def __init__(self, connection_string):
        import psycopg2
        self.conn = psycopg2.connect(connection_string)

    def get_policies(self, destination):
        cur = self.conn.cursor()
        cur.execute(
            "SELECT * FROM security_policies WHERE destination = %s AND enabled = true ORDER BY priority",
            (destination,)
        )
        rows = cur.fetchall()
        return [self._row_to_policy(r) for r in rows]

    def save_policy(self, policy):
        cur = self.conn.cursor()
        cur.execute(
            "INSERT INTO security_policies (policy_id, name, conditions, action, priority, enabled) VALUES (%s, %s, %s, %s, %s, %s)",
            (policy.policy_id, policy.name, str(policy.conditions), policy.action.value, policy.priority, policy.enabled)
        )
        self.conn.commit()
        return True

    def _row_to_policy(self, row):
        return SecurityPolicy(
            policy_id=row[0], name=row[1],
            conditions=eval(row[2]), action=AccessDecision(row[3]),
            priority=row[4], enabled=row[5],
        )

# --- FastAPI Controller ---
# from fastapi import FastAPI, HTTPException
# app = FastAPI()
#
# @app.post("/evaluate")
# async def evaluate_access(request_data: dict):
#     # Build AccessRequest from request_data
#     # Call EvaluateAccessUseCase
#     # Return decision
#     pass
#
# @app.get("/policies")
# async def list_policies(destination: str):
#     policies = policy_repo.get_policies(destination)
#     return {"policies": [p.__dict__ for p in policies]}

print("\nClean Architecture Layers:")
print("  Layer 1 (Entities): User, Device, Policy, Threat")
print("  Layer 2 (Use Cases): EvaluateAccess, DetectThreat")
print("  Layer 3 (Adapters): Cloudflare, Zscaler, PostgreSQL")
print("  Layer 4 (Frameworks): FastAPI, PostgreSQL Driver")

Best Practices

Clean Architecture คืออะไร

หลักการออกแบบซอฟต์แวร์ของ Uncle Bob แบ่งเป็น Layers แยก Concerns Entities Use Cases Interface Adapters Frameworks Dependencies ชี้เข้าด้านในเสมอ

SASE กับ Clean Architecture เกี่ยวกันอย่างไร

Clean Architecture ช่วยออกแบบ SASE ให้ Maintainable Testable แยก Security Logic ออกจาก Infrastructure เปลี่ยน Provider ได้ไม่กระทบ Business Logic ทดสอบ Policies ไม่ต้องมี Infrastructure จริง

Dependency Rule คืออะไร

กฎสำคัญกำหนดว่า Dependencies ชี้จาก Layer นอกเข้า Layer ใน Frameworks ขึ้นกับ Adapters, Adapters ขึ้นกับ Use Cases, Use Cases ขึ้นกับ Entities Layer ในไม่รู้จัก Layer นอก

วิธีทดสอบ Security Policies ทำอย่างไร

แยก Security Logic เป็น Use Cases ทดสอบด้วย Unit Tests Mock Repository Interfaces ทดสอบ Policy Evaluation Access Control Threat Detection แยกจาก Network Database External APIs

สรุป

SASE Security ร่วมกับ Clean Architecture ให้ระบบที่ Maintainable Testable และ Provider Agnostic แบ่งเป็น 4 Layers ตาม Dependency Rule ใช้ Interfaces เชื่อม Layers ทดสอบ Use Cases ด้วย Mock เปลี่ยน SASE Provider โดยแก้แค่ Adapter Layer ใช้ Domain Events สื่อสารระหว่าง Use Cases

📖 บทความที่เกี่ยวข้อง

AWS Glue ETL อ่านบทความ → Elasticsearch OpenSearch อ่านบทความ → Helm Chart Template อ่านบทความ → Libvirt KVM อ่านบทความ → Embedding Model อ่านบทความ →

📚 ดูบทความทั้งหมด →