SiamCafe · Blog
Certificate Manager Log Management ELK
บทความทั่วไป

Certificate Manager Log Management ELK

เผยแพร่ May 28, 2026

Certificate Manager Log Management ELK คืออะไร

Certificate Manager Log Management ELK

Certificate Manager คือระบบจัดการ SSL/TLS certificates ตลอด lifecycle ตั้งแต่ออก, ติดตั้ง, renew จนถึง revoke ELK Stack (Elasticsearch, Logstash, Kibana) เป็น open-source log management platform ที่ใช้รวบรวม วิเคราะห์ และแสดงผล logs จากทุกระบบ การรวม Certificate Manager กับ ELK ช่วยให้ monitor certificate events ได้ real-time ตรวจจับ certificates ที่ใกล้หมดอายุ วิเคราะห์ security incidents จาก certificate errors และสร้าง compliance reports อัตโนมัติ

ELK Stack Architecture

# elk_arch.py — ELK Stack architecture for cert management
import json

class ELKArchitecture:
    COMPONENTS = {
        "elasticsearch": {
            "name": "Elasticsearch",
            "role": "Search & Analytics Engine — เก็บและค้นหา logs",
            "description": "Distributed search engine บน Apache Lucene — index, search, aggregate logs",
            "port": 9200,
        },
        "logstash": {
            "name": "Logstash",
            "role": "Data Pipeline — รับ, แปลง, ส่ง logs",
            "description": "Input → Filter → Output pipeline สำหรับ process logs ก่อนส่ง Elasticsearch",
            "port": 5044,
        },
        "kibana": {
            "name": "Kibana",
            "role": "Visualization — dashboard และ analytics",
            "description": "Web UI สำหรับ visualize data จาก Elasticsearch — charts, maps, alerts",
            "port": 5601,
        },
        "filebeat": {
            "name": "Filebeat",
            "role": "Log Shipper — ส่ง log files ไป Logstash/Elasticsearch",
            "description": "Lightweight agent ติดตั้งบน servers — อ่าน log files แล้วส่งต่อ",
        },
    }

    DOCKER_COMPOSE = """
# docker-compose.yml — ELK Stack
version: '3.8'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9200:9200"
    volumes:
      - esdata:/usr/share/elasticsearch/data

  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    ports:
      - "5044:5044"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    depends_on:
      - elasticsearch

  kibana:
    image: docker.elastic.co/kibana/kibana:8.12.0
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

volumes:
  esdata:
"""

    def show_components(self):
        print("=== ELK Stack Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}] — {comp['role']}")
            print(f"  {comp['description']}")
            print()

    def show_docker(self):
        print("=== Docker Compose ===")
        print(self.DOCKER_COMPOSE[:500])

elk = ELKArchitecture()
elk.show_components()
elk.show_docker()

Certificate Log Pipeline

# cert_pipeline.py — Logstash pipeline for certificate logs
import json

class CertLogPipeline:
    LOGSTASH_CONFIG = """
# logstash.conf — Certificate log processing pipeline
input {
  # Filebeat input for cert logs
  beats {
    port => 5044
    tags => ["certificate"]
  }
  
  # HTTP input for cert manager webhooks
  http {
    port => 8080
    codec => json
    tags => ["cert-webhook"]
  }
  
  # File input for certbot logs
  file {
    path => "/var/log/letsencrypt/letsencrypt.log"
    start_position => "beginning"
    tags => ["certbot"]
  }
}

filter {
  # Parse certificate events
  if "certificate" in [tags] {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:event_message}"
      }
    }
    
    # Extract certificate details
    if "certificate" in [event_message] {
      grok {
        match => {
          "event_message" => "domain=%{HOSTNAME:domain} serial=%{DATA:serial} expires=%{TIMESTAMP_ISO8601:expiry_date}"
        }
        tag_on_failure => ["_cert_parse_failed"]
      }
    }
    
    # Calculate days until expiry
    ruby {
      code => '
        if event.get("expiry_date")
          expiry = Time.parse(event.get("expiry_date"))
          days_left = ((expiry - Time.now) / 86400).to_i
          event.set("days_until_expiry", days_left)
          
          if days_left <= 7
            event.set("cert_status", "critical")
          elsif days_left <= 30
            event.set("cert_status", "warning")
          else
            event.set("cert_status", "ok")
          end
        end
      '
    }
  }
  
  # Parse certbot logs
  if "certbot" in [tags] {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} - certbot - %{LOGLEVEL:level} - %{GREEDYDATA:certbot_message}"
      }
    }
    
    if "renewed" in [certbot_message] {
      mutate { add_field => { "cert_event" => "renewal_success" } }
    }
    if "failed" in [certbot_message] {
      mutate { add_field => { "cert_event" => "renewal_failed" } }
    }
  }
  
  # Add metadata
  mutate {
    add_field => { "[@metadata][index]" => "certificates-%{+YYYY.MM}" }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "%{[@metadata][index]}"
  }
  
  # Alert on critical certificates
  if [cert_status] == "critical" {
    http {
      url => "https://hooks.slack.com/services/xxx"
      http_method => "post"
      format => "json"
      mapping => {
        "text" => "CERT CRITICAL: %{domain} expires in %{days_until_expiry} days!"
      }
    }
  }
}
"""

    def show_pipeline(self):
        print("=== Logstash Pipeline ===")
        print(self.LOGSTASH_CONFIG[:600])

pipeline = CertLogPipeline()
pipeline.show_pipeline()

Python Certificate Monitor

Certificate Manager Log Management ELK
# cert_monitor.py — Python certificate monitoring with ELK
import json

class CertMonitor:
    CODE = """
# cert_elk_monitor.py — Monitor certificates and send to ELK
import ssl
import socket
import json
import requests
from datetime import datetime
from elasticsearch import Elasticsearch

class CertificateELKMonitor:
    def __init__(self, es_host="localhost:9200"):
        self.es = Elasticsearch([es_host])
        self.index = f"certificates-{datetime.utcnow().strftime('%Y.%m')}"
    
    def check_certificate(self, domain, port=443):
        '''Check SSL certificate and return details'''
        try:
            context = ssl.create_default_context()
            with socket.create_connection((domain, port), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=domain) as ssock:
                    cert = ssock.getpeercert()
                    
                    expire_str = cert['notAfter']
                    expire_date = datetime.strptime(expire_str, '%b %d %H:%M:%S %Y %Z')
                    days_left = (expire_date - datetime.utcnow()).days
                    
                    issuer = dict(x[0] for x in cert['issuer'])
                    subject = dict(x[0] for x in cert['subject'])
                    san = [entry[1] for entry in cert.get('subjectAltName', [])]
                    
                    status = 'ok' if days_left > 30 else 'warning' if days_left > 7 else 'critical'
                    
                    return {
                        'domain': domain,
                        'issuer': issuer.get('organizationName', ''),
                        'subject_cn': subject.get('commonName', ''),
                        'san': san,
                        'not_after': expire_date.isoformat(),
                        'days_until_expiry': days_left,
                        'cert_status': status,
                        'serial_number': cert.get('serialNumber', ''),
                        'version': cert.get('version', ''),
                        'protocol': ssock.version(),
                        'cipher': ssock.cipher()[0],
                        'checked_at': datetime.utcnow().isoformat(),
                    }
        except Exception as e:
            return {
                'domain': domain,
                'cert_status': 'error',
                'error': str(e),
                'checked_at': datetime.utcnow().isoformat(),
            }
    
    def send_to_elk(self, cert_data):
        '''Send certificate data to Elasticsearch'''
        self.es.index(index=self.index, document=cert_data)
    
    def monitor_domains(self, domains):
        '''Monitor multiple domains'''
        results = []
        for domain in domains:
            data = self.check_certificate(domain)
            self.send_to_elk(data)
            results.append(data)
        return results
    
    def get_expiring_certs(self, days=30):
        '''Query ELK for expiring certificates'''
        query = {
            "query": {
                "range": {
                    "days_until_expiry": {"lte": days}
                }
            },
            "sort": [{"days_until_expiry": "asc"}]
        }
        result = self.es.search(index="certificates-*", body=query)
        return [hit['_source'] for hit in result['hits']['hits']]

# monitor = CertificateELKMonitor("localhost:9200")
# domains = ["example.com", "api.example.com", "cdn.example.com"]
# results = monitor.monitor_domains(domains)
"""

    def show_code(self):
        print("=== Certificate ELK Monitor ===")
        print(self.CODE[:600])

monitor = CertMonitor()
monitor.show_code()

Kibana Dashboard

# kibana_dashboard.py — Kibana dashboard for certificate monitoring
import json
import random

class KibanaDashboard:
    VISUALIZATIONS = {
        "cert_status_pie": {
            "name": "Certificate Status Distribution",
            "type": "Pie Chart",
            "query": "agg: terms on cert_status field",
            "description": "แสดงสัดส่วน OK / Warning / Critical / Error",
        },
        "expiry_timeline": {
            "name": "Certificate Expiry Timeline",
            "type": "Timeline / Bar Chart",
            "query": "date_histogram on not_after field",
            "description": "แสดงว่า certificates จะหมดอายุเมื่อไหร่ — วางแผน renewal",
        },
        "renewal_events": {
            "name": "Renewal Events Over Time",
            "type": "Line Chart",
            "query": "date_histogram on checked_at, split by cert_event",
            "description": "แสดง renewal success/failure trends",
        },
        "issuer_breakdown": {
            "name": "Certificates by Issuer",
            "type": "Bar Chart",
            "query": "terms aggregation on issuer field",
            "description": "แสดงจำนวน certs แยกตาม CA (Let's Encrypt, DigiCert, etc.)",
        },
        "domain_table": {
            "name": "Domain Certificate Details",
            "type": "Data Table",
            "query": "top_hits, sorted by days_until_expiry ASC",
            "description": "ตาราง domains + expiry date + status — sortable",
        },
    }

    ALERTS = {
        "cert_expiring_30d": {
            "name": "Certificate Expiring in 30 Days",
            "condition": "days_until_expiry <= 30",
            "action": "Email + Slack notification",
        },
        "cert_expiring_7d": {
            "name": "Certificate Critical — 7 Days",
            "condition": "days_until_expiry <= 7",
            "action": "PagerDuty + Slack #incidents",
        },
        "renewal_failed": {
            "name": "Certificate Renewal Failed",
            "condition": "cert_event == 'renewal_failed'",
            "action": "Slack + Email to security team",
        },
    }

    def show_visualizations(self):
        print("=== Kibana Visualizations ===\n")
        for key, viz in self.VISUALIZATIONS.items():
            print(f"[{viz['name']}] ({viz['type']})")
            print(f"  {viz['description']}")
            print()

    def show_alerts(self):
        print("=== Kibana Alerts ===")
        for key, alert in self.ALERTS.items():
            print(f"  [{alert['name']}]")
            print(f"    Condition: {alert['condition']}")
            print(f"    Action: {alert['action']}")

    def sample_dashboard(self):
        print(f"\n=== Certificate Dashboard ===")
        print(f"  Total Certificates: {random.randint(50, 200)}")
        print(f"  OK: {random.randint(40, 180)}")
        print(f"  Warning (< 30d): {random.randint(3, 15)}")
        print(f"  Critical (< 7d): {random.randint(0, 3)}")
        print(f"  Errors: {random.randint(0, 5)}")
        print(f"  Renewals (24h): {random.randint(0, 10)} success, {random.randint(0, 2)} failed")

dash = KibanaDashboard()
dash.show_visualizations()
dash.show_alerts()
dash.sample_dashboard()

Automation & Compliance

# compliance.py — Certificate compliance automation
import json

class CertCompliance:
    COMPLIANCE_CHECKS = {
        "expiry": {
            "name": "Certificate Expiry Check",
            "rule": "ทุก certificate ต้องมีอายุเหลือ > 30 วัน",
            "frequency": "ทุกวัน",
        },
        "key_strength": {
            "name": "Key Strength Validation",
            "rule": "RSA >= 2048 bits, ECDSA >= 256 bits",
            "frequency": "ทุกสัปดาห์",
        },
        "protocol_version": {
            "name": "TLS Version Check",
            "rule": "TLS 1.2+ only, ปิด TLS 1.0/1.1",
            "frequency": "ทุกสัปดาห์",
        },
        "cert_chain": {
            "name": "Certificate Chain Validation",
            "rule": "Chain ต้องครบ — root CA → intermediate → leaf",
            "frequency": "ทุกวัน",
        },
        "san_coverage": {
            "name": "SAN Coverage Check",
            "rule": "ทุก subdomain ที่ใช้ต้องอยู่ใน SAN",
            "frequency": "เมื่อเพิ่ม subdomain ใหม่",
        },
    }

    AUTOMATION_SCRIPT = """
# cert_compliance.py — Automated compliance report
from elasticsearch import Elasticsearch
from datetime import datetime
import json

class CertComplianceReport:
    def __init__(self, es_host="localhost:9200"):
        self.es = Elasticsearch([es_host])
    
    def generate_report(self):
        # Query all certificates
        result = self.es.search(
            index="certificates-*",
            body={
                "size": 1000,
                "query": {"match_all": {}},
                "sort": [{"days_until_expiry": "asc"}],
            }
        )
        
        certs = [hit['_source'] for hit in result['hits']['hits']]
        
        report = {
            "generated_at": datetime.utcnow().isoformat(),
            "total_certificates": len(certs),
            "status_summary": {
                "ok": sum(1 for c in certs if c.get('cert_status') == 'ok'),
                "warning": sum(1 for c in certs if c.get('cert_status') == 'warning'),
                "critical": sum(1 for c in certs if c.get('cert_status') == 'critical'),
                "error": sum(1 for c in certs if c.get('cert_status') == 'error'),
            },
            "expiring_soon": [c for c in certs if c.get('days_until_expiry', 999) <= 30],
            "compliance_pass": all(
                c.get('days_until_expiry', 0) > 7 for c in certs
                if c.get('cert_status') != 'error'
            ),
        }
        return report

# reporter = CertComplianceReport()
# report = reporter.generate_report()
# print(json.dumps(report, indent=2))
"""

    def show_checks(self):
        print("=== Compliance Checks ===\n")
        for key, check in self.COMPLIANCE_CHECKS.items():
            print(f"[{check['name']}]")
            print(f"  Rule: {check['rule']}")
            print(f"  Frequency: {check['frequency']}")
            print()

    def show_script(self):
        print("=== Compliance Script ===")
        print(self.AUTOMATION_SCRIPT[:500])

compliance = CertCompliance()
compliance.show_checks()
compliance.show_script()

FAQ - คำถามที่พบบ่อย

Q: ELK Stack กับ Grafana/Loki อันไหนดีกว่าสำหรับ cert logs?

A: ELK: full-text search ดีกว่า, complex queries, mature ecosystem, Kibana dashboards สวย Grafana/Loki: lightweight กว่า, ถูกกว่า (ไม่ต้อง index ทุก field), integrate กับ Prometheus ดี เลือก ELK: ถ้าต้องการ full-text search + complex analytics + compliance reporting เลือก Loki: ถ้ามี Grafana อยู่แล้ว + ต้องการ cost-efficient log storage

Q: Certificate logs เก็บนานแค่ไหน?

A: ขึ้นกับ compliance: PCI DSS: 1 ปี SOC 2: 1 ปี+ HIPAA: 6 ปี ทั่วไป: 1-2 ปี ELK: ใช้ ILM (Index Lifecycle Management) — hot → warm → cold → delete อัตโนมัติ ประหยัด storage: compress old indices, move to cold storage (S3)

Q: Filebeat กับ Logstash ต่างกันอย่างไร?

A: Filebeat: lightweight log shipper — อ่าน files แล้วส่งต่อ (CPU/RAM น้อย) Logstash: full data pipeline — parse, transform, enrich logs (CPU/RAM มากกว่า) ใช้ร่วมกัน: Filebeat (บน servers) → Logstash (centralized) → Elasticsearch หรือ: Filebeat → Elasticsearch โดยตรง (ถ้าไม่ต้อง complex parsing)

Q: Monitor certificates กี่ domains ได้ใน ELK?

A: ไม่จำกัด — ขึ้นกับ Elasticsearch cluster size 100-500 domains: single node เพียงพอ 500-5000 domains: 3-node cluster แนะนำ 5000+ domains: multi-node cluster + dedicated master nodes Check ทุก 1 ชั่วโมง: 500 domains × 24 checks = 12,000 documents/day — เล็กมากสำหรับ Elasticsearch