ai

Better Uptime การใช้งาน 2026 — ระบบ Monitoring

Better Uptime การใช้งาน 2026 — ระบบ Monitoring
SASE Security กับ Clean Architecture — วิธีออกแบบระบบ Security ด้วย SASE และ Clean Architecture | SiamCafe Blog เรียนรู้การออกแบบระบบ Security ด้วย SASE Framework ร่วมกับ Clean Architecture Principles ตั้งแต่ Layer Design, Security Policies, Use Cases ไปจนถึง Implementation พร้อม Code จริง FAQ_Q:Clean Architecture คืออะไร FAQ_A:Clean Architecture เป็นหลักการออกแบบซอฟต์แวร์ของ Robert C. Martin (Uncle Bob) แบ่งระบบเป็น Layers ที่แยก Concerns ชัดเจน Entities (Business Rules), Use Cases (Application Logic), Interface Adapters (Controllers, Presenters), Frameworks (UI, Database) Dependencies ชี้เข้าด้านในเสมอ FAQ_Q:SASE กับ Clean Architecture เกี่ยวกันอย่างไร FAQ_A:Clean Architecture ช่วยออกแบบระบบ SASE ให้ Maintainable และ Testable แยก Security Logic (Use Cases) ออกจาก Infrastructure (Frameworks) เปลี่ยน SASE Provider ได้โดยไม่กระทบ Business Logic ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง FAQ_Q:Dependency Rule คืออะไร FAQ_A:Dependency Rule เป็นกฎสำคัญของ Clean Architecture กำหนดว่า Dependencies ต้องชี้จาก Layer นอกเข้า Layer ใน เช่น Frameworks ขึ้นกับ Interface Adapters, Interface Adapters ขึ้นกับ Use Cases, Use Cases ขึ้นกับ Entities Layer ในไม่รู้จัก Layer นอก FAQ_Q: วิธีทดสอบ Security Policies ทำอย่างไร FAQ_A: ใช้ Clean Architecture แยก Security Logic เป็น Use Cases ทดสอบด้วย Unit Tests โดยไม่ต้องมี Infrastructure จริง Mock Repository Interfaces ทดสอบ Policy Evaluation, Access Control, Threat Detection แยกจาก Network, Database, External APIs BODY_START

SASE Security และ Clean Architecture

Better Uptime การใช้งาน 2026 — ระบบ Monitoring

SASE (Secure Access Service Edge) เป็น Framework ที่รวม Network และ Security Services เมื่อออกแบบด้วย Clean Architecture ได้ระบบที่ Maintainable, Testable และเปลี่ยน Provider ได้ง่าย แยก Business Logic ออกจาก Infrastructure ชัดเจน

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Voice Cloning Service Mesh Setup

Clean Architecture แบ่งระบบเป็น 4 Layers หลัก โดย Dependencies ชี้เข้าด้านในเสมอ (Dependency Rule) ทำให้ทดสอบ Security Policies ได้โดยไม่ต้องมี Infrastructure จริง

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ LocalAI Self-hosted Business Continuity

Clean Architecture Layers สำหรับ SASE

# === Clean Architecture สำหรับ SASE Security ===

# โครงสร้าง Project



# sase_security/

# ├── domain/                    # Layer 1: Entities

# │   ├── entities/

# │   │   ├── user.py

# │   │   ├── device.py

# │   │   ├── policy.py

# │   │   └── threat.py

# │   └── value_objects/

# │       ├── ip_address.py

# │       └── risk_score.py

# │

# ├── use_cases/                 # Layer 2: Use Cases

# │   ├── evaluate_access.py

# │   ├── detect_threat.py

# │   ├── enforce_policy.py

# │   └── interfaces/

# │       ├── policy_repository.py

# │       ├── threat_detector.py

# │       └── access_gateway.py

# │

# ├── adapters/                  # Layer 3: Interface Adapters

# │   ├── controllers/

# │   │   ├── access_controller.py

# │   │   └── policy_controller.py

# │   ├── presenters/

# │   │   └── security_dashboard.py

# │   └── gateways/

# │       ├── cloudflare_gateway.py

# │       └── zscaler_gateway.py

# │

# └── infrastructure/            # Layer 4: Frameworks

#     ├── api/

#     │   └── fastapi_app.py

#     ├── database/

#     │   └── postgres_repository.py

#     └── external/

#         ├── cloudflare_client.py

#         └── zscaler_client.py



# === Layer 1: Entities (Domain) ===



from dataclasses import dataclass, field

from typing import List, Optional

from enum import Enum

from datetime import datetime



class RiskLevel(Enum):

    LOW = 1

    MEDIUM = 2

    HIGH = 3

    CRITICAL = 4



class AccessDecision(Enum):

    ALLOW = "allow"

    DENY = "deny"

    CHALLENGE = "challenge"  # MFA Required



@dataclass

class User:

    user_id: str

    email: str

    department: str

    role: str

    mfa_enabled: bool = False

    risk_score: float = 0.0



@dataclass

class Device:

    device_id: str

    os: str

    os_version: str

    compliant: bool = True

    managed: bool = True

    last_seen: Optional[datetime] = None



@dataclass

class AccessRequest:

    user: User

    device: Device

    source_ip: str

    destination: str

    timestamp: datetime

    location: str = ""

    protocol: str = "HTTPS"



@dataclass

class SecurityPolicy:

    policy_id: str

    name: str

    conditions: dict = field(default_factory=dict)

    action: AccessDecision = AccessDecision.DENY

    priority: int = 100

    enabled: bool = True



@dataclass

class ThreatEvent:

    event_id: str

    source_ip: str

    event_type: str

    risk_level: RiskLevel

    description: str

    timestamp: datetime

    mitigated: bool = False



print("Layer 1: Entities defined")

print("  - User, Device, AccessRequest")

print("  - SecurityPolicy, ThreatEvent")

print("  - RiskLevel, AccessDecision")

Use Cases และ Repository Interfaces

Better Uptime การใช้งาน 2026 — ระบบ Monitoring
# === Layer 2: Use Cases ===

from abc import ABC, abstractmethod

from typing import List, Optional



# --- Interfaces (Ports) ---



class PolicyRepository(ABC):

    """Interface สำหรับ Policy Storage"""

    @abstractmethod

    def get_policies(self, destination: str) -> List[SecurityPolicy]:

        pass



    @abstractmethod

    def save_policy(self, policy: SecurityPolicy) -> bool:

        pass



class ThreatDetector(ABC):

    """Interface สำหรับ Threat Detection"""

    @abstractmethod

    def analyze(self, request: AccessRequest) -> Optional[ThreatEvent]:

        pass



class AccessGateway(ABC):

    """Interface สำหรับ Access Enforcement"""

    @abstractmethod

    def enforce(self, request: AccessRequest, decision: AccessDecision) -> bool:

        pass



class AuditLogger(ABC):

    """Interface สำหรับ Audit Logging"""

    @abstractmethod

    def log_access(self, request: AccessRequest, decision: AccessDecision, reason: str):

        pass



# --- Use Cases ---



class EvaluateAccessUseCase:

    """Use Case: ประเมิน Access Request"""



    def __init__(self, policy_repo: PolicyRepository,

                 threat_detector: ThreatDetector,

                 access_gateway: AccessGateway,

                 audit_logger: AuditLogger):

        self.policy_repo = policy_repo

        self.threat_detector = threat_detector

        self.gateway = access_gateway

        self.audit = audit_logger



    def execute(self, request: AccessRequest) -> AccessDecision:

        """ประเมินและตัดสินใจ Access"""



        # 1. ตรวจ Threat

        threat = self.threat_detector.analyze(request)

        if threat and threat.risk_level.value >= RiskLevel.HIGH.value:

            self.audit.log_access(request, AccessDecision.DENY,

                                  f"Threat detected: {threat.event_type}")

            self.gateway.enforce(request, AccessDecision.DENY)

            return AccessDecision.DENY



        # 2. ตรวจ Device Compliance

        if not request.device.compliant:

            self.audit.log_access(request, AccessDecision.DENY,

                                  "Device not compliant")

            self.gateway.enforce(request, AccessDecision.DENY)

            return AccessDecision.DENY



        # 3. ตรวจ Policies

        policies = self.policy_repo.get_policies(request.destination)

        for policy in sorted(policies, key=lambda p: p.priority):

            if not policy.enabled:

                continue

            if self._matches(request, policy):

                decision = policy.action

                self.audit.log_access(request, decision,

                                      f"Policy: {policy.name}")

                self.gateway.enforce(request, decision)

                return decision



        # 4. Default: ต้องมี MFA

        if not request.user.mfa_enabled:

            self.audit.log_access(request, AccessDecision.CHALLENGE,

                                  "MFA required")

            return AccessDecision.CHALLENGE



        # 5. Default Allow

        self.audit.log_access(request, AccessDecision.ALLOW, "Default allow")

        self.gateway.enforce(request, AccessDecision.ALLOW)

        return AccessDecision.ALLOW



    def _matches(self, request, policy):

        """ตรวจสอบว่า Request ตรงกับ Policy Conditions"""

        conditions = policy.conditions

        if "department" in conditions:

            if request.user.department not in conditions["department"]:

                return False

        if "managed_device" in conditions:

            if request.device.managed != conditions["managed_device"]:

                return False

        if "min_risk_score" in conditions:

            if request.user.risk_score < conditions["min_risk_score"]:

                return False

        return True



# === Unit Test ===

class MockPolicyRepo(PolicyRepository):

    def __init__(self, policies=None):

        self.policies = policies or []

    def get_policies(self, destination):

        return self.policies

    def save_policy(self, policy):

        self.policies.append(policy)

        return True



class MockThreatDetector(ThreatDetector):

    def __init__(self, threat=None):

        self.threat = threat

    def analyze(self, request):

        return self.threat



class MockGateway(AccessGateway):

    def __init__(self):

        self.last_decision = None

    def enforce(self, request, decision):

        self.last_decision = decision

        return True



class MockAuditLogger(AuditLogger):

    def __init__(self):

        self.logs = []

    def log_access(self, request, decision, reason):

        self.logs.append({"decision": decision, "reason": reason})



# Test

user = User("u1", "alice@example.com", "engineering", "developer", mfa_enabled=True)

device = Device("d1", "macOS", "14.0", compliant=True, managed=True)

req = AccessRequest(user, device, "10.0.0.1", "api.internal", datetime.now())



policy = SecurityPolicy("p1", "Allow Engineering",

    conditions={"department": ["engineering"]},

    action=AccessDecision.ALLOW, priority=1)



repo = MockPolicyRepo([policy])

detector = MockThreatDetector()

gateway = MockGateway()

logger = MockAuditLogger()



uc = EvaluateAccessUseCase(repo, detector, gateway, logger)

result = uc.execute(req)

print(f"\nAccess Decision: {result.value}")

print(f"Audit Logs: {len(logger.logs)}")

Adapters และ Infrastructure

# === Layer 3 & 4: Adapters + Infrastructure ===



# --- Cloudflare Gateway Adapter ---

class CloudflareAccessGateway(AccessGateway):

    """Adapter สำหรับ Cloudflare Zero Trust"""



    def __init__(self, api_token, account_id):

        self.api_token = api_token

        self.account_id = account_id

        self.base_url = f"https://api.cloudflare.com/client/v4/accounts/{account_id}"



    def enforce(self, request, decision):

        """Enforce ผ่าน Cloudflare Access"""

        import requests

        headers = {"Authorization": f"Bearer {self.api_token}"}



        if decision == AccessDecision.DENY:

            # Block IP

            payload = {

                "mode": "block",

                "configuration": {

                    "target": "ip",

                    "value": request.source_ip,

                },

                "notes": f"Blocked by SASE policy for {request.user.email}",

            }

            resp = requests.post(

                f"{self.base_url}/firewall/access_rules/rules",

                headers=headers, json=payload,

            )

            return resp.status_code == 200

        return True



# --- PostgreSQL Policy Repository ---

class PostgresPolicyRepository(PolicyRepository):

    """Adapter สำหรับ PostgreSQL"""



    def __init__(self, connection_string):

        import psycopg2

        self.conn = psycopg2.connect(connection_string)



    def get_policies(self, destination):

        cur = self.conn.cursor()

        cur.execute(

            "SELECT * FROM security_policies WHERE destination = %s AND enabled = true ORDER BY priority",

            (destination,)

        )

        rows = cur.fetchall()

        return [self._row_to_policy(r) for r in rows]



    def save_policy(self, policy):

        cur = self.conn.cursor()

        cur.execute(

            "INSERT INTO security_policies (policy_id, name, conditions, action, priority, enabled) VALUES (%s, %s, %s, %s, %s, %s)",

            (policy.policy_id, policy.name, str(policy.conditions), policy.action.value, policy.priority, policy.enabled)

        )

        self.conn.commit()

        return True



    def _row_to_policy(self, row):

        return SecurityPolicy(

            policy_id=row[0], name=row[1],

            conditions=eval(row[2]), action=AccessDecision(row[3]),

            priority=row[4], enabled=row[5],

        )



# --- FastAPI Controller ---

# from fastapi import FastAPI, HTTPException

# app = FastAPI()

#

# @app.post("/evaluate")

# async def evaluate_access(request_data: dict):

#     # Build AccessRequest from request_data

#     # Call EvaluateAccessUseCase

#     # Return decision

#     pass

#

# @app.get("/policies")

# async def list_policies(destination: str):

#     policies = policy_repo.get_policies(destination)

#     return {"policies": [p.__dict__ for p in policies]}



print("\nClean Architecture Layers:")

print("  Layer 1 (Entities): User, Device, Policy, Threat")

print("  Layer 2 (Use Cases): EvaluateAccess, DetectThreat")

print("  Layer 3 (Adapters): Cloudflare, Zscaler, PostgreSQL")

print("  Layer 4 (Frameworks): FastAPI, PostgreSQL Driver")

Best Practices

  • Dependency Rule: Dependencies ชี้เข้าด้านในเสมอ Layer ในไม่รู้จัก Layer นอก
  • Interfaces: ใช้ Abstract Classes (Interfaces) เชื่อม Layers ทำให้เปลี่ยน Implementation ได้
  • Unit Testing: ทดสอบ Use Cases ด้วย Mock Repositories ไม่ต้องมี Infrastructure จริง
  • Provider Agnostic: เปลี่ยน SASE Provider (Cloudflare → Zscaler) โดยแก้แค่ Adapter Layer
  • Domain Events: ใช้ Events สื่อสารระหว่าง Use Cases แทน Direct Coupling
  • Separation of Concerns: แยก Security Logic ออกจาก Framework Code ชัดเจน

Clean Architecture คืออะไร

หลักการออกแบบซอฟต์แวร์ของ Uncle Bob แบ่งเป็น Layers แยก Concerns Entities Use Cases Interface Adapters Frameworks Dependencies ชี้เข้าด้านในเสมอ

แนะนำเพิ่มเติม — iCafeForex

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง TensorFlow Serving SSL TLS Certificate

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง