ทำไม Python ถึงเหมาะกับ SysAdmin Automation
ผมเริ่มเขียน automation scripts ด้วย Bash มาก่อนแต่พอ scripts ซับซ้อนขึ้น Bash ก็จัดการยากขึ้นเรื่อยๆ error handling แย่ string manipulation ลำบาก JSON/YAML parsing ต้องพึ่ง external tools Python แก้ปัญหาทั้งหมดนี้มี libraries ครบทุกอย่าง error handling ดีอ่านง่าย maintain ง่าย
ผมจะแสดง scripts จริงที่ใช้ในการทำงานประจำวันทุก script ทดสอบแล้วบน Ubuntu 24.04 และ Python 3.12+ สามารถนำไปใช้งานได้ทันที
Libraries ที่ SysAdmin ต้องรู้
# ติดตั้ง libraries ที่ใช้บ่อย
pip install psutil requests paramiko jinja2 pyyaml rich
# psutil — system monitoring (CPU, RAM, disk, network)
# requests — HTTP API calls
# paramiko — SSH connections
# jinja2 — template rendering
# pyyaml — YAML parsing
# rich — beautiful terminal output
ตรวจสอบ Server Health แบบครบวงจร
#!/usr/bin/env python3
"""server_health.py — ตรวจสอบสุขภาพเซิร์ฟเวอร์และส่ง alert"""
import psutil
import socket
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from dataclasses import dataclass
@dataclass
class HealthCheck:
name: str
value: float
threshold: float
unit: str
@property
def is_critical(self) -> bool:
return self.value > self.threshold
def __str__(self) -> str:
status = "CRITICAL" if self.is_critical else "OK"
return f"[{status}] {self.name}: {self.value:.1f}{self.unit} (threshold: {self.threshold}{self.unit})"
def check_server_health() -> list[HealthCheck]:
checks = []
# CPU
cpu = psutil.cpu_percent(interval=3)
checks.append(HealthCheck("CPU Usage", cpu, 85.0, "%"))
# Memory
mem = psutil.virtual_memory()
checks.append(HealthCheck("Memory Usage", mem.percent, 90.0, "%"))
# Disk (root partition)
disk = psutil.disk_usage('/')
checks.append(HealthCheck("Disk Usage /", disk.percent, 85.0, "%"))
# Swap
swap = psutil.swap_memory()
checks.append(HealthCheck("Swap Usage", swap.percent, 50.0, "%"))
# Load Average (per core)
load1 = psutil.getloadavg()[0]
cores = psutil.cpu_count()
load_per_core = (load1 / cores) * 100
checks.append(HealthCheck("Load/Core", load_per_core, 80.0, "%"))
# Open files
open_files = len(psutil.Process(1).open_files()) if psutil.Process(1) else 0
checks.append(HealthCheck("Open Files (PID 1)", float(open_files), 1000.0, ""))
return checks
def send_alert(checks: list[HealthCheck], hostname: str):
critical = [c for c in checks if c.is_critical]
if not critical:
return
body = f"Server Health Alert — {hostname}\n"
body += f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
for c in checks:
body += str(c) + "\n"
msg = MIMEText(body)
msg['Subject'] = f"[ALERT] Server {hostname} — {len(critical)} critical checks"
msg['From'] = 'alerts@example.com'
msg['To'] = 'admin@example.com'
with smtplib.SMTP('localhost', 25) as smtp:
smtp.send_message(msg)
if __name__ == '__main__':
hostname = socket.gethostname()
checks = check_server_health()
for c in checks:
print(c)
send_alert(checks, hostname)
Nginx Access Log Analyzer
#!/usr/bin/env python3
"""nginx_log_analyzer.py — วิเคราะห์ Nginx access logs"""
import re
from collections import Counter, defaultdict
from pathlib import Path
from datetime import datetime
LOG_PATTERN = re.compile(
r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] '
r'"(?P<method>\S+) (?P<url>\S+) \S+" (?P<status>\d+) (?P<size>\d+)'
)
def parse_log(filepath: str) -> list[dict]:
entries = []
with open(filepath, 'r') as f:
for line in f:
match = LOG_PATTERN.match(line)
if match:
entries.append(match.groupdict())
return entries
def analyze(entries: list[dict]):
ip_counter = Counter(e['ip'] for e in entries)
status_counter = Counter(e['status'] for e in entries)
url_counter = Counter(e['url'] for e in entries)
total_bytes = sum(int(e['size']) for e in entries)
errors = [e for e in entries if e['status'].startswith(('4', '5'))]
print(f"Total Requests: {len(entries):,}")
print(f"Total Transfer: {total_bytes / (1024**3):.2f} GB")
print(f"Error Rate: {len(errors) / len(entries) * 100:.1f}%")
print(f"\nTop 10 IPs:")
for ip, count in ip_counter.most_common(10):
print(f" {ip}: {count:,} requests")
print(f"\nStatus Code Distribution:")
for status, count in sorted(status_counter.items()):
pct = count / len(entries) * 100
print(f" {status}: {count:,} ({pct:.1f}%)")
print(f"\nTop 10 URLs:")
for url, count in url_counter.most_common(10):
print(f" {url}: {count:,}")
if __name__ == '__main__':
entries = parse_log('/var/log/nginx/access.log')
if entries:
analyze(entries)
else:
print("No log entries found")
Smart Backup Script
#!/usr/bin/env python3
"""smart_backup.py — Backup directories with rotation and compression"""
import tarfile
import shutil
from pathlib import Path
from datetime import datetime, timedelta
import logging
import hashlib
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)
class BackupManager:
def __init__(self, backup_dir: str, retention_days: int = 30):
self.backup_dir = Path(backup_dir)
self.backup_dir.mkdir(parents=True, exist_ok=True)
self.retention_days = retention_days
def create_backup(self, source: str, name: str) -> Path:
source_path = Path(source)
if not source_path.exists():
raise FileNotFoundError(f"Source not found: {source}")
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
archive_name = f"{name}_{timestamp}.tar.gz"
archive_path = self.backup_dir / archive_name
logger.info(f"Creating backup: {archive_path}")
with tarfile.open(archive_path, 'w:gz') as tar:
tar.add(source, arcname=source_path.name)
size_mb = archive_path.stat().st_size / (1024 * 1024)
md5 = hashlib.md5(archive_path.read_bytes()).hexdigest()
logger.info(f"Backup complete: {size_mb:.1f} MB, MD5: {md5}")
# Write checksum file
checksum_path = archive_path.with_suffix('.md5')
checksum_path.write_text(f"{md5} {archive_name}\n")
return archive_path
def rotate_backups(self, name: str):
cutoff = datetime.now() - timedelta(days=self.retention_days)
pattern = f"{name}_*.tar.gz"
removed = 0
for f in sorted(self.backup_dir.glob(pattern)):
mtime = datetime.fromtimestamp(f.stat().st_mtime)
if mtime < cutoff:
f.unlink()
md5_file = f.with_suffix('.md5')
if md5_file.exists():
md5_file.unlink()
removed += 1
logger.info(f"Removed old backup: {f.name}")
logger.info(f"Rotation complete: removed {removed} old backups")
def list_backups(self, name: str) -> list[dict]:
pattern = f"{name}_*.tar.gz"
backups = []
for f in sorted(self.backup_dir.glob(pattern), reverse=True):
backups.append({
'file': f.name,
'size_mb': f.stat().st_size / (1024 * 1024),
'date': datetime.fromtimestamp(f.stat().st_mtime),
})
return backups
# ใช้งาน
if __name__ == '__main__':
bm = BackupManager('/backup/daily', retention_days=30)
# Backup directories
targets = {
'nginx-config': '/etc/nginx',
'webapp': '/var/www/html',
'crontabs': '/var/spool/cron',
}
for name, source in targets.items():
try:
bm.create_backup(source, name)
bm.rotate_backups(name)
except Exception as e:
logger.error(f"Backup failed for {name}: {e}")
Port Scanner สำหรับ Service Monitoring
#!/usr/bin/env python3
"""port_checker.py — ตรวจสอบ services ที่ต้องทำงาน"""
import socket
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
@dataclass
class ServiceCheck:
host: str
port: int
service: str
is_up: bool
response_ms: float = 0.0
def check_port(host: str, port: int, service: str, timeout: float = 3.0) -> ServiceCheck:
import time
start = time.monotonic()
try:
with socket.create_connection((host, port), timeout=timeout):
elapsed = (time.monotonic() - start) * 1000
return ServiceCheck(host, port, service, True, elapsed)
except (socket.timeout, ConnectionRefusedError, OSError):
elapsed = (time.monotonic() - start) * 1000
return ServiceCheck(host, port, service, False, elapsed)
# รายการ services ที่ต้อง monitor
SERVICES = [
('10.10.10.11', 80, 'Nginx'),
('10.10.10.11', 443, 'Nginx SSL'),
('10.10.10.12', 3306, 'MySQL'),
('10.10.10.12', 6379, 'Redis'),
('10.10.10.13', 9090, 'Prometheus'),
('10.10.10.13', 3000, 'Grafana'),
('10.10.10.14', 8200, 'Vault'),
('10.10.10.15', 51820, 'WireGuard'),
]
with ThreadPoolExecutor(max_workers=20) as executor:
futures = [executor.submit(check_port, h, p, s) for h, p, s in SERVICES]
results = [f.result() for f in futures]
print(f"{'Service':<20} {'Host:Port':<25} {'Status':<10} {'Response'}")
print("-" * 70)
for r in results:
status = "UP" if r.is_up else "DOWN"
color = "" if r.is_up else "*** "
print(f"{color}{r.service:<20} {r.host}:{r.port:<15} {status:<10} {r.response_ms:.0f}ms")
ตรวจสอบ SSL Certificate Expiry
#!/usr/bin/env python3
"""ssl_checker.py — ตรวจสอบวันหมดอายุ SSL certificates"""
import ssl
import socket
from datetime import datetime
from dataclasses import dataclass
@dataclass
class CertInfo:
domain: str
issuer: str
expires: datetime
days_left: int
is_valid: bool
def check_ssl(domain: str, port: int = 443) -> CertInfo:
context = ssl.create_default_context()
try:
with socket.create_connection((domain, port), timeout=5) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
expires = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_left = (expires - datetime.utcnow()).days
issuer = dict(x[0] for x in cert['issuer']).get('organizationName', 'Unknown')
return CertInfo(domain, issuer, expires, days_left, True)
except Exception as e:
return CertInfo(domain, str(e), datetime.min, -1, False)
domains = [
'siamcafe.net',
'icafeforex.com',
'siamlancard.com',
'example.com',
]
print(f"{'Domain':<30} {'Issuer':<25} {'Expires':<12} {'Days Left'}")
print("-" * 80)
for domain in domains:
info = check_ssl(domain)
status = f"{info.days_left}d" if info.is_valid else "INVALID"
warning = " !!!" if info.days_left < 30 and info.is_valid else ""
print(f"{info.domain:<30} {info.issuer:<25} {info.expires.strftime('%Y-%m-%d') if info.is_valid else 'N/A':<12} {status}{warning}")
ลบไฟล์เก่าอัตโนมัติ
#!/usr/bin/env python3
"""disk_cleanup.py — ลบไฟล์เก่าและจัดการ disk space"""
from pathlib import Path
from datetime import datetime, timedelta
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logger = logging.getLogger(__name__)
class DiskCleaner:
def __init__(self, dry_run: bool = True):
self.dry_run = dry_run
self.total_freed = 0
def clean_old_files(self, directory: str, pattern: str, max_age_days: int):
path = Path(directory)
if not path.exists():
logger.warning(f"Directory not found: {directory}")
return
cutoff = datetime.now() - timedelta(days=max_age_days)
for f in path.glob(pattern):
if f.is_file() and datetime.fromtimestamp(f.stat().st_mtime) < cutoff:
size = f.stat().st_size
if self.dry_run:
logger.info(f"[DRY RUN] Would delete: {f} ({size / 1024:.1f} KB)")
else:
f.unlink()
logger.info(f"Deleted: {f} ({size / 1024:.1f} KB)")
self.total_freed += size
def clean_empty_dirs(self, directory: str):
path = Path(directory)
for d in sorted(path.rglob('*'), reverse=True):
if d.is_dir() and not any(d.iterdir()):
if not self.dry_run:
d.rmdir()
logger.info(f"{'[DRY RUN] ' if self.dry_run else ''}Removed empty dir: {d}")
# กำหนดกฎการ cleanup
cleaner = DiskCleaner(dry_run=True) # เปลี่ยนเป็น False เมื่อพร้อมลบจริง
cleanup_rules = [
('/var/log', '*.gz', 30),
('/var/log', '*.old', 7),
('/tmp', '*', 7),
('/var/cache/apt/archives', '*.deb', 14),
('/home/*/Downloads', '*.tmp', 30),
]
for directory, pattern, max_age in cleanup_rules:
cleaner.clean_old_files(directory, pattern, max_age)
freed_mb = cleaner.total_freed / (1024 * 1024)
logger.info(f"\nTotal space {'would be' if cleaner.dry_run else ''} freed: {freed_mb:.1f} MB")
ควรใช้ Python หรือ Bash สำหรับ automation scripts?
กฎง่ายๆของผมคือถ้า script ไม่เกิน 20 บรรทัดและเป็นแค่ chain of commands ใช้ Bash ถ้าต้องมี logic, loops, error handling, API calls, parsing JSON/YAML ใช้ Python ผมใช้ Python 80% และ Bash 20% สำหรับ simple wrappers
Python scripts ควรรันด้วย cron หรือ systemd timer?
systemd timer ดีกว่า cron ในหลายด้านมี logging ดีกว่า (journalctl), dependency management, resource limits, randomized delays ผมใช้ systemd timer สำหรับ production scripts ทั้งหมดแล้วดูบทความ Python Trends 2026 สำหรับ modern Python practices เพิ่มเติม
# /etc/systemd/system/health-check.service
[Unit]
Description=Server Health Check
[Service]
Type=oneshot
ExecStart=/usr/bin/python3 /opt/scripts/server_health.py
User=monitor
# /etc/systemd/system/health-check.timer
[Unit]
Description=Run health check every 5 minutes
[Timer]
OnCalendar=*:0/5
RandomizedDelaySec=30
[Install]
WantedBy=timers.target
จะจัดการ secrets ใน Python scripts ยังไง?
อย่า hardcode passwords ใน scripts เด็ดขาดใช้ environment variables, config files ที่มี permissions จำกัด, หรือ secrets manager เช่น HashiCorp Vault ผมใช้ python-dotenv สำหรับ development และ Vault สำหรับ production
มี framework สำหรับ SysAdmin scripts ไหม?
Fabric เป็น library ยอดนิยมสำหรับรัน commands บน remote servers ผ่าน SSH Invoke สำหรับ local task runner Click สำหรับสร้าง CLI tools ที่มี arguments/options Typer เป็น modern alternative ที่ใช้ type hints ผมใช้ Typer + Rich สำหรับ CLI tools ใหม่ทั้งหมด
สรุป
Python เป็นเครื่องมือที่ทรงพลังที่สุดสำหรับ SysAdmin automation ในปี 2026 Scripts ที่แสดงในบทความนี้ใช้ได้จริงทุกตัวตั้งแต่ server health monitoring, log analysis, automated backup, network scanning จนถึง SSL certificate monitoring
สิ่งสำคัญคือเริ่มจาก script ง่ายๆก่อนทดสอบให้ดีใช้ dry_run mode ก่อนรันจริงจัดการ secrets อย่างปลอดภัยและใช้ systemd timer แทน cron สำหรับ scheduling
