SiamCafe.net Blog
Cybersecurity

SASE Security Automation Script

sase security automation script
SASE Security Automation Script | SiamCafe Blog
2025-10-06· อ. บอม — SiamCafe.net· 9,620 คำ

SASE Security Automation

Security Automation ใช้ Scripts จัดการ SASE Policies อัตโนมัติ สร้าง Firewall Rules ตรวจ Compliance ตอบสนอง Threats ลดงาน Manual ลด Human Error

SOAR Platform รวม Security Tools สร้าง Playbooks อัตโนมัติ Incident Response เมื่อตรวจจับ Threat ให้ Block Isolate Alert อัตโนมัติ

Security Policy Automation

# sase_automation.py — SASE Security Policy Automation
# pip install requests pyyaml

import json
import time
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum

class PolicyAction(Enum):
    ALLOW = "allow"
    BLOCK = "block"
    ALERT = "alert"
    QUARANTINE = "quarantine"

@dataclass
class SecurityPolicy:
    name: str
    description: str
    action: PolicyAction
    source: Dict  # {"groups": [...], "users": [...]}
    destination: Dict  # {"urls": [...], "categories": [...]}
    conditions: Dict = field(default_factory=dict)
    enabled: bool = True
    priority: int = 100

class SASEPolicyManager:
    """SASE Security Policy Manager"""

    def __init__(self, api_url, api_key):
        self.api_url = api_url
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        }
        self.policies: List[SecurityPolicy] = []
        self.audit_log: List[Dict] = []

    def add_policy(self, policy: SecurityPolicy):
        self.policies.append(policy)
        self.log("CREATE", f"Policy: {policy.name}")

    def update_policy(self, name, **kwargs):
        for p in self.policies:
            if p.name == name:
                for k, v in kwargs.items():
                    setattr(p, k, v)
                self.log("UPDATE", f"Policy: {name} -> {kwargs}")
                return True
        return False

    def delete_policy(self, name):
        self.policies = [p for p in self.policies if p.name != name]
        self.log("DELETE", f"Policy: {name}")

    def log(self, action, detail):
        self.audit_log.append({
            "timestamp": datetime.now().isoformat(),
            "action": action,
            "detail": detail,
        })

    def deploy_policies(self):
        """Deploy Policies ไป SASE Platform"""
        print(f"\nDeploying {len(self.policies)} policies...")
        for p in self.policies:
            status = "enabled" if p.enabled else "disabled"
            print(f"  [{status:>8}] {p.name}: {p.action.value} "
                  f"(priority: {p.priority})")

        # API Call
        # for policy in self.policies:
        #     resp = requests.post(
        #         f"{self.api_url}/policies",
        #         headers=self.headers,
        #         json=policy_to_dict(policy),
        #     )
        print(f"  Deployed: {len(self.policies)} policies")

    def compliance_check(self):
        """ตรวจสอบ Compliance"""
        checks = [
            ("Block known malware domains", any(
                p.action == PolicyAction.BLOCK and "malware" in str(p.destination)
                for p in self.policies)),
            ("Block unauthorized cloud storage", any(
                p.action == PolicyAction.BLOCK and "cloud-storage" in str(p.destination)
                for p in self.policies)),
            ("MFA required for admin access", True),
            ("DLP policy for PII data", True),
            ("Logging enabled for all policies", True),
        ]

        print(f"\n  Compliance Check:")
        passed = sum(1 for _, ok in checks if ok)
        for check, ok in checks:
            status = "PASS" if ok else "FAIL"
            print(f"    [{status}] {check}")
        print(f"\n  Score: {passed}/{len(checks)} ({passed/len(checks)*100:.0f}%)")

# ตัวอย่าง
manager = SASEPolicyManager("https://api.sase-vendor.com/v1", "api-key")

policies = [
    SecurityPolicy(
        "Block Malware Domains", "Block known malware C2 domains",
        PolicyAction.BLOCK,
        {"groups": ["all-users"]},
        {"categories": ["malware", "phishing", "command-and-control"]},
        priority=10,
    ),
    SecurityPolicy(
        "Allow Corporate SaaS", "Allow approved SaaS applications",
        PolicyAction.ALLOW,
        {"groups": ["all-users"]},
        {"urls": ["*.google.com", "*.microsoft.com", "*.slack.com"]},
        priority=50,
    ),
    SecurityPolicy(
        "Block Unauthorized Storage", "Block non-approved cloud storage",
        PolicyAction.BLOCK,
        {"groups": ["all-users"]},
        {"categories": ["cloud-storage"], "exclude": ["drive.google.com", "onedrive.live.com"]},
        priority=60,
    ),
    SecurityPolicy(
        "DLP PII Protection", "Block PII data upload",
        PolicyAction.BLOCK,
        {"groups": ["all-users"]},
        {"categories": ["file-sharing"]},
        conditions={"data_patterns": ["credit-card", "ssn", "passport"]},
        priority=20,
    ),
]

for p in policies:
    manager.add_policy(p)

manager.deploy_policies()
manager.compliance_check()

Threat Detection Automation

# threat_detection.py — Automated Threat Detection
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime
from enum import Enum

class ThreatSeverity(Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

@dataclass
class ThreatIndicator:
    ioc_type: str  # ip, domain, hash, url
    value: str
    source: str
    severity: ThreatSeverity
    tags: List[str] = field(default_factory=list)

@dataclass
class ThreatAlert:
    indicator: ThreatIndicator
    matched_policy: str
    user: str
    timestamp: str
    action_taken: str

class ThreatDetectionEngine:
    """Automated Threat Detection Engine"""

    def __init__(self):
        self.indicators: List[ThreatIndicator] = []
        self.alerts: List[ThreatAlert] = []
        self.blocked_count = 0

    def load_threat_feeds(self):
        """โหลด Threat Intelligence Feeds"""
        feeds = [
            ThreatIndicator("domain", "malware-c2.evil.com", "AlienVault OTX",
                           ThreatSeverity.CRITICAL, ["malware", "c2"]),
            ThreatIndicator("ip", "198.51.100.1", "AbuseIPDB",
                           ThreatSeverity.HIGH, ["botnet"]),
            ThreatIndicator("domain", "phishing-bank.com", "PhishTank",
                           ThreatSeverity.HIGH, ["phishing"]),
            ThreatIndicator("hash", "d41d8cd98f00b204e9800998ecf8427e",
                           "VirusTotal", ThreatSeverity.CRITICAL, ["ransomware"]),
            ThreatIndicator("url", "http://exploit-kit.ru/payload", "Mandiant",
                           ThreatSeverity.CRITICAL, ["exploit"]),
        ]
        self.indicators.extend(feeds)
        print(f"  Loaded {len(feeds)} indicators from threat feeds")

    def check_traffic(self, traffic_logs):
        """ตรวจสอบ Traffic กับ Threat Indicators"""
        for log in traffic_logs:
            for indicator in self.indicators:
                if indicator.value in str(log.get("destination", "")):
                    alert = ThreatAlert(
                        indicator, "Threat Intelligence Match",
                        log.get("user", "unknown"),
                        datetime.now().isoformat(),
                        "BLOCKED",
                    )
                    self.alerts.append(alert)
                    self.blocked_count += 1

    def auto_respond(self, alert: ThreatAlert):
        """Auto-respond ตาม Severity"""
        actions = {
            ThreatSeverity.CRITICAL: [
                "Block IP/Domain immediately",
                "Isolate affected endpoint",
                "Create P1 incident ticket",
                "Notify SOC team via PagerDuty",
                "Capture forensic evidence",
            ],
            ThreatSeverity.HIGH: [
                "Block IP/Domain",
                "Create P2 incident ticket",
                "Notify SOC team via Slack",
            ],
            ThreatSeverity.MEDIUM: [
                "Add to watchlist",
                "Create investigation ticket",
            ],
        }

        severity_actions = actions.get(alert.indicator.severity, ["Log only"])
        return severity_actions

    def report(self):
        """Threat Detection Report"""
        print(f"\n{'='*55}")
        print(f"Threat Detection Report")
        print(f"{'='*55}")
        print(f"  Indicators: {len(self.indicators)}")
        print(f"  Alerts: {len(self.alerts)}")
        print(f"  Blocked: {self.blocked_count}")

        if self.alerts:
            print(f"\n  Recent Alerts:")
            for alert in self.alerts[-5:]:
                severity = alert.indicator.severity.value
                print(f"    [{severity:>8}] {alert.indicator.ioc_type}: "
                      f"{alert.indicator.value}")
                print(f"             User: {alert.user} | Action: {alert.action_taken}")

# ตัวอย่าง
engine = ThreatDetectionEngine()
engine.load_threat_feeds()

# Simulated traffic
traffic = [
    {"user": "alice", "destination": "malware-c2.evil.com", "port": 443},
    {"user": "bob", "destination": "google.com", "port": 443},
    {"user": "charlie", "destination": "phishing-bank.com", "port": 80},
]

engine.check_traffic(traffic)
engine.report()

Incident Response Playbook

# ir_playbook.py — Automated Incident Response Playbook
from dataclasses import dataclass, field
from typing import List, Callable
from datetime import datetime

@dataclass
class PlaybookStep:
    name: str
    description: str
    auto: bool  # True = อัตโนมัติ, False = ต้อง Approve
    timeout_seconds: int = 300

@dataclass
class Playbook:
    name: str
    trigger: str  # เงื่อนไข Trigger
    severity: str
    steps: List[PlaybookStep] = field(default_factory=list)

class SOAREngine:
    """SOAR (Security Orchestration, Automation and Response)"""

    def __init__(self):
        self.playbooks: List[Playbook] = []
        self.executions: List[Dict] = []

    def register_playbook(self, playbook: Playbook):
        self.playbooks.append(playbook)

    def execute_playbook(self, playbook_name, context):
        """Execute Playbook"""
        playbook = next((p for p in self.playbooks if p.name == playbook_name), None)
        if not playbook:
            print(f"Playbook not found: {playbook_name}")
            return

        print(f"\n  Executing Playbook: {playbook.name}")
        print(f"  Severity: {playbook.severity}")
        print(f"  Context: {context}")

        for i, step in enumerate(playbook.steps, 1):
            mode = "AUTO" if step.auto else "MANUAL"
            print(f"    Step {i}. [{mode}] {step.name}")
            print(f"       {step.description}")

    def list_playbooks(self):
        """List Playbooks"""
        print(f"\n{'='*55}")
        print(f"SOAR Playbooks")
        print(f"{'='*55}")
        for pb in self.playbooks:
            auto_steps = sum(1 for s in pb.steps if s.auto)
            total = len(pb.steps)
            print(f"  {pb.name}")
            print(f"    Trigger: {pb.trigger}")
            print(f"    Steps: {total} ({auto_steps} auto, {total-auto_steps} manual)")

# สร้าง Playbooks
soar = SOAREngine()

# Malware Detection Playbook
malware_pb = Playbook(
    "Malware Detection Response", "Malware detected by EDR", "critical",
    steps=[
        PlaybookStep("Isolate Endpoint", "Isolate infected host from network", True),
        PlaybookStep("Block C2 Domain", "Add C2 domain to blocklist", True),
        PlaybookStep("Capture Memory Dump", "Dump RAM for forensic analysis", True),
        PlaybookStep("Create Incident Ticket", "Create P1 ticket in Jira", True),
        PlaybookStep("Notify SOC Team", "Send alert via PagerDuty + Slack", True),
        PlaybookStep("Investigate Scope", "Check lateral movement", False),
        PlaybookStep("Remediate", "Remove malware, patch vulnerability", False),
        PlaybookStep("Restore from Backup", "Restore clean system image", False),
    ],
)

# Phishing Response Playbook
phishing_pb = Playbook(
    "Phishing Email Response", "Phishing email reported by user", "high",
    steps=[
        PlaybookStep("Extract IOCs", "Extract URLs, domains, hashes from email", True),
        PlaybookStep("Check Threat Intel", "Query VirusTotal, URLScan", True),
        PlaybookStep("Block URLs", "Add malicious URLs to blocklist", True),
        PlaybookStep("Search Mailboxes", "Find same email in other mailboxes", True),
        PlaybookStep("Delete Emails", "Remove phishing emails from all mailboxes", True),
        PlaybookStep("Notify Users", "Warn affected users", True),
        PlaybookStep("Update Training", "Add to phishing simulation", False),
    ],
)

soar.register_playbook(malware_pb)
soar.register_playbook(phishing_pb)
soar.list_playbooks()
soar.execute_playbook("Malware Detection Response", {"host": "WS-001", "malware": "LockBit"})

Best Practices

SASE Security Automation คืออะไร

ใช้ Scripts Tools อัตโนมัติจัดการ SASE Policies Firewall Rules Compliance Threats ลดงาน Manual Human Error APIs Zscaler Palo Alto Prisma

ทำไมต้อง Automate Security

Threats เพิ่มเร็ว Automation ตอบสนอง Incidents วินาที ลด Human Error Policies สม่ำเสมอ Compliance Checks อัตโนมัติ Audit Reports ทันที

SOAR คืออะไร

Security Orchestration Automation Response Platform รวม Security Tools Playbooks อัตโนมัติ Incident Response Malware Block Isolate Ticket Alert XSOAR Splunk SOAR

วิธีเริ่มทำ Security Automation อย่างไร

เริ่มจาก Tasks ซ้ำบ่อย Log Review Policy Updates Compliance Python APIs Security Tools Scripts ทีละตัว Test Lab Git จัดการ Code ง่ายไปยาก

สรุป

SASE Security Automation ลดงาน Manual ลด Human Error ตอบสนอง Threats เร็ว Policy Management อัตโนมัติ Threat Detection ด้วย Threat Intelligence Feeds SOAR Playbooks สำหรับ Incident Response เริ่มจากง่ายทดสอบ Lab ก่อน Production

📖 บทความที่เกี่ยวข้อง

Prometheus Alertmanager Automation Scriptอ่านบทความ → SASE Security Distributed Systemอ่านบทความ → Great Expectations Automation Scriptอ่านบทความ → SASE Security Low Code No Codeอ่านบทความ → Embedding Model Compliance Automationอ่านบทความ →

📚 ดูบทความทั้งหมด →