ACME Protocol Batch Processing Pipeline คืออะไร
ACME (Automatic Certificate Management Environment) เป็น protocol มาตรฐานสำหรับ automate การออก, ต่ออายุ และจัดการ SSL/TLS certificates โดย Let's Encrypt เป็นผู้ใช้หลัก Batch Processing Pipeline คือการประมวลผล certificate operations เป็นชุดสำหรับองค์กรที่มี domains จำนวนมาก การรวมสองแนวคิดนี้ช่วยให้จัดการ certificates หลายร้อยหรือหลายพัน domains อัตโนมัติ ลดความเสี่ยงจาก expired certificates และประหยัดเวลา operations team
ACME Protocol พื้นฐาน
# acme_basics.py — ACME protocol fundamentals
import json
class ACMEBasics:
PROTOCOL = {
"name": "ACME (RFC 8555)",
"purpose": "Automate SSL/TLS certificate lifecycle",
"providers": ["Let's Encrypt (free)", "ZeroSSL", "Buypass", "Google Trust Services"],
"cert_types": ["DV (Domain Validation)", "wildcard (*.example.com)"],
"validity": "90 days (Let's Encrypt) — renew every 60 days",
}
FLOW = {
"step1": {"name": "Account Registration", "description": "สร้าง ACME account กับ CA (key pair)"},
"step2": {"name": "Order Certificate", "description": "ขอ certificate สำหรับ domain(s)"},
"step3": {"name": "Authorization (Challenge)", "description": "พิสูจน์ว่าเป็นเจ้าของ domain"},
"step4": {"name": "Finalize Order", "description": "ส่ง CSR (Certificate Signing Request)"},
"step5": {"name": "Download Certificate", "description": "ดาวน์โหลด certificate + chain"},
}
CHALLENGE_TYPES = {
"http01": {
"name": "HTTP-01",
"method": "วางไฟล์ที่ http://domain/.well-known/acme-challenge/TOKEN",
"use": "Web servers ที่เข้าถึง port 80 ได้",
"limitation": "ไม่รองรับ wildcard",
},
"dns01": {
"name": "DNS-01",
"method": "สร้าง TXT record _acme-challenge.domain",
"use": "Wildcard certificates, internal domains",
"limitation": "ต้องมี DNS API access",
},
"tls_alpn01": {
"name": "TLS-ALPN-01",
"method": "ตอบ challenge ผ่าน TLS connection บน port 443",
"use": "เมื่อ port 80 ไม่เปิด",
"limitation": "ไม่รองรับ wildcard",
},
}
def show_protocol(self):
print("=== ACME Protocol ===\n")
for key, value in self.PROTOCOL.items():
if isinstance(value, list):
print(f" {key}: {', '.join(value)}")
else:
print(f" {key}: {value}")
def show_flow(self):
print(f"\n=== Certificate Flow ===")
for key, step in self.FLOW.items():
print(f" [{step['name']}] {step['description']}")
def show_challenges(self):
print(f"\n=== Challenge Types ===")
for key, challenge in self.CHALLENGE_TYPES.items():
print(f" [{challenge['name']}] {challenge['method']}")
print(f" Use: {challenge['use']}")
print()
acme = ACMEBasics()
acme.show_protocol()
acme.show_flow()
acme.show_challenges()
Batch Certificate Manager
# batch_manager.py — Batch ACME certificate management
import json
import os
from datetime import datetime, timedelta
class BatchCertManager:
PYTHON_MANAGER = """
# cert_manager.py — Batch certificate manager
import subprocess
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
class CertManager:
def __init__(self, email, cert_dir="/etc/letsencrypt"):
self.email = email
self.cert_dir = Path(cert_dir)
self.domains_file = "domains.json"
def load_domains(self):
with open(self.domains_file) as f:
return json.load(f)
def check_expiry(self, domain):
cert_path = self.cert_dir / "live" / domain / "cert.pem"
if not cert_path.exists():
return None
result = subprocess.run(
["openssl", "x509", "-enddate", "-noout", "-in", str(cert_path)],
capture_output=True, text=True
)
if result.returncode == 0:
date_str = result.stdout.strip().split("=")[1]
expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z")
return expiry
return None
def needs_renewal(self, domain, days_before=30):
expiry = self.check_expiry(domain)
if expiry is None:
return True # No cert = needs issuance
return expiry - datetime.now() < timedelta(days=days_before)
def issue_cert(self, domain, challenge="dns-01", dns_plugin="cloudflare"):
cmd = [
"certbot", "certonly",
"--non-interactive",
"--agree-tos",
"--email", self.email,
]
if challenge == "dns-01":
cmd.extend([
f"--dns-{dns_plugin}",
f"--dns-{dns_plugin}-credentials", f"/etc/letsencrypt/{dns_plugin}.ini",
])
else:
cmd.extend(["--webroot", "-w", "/var/www/html"])
if isinstance(domain, list):
for d in domain:
cmd.extend(["-d", d])
else:
cmd.extend(["-d", domain])
result = subprocess.run(cmd, capture_output=True, text=True)
return result.returncode == 0, result.stdout + result.stderr
def batch_renew(self):
domains = self.load_domains()
results = {"renewed": [], "skipped": [], "failed": []}
for entry in domains:
domain = entry["domain"]
if self.needs_renewal(domain):
success, output = self.issue_cert(domain)
if success:
results["renewed"].append(domain)
else:
results["failed"].append({"domain": domain, "error": output[:200]})
else:
results["skipped"].append(domain)
return results
manager = CertManager("admin@example.com")
results = manager.batch_renew()
print(f"Renewed: {len(results['renewed'])}")
print(f"Skipped: {len(results['skipped'])}")
print(f"Failed: {len(results['failed'])}")
"""
DOMAINS_CONFIG = """
# domains.json — Domain configuration
[
{"domain": "example.com", "sans": ["www.example.com"], "challenge": "http-01"},
{"domain": "*.api.example.com", "challenge": "dns-01", "dns_plugin": "cloudflare"},
{"domain": "app.example.com", "sans": ["cdn.example.com"], "challenge": "http-01"},
{"domain": "*.internal.example.com", "challenge": "dns-01", "dns_plugin": "route53"}
]
"""
def show_manager(self):
print("=== Batch Certificate Manager ===")
print(self.PYTHON_MANAGER[:600])
def show_config(self):
print(f"\n=== Domains Config ===")
print(self.DOMAINS_CONFIG[:400])
mgr = BatchCertManager()
mgr.show_manager()
mgr.show_config()
Pipeline Architecture
# pipeline.py — ACME batch processing pipeline
import json
import random
class ACMEPipeline:
STAGES = {
"inventory": {
"name": "1. Inventory",
"description": "รวบรวมรายการ domains ทั้งหมดที่ต้องจัดการ certificates",
"sources": "DNS records, load balancers, CDN configs, domain registrar",
},
"check": {
"name": "2. Check Expiry",
"description": "ตรวจสอบ expiry date ของ certificates ทั้งหมด",
"threshold": "Renew ถ้าเหลือ < 30 วัน",
},
"issue_renew": {
"name": "3. Issue/Renew",
"description": "ขอหรือต่ออายุ certificates ผ่าน ACME protocol",
"method": "certbot, acme.sh, lego",
},
"deploy": {
"name": "4. Deploy",
"description": "Deploy certificates ไปยัง web servers, load balancers, CDN",
"targets": "Nginx, Apache, AWS ALB/CloudFront, Kubernetes",
},
"verify": {
"name": "5. Verify",
"description": "ตรวจสอบว่า certificates ทำงานถูกต้องบน endpoints",
"checks": "SSL Labs grade, chain validation, OCSP stapling",
},
"report": {
"name": "6. Report",
"description": "สรุปผลและแจ้งเตือนทีม",
"channels": "Slack, email, dashboard",
},
}
def show_pipeline(self):
print("=== Pipeline Stages ===\n")
for key, stage in self.STAGES.items():
print(f"[{stage['name']}]")
print(f" {stage['description']}")
print()
def simulate_batch(self):
print("=== Batch Processing Results ===")
total = random.randint(50, 200)
renewed = random.randint(5, 20)
skipped = total - renewed - random.randint(0, 3)
failed = total - renewed - skipped
print(f" Total domains: {total}")
print(f" Renewed: {renewed}")
print(f" Skipped (not due): {skipped}")
print(f" Failed: {failed}")
print(f" Next run: {random.randint(1, 24)} hours")
pipeline = ACMEPipeline()
pipeline.show_pipeline()
pipeline.simulate_batch()
Automation with Cron & Airflow
# automation.py — Automated certificate pipeline
import json
class CertAutomation:
CRON = """
# /etc/cron.d/cert-renewal — Daily certificate check
# Run daily at 3 AM
0 3 * * * root /opt/cert-manager/batch_renew.sh >> /var/log/cert-renewal.log 2>&1
# batch_renew.sh
#!/bin/bash
set -e
cd /opt/cert-manager
python3 cert_manager.py --action batch-renew --config domains.json
# Reload services after renewal
if [ -f /var/run/nginx.pid ]; then
nginx -s reload
fi
# Send notification
python3 notify.py --channel slack --results /tmp/cert-results.json
"""
AIRFLOW_DAG = """
# dags/cert_pipeline.py — Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'infra-team',
'retries': 2,
'retry_delay': timedelta(minutes=10),
}
dag = DAG(
'cert_batch_pipeline',
default_args=default_args,
schedule_interval='0 3 * * *',
start_date=datetime(2025, 1, 1),
catchup=False,
)
inventory = PythonOperator(
task_id='inventory_domains',
python_callable=lambda: __import__('cert_manager').inventory(),
dag=dag,
)
check = PythonOperator(
task_id='check_expiry',
python_callable=lambda: __import__('cert_manager').check_all(),
dag=dag,
)
renew = PythonOperator(
task_id='batch_renew',
python_callable=lambda: __import__('cert_manager').batch_renew(),
dag=dag,
)
deploy = PythonOperator(
task_id='deploy_certs',
python_callable=lambda: __import__('deployer').deploy_all(),
dag=dag,
)
verify = PythonOperator(
task_id='verify_ssl',
python_callable=lambda: __import__('verifier').check_endpoints(),
dag=dag,
)
report = PythonOperator(
task_id='send_report',
python_callable=lambda: __import__('reporter').send_report(),
dag=dag,
)
inventory >> check >> renew >> deploy >> verify >> report
"""
def show_cron(self):
print("=== Cron Setup ===")
print(self.CRON[:400])
def show_airflow(self):
print(f"\n=== Airflow DAG ===")
print(self.AIRFLOW_DAG[:500])
auto = CertAutomation()
auto.show_cron()
auto.show_airflow()
Monitoring & Alerting
# monitoring.py — Certificate monitoring
import json
import random
from datetime import datetime, timedelta
class CertMonitoring:
def cert_dashboard(self):
print("=== Certificate Dashboard ===\n")
certs = [
{"domain": "example.com", "expiry_days": random.randint(30, 80), "grade": "A+"},
{"domain": "*.api.example.com", "expiry_days": random.randint(5, 60), "grade": "A"},
{"domain": "app.example.com", "expiry_days": random.randint(20, 70), "grade": "A+"},
{"domain": "cdn.example.com", "expiry_days": random.randint(40, 85), "grade": "A"},
{"domain": "internal.example.com", "expiry_days": random.randint(1, 30), "grade": "B"},
]
for cert in sorted(certs, key=lambda x: x["expiry_days"]):
status = "URGENT" if cert["expiry_days"] < 14 else "WARN" if cert["expiry_days"] < 30 else "OK"
print(f" [{status:>6}] {cert['domain']:<30} Expires: {cert['expiry_days']}d | Grade: {cert['grade']}")
def alert_rules(self):
print(f"\n=== Alert Rules ===")
rules = [
{"condition": "Expiry < 7 days", "action": "CRITICAL alert → PagerDuty + Slack", "priority": "P1"},
{"condition": "Expiry < 14 days", "action": "WARNING alert → Slack", "priority": "P2"},
{"condition": "Expiry < 30 days", "action": "INFO → auto-renew triggered", "priority": "P3"},
{"condition": "Renewal failed", "action": "CRITICAL alert → PagerDuty", "priority": "P1"},
{"condition": "SSL Grade < A", "action": "WARNING → review config", "priority": "P2"},
]
for rule in rules:
print(f" [{rule['priority']}] {rule['condition']} → {rule['action']}")
def metrics(self):
print(f"\n=== Metrics ===")
metrics = {
"Total certificates": random.randint(50, 200),
"Expiring < 30 days": random.randint(2, 10),
"Renewed today": random.randint(0, 5),
"Failed renewals": random.randint(0, 2),
"Average SSL Grade": "A",
"Auto-renewal success rate": f"{random.randint(95, 100)}%",
}
for m, v in metrics.items():
print(f" {m}: {v}")
mon = CertMonitoring()
mon.cert_dashboard()
mon.alert_rules()
mon.metrics()
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
FAQ - คำถามที่พบบ่อย
Q: certbot กับ acme.sh อันไหนดี?
A: certbot: official Let's Encrypt client, Python, plugins เยอะ, community ใหญ่ acme.sh: pure shell script, lightweight, ไม่ต้อง dependencies, DNS plugins เยอะ ใช้ certbot: standard setup, มี Python ecosystem ใช้ acme.sh: lightweight, embedded systems, Docker, ไม่อยาก install Python
Q: Wildcard certificate ดีกว่า individual ไหม?
A: Wildcard (*.example.com): จัดการง่าย 1 cert ครอบคลุมทุก subdomain Individual: granular control, revoke ได้ทีละ domain ใช้ Wildcard: subdomains เยอะ, เปลี่ยนบ่อย ใช้ Individual: ต้องการ security isolation, compliance requirements Wildcard ต้องใช้ DNS-01 challenge (ไม่ใช้ HTTP-01 ได้)
Q: Certificate หมดอายุแล้วทำอย่างไร?
A: 1. Renew ทันที: certbot renew --force-renewal 2. Reload web server: nginx -s reload หรือ systemctl reload apache2 3. ตรวจสอบ: openssl s_client -connect domain:443 4. ป้องกัน: ตั้ง auto-renewal + monitoring alerts Certificate หมดอายุ = เว็บไม่ปลอดภัย browsers จะ block = เสียลูกค้า
Q: Rate limits ของ Let's Encrypt เท่าไหร่?
A: 50 certificates per domain per week 5 duplicate certificates per week 300 new orders per account per 3 hours 100 names (SANs) per certificate สำหรับ batch processing: วางแผน rate limiting, ใช้ staging endpoint ทดสอบก่อน
