SASE Security Automation
Security Automation ใช้ Scripts จัดการ SASE Policies อัตโนมัติ สร้าง Firewall Rules ตรวจ Compliance ตอบสนอง Threats ลดงาน Manual ลด Human Error
SOAR Platform รวม Security Tools สร้าง Playbooks อัตโนมัติ Incident Response เมื่อตรวจจับ Threat ให้ Block Isolate Alert อัตโนมัติ
Security Policy Automation
# sase_automation.py — SASE Security Policy Automation
# pip install requests pyyaml
import json
import time
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum
class PolicyAction(Enum):
ALLOW = "allow"
BLOCK = "block"
ALERT = "alert"
QUARANTINE = "quarantine"
@dataclass
class SecurityPolicy:
name: str
description: str
action: PolicyAction
source: Dict # {"groups": [...], "users": [...]}
destination: Dict # {"urls": [...], "categories": [...]}
conditions: Dict = field(default_factory=dict)
enabled: bool = True
priority: int = 100
class SASEPolicyManager:
"""SASE Security Policy Manager"""
def __init__(self, api_url, api_key):
self.api_url = api_url
self.api_key = api_key
self.headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
self.policies: List[SecurityPolicy] = []
self.audit_log: List[Dict] = []
def add_policy(self, policy: SecurityPolicy):
self.policies.append(policy)
self.log("CREATE", f"Policy: {policy.name}")
def update_policy(self, name, **kwargs):
for p in self.policies:
if p.name == name:
for k, v in kwargs.items():
setattr(p, k, v)
self.log("UPDATE", f"Policy: {name} -> {kwargs}")
return True
return False
def delete_policy(self, name):
self.policies = [p for p in self.policies if p.name != name]
self.log("DELETE", f"Policy: {name}")
def log(self, action, detail):
self.audit_log.append({
"timestamp": datetime.now().isoformat(),
"action": action,
"detail": detail,
})
def deploy_policies(self):
"""Deploy Policies ไป SASE Platform"""
print(f"\nDeploying {len(self.policies)} policies...")
for p in self.policies:
status = "enabled" if p.enabled else "disabled"
print(f" [{status:>8}] {p.name}: {p.action.value} "
f"(priority: {p.priority})")
# API Call
# for policy in self.policies:
# resp = requests.post(
# f"{self.api_url}/policies",
# headers=self.headers,
# json=policy_to_dict(policy),
# )
print(f" Deployed: {len(self.policies)} policies")
def compliance_check(self):
"""ตรวจสอบ Compliance"""
checks = [
("Block known malware domains", any(
p.action == PolicyAction.BLOCK and "malware" in str(p.destination)
for p in self.policies)),
("Block unauthorized cloud storage", any(
p.action == PolicyAction.BLOCK and "cloud-storage" in str(p.destination)
for p in self.policies)),
("MFA required for admin access", True),
("DLP policy for PII data", True),
("Logging enabled for all policies", True),
]
print(f"\n Compliance Check:")
passed = sum(1 for _, ok in checks if ok)
for check, ok in checks:
status = "PASS" if ok else "FAIL"
print(f" [{status}] {check}")
print(f"\n Score: {passed}/{len(checks)} ({passed/len(checks)*100:.0f}%)")
# ตัวอย่าง
manager = SASEPolicyManager("https://api.sase-vendor.com/v1", "api-key")
policies = [
SecurityPolicy(
"Block Malware Domains", "Block known malware C2 domains",
PolicyAction.BLOCK,
{"groups": ["all-users"]},
{"categories": ["malware", "phishing", "command-and-control"]},
priority=10,
),
SecurityPolicy(
"Allow Corporate SaaS", "Allow approved SaaS applications",
PolicyAction.ALLOW,
{"groups": ["all-users"]},
{"urls": ["*.google.com", "*.microsoft.com", "*.slack.com"]},
priority=50,
),
SecurityPolicy(
"Block Unauthorized Storage", "Block non-approved cloud storage",
PolicyAction.BLOCK,
{"groups": ["all-users"]},
{"categories": ["cloud-storage"], "exclude": ["drive.google.com", "onedrive.live.com"]},
priority=60,
),
SecurityPolicy(
"DLP PII Protection", "Block PII data upload",
PolicyAction.BLOCK,
{"groups": ["all-users"]},
{"categories": ["file-sharing"]},
conditions={"data_patterns": ["credit-card", "ssn", "passport"]},
priority=20,
),
]
for p in policies:
manager.add_policy(p)
manager.deploy_policies()
manager.compliance_check()
Threat Detection Automation
# threat_detection.py — Automated Threat Detection
from dataclasses import dataclass, field
from typing import List, Dict
from datetime import datetime
from enum import Enum
class ThreatSeverity(Enum):
CRITICAL = "critical"
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
INFO = "info"
@dataclass
class ThreatIndicator:
ioc_type: str # ip, domain, hash, url
value: str
source: str
severity: ThreatSeverity
tags: List[str] = field(default_factory=list)
@dataclass
class ThreatAlert:
indicator: ThreatIndicator
matched_policy: str
user: str
timestamp: str
action_taken: str
class ThreatDetectionEngine:
"""Automated Threat Detection Engine"""
def __init__(self):
self.indicators: List[ThreatIndicator] = []
self.alerts: List[ThreatAlert] = []
self.blocked_count = 0
def load_threat_feeds(self):
"""โหลด Threat Intelligence Feeds"""
feeds = [
ThreatIndicator("domain", "malware-c2.evil.com", "AlienVault OTX",
ThreatSeverity.CRITICAL, ["malware", "c2"]),
ThreatIndicator("ip", "198.51.100.1", "AbuseIPDB",
ThreatSeverity.HIGH, ["botnet"]),
ThreatIndicator("domain", "phishing-bank.com", "PhishTank",
ThreatSeverity.HIGH, ["phishing"]),
ThreatIndicator("hash", "d41d8cd98f00b204e9800998ecf8427e",
"VirusTotal", ThreatSeverity.CRITICAL, ["ransomware"]),
ThreatIndicator("url", "http://exploit-kit.ru/payload", "Mandiant",
ThreatSeverity.CRITICAL, ["exploit"]),
]
self.indicators.extend(feeds)
print(f" Loaded {len(feeds)} indicators from threat feeds")
def check_traffic(self, traffic_logs):
"""ตรวจสอบ Traffic กับ Threat Indicators"""
for log in traffic_logs:
for indicator in self.indicators:
if indicator.value in str(log.get("destination", "")):
alert = ThreatAlert(
indicator, "Threat Intelligence Match",
log.get("user", "unknown"),
datetime.now().isoformat(),
"BLOCKED",
)
self.alerts.append(alert)
self.blocked_count += 1
def auto_respond(self, alert: ThreatAlert):
"""Auto-respond ตาม Severity"""
actions = {
ThreatSeverity.CRITICAL: [
"Block IP/Domain immediately",
"Isolate affected endpoint",
"Create P1 incident ticket",
"Notify SOC team via PagerDuty",
"Capture forensic evidence",
],
ThreatSeverity.HIGH: [
"Block IP/Domain",
"Create P2 incident ticket",
"Notify SOC team via Slack",
],
ThreatSeverity.MEDIUM: [
"Add to watchlist",
"Create investigation ticket",
],
}
severity_actions = actions.get(alert.indicator.severity, ["Log only"])
return severity_actions
def report(self):
"""Threat Detection Report"""
print(f"\n{'='*55}")
print(f"Threat Detection Report")
print(f"{'='*55}")
print(f" Indicators: {len(self.indicators)}")
print(f" Alerts: {len(self.alerts)}")
print(f" Blocked: {self.blocked_count}")
if self.alerts:
print(f"\n Recent Alerts:")
for alert in self.alerts[-5:]:
severity = alert.indicator.severity.value
print(f" [{severity:>8}] {alert.indicator.ioc_type}: "
f"{alert.indicator.value}")
print(f" User: {alert.user} | Action: {alert.action_taken}")
# ตัวอย่าง
engine = ThreatDetectionEngine()
engine.load_threat_feeds()
# Simulated traffic
traffic = [
{"user": "alice", "destination": "malware-c2.evil.com", "port": 443},
{"user": "bob", "destination": "google.com", "port": 443},
{"user": "charlie", "destination": "phishing-bank.com", "port": 80},
]
engine.check_traffic(traffic)
engine.report()
Incident Response Playbook
# ir_playbook.py — Automated Incident Response Playbook
from dataclasses import dataclass, field
from typing import List, Callable
from datetime import datetime
@dataclass
class PlaybookStep:
name: str
description: str
auto: bool # True = อัตโนมัติ, False = ต้อง Approve
timeout_seconds: int = 300
@dataclass
class Playbook:
name: str
trigger: str # เงื่อนไข Trigger
severity: str
steps: List[PlaybookStep] = field(default_factory=list)
class SOAREngine:
"""SOAR (Security Orchestration, Automation and Response)"""
def __init__(self):
self.playbooks: List[Playbook] = []
self.executions: List[Dict] = []
def register_playbook(self, playbook: Playbook):
self.playbooks.append(playbook)
def execute_playbook(self, playbook_name, context):
"""Execute Playbook"""
playbook = next((p for p in self.playbooks if p.name == playbook_name), None)
if not playbook:
print(f"Playbook not found: {playbook_name}")
return
print(f"\n Executing Playbook: {playbook.name}")
print(f" Severity: {playbook.severity}")
print(f" Context: {context}")
for i, step in enumerate(playbook.steps, 1):
mode = "AUTO" if step.auto else "MANUAL"
print(f" Step {i}. [{mode}] {step.name}")
print(f" {step.description}")
def list_playbooks(self):
"""List Playbooks"""
print(f"\n{'='*55}")
print(f"SOAR Playbooks")
print(f"{'='*55}")
for pb in self.playbooks:
auto_steps = sum(1 for s in pb.steps if s.auto)
total = len(pb.steps)
print(f" {pb.name}")
print(f" Trigger: {pb.trigger}")
print(f" Steps: {total} ({auto_steps} auto, {total-auto_steps} manual)")
# สร้าง Playbooks
soar = SOAREngine()
# Malware Detection Playbook
malware_pb = Playbook(
"Malware Detection Response", "Malware detected by EDR", "critical",
steps=[
PlaybookStep("Isolate Endpoint", "Isolate infected host from network", True),
PlaybookStep("Block C2 Domain", "Add C2 domain to blocklist", True),
PlaybookStep("Capture Memory Dump", "Dump RAM for forensic analysis", True),
PlaybookStep("Create Incident Ticket", "Create P1 ticket in Jira", True),
PlaybookStep("Notify SOC Team", "Send alert via PagerDuty + Slack", True),
PlaybookStep("Investigate Scope", "Check lateral movement", False),
PlaybookStep("Remediate", "Remove malware, patch vulnerability", False),
PlaybookStep("Restore from Backup", "Restore clean system image", False),
],
)
# Phishing Response Playbook
phishing_pb = Playbook(
"Phishing Email Response", "Phishing email reported by user", "high",
steps=[
PlaybookStep("Extract IOCs", "Extract URLs, domains, hashes from email", True),
PlaybookStep("Check Threat Intel", "Query VirusTotal, URLScan", True),
PlaybookStep("Block URLs", "Add malicious URLs to blocklist", True),
PlaybookStep("Search Mailboxes", "Find same email in other mailboxes", True),
PlaybookStep("Delete Emails", "Remove phishing emails from all mailboxes", True),
PlaybookStep("Notify Users", "Warn affected users", True),
PlaybookStep("Update Training", "Add to phishing simulation", False),
],
)
soar.register_playbook(malware_pb)
soar.register_playbook(phishing_pb)
soar.list_playbooks()
soar.execute_playbook("Malware Detection Response", {"host": "WS-001", "malware": "LockBit"})
Best Practices
- Start Simple: เริ่มจาก Tasks ที่ทำซ้ำบ่อย Automate ทีละตัว
- Test in Lab: ทดสอบ Scripts ใน Lab ก่อนใช้ Production
- Approval Gates: ตั้ง Approval สำหรับ Actions ที่มีผลกระทบสูง
- Audit Trail: บันทึกทุก Action ที่ Automation ทำ
- Threat Intel: อัพเดท Threat Feeds อัตโนมัติทุกวัน
- Playbooks: สร้าง Playbooks สำหรับ Incidents ที่เกิดบ่อย
SASE Security Automation คืออะไร
ใช้ Scripts Tools อัตโนมัติจัดการ SASE Policies Firewall Rules Compliance Threats ลดงาน Manual Human Error APIs Zscaler Palo Alto Prisma
ทำไมต้อง Automate Security
Threats เพิ่มเร็ว Automation ตอบสนอง Incidents วินาที ลด Human Error Policies สม่ำเสมอ Compliance Checks อัตโนมัติ Audit Reports ทันที
SOAR คืออะไร
Security Orchestration Automation Response Platform รวม Security Tools Playbooks อัตโนมัติ Incident Response Malware Block Isolate Ticket Alert XSOAR Splunk SOAR
วิธีเริ่มทำ Security Automation อย่างไร
เริ่มจาก Tasks ซ้ำบ่อย Log Review Policy Updates Compliance Python APIs Security Tools Scripts ทีละตัว Test Lab Git จัดการ Code ง่ายไปยาก
สรุป
SASE Security Automation ลดงาน Manual ลด Human Error ตอบสนอง Threats เร็ว Policy Management อัตโนมัติ Threat Detection ด้วย Threat Intelligence Feeds SOAR Playbooks สำหรับ Incident Response เริ่มจากง่ายทดสอบ Lab ก่อน Production
