Python Automation SysAdmin เขียน Script จัดการเซิร์ฟเวอร์ 2026 SiamCafe.net | IT Expert Since 1997

Python Automation สำหรับ SysAdmin — คู่มือเขียน Script จัดการเซิร์ฟเวอร์ 2026

python automation sysadmin 0041
โดยอ. บอม (SiamCafe Admin) | 28/02/2026 | Automation | 1,850+ คำ

ทำไม Python ถึงเหมาะกับ SysAdmin Automation

ผมเริ่มเขียน automation scripts ด้วย Bash มาก่อนแต่พอ scripts ซับซ้อนขึ้น Bash ก็จัดการยากขึ้นเรื่อยๆ error handling แย่ string manipulation ลำบาก JSON/YAML parsing ต้องพึ่ง external tools Python แก้ปัญหาทั้งหมดนี้มี libraries ครบทุกอย่าง error handling ดีอ่านง่าย maintain ง่าย

ผมจะแสดง scripts จริงที่ใช้ในการทำงานประจำวันทุก script ทดสอบแล้วบน Ubuntu 24.04 และ Python 3.12+ สามารถนำไปใช้งานได้ทันที

Libraries ที่ SysAdmin ต้องรู้

# ติดตั้ง libraries ที่ใช้บ่อย
pip install psutil requests paramiko jinja2 pyyaml rich

# psutil — system monitoring (CPU, RAM, disk, network)
# requests — HTTP API calls
# paramiko — SSH connections
# jinja2 — template rendering
# pyyaml — YAML parsing
# rich — beautiful terminal output

ตรวจสอบ Server Health แบบครบวงจร

#!/usr/bin/env python3
"""server_health.py — ตรวจสอบสุขภาพเซิร์ฟเวอร์และส่ง alert"""

import psutil
import socket
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
from dataclasses import dataclass

@dataclass
class HealthCheck:
 name: str
 value: float
 threshold: float
 unit: str

 @property
 def is_critical(self) -> bool:
 return self.value > self.threshold

 def __str__(self) -> str:
 status = "CRITICAL" if self.is_critical else "OK"
 return f"[{status}] {self.name}: {self.value:.1f}{self.unit} (threshold: {self.threshold}{self.unit})"

def check_server_health() -> list[HealthCheck]:
 checks = []

 # CPU
 cpu = psutil.cpu_percent(interval=3)
 checks.append(HealthCheck("CPU Usage", cpu, 85.0, "%"))

 # Memory
 mem = psutil.virtual_memory()
 checks.append(HealthCheck("Memory Usage", mem.percent, 90.0, "%"))

 # Disk (root partition)
 disk = psutil.disk_usage('/')
 checks.append(HealthCheck("Disk Usage /", disk.percent, 85.0, "%"))

 # Swap
 swap = psutil.swap_memory()
 checks.append(HealthCheck("Swap Usage", swap.percent, 50.0, "%"))

 # Load Average (per core)
 load1 = psutil.getloadavg()[0]
 cores = psutil.cpu_count()
 load_per_core = (load1 / cores) * 100
 checks.append(HealthCheck("Load/Core", load_per_core, 80.0, "%"))

 # Open files
 open_files = len(psutil.Process(1).open_files()) if psutil.Process(1) else 0
 checks.append(HealthCheck("Open Files (PID 1)", float(open_files), 1000.0, ""))

 return checks

def send_alert(checks: list[HealthCheck], hostname: str):
 critical = [c for c in checks if c.is_critical]
 if not critical:
 return

 body = f"Server Health Alert — {hostname}\n"
 body += f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
 for c in checks:
 body += str(c) + "\n"

 msg = MIMEText(body)
 msg['Subject'] = f"[ALERT] Server {hostname} — {len(critical)} critical checks"
 msg['From'] = 'alerts@example.com'
 msg['To'] = 'admin@example.com'

 with smtplib.SMTP('localhost', 25) as smtp:
 smtp.send_message(msg)

if __name__ == '__main__':
 hostname = socket.gethostname()
 checks = check_server_health()
 for c in checks:
 print(c)
 send_alert(checks, hostname)

Nginx Access Log Analyzer

#!/usr/bin/env python3
"""nginx_log_analyzer.py — วิเคราะห์ Nginx access logs"""

import re
from collections import Counter, defaultdict
from pathlib import Path
from datetime import datetime

LOG_PATTERN = re.compile(
 r'(?P<ip>\S+) \S+ \S+ \[(?P<time>[^\]]+)\] '
 r'"(?P<method>\S+) (?P<url>\S+) \S+" (?P<status>\d+) (?P<size>\d+)'
)

def parse_log(filepath: str) -> list[dict]:
 entries = []
 with open(filepath, 'r') as f:
 for line in f:
 match = LOG_PATTERN.match(line)
 if match:
 entries.append(match.groupdict())
 return entries

def analyze(entries: list[dict]):
 ip_counter = Counter(e['ip'] for e in entries)
 status_counter = Counter(e['status'] for e in entries)
 url_counter = Counter(e['url'] for e in entries)
 total_bytes = sum(int(e['size']) for e in entries)
 errors = [e for e in entries if e['status'].startswith(('4', '5'))]

 print(f"Total Requests: {len(entries):,}")
 print(f"Total Transfer: {total_bytes / (1024**3):.2f} GB")
 print(f"Error Rate: {len(errors) / len(entries) * 100:.1f}%")

 print(f"\nTop 10 IPs:")
 for ip, count in ip_counter.most_common(10):
 print(f" {ip}: {count:,} requests")

 print(f"\nStatus Code Distribution:")
 for status, count in sorted(status_counter.items()):
 pct = count / len(entries) * 100
 print(f" {status}: {count:,} ({pct:.1f}%)")

 print(f"\nTop 10 URLs:")
 for url, count in url_counter.most_common(10):
 print(f" {url}: {count:,}")

if __name__ == '__main__':
 entries = parse_log('/var/log/nginx/access.log')
 if entries:
 analyze(entries)
 else:
 print("No log entries found")

Smart Backup Script

#!/usr/bin/env python3
"""smart_backup.py — Backup directories with rotation and compression"""

import tarfile
import shutil
from pathlib import Path
from datetime import datetime, timedelta
import logging
import hashlib

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)

class BackupManager:
 def __init__(self, backup_dir: str, retention_days: int = 30):
 self.backup_dir = Path(backup_dir)
 self.backup_dir.mkdir(parents=True, exist_ok=True)
 self.retention_days = retention_days

 def create_backup(self, source: str, name: str) -> Path:
 source_path = Path(source)
 if not source_path.exists():
 raise FileNotFoundError(f"Source not found: {source}")

 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
 archive_name = f"{name}_{timestamp}.tar.gz"
 archive_path = self.backup_dir / archive_name

 logger.info(f"Creating backup: {archive_path}")
 with tarfile.open(archive_path, 'w:gz') as tar:
 tar.add(source, arcname=source_path.name)

 size_mb = archive_path.stat().st_size / (1024 * 1024)
 md5 = hashlib.md5(archive_path.read_bytes()).hexdigest()
 logger.info(f"Backup complete: {size_mb:.1f} MB, MD5: {md5}")

 # Write checksum file
 checksum_path = archive_path.with_suffix('.md5')
 checksum_path.write_text(f"{md5} {archive_name}\n")

 return archive_path

 def rotate_backups(self, name: str):
 cutoff = datetime.now() - timedelta(days=self.retention_days)
 pattern = f"{name}_*.tar.gz"
 removed = 0

 for f in sorted(self.backup_dir.glob(pattern)):
 mtime = datetime.fromtimestamp(f.stat().st_mtime)
 if mtime < cutoff:
 f.unlink()
 md5_file = f.with_suffix('.md5')
 if md5_file.exists():
 md5_file.unlink()
 removed += 1
 logger.info(f"Removed old backup: {f.name}")

 logger.info(f"Rotation complete: removed {removed} old backups")

 def list_backups(self, name: str) -> list[dict]:
 pattern = f"{name}_*.tar.gz"
 backups = []
 for f in sorted(self.backup_dir.glob(pattern), reverse=True):
 backups.append({
 'file': f.name,
 'size_mb': f.stat().st_size / (1024 * 1024),
 'date': datetime.fromtimestamp(f.stat().st_mtime),
 })
 return backups

# ใช้งาน
if __name__ == '__main__':
 bm = BackupManager('/backup/daily', retention_days=30)

 # Backup directories
 targets = {
 'nginx-config': '/etc/nginx',
 'webapp': '/var/www/html',
 'crontabs': '/var/spool/cron',
 }

 for name, source in targets.items():
 try:
 bm.create_backup(source, name)
 bm.rotate_backups(name)
 except Exception as e:
 logger.error(f"Backup failed for {name}: {e}")

Port Scanner สำหรับ Service Monitoring

#!/usr/bin/env python3
"""port_checker.py — ตรวจสอบ services ที่ต้องทำงาน"""

import socket
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

@dataclass
class ServiceCheck:
 host: str
 port: int
 service: str
 is_up: bool
 response_ms: float = 0.0

def check_port(host: str, port: int, service: str, timeout: float = 3.0) -> ServiceCheck:
 import time
 start = time.monotonic()
 try:
 with socket.create_connection((host, port), timeout=timeout):
 elapsed = (time.monotonic() - start) * 1000
 return ServiceCheck(host, port, service, True, elapsed)
 except (socket.timeout, ConnectionRefusedError, OSError):
 elapsed = (time.monotonic() - start) * 1000
 return ServiceCheck(host, port, service, False, elapsed)

# รายการ services ที่ต้อง monitor
SERVICES = [
 ('10.10.10.11', 80, 'Nginx'),
 ('10.10.10.11', 443, 'Nginx SSL'),
 ('10.10.10.12', 3306, 'MySQL'),
 ('10.10.10.12', 6379, 'Redis'),
 ('10.10.10.13', 9090, 'Prometheus'),
 ('10.10.10.13', 3000, 'Grafana'),
 ('10.10.10.14', 8200, 'Vault'),
 ('10.10.10.15', 51820, 'WireGuard'),
]

with ThreadPoolExecutor(max_workers=20) as executor:
 futures = [executor.submit(check_port, h, p, s) for h, p, s in SERVICES]
 results = [f.result() for f in futures]

print(f"{'Service':<20} {'Host:Port':<25} {'Status':<10} {'Response'}")
print("-" * 70)
for r in results:
 status = "UP" if r.is_up else "DOWN"
 color = "" if r.is_up else "*** "
 print(f"{color}{r.service:<20} {r.host}:{r.port:<15} {status:<10} {r.response_ms:.0f}ms")

ตรวจสอบ SSL Certificate Expiry

#!/usr/bin/env python3
"""ssl_checker.py — ตรวจสอบวันหมดอายุ SSL certificates"""

import ssl
import socket
from datetime import datetime
from dataclasses import dataclass

@dataclass
class CertInfo:
 domain: str
 issuer: str
 expires: datetime
 days_left: int
 is_valid: bool

def check_ssl(domain: str, port: int = 443) -> CertInfo:
 context = ssl.create_default_context()
 try:
 with socket.create_connection((domain, port), timeout=5) as sock:
 with context.wrap_socket(sock, server_hostname=domain) as ssock:
 cert = ssock.getpeercert()
 expires = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
 days_left = (expires - datetime.utcnow()).days
 issuer = dict(x[0] for x in cert['issuer']).get('organizationName', 'Unknown')
 return CertInfo(domain, issuer, expires, days_left, True)
 except Exception as e:
 return CertInfo(domain, str(e), datetime.min, -1, False)

domains = [
 'siamcafe.net',
 'icafeforex.com',
 'siamlancard.com',
 'example.com',
]

print(f"{'Domain':<30} {'Issuer':<25} {'Expires':<12} {'Days Left'}")
print("-" * 80)
for domain in domains:
 info = check_ssl(domain)
 status = f"{info.days_left}d" if info.is_valid else "INVALID"
 warning = " !!!" if info.days_left < 30 and info.is_valid else ""
 print(f"{info.domain:<30} {info.issuer:<25} {info.expires.strftime('%Y-%m-%d') if info.is_valid else 'N/A':<12} {status}{warning}")

ลบไฟล์เก่าอัตโนมัติ

#!/usr/bin/env python3
"""disk_cleanup.py — ลบไฟล์เก่าและจัดการ disk space"""

from pathlib import Path
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s %(message)s')
logger = logging.getLogger(__name__)

class DiskCleaner:
 def __init__(self, dry_run: bool = True):
 self.dry_run = dry_run
 self.total_freed = 0

 def clean_old_files(self, directory: str, pattern: str, max_age_days: int):
 path = Path(directory)
 if not path.exists():
 logger.warning(f"Directory not found: {directory}")
 return

 cutoff = datetime.now() - timedelta(days=max_age_days)
 for f in path.glob(pattern):
 if f.is_file() and datetime.fromtimestamp(f.stat().st_mtime) < cutoff:
 size = f.stat().st_size
 if self.dry_run:
 logger.info(f"[DRY RUN] Would delete: {f} ({size / 1024:.1f} KB)")
 else:
 f.unlink()
 logger.info(f"Deleted: {f} ({size / 1024:.1f} KB)")
 self.total_freed += size

 def clean_empty_dirs(self, directory: str):
 path = Path(directory)
 for d in sorted(path.rglob('*'), reverse=True):
 if d.is_dir() and not any(d.iterdir()):
 if not self.dry_run:
 d.rmdir()
 logger.info(f"{'[DRY RUN] ' if self.dry_run else ''}Removed empty dir: {d}")

# กำหนดกฎการ cleanup
cleaner = DiskCleaner(dry_run=True) # เปลี่ยนเป็น False เมื่อพร้อมลบจริง

cleanup_rules = [
 ('/var/log', '*.gz', 30),
 ('/var/log', '*.old', 7),
 ('/tmp', '*', 7),
 ('/var/cache/apt/archives', '*.deb', 14),
 ('/home/*/Downloads', '*.tmp', 30),
]

for directory, pattern, max_age in cleanup_rules:
 cleaner.clean_old_files(directory, pattern, max_age)

freed_mb = cleaner.total_freed / (1024 * 1024)
logger.info(f"\nTotal space {'would be' if cleaner.dry_run else ''} freed: {freed_mb:.1f} MB")

ควรใช้ Python หรือ Bash สำหรับ automation scripts?

กฎง่ายๆของผมคือถ้า script ไม่เกิน 20 บรรทัดและเป็นแค่ chain of commands ใช้ Bash ถ้าต้องมี logic, loops, error handling, API calls, parsing JSON/YAML ใช้ Python ผมใช้ Python 80% และ Bash 20% สำหรับ simple wrappers

Python scripts ควรรันด้วย cron หรือ systemd timer?

systemd timer ดีกว่า cron ในหลายด้านมี logging ดีกว่า (journalctl), dependency management, resource limits, randomized delays ผมใช้ systemd timer สำหรับ production scripts ทั้งหมดแล้วดูบทความ Python Trends 2026 สำหรับ modern Python practices เพิ่มเติม

# /etc/systemd/system/health-check.service
[Unit]
Description=Server Health Check
[Service]
Type=oneshot
ExecStart=/usr/bin/python3 /opt/scripts/server_health.py
User=monitor

# /etc/systemd/system/health-check.timer
[Unit]
Description=Run health check every 5 minutes
[Timer]
OnCalendar=*:0/5
RandomizedDelaySec=30
[Install]
WantedBy=timers.target

จะจัดการ secrets ใน Python scripts ยังไง?

อย่า hardcode passwords ใน scripts เด็ดขาดใช้ environment variables, config files ที่มี permissions จำกัด, หรือ secrets manager เช่น HashiCorp Vault ผมใช้ python-dotenv สำหรับ development และ Vault สำหรับ production

มี framework สำหรับ SysAdmin scripts ไหม?

Fabric เป็น library ยอดนิยมสำหรับรัน commands บน remote servers ผ่าน SSH Invoke สำหรับ local task runner Click สำหรับสร้าง CLI tools ที่มี arguments/options Typer เป็น modern alternative ที่ใช้ type hints ผมใช้ Typer + Rich สำหรับ CLI tools ใหม่ทั้งหมด

สรุป

Python เป็นเครื่องมือที่ทรงพลังที่สุดสำหรับ SysAdmin automation ในปี 2026 Scripts ที่แสดงในบทความนี้ใช้ได้จริงทุกตัวตั้งแต่ server health monitoring, log analysis, automated backup, network scanning จนถึง SSL certificate monitoring

สิ่งสำคัญคือเริ่มจาก script ง่ายๆก่อนทดสอบให้ดีใช้ dry_run mode ก่อนรันจริงจัดการ secrets อย่างปลอดภัยและใช้ systemd timer แทน cron สำหรับ scheduling

แนะนำโดยผู้เชี่ยวชาญ

iCafeForex สอนเทรด Forex ฟรี SiamLancard IT Solutions

🎬 ดูวิดีโอเพิ่มเติม

เรียนรู้ IT, Forex Trading จากประสบการณ์จริง 30 ปี

▶ YouTube @icafefx
👨‍💻

อ. บอมกิตติทัศน์เจริญพนาสิทธิ์

ผู้ก่อตั้ง SiamCafe.net (1997) | IT Expert 30+ ปี | ประสบการณ์ Network, Server, Security, DevOps