Tailscale Mesh Audit Trail Logging คืออะไร
Tailscale เป็น WireGuard-based mesh VPN ที่สร้าง secure network ระหว่างอุปกรณ์ทุกชนิด โดยไม่ต้อง configure firewall หรือ port forwarding Audit Trail Logging คือการบันทึกเหตุการณ์ทั้งหมดที่เกิดขึ้นในระบบ เพื่อ security monitoring, compliance และ forensics การรวม Tailscale กับ audit trail logging ช่วยให้ track ได้ว่าใครเชื่อมต่อเมื่อไหร่ เข้าถึง resource อะไร และมีกิจกรรมผิดปกติหรือไม่ สำคัญสำหรับ compliance frameworks เช่น SOC 2, ISO 27001 และ HIPAA
Tailscale Audit Events
# audit_events.py — Tailscale audit event types
import json
class TailscaleAuditEvents:
EVENTS = {
"auth": {
"name": "Authentication Events",
"events": [
"NodeKeyRegistered — อุปกรณ์ใหม่ลงทะเบียน",
"NodeKeyExpired — key หมดอายุ",
"NodeKeyReauth — reauthenticate อุปกรณ์",
"UserLogin — ผู้ใช้ login เข้า admin console",
"UserLogout — ผู้ใช้ logout",
],
},
"acl": {
"name": "ACL & Policy Events",
"events": [
"ACLUpdated — อัพเดท Access Control List",
"ACLTestRun — ทดสอบ ACL policy",
"TagCreated — สร้าง tag ใหม่",
"TagDeleted — ลบ tag",
],
},
"network": {
"name": "Network Events",
"events": [
"SubnetRouteEnabled — เปิด subnet route",
"SubnetRouteDisabled — ปิด subnet route",
"ExitNodeEnabled — เปิด exit node",
"DNSConfigUpdated — อัพเดท DNS config",
"MagicDNSEnabled — เปิด Magic DNS",
],
},
"device": {
"name": "Device Events",
"events": [
"DeviceApproved — อนุมัติอุปกรณ์ใหม่",
"DeviceRemoved — ลบอุปกรณ์",
"DeviceRenamed — เปลี่ยนชื่ออุปกรณ์",
"DeviceTagged — assign tag ให้อุปกรณ์",
"DeviceIPAssigned — assign IP ให้อุปกรณ์",
],
},
"admin": {
"name": "Admin Events",
"events": [
"AdminRoleChanged — เปลี่ยน admin role",
"InviteSent — ส่ง invite เข้า tailnet",
"InviteAccepted — ยอมรับ invite",
"BillingUpdated — อัพเดทข้อมูล billing",
],
},
}
def show_events(self):
print("=== Tailscale Audit Events ===\n")
for key, category in self.EVENTS.items():
print(f"[{category['name']}]")
for event in category['events'][:3]:
print(f" • {event}")
print()
events = TailscaleAuditEvents()
events.show_events()
Audit Log Collection
# log_collection.py — Collect Tailscale audit logs
import json
class AuditLogCollection:
API_EXAMPLE = """
# Tailscale API — Fetch audit logs
# GET https://api.tailscale.com/api/v2/tailnet/{tailnet}/logging/configuration
# GET https://api.tailscale.com/api/v2/tailnet/{tailnet}/logging/stream
# Network flow logs (requires Tailscale Business/Enterprise)
# Configuration:
{
"logType": "network",
"destinationType": "s3",
"destination": {
"bucket": "tailscale-logs",
"region": "ap-southeast-1",
"prefix": "audit/"
}
}
"""
LOG_DESTINATIONS = {
"s3": {
"name": "AWS S3",
"description": "ส่ง logs ไป S3 bucket — ราคาถูก, ใช้ Athena query ได้",
},
"splunk": {
"name": "Splunk",
"description": "ส่ง logs ไป Splunk HEC — search + alerting + dashboards",
},
"elasticsearch": {
"name": "Elasticsearch (ELK)",
"description": "ส่ง logs ไป ELK Stack — Kibana dashboards + alerting",
},
"datadog": {
"name": "Datadog",
"description": "ส่ง logs ไป Datadog — unified monitoring + security",
},
"siem": {
"name": "SIEM (Generic)",
"description": "ส่ง logs ไป SIEM — correlation, threat detection",
},
}
def show_api(self):
print("=== Tailscale API ===")
print(self.API_EXAMPLE[:400])
def show_destinations(self):
print(f"\n=== Log Destinations ===")
for key, dest in self.LOG_DESTINATIONS.items():
print(f" [{dest['name']}] {dest['description']}")
collection = AuditLogCollection()
collection.show_api()
collection.show_destinations()
Python Audit Logger
# logger.py — Python Tailscale audit logger
import json
class TailscaleAuditLogger:
CODE = """
# tailscale_audit.py — Tailscale audit log collector
import requests
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TailscaleAuditCollector:
def __init__(self, api_key, tailnet):
self.api_key = api_key
self.tailnet = tailnet
self.base_url = f"https://api.tailscale.com/api/v2/tailnet/{tailnet}"
self.headers = {"Authorization": f"Bearer {api_key}"}
def get_devices(self):
'''Get all devices in tailnet'''
resp = requests.get(f"{self.base_url}/devices", headers=self.headers)
return resp.json().get('devices', [])
def get_acl(self):
'''Get current ACL policy'''
resp = requests.get(f"{self.base_url}/acl", headers=self.headers)
return resp.json()
def get_dns(self):
'''Get DNS configuration'''
resp = requests.get(f"{self.base_url}/dns/nameservers", headers=self.headers)
return resp.json()
def audit_snapshot(self):
'''Take audit snapshot of current state'''
devices = self.get_devices()
acl = self.get_acl()
snapshot = {
'timestamp': datetime.utcnow().isoformat(),
'tailnet': self.tailnet,
'device_count': len(devices),
'devices': [{
'name': d.get('name', ''),
'hostname': d.get('hostname', ''),
'os': d.get('os', ''),
'last_seen': d.get('lastSeen', ''),
'authorized': d.get('authorized', False),
'tags': d.get('tags', []),
'ip': d.get('addresses', [None])[0],
'key_expiry': d.get('keyExpiryDisabled', False),
} for d in devices],
'acl_hash': hash(json.dumps(acl, sort_keys=True)),
}
return snapshot
def detect_anomalies(self, current, previous):
'''Detect changes between snapshots'''
anomalies = []
# New devices
current_names = {d['name'] for d in current['devices']}
previous_names = {d['name'] for d in previous['devices']}
new_devices = current_names - previous_names
removed_devices = previous_names - current_names
if new_devices:
anomalies.append({
'type': 'new_device',
'severity': 'medium',
'details': f"New devices: {', '.join(new_devices)}",
})
if removed_devices:
anomalies.append({
'type': 'removed_device',
'severity': 'low',
'details': f"Removed devices: {', '.join(removed_devices)}",
})
# ACL changes
if current['acl_hash'] != previous['acl_hash']:
anomalies.append({
'type': 'acl_changed',
'severity': 'high',
'details': 'ACL policy was modified',
})
# Unauthorized devices
unauth = [d['name'] for d in current['devices'] if not d['authorized']]
if unauth:
anomalies.append({
'type': 'unauthorized_device',
'severity': 'high',
'details': f"Unauthorized devices: {', '.join(unauth)}",
})
# Expired keys
for d in current['devices']:
if d.get('key_expiry') and d['last_seen']:
last = datetime.fromisoformat(d['last_seen'].replace('Z', '+00:00'))
if datetime.now(last.tzinfo) - last > timedelta(days=90):
anomalies.append({
'type': 'stale_device',
'severity': 'low',
'details': f"Device {d['name']} not seen in 90+ days",
})
return anomalies
def generate_compliance_report(self):
'''Generate compliance report'''
snapshot = self.audit_snapshot()
devices = snapshot['devices']
return {
'report_date': datetime.utcnow().isoformat(),
'tailnet': self.tailnet,
'total_devices': len(devices),
'authorized': sum(1 for d in devices if d['authorized']),
'unauthorized': sum(1 for d in devices if not d['authorized']),
'tagged': sum(1 for d in devices if d['tags']),
'untagged': sum(1 for d in devices if not d['tags']),
'os_distribution': {},
'compliance_checks': {
'all_authorized': all(d['authorized'] for d in devices),
'all_tagged': all(d['tags'] for d in devices),
'key_rotation': all(not d['key_expiry'] for d in devices),
},
}
# collector = TailscaleAuditCollector("tskey-api-xxx", "example.com")
# report = collector.generate_compliance_report()
"""
def show_code(self):
print("=== Audit Logger ===")
print(self.CODE[:600])
logger_cls = TailscaleAuditLogger()
logger_cls.show_code()
ELK Stack Integration
# elk.py — ELK Stack for Tailscale audit logs
import json
class ELKIntegration:
LOGSTASH_CONFIG = """
# logstash-tailscale.conf
input {
http {
port => 5044
codec => json
}
s3 {
bucket => "tailscale-logs"
prefix => "audit/"
region => "ap-southeast-1"
codec => json_lines
}
}
filter {
date {
match => ["timestamp", "ISO8601"]
target => "@timestamp"
}
mutate {
add_field => { "source" => "tailscale" }
}
if [type] == "acl_changed" {
mutate { add_tag => ["security_alert"] }
}
if [type] == "unauthorized_device" {
mutate { add_tag => ["critical_alert"] }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "tailscale-audit-%{+YYYY.MM}"
}
if "critical_alert" in [tags] {
slack {
url => "https://hooks.slack.com/services/xxx"
format => "CRITICAL: Tailscale %{type} - %{details}"
}
}
}
"""
KIBANA_QUERIES = {
"new_devices": 'type: "new_device" AND @timestamp >= now-24h',
"acl_changes": 'type: "acl_changed" AND severity: "high"',
"unauthorized": 'type: "unauthorized_device"',
"all_anomalies": 'severity: ("high" OR "critical") AND @timestamp >= now-7d',
}
def show_logstash(self):
print("=== Logstash Config ===")
print(self.LOGSTASH_CONFIG[:500])
def show_queries(self):
print(f"\n=== Kibana Queries ===")
for name, query in self.KIBANA_QUERIES.items():
print(f" [{name}] {query}")
elk = ELKIntegration()
elk.show_logstash()
elk.show_queries()
Compliance & Alerting
# compliance.py — Compliance and alerting
import json
import random
class ComplianceAlerting:
FRAMEWORKS = {
"soc2": {
"name": "SOC 2",
"requirements": "Access logging, change management, incident detection",
"tailscale_coverage": "Device auth logs, ACL change logs, anomaly detection",
},
"iso27001": {
"name": "ISO 27001",
"requirements": "Access control, audit trails, information security events",
"tailscale_coverage": "Network access logs, device management, policy changes",
},
"hipaa": {
"name": "HIPAA",
"requirements": "Access to PHI tracking, audit controls, security incidents",
"tailscale_coverage": "Network flow logs, device authorization, audit snapshots",
},
}
ALERT_RULES = {
"critical": [
"Unauthorized device connected to tailnet",
"ACL policy modified outside change window",
"Admin role escalation detected",
"Multiple failed auth attempts (> 5 in 10 min)",
],
"high": [
"New device registered without approval",
"Subnet route enabled/disabled",
"Exit node configuration changed",
"Key expired but device still active",
],
"medium": [
"Device not seen in 30+ days",
"DNS configuration changed",
"New user invited to tailnet",
],
}
def show_frameworks(self):
print("=== Compliance Frameworks ===\n")
for key, fw in self.FRAMEWORKS.items():
print(f"[{fw['name']}]")
print(f" Requirements: {fw['requirements']}")
print(f" Coverage: {fw['tailscale_coverage']}")
print()
def show_alerts(self):
print("=== Alert Rules ===")
for severity, rules in self.ALERT_RULES.items():
print(f"\n [{severity.upper()}]")
for rule in rules:
print(f" • {rule}")
comp = ComplianceAlerting()
comp.show_frameworks()
comp.show_alerts()
FAQ - คำถามที่พบบ่อย
Q: Tailscale audit logs มีให้ใช้ทุก plan ไหม?
A: ไม่ — audit logs มีให้เฉพาะ Business และ Enterprise plans Free/Personal/Starter: ไม่มี audit log API Business: มี configuration audit logs + device events Enterprise: มี network flow logs + full audit trail ทางเลือก: สร้าง audit system เองด้วย API polling (get devices, get ACL) — ใช้ได้ทุก plan
Q: Network flow logs เก็บอะไรบ้าง?
A: Flow logs เก็บ: source device, destination device, timestamp, bytes transferred, protocol, port ไม่เก็บ: packet content (payload) — เฉพาะ metadata เท่านั้น ส่งไป: S3, Datadog, Splunk, Elasticsearch ข้อจำกัด: ต้องเป็น Enterprise plan + enable ใน admin console
Q: ต้องเก็บ audit logs นานแค่ไหน?
A: ขึ้นกับ compliance framework: SOC 2: อย่างน้อย 1 ปี ISO 27001: อย่างน้อย 3 ปี HIPAA: อย่างน้อย 6 ปี PCI DSS: อย่างน้อย 1 ปี แนะนำ: เก็บ hot storage 90 วัน (ELK/Splunk) + cold storage 3-7 ปี (S3 Glacier)
Q: Tailscale กับ traditional VPN อันไหน audit ง่ายกว่า?
A: Tailscale ง่ายกว่ามาก — identity-based (ผูกกับ user account) ไม่ใช่ IP-based Traditional VPN: shared credentials, IP-based, ไม่รู้ว่า user ไหนทำอะไร Tailscale: ทุก device มี identity ชัดเจน, ACL ตาม user/group, API สำหรับ automation ข้อดี: Zero Trust model — ทุก connection authenticated + logged
