SiamCafe.net Blog
Cybersecurity

Tailscale Mesh Audit Trail Logging

tailscale mesh audit trail logging
Tailscale Mesh Audit Trail Logging | SiamCafe Blog
2026-01-18· อ. บอม — SiamCafe.net· 1,597 คำ

Tailscale Mesh Audit Trail Logging คืออะไร

Tailscale เป็น WireGuard-based mesh VPN ที่สร้าง secure network ระหว่างอุปกรณ์ทุกชนิด โดยไม่ต้อง configure firewall หรือ port forwarding Audit Trail Logging คือการบันทึกเหตุการณ์ทั้งหมดที่เกิดขึ้นในระบบ เพื่อ security monitoring, compliance และ forensics การรวม Tailscale กับ audit trail logging ช่วยให้ track ได้ว่าใครเชื่อมต่อเมื่อไหร่ เข้าถึง resource อะไร และมีกิจกรรมผิดปกติหรือไม่ สำคัญสำหรับ compliance frameworks เช่น SOC 2, ISO 27001 และ HIPAA

Tailscale Audit Events

# audit_events.py — Tailscale audit event types
import json

class TailscaleAuditEvents:
    EVENTS = {
        "auth": {
            "name": "Authentication Events",
            "events": [
                "NodeKeyRegistered — อุปกรณ์ใหม่ลงทะเบียน",
                "NodeKeyExpired — key หมดอายุ",
                "NodeKeyReauth — reauthenticate อุปกรณ์",
                "UserLogin — ผู้ใช้ login เข้า admin console",
                "UserLogout — ผู้ใช้ logout",
            ],
        },
        "acl": {
            "name": "ACL & Policy Events",
            "events": [
                "ACLUpdated — อัพเดท Access Control List",
                "ACLTestRun — ทดสอบ ACL policy",
                "TagCreated — สร้าง tag ใหม่",
                "TagDeleted — ลบ tag",
            ],
        },
        "network": {
            "name": "Network Events",
            "events": [
                "SubnetRouteEnabled — เปิด subnet route",
                "SubnetRouteDisabled — ปิด subnet route",
                "ExitNodeEnabled — เปิด exit node",
                "DNSConfigUpdated — อัพเดท DNS config",
                "MagicDNSEnabled — เปิด Magic DNS",
            ],
        },
        "device": {
            "name": "Device Events",
            "events": [
                "DeviceApproved — อนุมัติอุปกรณ์ใหม่",
                "DeviceRemoved — ลบอุปกรณ์",
                "DeviceRenamed — เปลี่ยนชื่ออุปกรณ์",
                "DeviceTagged — assign tag ให้อุปกรณ์",
                "DeviceIPAssigned — assign IP ให้อุปกรณ์",
            ],
        },
        "admin": {
            "name": "Admin Events",
            "events": [
                "AdminRoleChanged — เปลี่ยน admin role",
                "InviteSent — ส่ง invite เข้า tailnet",
                "InviteAccepted — ยอมรับ invite",
                "BillingUpdated — อัพเดทข้อมูล billing",
            ],
        },
    }

    def show_events(self):
        print("=== Tailscale Audit Events ===\n")
        for key, category in self.EVENTS.items():
            print(f"[{category['name']}]")
            for event in category['events'][:3]:
                print(f"  • {event}")
            print()

events = TailscaleAuditEvents()
events.show_events()

Audit Log Collection

# log_collection.py — Collect Tailscale audit logs
import json

class AuditLogCollection:
    API_EXAMPLE = """
# Tailscale API — Fetch audit logs
# GET https://api.tailscale.com/api/v2/tailnet/{tailnet}/logging/configuration
# GET https://api.tailscale.com/api/v2/tailnet/{tailnet}/logging/stream

# Network flow logs (requires Tailscale Business/Enterprise)
# Configuration:
{
  "logType": "network",
  "destinationType": "s3",
  "destination": {
    "bucket": "tailscale-logs",
    "region": "ap-southeast-1",
    "prefix": "audit/"
  }
}
"""

    LOG_DESTINATIONS = {
        "s3": {
            "name": "AWS S3",
            "description": "ส่ง logs ไป S3 bucket — ราคาถูก, ใช้ Athena query ได้",
        },
        "splunk": {
            "name": "Splunk",
            "description": "ส่ง logs ไป Splunk HEC — search + alerting + dashboards",
        },
        "elasticsearch": {
            "name": "Elasticsearch (ELK)",
            "description": "ส่ง logs ไป ELK Stack — Kibana dashboards + alerting",
        },
        "datadog": {
            "name": "Datadog",
            "description": "ส่ง logs ไป Datadog — unified monitoring + security",
        },
        "siem": {
            "name": "SIEM (Generic)",
            "description": "ส่ง logs ไป SIEM — correlation, threat detection",
        },
    }

    def show_api(self):
        print("=== Tailscale API ===")
        print(self.API_EXAMPLE[:400])

    def show_destinations(self):
        print(f"\n=== Log Destinations ===")
        for key, dest in self.LOG_DESTINATIONS.items():
            print(f"  [{dest['name']}] {dest['description']}")

collection = AuditLogCollection()
collection.show_api()
collection.show_destinations()

Python Audit Logger

# logger.py — Python Tailscale audit logger
import json

class TailscaleAuditLogger:
    CODE = """
# tailscale_audit.py — Tailscale audit log collector
import requests
import json
import logging
from datetime import datetime, timedelta
from typing import List, Dict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TailscaleAuditCollector:
    def __init__(self, api_key, tailnet):
        self.api_key = api_key
        self.tailnet = tailnet
        self.base_url = f"https://api.tailscale.com/api/v2/tailnet/{tailnet}"
        self.headers = {"Authorization": f"Bearer {api_key}"}
    
    def get_devices(self):
        '''Get all devices in tailnet'''
        resp = requests.get(f"{self.base_url}/devices", headers=self.headers)
        return resp.json().get('devices', [])
    
    def get_acl(self):
        '''Get current ACL policy'''
        resp = requests.get(f"{self.base_url}/acl", headers=self.headers)
        return resp.json()
    
    def get_dns(self):
        '''Get DNS configuration'''
        resp = requests.get(f"{self.base_url}/dns/nameservers", headers=self.headers)
        return resp.json()
    
    def audit_snapshot(self):
        '''Take audit snapshot of current state'''
        devices = self.get_devices()
        acl = self.get_acl()
        
        snapshot = {
            'timestamp': datetime.utcnow().isoformat(),
            'tailnet': self.tailnet,
            'device_count': len(devices),
            'devices': [{
                'name': d.get('name', ''),
                'hostname': d.get('hostname', ''),
                'os': d.get('os', ''),
                'last_seen': d.get('lastSeen', ''),
                'authorized': d.get('authorized', False),
                'tags': d.get('tags', []),
                'ip': d.get('addresses', [None])[0],
                'key_expiry': d.get('keyExpiryDisabled', False),
            } for d in devices],
            'acl_hash': hash(json.dumps(acl, sort_keys=True)),
        }
        return snapshot
    
    def detect_anomalies(self, current, previous):
        '''Detect changes between snapshots'''
        anomalies = []
        
        # New devices
        current_names = {d['name'] for d in current['devices']}
        previous_names = {d['name'] for d in previous['devices']}
        
        new_devices = current_names - previous_names
        removed_devices = previous_names - current_names
        
        if new_devices:
            anomalies.append({
                'type': 'new_device',
                'severity': 'medium',
                'details': f"New devices: {', '.join(new_devices)}",
            })
        
        if removed_devices:
            anomalies.append({
                'type': 'removed_device',
                'severity': 'low',
                'details': f"Removed devices: {', '.join(removed_devices)}",
            })
        
        # ACL changes
        if current['acl_hash'] != previous['acl_hash']:
            anomalies.append({
                'type': 'acl_changed',
                'severity': 'high',
                'details': 'ACL policy was modified',
            })
        
        # Unauthorized devices
        unauth = [d['name'] for d in current['devices'] if not d['authorized']]
        if unauth:
            anomalies.append({
                'type': 'unauthorized_device',
                'severity': 'high',
                'details': f"Unauthorized devices: {', '.join(unauth)}",
            })
        
        # Expired keys
        for d in current['devices']:
            if d.get('key_expiry') and d['last_seen']:
                last = datetime.fromisoformat(d['last_seen'].replace('Z', '+00:00'))
                if datetime.now(last.tzinfo) - last > timedelta(days=90):
                    anomalies.append({
                        'type': 'stale_device',
                        'severity': 'low',
                        'details': f"Device {d['name']} not seen in 90+ days",
                    })
        
        return anomalies
    
    def generate_compliance_report(self):
        '''Generate compliance report'''
        snapshot = self.audit_snapshot()
        devices = snapshot['devices']
        
        return {
            'report_date': datetime.utcnow().isoformat(),
            'tailnet': self.tailnet,
            'total_devices': len(devices),
            'authorized': sum(1 for d in devices if d['authorized']),
            'unauthorized': sum(1 for d in devices if not d['authorized']),
            'tagged': sum(1 for d in devices if d['tags']),
            'untagged': sum(1 for d in devices if not d['tags']),
            'os_distribution': {},
            'compliance_checks': {
                'all_authorized': all(d['authorized'] for d in devices),
                'all_tagged': all(d['tags'] for d in devices),
                'key_rotation': all(not d['key_expiry'] for d in devices),
            },
        }

# collector = TailscaleAuditCollector("tskey-api-xxx", "example.com")
# report = collector.generate_compliance_report()
"""

    def show_code(self):
        print("=== Audit Logger ===")
        print(self.CODE[:600])

logger_cls = TailscaleAuditLogger()
logger_cls.show_code()

ELK Stack Integration

# elk.py — ELK Stack for Tailscale audit logs
import json

class ELKIntegration:
    LOGSTASH_CONFIG = """
# logstash-tailscale.conf
input {
  http {
    port => 5044
    codec => json
  }
  
  s3 {
    bucket => "tailscale-logs"
    prefix => "audit/"
    region => "ap-southeast-1"
    codec => json_lines
  }
}

filter {
  date {
    match => ["timestamp", "ISO8601"]
    target => "@timestamp"
  }
  
  mutate {
    add_field => { "source" => "tailscale" }
  }
  
  if [type] == "acl_changed" {
    mutate { add_tag => ["security_alert"] }
  }
  
  if [type] == "unauthorized_device" {
    mutate { add_tag => ["critical_alert"] }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "tailscale-audit-%{+YYYY.MM}"
  }
  
  if "critical_alert" in [tags] {
    slack {
      url => "https://hooks.slack.com/services/xxx"
      format => "CRITICAL: Tailscale %{type} - %{details}"
    }
  }
}
"""

    KIBANA_QUERIES = {
        "new_devices": 'type: "new_device" AND @timestamp >= now-24h',
        "acl_changes": 'type: "acl_changed" AND severity: "high"',
        "unauthorized": 'type: "unauthorized_device"',
        "all_anomalies": 'severity: ("high" OR "critical") AND @timestamp >= now-7d',
    }

    def show_logstash(self):
        print("=== Logstash Config ===")
        print(self.LOGSTASH_CONFIG[:500])

    def show_queries(self):
        print(f"\n=== Kibana Queries ===")
        for name, query in self.KIBANA_QUERIES.items():
            print(f"  [{name}] {query}")

elk = ELKIntegration()
elk.show_logstash()
elk.show_queries()

Compliance & Alerting

# compliance.py — Compliance and alerting
import json
import random

class ComplianceAlerting:
    FRAMEWORKS = {
        "soc2": {
            "name": "SOC 2",
            "requirements": "Access logging, change management, incident detection",
            "tailscale_coverage": "Device auth logs, ACL change logs, anomaly detection",
        },
        "iso27001": {
            "name": "ISO 27001",
            "requirements": "Access control, audit trails, information security events",
            "tailscale_coverage": "Network access logs, device management, policy changes",
        },
        "hipaa": {
            "name": "HIPAA",
            "requirements": "Access to PHI tracking, audit controls, security incidents",
            "tailscale_coverage": "Network flow logs, device authorization, audit snapshots",
        },
    }

    ALERT_RULES = {
        "critical": [
            "Unauthorized device connected to tailnet",
            "ACL policy modified outside change window",
            "Admin role escalation detected",
            "Multiple failed auth attempts (> 5 in 10 min)",
        ],
        "high": [
            "New device registered without approval",
            "Subnet route enabled/disabled",
            "Exit node configuration changed",
            "Key expired but device still active",
        ],
        "medium": [
            "Device not seen in 30+ days",
            "DNS configuration changed",
            "New user invited to tailnet",
        ],
    }

    def show_frameworks(self):
        print("=== Compliance Frameworks ===\n")
        for key, fw in self.FRAMEWORKS.items():
            print(f"[{fw['name']}]")
            print(f"  Requirements: {fw['requirements']}")
            print(f"  Coverage: {fw['tailscale_coverage']}")
            print()

    def show_alerts(self):
        print("=== Alert Rules ===")
        for severity, rules in self.ALERT_RULES.items():
            print(f"\n  [{severity.upper()}]")
            for rule in rules:
                print(f"    • {rule}")

comp = ComplianceAlerting()
comp.show_frameworks()
comp.show_alerts()

FAQ - คำถามที่พบบ่อย

Q: Tailscale audit logs มีให้ใช้ทุก plan ไหม?

A: ไม่ — audit logs มีให้เฉพาะ Business และ Enterprise plans Free/Personal/Starter: ไม่มี audit log API Business: มี configuration audit logs + device events Enterprise: มี network flow logs + full audit trail ทางเลือก: สร้าง audit system เองด้วย API polling (get devices, get ACL) — ใช้ได้ทุก plan

Q: Network flow logs เก็บอะไรบ้าง?

A: Flow logs เก็บ: source device, destination device, timestamp, bytes transferred, protocol, port ไม่เก็บ: packet content (payload) — เฉพาะ metadata เท่านั้น ส่งไป: S3, Datadog, Splunk, Elasticsearch ข้อจำกัด: ต้องเป็น Enterprise plan + enable ใน admin console

Q: ต้องเก็บ audit logs นานแค่ไหน?

A: ขึ้นกับ compliance framework: SOC 2: อย่างน้อย 1 ปี ISO 27001: อย่างน้อย 3 ปี HIPAA: อย่างน้อย 6 ปี PCI DSS: อย่างน้อย 1 ปี แนะนำ: เก็บ hot storage 90 วัน (ELK/Splunk) + cold storage 3-7 ปี (S3 Glacier)

Q: Tailscale กับ traditional VPN อันไหน audit ง่ายกว่า?

A: Tailscale ง่ายกว่ามาก — identity-based (ผูกกับ user account) ไม่ใช่ IP-based Traditional VPN: shared credentials, IP-based, ไม่รู้ว่า user ไหนทำอะไร Tailscale: ทุก device มี identity ชัดเจน, ACL ตาม user/group, API สำหรับ automation ข้อดี: Zero Trust model — ทุก connection authenticated + logged

📖 บทความที่เกี่ยวข้อง

Apache Kafka Streams Audit Trail Loggingอ่านบทความ → BigQuery Scheduled Query Audit Trail Loggingอ่านบทความ → DNSSEC Implementation Audit Trail Loggingอ่านบทความ → API Rate Limiting Audit Trail Loggingอ่านบทความ → Java Virtual Threads Audit Trail Loggingอ่านบทความ →

📚 ดูบทความทั้งหมด →