Cybersecurity

Certificate Manager Log Management ELK

certificate manager log management elk
Certificate Manager Log Management ELK | SiamCafe Blog
2026-02-13· อ. บอม — SiamCafe.net· 1,616 คำ

Certificate Manager Log Management ELK คืออะไร

Certificate Manager คือระบบจัดการ SSL/TLS certificates ตลอด lifecycle ตั้งแต่ออก, ติดตั้ง, renew จนถึง revoke ELK Stack (Elasticsearch, Logstash, Kibana) เป็น open-source log management platform ที่ใช้รวบรวม วิเคราะห์ และแสดงผล logs จากทุกระบบ การรวม Certificate Manager กับ ELK ช่วยให้ monitor certificate events ได้ real-time ตรวจจับ certificates ที่ใกล้หมดอายุ วิเคราะห์ security incidents จาก certificate errors และสร้าง compliance reports อัตโนมัติ

ELK Stack Architecture

# elk_arch.py — ELK Stack architecture for cert management
import json

class ELKArchitecture:
    COMPONENTS = {
        "elasticsearch": {
            "name": "Elasticsearch",
            "role": "Search & Analytics Engine — เก็บและค้นหา logs",
            "description": "Distributed search engine บน Apache Lucene — index, search, aggregate logs",
            "port": 9200,
        },
        "logstash": {
            "name": "Logstash",
            "role": "Data Pipeline — รับ, แปลง, ส่ง logs",
            "description": "Input → Filter → Output pipeline สำหรับ process logs ก่อนส่ง Elasticsearch",
            "port": 5044,
        },
        "kibana": {
            "name": "Kibana",
            "role": "Visualization — dashboard และ analytics",
            "description": "Web UI สำหรับ visualize data จาก Elasticsearch — charts, maps, alerts",
            "port": 5601,
        },
        "filebeat": {
            "name": "Filebeat",
            "role": "Log Shipper — ส่ง log files ไป Logstash/Elasticsearch",
            "description": "Lightweight agent ติดตั้งบน servers — อ่าน log files แล้วส่งต่อ",
        },
    }

    DOCKER_COMPOSE = """
# docker-compose.yml — ELK Stack
version: '3.8'
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"
    ports:
      - "9200:9200"
    volumes:
      - esdata:/usr/share/elasticsearch/data

  logstash:
    image: docker.elastic.co/logstash/logstash:8.12.0
    ports:
      - "5044:5044"
    volumes:
      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    depends_on:
      - elasticsearch

  kibana:
    image: docker.elastic.co/kibana/kibana:8.12.0
    ports:
      - "5601:5601"
    depends_on:
      - elasticsearch

volumes:
  esdata:
"""

    def show_components(self):
        print("=== ELK Stack Components ===\n")
        for key, comp in self.COMPONENTS.items():
            print(f"[{comp['name']}] — {comp['role']}")
            print(f"  {comp['description']}")
            print()

    def show_docker(self):
        print("=== Docker Compose ===")
        print(self.DOCKER_COMPOSE[:500])

elk = ELKArchitecture()
elk.show_components()
elk.show_docker()

Certificate Log Pipeline

# cert_pipeline.py — Logstash pipeline for certificate logs
import json

class CertLogPipeline:
    LOGSTASH_CONFIG = """
# logstash.conf — Certificate log processing pipeline
input {
  # Filebeat input for cert logs
  beats {
    port => 5044
    tags => ["certificate"]
  }
  
  # HTTP input for cert manager webhooks
  http {
    port => 8080
    codec => json
    tags => ["cert-webhook"]
  }
  
  # File input for certbot logs
  file {
    path => "/var/log/letsencrypt/letsencrypt.log"
    start_position => "beginning"
    tags => ["certbot"]
  }
}

filter {
  # Parse certificate events
  if "certificate" in [tags] {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:event_message}"
      }
    }
    
    # Extract certificate details
    if "certificate" in [event_message] {
      grok {
        match => {
          "event_message" => "domain=%{HOSTNAME:domain} serial=%{DATA:serial} expires=%{TIMESTAMP_ISO8601:expiry_date}"
        }
        tag_on_failure => ["_cert_parse_failed"]
      }
    }
    
    # Calculate days until expiry
    ruby {
      code => '
        if event.get("expiry_date")
          expiry = Time.parse(event.get("expiry_date"))
          days_left = ((expiry - Time.now) / 86400).to_i
          event.set("days_until_expiry", days_left)
          
          if days_left <= 7
            event.set("cert_status", "critical")
          elsif days_left <= 30
            event.set("cert_status", "warning")
          else
            event.set("cert_status", "ok")
          end
        end
      '
    }
  }
  
  # Parse certbot logs
  if "certbot" in [tags] {
    grok {
      match => {
        "message" => "%{TIMESTAMP_ISO8601:timestamp} - certbot - %{LOGLEVEL:level} - %{GREEDYDATA:certbot_message}"
      }
    }
    
    if "renewed" in [certbot_message] {
      mutate { add_field => { "cert_event" => "renewal_success" } }
    }
    if "failed" in [certbot_message] {
      mutate { add_field => { "cert_event" => "renewal_failed" } }
    }
  }
  
  # Add metadata
  mutate {
    add_field => { "[@metadata][index]" => "certificates-%{+YYYY.MM}" }
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "%{[@metadata][index]}"
  }
  
  # Alert on critical certificates
  if [cert_status] == "critical" {
    http {
      url => "https://hooks.slack.com/services/xxx"
      http_method => "post"
      format => "json"
      mapping => {
        "text" => "CERT CRITICAL: %{domain} expires in %{days_until_expiry} days!"
      }
    }
  }
}
"""

    def show_pipeline(self):
        print("=== Logstash Pipeline ===")
        print(self.LOGSTASH_CONFIG[:600])

pipeline = CertLogPipeline()
pipeline.show_pipeline()

Python Certificate Monitor

# cert_monitor.py — Python certificate monitoring with ELK
import json

class CertMonitor:
    CODE = """
# cert_elk_monitor.py — Monitor certificates and send to ELK
import ssl
import socket
import json
import requests
from datetime import datetime
from elasticsearch import Elasticsearch

class CertificateELKMonitor:
    def __init__(self, es_host="localhost:9200"):
        self.es = Elasticsearch([es_host])
        self.index = f"certificates-{datetime.utcnow().strftime('%Y.%m')}"
    
    def check_certificate(self, domain, port=443):
        '''Check SSL certificate and return details'''
        try:
            context = ssl.create_default_context()
            with socket.create_connection((domain, port), timeout=10) as sock:
                with context.wrap_socket(sock, server_hostname=domain) as ssock:
                    cert = ssock.getpeercert()
                    
                    expire_str = cert['notAfter']
                    expire_date = datetime.strptime(expire_str, '%b %d %H:%M:%S %Y %Z')
                    days_left = (expire_date - datetime.utcnow()).days
                    
                    issuer = dict(x[0] for x in cert['issuer'])
                    subject = dict(x[0] for x in cert['subject'])
                    san = [entry[1] for entry in cert.get('subjectAltName', [])]
                    
                    status = 'ok' if days_left > 30 else 'warning' if days_left > 7 else 'critical'
                    
                    return {
                        'domain': domain,
                        'issuer': issuer.get('organizationName', ''),
                        'subject_cn': subject.get('commonName', ''),
                        'san': san,
                        'not_after': expire_date.isoformat(),
                        'days_until_expiry': days_left,
                        'cert_status': status,
                        'serial_number': cert.get('serialNumber', ''),
                        'version': cert.get('version', ''),
                        'protocol': ssock.version(),
                        'cipher': ssock.cipher()[0],
                        'checked_at': datetime.utcnow().isoformat(),
                    }
        except Exception as e:
            return {
                'domain': domain,
                'cert_status': 'error',
                'error': str(e),
                'checked_at': datetime.utcnow().isoformat(),
            }
    
    def send_to_elk(self, cert_data):
        '''Send certificate data to Elasticsearch'''
        self.es.index(index=self.index, document=cert_data)
    
    def monitor_domains(self, domains):
        '''Monitor multiple domains'''
        results = []
        for domain in domains:
            data = self.check_certificate(domain)
            self.send_to_elk(data)
            results.append(data)
        return results
    
    def get_expiring_certs(self, days=30):
        '''Query ELK for expiring certificates'''
        query = {
            "query": {
                "range": {
                    "days_until_expiry": {"lte": days}
                }
            },
            "sort": [{"days_until_expiry": "asc"}]
        }
        result = self.es.search(index="certificates-*", body=query)
        return [hit['_source'] for hit in result['hits']['hits']]

# monitor = CertificateELKMonitor("localhost:9200")
# domains = ["example.com", "api.example.com", "cdn.example.com"]
# results = monitor.monitor_domains(domains)
"""

    def show_code(self):
        print("=== Certificate ELK Monitor ===")
        print(self.CODE[:600])

monitor = CertMonitor()
monitor.show_code()

Kibana Dashboard

# kibana_dashboard.py — Kibana dashboard for certificate monitoring
import json
import random

class KibanaDashboard:
    VISUALIZATIONS = {
        "cert_status_pie": {
            "name": "Certificate Status Distribution",
            "type": "Pie Chart",
            "query": "agg: terms on cert_status field",
            "description": "แสดงสัดส่วน OK / Warning / Critical / Error",
        },
        "expiry_timeline": {
            "name": "Certificate Expiry Timeline",
            "type": "Timeline / Bar Chart",
            "query": "date_histogram on not_after field",
            "description": "แสดงว่า certificates จะหมดอายุเมื่อไหร่ — วางแผน renewal",
        },
        "renewal_events": {
            "name": "Renewal Events Over Time",
            "type": "Line Chart",
            "query": "date_histogram on checked_at, split by cert_event",
            "description": "แสดง renewal success/failure trends",
        },
        "issuer_breakdown": {
            "name": "Certificates by Issuer",
            "type": "Bar Chart",
            "query": "terms aggregation on issuer field",
            "description": "แสดงจำนวน certs แยกตาม CA (Let's Encrypt, DigiCert, etc.)",
        },
        "domain_table": {
            "name": "Domain Certificate Details",
            "type": "Data Table",
            "query": "top_hits, sorted by days_until_expiry ASC",
            "description": "ตาราง domains + expiry date + status — sortable",
        },
    }

    ALERTS = {
        "cert_expiring_30d": {
            "name": "Certificate Expiring in 30 Days",
            "condition": "days_until_expiry <= 30",
            "action": "Email + Slack notification",
        },
        "cert_expiring_7d": {
            "name": "Certificate Critical — 7 Days",
            "condition": "days_until_expiry <= 7",
            "action": "PagerDuty + Slack #incidents",
        },
        "renewal_failed": {
            "name": "Certificate Renewal Failed",
            "condition": "cert_event == 'renewal_failed'",
            "action": "Slack + Email to security team",
        },
    }

    def show_visualizations(self):
        print("=== Kibana Visualizations ===\n")
        for key, viz in self.VISUALIZATIONS.items():
            print(f"[{viz['name']}] ({viz['type']})")
            print(f"  {viz['description']}")
            print()

    def show_alerts(self):
        print("=== Kibana Alerts ===")
        for key, alert in self.ALERTS.items():
            print(f"  [{alert['name']}]")
            print(f"    Condition: {alert['condition']}")
            print(f"    Action: {alert['action']}")

    def sample_dashboard(self):
        print(f"\n=== Certificate Dashboard ===")
        print(f"  Total Certificates: {random.randint(50, 200)}")
        print(f"  OK: {random.randint(40, 180)}")
        print(f"  Warning (< 30d): {random.randint(3, 15)}")
        print(f"  Critical (< 7d): {random.randint(0, 3)}")
        print(f"  Errors: {random.randint(0, 5)}")
        print(f"  Renewals (24h): {random.randint(0, 10)} success, {random.randint(0, 2)} failed")

dash = KibanaDashboard()
dash.show_visualizations()
dash.show_alerts()
dash.sample_dashboard()

Automation & Compliance

# compliance.py — Certificate compliance automation
import json

class CertCompliance:
    COMPLIANCE_CHECKS = {
        "expiry": {
            "name": "Certificate Expiry Check",
            "rule": "ทุก certificate ต้องมีอายุเหลือ > 30 วัน",
            "frequency": "ทุกวัน",
        },
        "key_strength": {
            "name": "Key Strength Validation",
            "rule": "RSA >= 2048 bits, ECDSA >= 256 bits",
            "frequency": "ทุกสัปดาห์",
        },
        "protocol_version": {
            "name": "TLS Version Check",
            "rule": "TLS 1.2+ only, ปิด TLS 1.0/1.1",
            "frequency": "ทุกสัปดาห์",
        },
        "cert_chain": {
            "name": "Certificate Chain Validation",
            "rule": "Chain ต้องครบ — root CA → intermediate → leaf",
            "frequency": "ทุกวัน",
        },
        "san_coverage": {
            "name": "SAN Coverage Check",
            "rule": "ทุก subdomain ที่ใช้ต้องอยู่ใน SAN",
            "frequency": "เมื่อเพิ่ม subdomain ใหม่",
        },
    }

    AUTOMATION_SCRIPT = """
# cert_compliance.py — Automated compliance report
from elasticsearch import Elasticsearch
from datetime import datetime
import json

class CertComplianceReport:
    def __init__(self, es_host="localhost:9200"):
        self.es = Elasticsearch([es_host])
    
    def generate_report(self):
        # Query all certificates
        result = self.es.search(
            index="certificates-*",
            body={
                "size": 1000,
                "query": {"match_all": {}},
                "sort": [{"days_until_expiry": "asc"}],
            }
        )
        
        certs = [hit['_source'] for hit in result['hits']['hits']]
        
        report = {
            "generated_at": datetime.utcnow().isoformat(),
            "total_certificates": len(certs),
            "status_summary": {
                "ok": sum(1 for c in certs if c.get('cert_status') == 'ok'),
                "warning": sum(1 for c in certs if c.get('cert_status') == 'warning'),
                "critical": sum(1 for c in certs if c.get('cert_status') == 'critical'),
                "error": sum(1 for c in certs if c.get('cert_status') == 'error'),
            },
            "expiring_soon": [c for c in certs if c.get('days_until_expiry', 999) <= 30],
            "compliance_pass": all(
                c.get('days_until_expiry', 0) > 7 for c in certs
                if c.get('cert_status') != 'error'
            ),
        }
        return report

# reporter = CertComplianceReport()
# report = reporter.generate_report()
# print(json.dumps(report, indent=2))
"""

    def show_checks(self):
        print("=== Compliance Checks ===\n")
        for key, check in self.COMPLIANCE_CHECKS.items():
            print(f"[{check['name']}]")
            print(f"  Rule: {check['rule']}")
            print(f"  Frequency: {check['frequency']}")
            print()

    def show_script(self):
        print("=== Compliance Script ===")
        print(self.AUTOMATION_SCRIPT[:500])

compliance = CertCompliance()
compliance.show_checks()
compliance.show_script()

FAQ - คำถามที่พบบ่อย

Q: ELK Stack กับ Grafana/Loki อันไหนดีกว่าสำหรับ cert logs?

A: ELK: full-text search ดีกว่า, complex queries, mature ecosystem, Kibana dashboards สวย Grafana/Loki: lightweight กว่า, ถูกกว่า (ไม่ต้อง index ทุก field), integrate กับ Prometheus ดี เลือก ELK: ถ้าต้องการ full-text search + complex analytics + compliance reporting เลือก Loki: ถ้ามี Grafana อยู่แล้ว + ต้องการ cost-efficient log storage

Q: Certificate logs เก็บนานแค่ไหน?

A: ขึ้นกับ compliance: PCI DSS: 1 ปี SOC 2: 1 ปี+ HIPAA: 6 ปี ทั่วไป: 1-2 ปี ELK: ใช้ ILM (Index Lifecycle Management) — hot → warm → cold → delete อัตโนมัติ ประหยัด storage: compress old indices, move to cold storage (S3)

Q: Filebeat กับ Logstash ต่างกันอย่างไร?

A: Filebeat: lightweight log shipper — อ่าน files แล้วส่งต่อ (CPU/RAM น้อย) Logstash: full data pipeline — parse, transform, enrich logs (CPU/RAM มากกว่า) ใช้ร่วมกัน: Filebeat (บน servers) → Logstash (centralized) → Elasticsearch หรือ: Filebeat → Elasticsearch โดยตรง (ถ้าไม่ต้อง complex parsing)

Q: Monitor certificates กี่ domains ได้ใน ELK?

A: ไม่จำกัด — ขึ้นกับ Elasticsearch cluster size 100-500 domains: single node เพียงพอ 500-5000 domains: 3-node cluster แนะนำ 5000+ domains: multi-node cluster + dedicated master nodes Check ทุก 1 ชั่วโมง: 500 domains × 24 checks = 12,000 documents/day — เล็กมากสำหรับ Elasticsearch

📖 บทความที่เกี่ยวข้อง

Model Registry Log Management ELKอ่านบทความ → Azure Front Door Log Management ELKอ่านบทความ → Certificate Manager Identity Access Managementอ่านบทความ → Certificate Manager CI CD Automation Pipelineอ่านบทความ → Certificate Manager Schema Evolutionอ่านบทความ →

📚 ดูบทความทั้งหมด →