Technology

ACME Protocol Batch Processing Pipeline

acme protocol batch processing pipeline
ACME Protocol Batch Processing Pipeline | SiamCafe Blog
2025-10-30· อ. บอม — SiamCafe.net· 1,166 คำ

ACME Protocol Batch Processing Pipeline คืออะไร

ACME (Automatic Certificate Management Environment) เป็น protocol มาตรฐานสำหรับ automate การออก, ต่ออายุ และจัดการ SSL/TLS certificates โดย Let's Encrypt เป็นผู้ใช้หลัก Batch Processing Pipeline คือการประมวลผล certificate operations เป็นชุดสำหรับองค์กรที่มี domains จำนวนมาก การรวมสองแนวคิดนี้ช่วยให้จัดการ certificates หลายร้อยหรือหลายพัน domains อัตโนมัติ ลดความเสี่ยงจาก expired certificates และประหยัดเวลา operations team

ACME Protocol พื้นฐาน

# acme_basics.py — ACME protocol fundamentals
import json

class ACMEBasics:
    PROTOCOL = {
        "name": "ACME (RFC 8555)",
        "purpose": "Automate SSL/TLS certificate lifecycle",
        "providers": ["Let's Encrypt (free)", "ZeroSSL", "Buypass", "Google Trust Services"],
        "cert_types": ["DV (Domain Validation)", "wildcard (*.example.com)"],
        "validity": "90 days (Let's Encrypt) — renew every 60 days",
    }

    FLOW = {
        "step1": {"name": "Account Registration", "description": "สร้าง ACME account กับ CA (key pair)"},
        "step2": {"name": "Order Certificate", "description": "ขอ certificate สำหรับ domain(s)"},
        "step3": {"name": "Authorization (Challenge)", "description": "พิสูจน์ว่าเป็นเจ้าของ domain"},
        "step4": {"name": "Finalize Order", "description": "ส่ง CSR (Certificate Signing Request)"},
        "step5": {"name": "Download Certificate", "description": "ดาวน์โหลด certificate + chain"},
    }

    CHALLENGE_TYPES = {
        "http01": {
            "name": "HTTP-01",
            "method": "วางไฟล์ที่ http://domain/.well-known/acme-challenge/TOKEN",
            "use": "Web servers ที่เข้าถึง port 80 ได้",
            "limitation": "ไม่รองรับ wildcard",
        },
        "dns01": {
            "name": "DNS-01",
            "method": "สร้าง TXT record _acme-challenge.domain",
            "use": "Wildcard certificates, internal domains",
            "limitation": "ต้องมี DNS API access",
        },
        "tls_alpn01": {
            "name": "TLS-ALPN-01",
            "method": "ตอบ challenge ผ่าน TLS connection บน port 443",
            "use": "เมื่อ port 80 ไม่เปิด",
            "limitation": "ไม่รองรับ wildcard",
        },
    }

    def show_protocol(self):
        print("=== ACME Protocol ===\n")
        for key, value in self.PROTOCOL.items():
            if isinstance(value, list):
                print(f"  {key}: {', '.join(value)}")
            else:
                print(f"  {key}: {value}")

    def show_flow(self):
        print(f"\n=== Certificate Flow ===")
        for key, step in self.FLOW.items():
            print(f"  [{step['name']}] {step['description']}")

    def show_challenges(self):
        print(f"\n=== Challenge Types ===")
        for key, challenge in self.CHALLENGE_TYPES.items():
            print(f"  [{challenge['name']}] {challenge['method']}")
            print(f"    Use: {challenge['use']}")
            print()

acme = ACMEBasics()
acme.show_protocol()
acme.show_flow()
acme.show_challenges()

Batch Certificate Manager

# batch_manager.py — Batch ACME certificate management
import json
import os
from datetime import datetime, timedelta

class BatchCertManager:
    PYTHON_MANAGER = """
# cert_manager.py — Batch certificate manager
import subprocess
import json
import os
from datetime import datetime, timedelta
from pathlib import Path

class CertManager:
    def __init__(self, email, cert_dir="/etc/letsencrypt"):
        self.email = email
        self.cert_dir = Path(cert_dir)
        self.domains_file = "domains.json"
    
    def load_domains(self):
        with open(self.domains_file) as f:
            return json.load(f)
    
    def check_expiry(self, domain):
        cert_path = self.cert_dir / "live" / domain / "cert.pem"
        if not cert_path.exists():
            return None
        
        result = subprocess.run(
            ["openssl", "x509", "-enddate", "-noout", "-in", str(cert_path)],
            capture_output=True, text=True
        )
        if result.returncode == 0:
            date_str = result.stdout.strip().split("=")[1]
            expiry = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z")
            return expiry
        return None
    
    def needs_renewal(self, domain, days_before=30):
        expiry = self.check_expiry(domain)
        if expiry is None:
            return True  # No cert = needs issuance
        return expiry - datetime.now() < timedelta(days=days_before)
    
    def issue_cert(self, domain, challenge="dns-01", dns_plugin="cloudflare"):
        cmd = [
            "certbot", "certonly",
            "--non-interactive",
            "--agree-tos",
            "--email", self.email,
        ]
        
        if challenge == "dns-01":
            cmd.extend([
                f"--dns-{dns_plugin}",
                f"--dns-{dns_plugin}-credentials", f"/etc/letsencrypt/{dns_plugin}.ini",
            ])
        else:
            cmd.extend(["--webroot", "-w", "/var/www/html"])
        
        if isinstance(domain, list):
            for d in domain:
                cmd.extend(["-d", d])
        else:
            cmd.extend(["-d", domain])
        
        result = subprocess.run(cmd, capture_output=True, text=True)
        return result.returncode == 0, result.stdout + result.stderr
    
    def batch_renew(self):
        domains = self.load_domains()
        results = {"renewed": [], "skipped": [], "failed": []}
        
        for entry in domains:
            domain = entry["domain"]
            if self.needs_renewal(domain):
                success, output = self.issue_cert(domain)
                if success:
                    results["renewed"].append(domain)
                else:
                    results["failed"].append({"domain": domain, "error": output[:200]})
            else:
                results["skipped"].append(domain)
        
        return results

manager = CertManager("admin@example.com")
results = manager.batch_renew()
print(f"Renewed: {len(results['renewed'])}")
print(f"Skipped: {len(results['skipped'])}")
print(f"Failed: {len(results['failed'])}")
"""

    DOMAINS_CONFIG = """
# domains.json — Domain configuration
[
    {"domain": "example.com", "sans": ["www.example.com"], "challenge": "http-01"},
    {"domain": "*.api.example.com", "challenge": "dns-01", "dns_plugin": "cloudflare"},
    {"domain": "app.example.com", "sans": ["cdn.example.com"], "challenge": "http-01"},
    {"domain": "*.internal.example.com", "challenge": "dns-01", "dns_plugin": "route53"}
]
"""

    def show_manager(self):
        print("=== Batch Certificate Manager ===")
        print(self.PYTHON_MANAGER[:600])

    def show_config(self):
        print(f"\n=== Domains Config ===")
        print(self.DOMAINS_CONFIG[:400])

mgr = BatchCertManager()
mgr.show_manager()
mgr.show_config()

Pipeline Architecture

# pipeline.py — ACME batch processing pipeline
import json
import random

class ACMEPipeline:
    STAGES = {
        "inventory": {
            "name": "1. Inventory",
            "description": "รวบรวมรายการ domains ทั้งหมดที่ต้องจัดการ certificates",
            "sources": "DNS records, load balancers, CDN configs, domain registrar",
        },
        "check": {
            "name": "2. Check Expiry",
            "description": "ตรวจสอบ expiry date ของ certificates ทั้งหมด",
            "threshold": "Renew ถ้าเหลือ < 30 วัน",
        },
        "issue_renew": {
            "name": "3. Issue/Renew",
            "description": "ขอหรือต่ออายุ certificates ผ่าน ACME protocol",
            "method": "certbot, acme.sh, lego",
        },
        "deploy": {
            "name": "4. Deploy",
            "description": "Deploy certificates ไปยัง web servers, load balancers, CDN",
            "targets": "Nginx, Apache, AWS ALB/CloudFront, Kubernetes",
        },
        "verify": {
            "name": "5. Verify",
            "description": "ตรวจสอบว่า certificates ทำงานถูกต้องบน endpoints",
            "checks": "SSL Labs grade, chain validation, OCSP stapling",
        },
        "report": {
            "name": "6. Report",
            "description": "สรุปผลและแจ้งเตือนทีม",
            "channels": "Slack, email, dashboard",
        },
    }

    def show_pipeline(self):
        print("=== Pipeline Stages ===\n")
        for key, stage in self.STAGES.items():
            print(f"[{stage['name']}]")
            print(f"  {stage['description']}")
            print()

    def simulate_batch(self):
        print("=== Batch Processing Results ===")
        total = random.randint(50, 200)
        renewed = random.randint(5, 20)
        skipped = total - renewed - random.randint(0, 3)
        failed = total - renewed - skipped
        print(f"  Total domains: {total}")
        print(f"  Renewed: {renewed}")
        print(f"  Skipped (not due): {skipped}")
        print(f"  Failed: {failed}")
        print(f"  Next run: {random.randint(1, 24)} hours")

pipeline = ACMEPipeline()
pipeline.show_pipeline()
pipeline.simulate_batch()

Automation with Cron & Airflow

# automation.py — Automated certificate pipeline
import json

class CertAutomation:
    CRON = """
# /etc/cron.d/cert-renewal — Daily certificate check
# Run daily at 3 AM
0 3 * * * root /opt/cert-manager/batch_renew.sh >> /var/log/cert-renewal.log 2>&1

# batch_renew.sh
#!/bin/bash
set -e

cd /opt/cert-manager
python3 cert_manager.py --action batch-renew --config domains.json

# Reload services after renewal
if [ -f /var/run/nginx.pid ]; then
    nginx -s reload
fi

# Send notification
python3 notify.py --channel slack --results /tmp/cert-results.json
"""

    AIRFLOW_DAG = """
# dags/cert_pipeline.py — Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'infra-team',
    'retries': 2,
    'retry_delay': timedelta(minutes=10),
}

dag = DAG(
    'cert_batch_pipeline',
    default_args=default_args,
    schedule_interval='0 3 * * *',
    start_date=datetime(2025, 1, 1),
    catchup=False,
)

inventory = PythonOperator(
    task_id='inventory_domains',
    python_callable=lambda: __import__('cert_manager').inventory(),
    dag=dag,
)

check = PythonOperator(
    task_id='check_expiry',
    python_callable=lambda: __import__('cert_manager').check_all(),
    dag=dag,
)

renew = PythonOperator(
    task_id='batch_renew',
    python_callable=lambda: __import__('cert_manager').batch_renew(),
    dag=dag,
)

deploy = PythonOperator(
    task_id='deploy_certs',
    python_callable=lambda: __import__('deployer').deploy_all(),
    dag=dag,
)

verify = PythonOperator(
    task_id='verify_ssl',
    python_callable=lambda: __import__('verifier').check_endpoints(),
    dag=dag,
)

report = PythonOperator(
    task_id='send_report',
    python_callable=lambda: __import__('reporter').send_report(),
    dag=dag,
)

inventory >> check >> renew >> deploy >> verify >> report
"""

    def show_cron(self):
        print("=== Cron Setup ===")
        print(self.CRON[:400])

    def show_airflow(self):
        print(f"\n=== Airflow DAG ===")
        print(self.AIRFLOW_DAG[:500])

auto = CertAutomation()
auto.show_cron()
auto.show_airflow()

Monitoring & Alerting

# monitoring.py — Certificate monitoring
import json
import random
from datetime import datetime, timedelta

class CertMonitoring:
    def cert_dashboard(self):
        print("=== Certificate Dashboard ===\n")
        certs = [
            {"domain": "example.com", "expiry_days": random.randint(30, 80), "grade": "A+"},
            {"domain": "*.api.example.com", "expiry_days": random.randint(5, 60), "grade": "A"},
            {"domain": "app.example.com", "expiry_days": random.randint(20, 70), "grade": "A+"},
            {"domain": "cdn.example.com", "expiry_days": random.randint(40, 85), "grade": "A"},
            {"domain": "internal.example.com", "expiry_days": random.randint(1, 30), "grade": "B"},
        ]
        for cert in sorted(certs, key=lambda x: x["expiry_days"]):
            status = "URGENT" if cert["expiry_days"] < 14 else "WARN" if cert["expiry_days"] < 30 else "OK"
            print(f"  [{status:>6}] {cert['domain']:<30} Expires: {cert['expiry_days']}d | Grade: {cert['grade']}")

    def alert_rules(self):
        print(f"\n=== Alert Rules ===")
        rules = [
            {"condition": "Expiry < 7 days", "action": "CRITICAL alert → PagerDuty + Slack", "priority": "P1"},
            {"condition": "Expiry < 14 days", "action": "WARNING alert → Slack", "priority": "P2"},
            {"condition": "Expiry < 30 days", "action": "INFO → auto-renew triggered", "priority": "P3"},
            {"condition": "Renewal failed", "action": "CRITICAL alert → PagerDuty", "priority": "P1"},
            {"condition": "SSL Grade < A", "action": "WARNING → review config", "priority": "P2"},
        ]
        for rule in rules:
            print(f"  [{rule['priority']}] {rule['condition']} → {rule['action']}")

    def metrics(self):
        print(f"\n=== Metrics ===")
        metrics = {
            "Total certificates": random.randint(50, 200),
            "Expiring < 30 days": random.randint(2, 10),
            "Renewed today": random.randint(0, 5),
            "Failed renewals": random.randint(0, 2),
            "Average SSL Grade": "A",
            "Auto-renewal success rate": f"{random.randint(95, 100)}%",
        }
        for m, v in metrics.items():
            print(f"  {m}: {v}")

mon = CertMonitoring()
mon.cert_dashboard()
mon.alert_rules()
mon.metrics()

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

FAQ - คำถามที่พบบ่อย

Q: certbot กับ acme.sh อันไหนดี?

A: certbot: official Let's Encrypt client, Python, plugins เยอะ, community ใหญ่ acme.sh: pure shell script, lightweight, ไม่ต้อง dependencies, DNS plugins เยอะ ใช้ certbot: standard setup, มี Python ecosystem ใช้ acme.sh: lightweight, embedded systems, Docker, ไม่อยาก install Python

Q: Wildcard certificate ดีกว่า individual ไหม?

A: Wildcard (*.example.com): จัดการง่าย 1 cert ครอบคลุมทุก subdomain Individual: granular control, revoke ได้ทีละ domain ใช้ Wildcard: subdomains เยอะ, เปลี่ยนบ่อย ใช้ Individual: ต้องการ security isolation, compliance requirements Wildcard ต้องใช้ DNS-01 challenge (ไม่ใช้ HTTP-01 ได้)

Q: Certificate หมดอายุแล้วทำอย่างไร?

A: 1. Renew ทันที: certbot renew --force-renewal 2. Reload web server: nginx -s reload หรือ systemctl reload apache2 3. ตรวจสอบ: openssl s_client -connect domain:443 4. ป้องกัน: ตั้ง auto-renewal + monitoring alerts Certificate หมดอายุ = เว็บไม่ปลอดภัย browsers จะ block = เสียลูกค้า

Q: Rate limits ของ Let's Encrypt เท่าไหร่?

A: 50 certificates per domain per week 5 duplicate certificates per week 300 new orders per account per 3 hours 100 names (SANs) per certificate สำหรับ batch processing: วางแผน rate limiting, ใช้ staging endpoint ทดสอบก่อน

📖 บทความที่เกี่ยวข้อง

IS-IS Protocol Batch Processing Pipelineอ่านบทความ → AWS CDK Batch Processing Pipelineอ่านบทความ → REST API Design Batch Processing Pipelineอ่านบทความ → Feature Store Feast Batch Processing Pipelineอ่านบทความ → AWS Glue ETL Batch Processing Pipelineอ่านบทความ →

📚 ดูบทความทั้งหมด →