SiamCafe · Blog
Certificate Manager CQRS Event Sourcing — จัดการ
บทความ

Certificate Manager CQRS Event Sourcing — จัดการ

เผยแพร่ 28 พฤษภาคม 2569

Certificate Manager CQRS Event Sourcing

Certificate Manager CQRS Event Sourcing — จัดการ

cert-manager SSL TLS Certificate Kubernetes Let's Encrypt ACME CQRS Command Query Separation Event Sourcing Events Audit Trail Replay

Patternหลักการใช้เมื่อ
CQRSแยก Read/Write ModelRead/Write Load ต่างกันมาก
Event Sourcingเก็บ Events แทน Stateต้องการ Audit Trail ครบ
cert-managerAuto SSL CertificateKubernetes HTTPS
ACMEAutomated CertificateLet's Encrypt Cert

cert-manager Setup

# === cert-manager Installation & Config ===

# Install cert-manager
# helm repo add jetstack https://charts.jetstack.io
# helm repo update
# helm install cert-manager jetstack/cert-manager \
#   --namespace cert-manager --create-namespace \
#   --set installCRDs=true

# ClusterIssuer — Let's Encrypt Production
# apiVersion: cert-manager.io/v1
# kind: ClusterIssuer
# metadata:
#   name: letsencrypt-prod
# spec:
#   acme:
#     server: https://acme-v02.api.letsencrypt.org/directory
#     email: admin@example.com
#     privateKeySecretRef:
#       name: letsencrypt-prod-key
#     solvers:
#     - http01:
#         ingress:
#           class: nginx
#     - dns01:
#         cloudflare:
#           email: admin@example.com
#           apiTokenSecretRef:
#             name: cloudflare-api-token
#             key: api-token
#       selector:
#         dnsZones:
#         - "example.com"

# Certificate Resource
# apiVersion: cert-manager.io/v1
# kind: Certificate
# metadata:
#   name: myapp-tls
#   namespace: production
# spec:
#   secretName: myapp-tls-secret
#   issuerRef:
#     name: letsencrypt-prod
#     kind: ClusterIssuer
#   commonName: myapp.example.com
#   dnsNames:
#   - myapp.example.com
#   - www.myapp.example.com
#   - api.myapp.example.com
#   renewBefore: 720h  # 30 days before expiry

# Ingress with TLS
# apiVersion: networking.k8s.io/v1
# kind: Ingress
# metadata:
#   name: myapp-ingress
#   annotations:
#     cert-manager.io/cluster-issuer: letsencrypt-prod
# spec:
#   tls:
#   - hosts:
#     - myapp.example.com
#     secretName: myapp-tls-secret
#   rules:
#   - host: myapp.example.com
#     http:
#       paths:
#       - path: /
#         pathType: Prefix
#         backend:
#           service:
#             name: myapp
#             port:
#               number: 80

# kubectl get certificates -A
# kubectl describe certificate myapp-tls
# kubectl get certificaterequests -A

from dataclasses import dataclass
from typing import List

@dataclass
class CertConfig:
    domain: str
    issuer: str
    challenge: str
    renew_before_days: int
    status: str

certs = [
    CertConfig("myapp.example.com", "letsencrypt-prod", "HTTP-01", 30, "Ready"),
    CertConfig("*.example.com", "letsencrypt-prod", "DNS-01", 30, "Ready"),
    CertConfig("api.example.com", "vault-issuer", "Internal", 15, "Ready"),
    CertConfig("internal.corp", "self-signed", "None", 365, "Ready"),
]

print("Certificate Status:")
for c in certs:
    print(f"  {c.domain} | {c.issuer} | {c.challenge} | "
          f"Renew: {c.renew_before_days}d | {c.status}")

CQRS Pattern

Certificate Manager CQRS Event Sourcing — จัดการ
# cqrs.py — CQRS Pattern Implementation
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
from abc import ABC, abstractmethod

# Commands (Write Side)
@dataclass
class Command(ABC):
    timestamp: str = field(default_factory=lambda: datetime.now().isoformat())

@dataclass
class RequestCertificate(Command):
    domain: str = ""
    issuer: str = "letsencrypt"
    requested_by: str = ""

@dataclass
class RenewCertificate(Command):
    domain: str = ""
    reason: str = ""

@dataclass
class RevokeCertificate(Command):
    domain: str = ""
    reason: str = ""
    revoked_by: str = ""

# Events (Event Sourcing)
@dataclass
class Event:
    event_type: str
    timestamp: str
    data: Dict

class EventStore:
    def __init__(self):
        self.events: List[Event] = []

    def append(self, event: Event):
        self.events.append(event)

    def get_events(self, filter_type: str = None) -> List[Event]:
        if filter_type:
            return [e for e in self.events if e.event_type == filter_type]
        return self.events

# Command Handler (Write Side)
class CertCommandHandler:
    def __init__(self, store: EventStore):
        self.store = store

    def handle_request(self, cmd: RequestCertificate):
        event = Event("CertificateRequested", cmd.timestamp,
                      {"domain": cmd.domain, "issuer": cmd.issuer,
                       "requested_by": cmd.requested_by})
        self.store.append(event)

        # Simulate ACME challenge
        event2 = Event("CertificateIssued", datetime.now().isoformat(),
                       {"domain": cmd.domain, "serial": "ABC123",
                        "expires": "2024-04-15"})
        self.store.append(event2)

    def handle_renew(self, cmd: RenewCertificate):
        event = Event("CertificateRenewed", cmd.timestamp,
                      {"domain": cmd.domain, "reason": cmd.reason})
        self.store.append(event)

    def handle_revoke(self, cmd: RevokeCertificate):
        event = Event("CertificateRevoked", cmd.timestamp,
                      {"domain": cmd.domain, "reason": cmd.reason,
                       "revoked_by": cmd.revoked_by})
        self.store.append(event)

# Query Handler (Read Side)
class CertQueryHandler:
    def __init__(self, store: EventStore):
        self.store = store

    def get_active_certs(self) -> List[Dict]:
        certs = {}
        for event in self.store.events:
            domain = event.data.get("domain", "")
            if event.event_type == "CertificateIssued":
                certs[domain] = {"status": "active", **event.data}
            elif event.event_type == "CertificateRevoked":
                if domain in certs:
                    certs[domain]["status"] = "revoked"
        return [v for v in certs.values() if v["status"] == "active"]

    def get_audit_trail(self, domain: str) -> List[Event]:
        return [e for e in self.store.events if e.data.get("domain") == domain]

# Demo
store = EventStore()
cmd_handler = CertCommandHandler(store)
query_handler = CertQueryHandler(store)

cmd_handler.handle_request(RequestCertificate(
    domain="myapp.example.com", issuer="letsencrypt", requested_by="DevOps"))
cmd_handler.handle_request(RequestCertificate(
    domain="api.example.com", issuer="vault", requested_by="Backend"))
cmd_handler.handle_renew(RenewCertificate(
    domain="myapp.example.com", reason="Auto-renewal 30 days before expiry"))

print("=== CQRS Demo ===")
print(f"\nActive Certificates:")
for cert in query_handler.get_active_certs():
    print(f"  {cert['domain']} | Status: {cert['status']}")

print(f"\nAudit Trail (myapp.example.com):")
for event in query_handler.get_audit_trail("myapp.example.com"):
    print(f"  [{event.timestamp[:19]}] {event.event_type}: {event.data}")

Event Store

# event_store.py — Persistent Event Store
# PostgreSQL Event Store Schema
# CREATE TABLE events (
#   id BIGSERIAL PRIMARY KEY,
#   stream_id VARCHAR(255) NOT NULL,
#   event_type VARCHAR(100) NOT NULL,
#   data JSONB NOT NULL,
#   metadata JSONB DEFAULT '{}',
#   version INT NOT NULL,
#   created_at TIMESTAMP DEFAULT NOW(),
#   INDEX idx_stream (stream_id, version),
#   UNIQUE (stream_id, version)
# );
#
# CREATE TABLE snapshots (
#   stream_id VARCHAR(255) PRIMARY KEY,
#   data JSONB NOT NULL,
#   version INT NOT NULL,
#   created_at TIMESTAMP DEFAULT NOW()
# );

# Event Store with Snapshots
event_store_features = {
    "Append Only": "Events เพิ่มเท่านั้น ไม่แก้ไข ไม่ลบ",
    "Stream": "จัดกลุ่ม Events ตาม Aggregate (เช่น Domain)",
    "Versioning": "ทุก Event มี Version ป้องกัน Concurrency",
    "Snapshot": "บันทึก State ทุก N Events ลดเวลา Replay",
    "Projection": "สร้าง Read Model จาก Events",
    "Replay": "สร้าง State ใหม่จาก Events ได้ตลอดเวลา",
}

print("Event Store Features:")
for feature, desc in event_store_features.items():
    print(f"  [{feature}]: {desc}")

# Tools
tools = {
    "EventStoreDB": "Open Source Event Store Database",
    "Apache Kafka": "Event Streaming Platform",
    "Marten": ".NET Event Sourcing Library",
    "Axon Framework": "Java CQRS/ES Framework",
    "PostgreSQL + JSONB": "DIY Event Store",
}

print(f"\n\nEvent Sourcing Tools:")
for tool, desc in tools.items():
    print(f"  {tool}: {desc}")

# When to Use
use_cases = {
    "ใช้ CQRS": ["Read/Write Load ต่างกันมาก", "ต้องการ Read Model หลายแบบ", "Complex Domain Logic"],
    "ใช้ Event Sourcing": ["ต้องการ Audit Trail ครบ", "ต้อง Replay/Debug ได้", "Temporal Query (ดูข้อมูล ณ เวลาใดก็ได้)"],
    "ไม่ควรใช้": ["CRUD ง่ายๆ", "ทีมเล็ก ไม่คุ้น Pattern", "ไม่ต้องการ Audit Trail"],
}

print(f"\n\nWhen to Use:")
for category, items in use_cases.items():
    print(f"\n  [{category}]")
    for item in items:
        print(f"    - {item}")

เคล็ดลับ

  • cert-manager: ใช้ DNS-01 Challenge สำหรับ Wildcard Certificate
  • CQRS: เริ่มจากแยก Read/Write Model ไม่ต้องแยก Database ทันที
  • Event Sourcing: ใช้ Snapshot ทุก 100 Events ลดเวลา Replay
  • Audit: Event Store เป็น Audit Trail อัตโนมัติ
  • Renewal: ตั้ง renewBefore 30 วัน ให้เวลาแก้ไขถ้ามีปัญหา

Certificate Manager คืออะไร

cert-manager SSL TLS Kubernetes อัตโนมัติ Let's Encrypt Vault ACME Issuer ClusterIssuer ต่ออายุอัตโนมัติ