it

Certificate Manager Log Management ELK

Certificate Manager Log Management ELK

Certificate Manager Log Management ELK คืออะไร

Certificate Manager Log Management ELK

Certificate Manager คือระบบจัดการ SSL/TLS certificates ตลอด lifecycle ตั้งแต่ออก, ติดตั้ง, renew จนถึง revoke ELK Stack (Elasticsearch, Logstash, Kibana) เป็น open-source log management platform ที่ใช้รวบรวม วิเคราะห์ และแสดงผล logs จากทุกระบบ การรวม Certificate Manager กับ ELK ช่วยให้ monitor certificate events ได้ real-time ตรวจจับ certificates ที่ใกล้หมดอายุ วิเคราะห์ security incidents จาก certificate errors และสร้าง compliance reports อัตโนมัติ

ELK Stack Architecture

# elk_arch.py — ELK Stack architecture for cert management

import json



class ELKArchitecture:

    COMPONENTS = {

        "elasticsearch": {

            "name": "Elasticsearch",

            "role": "Search & Analytics Engine — เก็บและค้นหา logs",

            "description": "Distributed search engine บน Apache Lucene — index, search, aggregate logs",

            "port": 9200,

        },

        "logstash": {

            "name": "Logstash",

            "role": "Data Pipeline — รับ, แปลง, ส่ง logs",

            "description": "Input → Filter → Output pipeline สำหรับ process logs ก่อนส่ง Elasticsearch",

            "port": 5044,

        },

        "kibana": {

            "name": "Kibana",

            "role": "Visualization — dashboard และ analytics",

            "description": "Web UI สำหรับ visualize data จาก Elasticsearch — charts, maps, alerts",

            "port": 5601,

        },

        "filebeat": {

            "name": "Filebeat",

            "role": "Log Shipper — ส่ง log files ไป Logstash/Elasticsearch",

            "description": "Lightweight agent ติดตั้งบน servers — อ่าน log files แล้วส่งต่อ",

        },

    }



    DOCKER_COMPOSE = """

# docker-compose.yml — ELK Stack

version: '3.8'

services:

  elasticsearch:

    image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0

    environment:

      - discovery.type=single-node

      - xpack.security.enabled=false

      - "ES_JAVA_OPTS=-Xms1g -Xmx1g"

    ports:

      - "9200:9200"

    volumes:

      - esdata:/usr/share/elasticsearch/data



  logstash:

    image: docker.elastic.co/logstash/logstash:8.12.0

    ports:

      - "5044:5044"

    volumes:

      - ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf

    depends_on:

      - elasticsearch



  kibana:

    image: docker.elastic.co/kibana/kibana:8.12.0

    ports:

      - "5601:5601"

    depends_on:

      - elasticsearch



volumes:

  esdata:

"""



    def show_components(self):

        print("=== ELK Stack Components ===\n")

        for key, comp in self.COMPONENTS.items():

            print(f"[{comp['name']}] — {comp['role']}")

            print(f"  {comp['description']}")

            print()



    def show_docker(self):

        print("=== Docker Compose ===")

        print(self.DOCKER_COMPOSE[:500])



elk = ELKArchitecture()

elk.show_components()

elk.show_docker()

Certificate Log Pipeline

# cert_pipeline.py — Logstash pipeline for certificate logs

import json



class CertLogPipeline:

    LOGSTASH_CONFIG = """

# logstash.conf — Certificate log processing pipeline

input {

  # Filebeat input for cert logs

  beats {

    port => 5044

    tags => ["certificate"]

  }

  

  # HTTP input for cert manager webhooks

  http {

    port => 8080

    codec => json

    tags => ["cert-webhook"]

  }

  

  # File input for certbot logs

  file {

    path => "/var/log/letsencrypt/letsencrypt.log"

    start_position => "beginning"

    tags => ["certbot"]

  }

}



filter {

  # Parse certificate events

  if "certificate" in [tags] {

    grok {

      match => {

        "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:event_message}"

      }

    }

    

    # Extract certificate details

    if "certificate" in [event_message] {

      grok {

        match => {

          "event_message" => "domain=%{HOSTNAME:domain} serial=%{DATA:serial} expires=%{TIMESTAMP_ISO8601:expiry_date}"

        }

        tag_on_failure => ["_cert_parse_failed"]

      }

    }

    

    # Calculate days until expiry

    ruby {

      code => '

        if event.get("expiry_date")

          expiry = Time.parse(event.get("expiry_date"))

          days_left = ((expiry - Time.now) / 86400).to_i

          event.set("days_until_expiry", days_left)

          

          if days_left <= 7

            event.set("cert_status", "critical")

          elsif days_left <= 30

            event.set("cert_status", "warning")

          else

            event.set("cert_status", "ok")

          end

        end

      '

    }

  }

  

  # Parse certbot logs

  if "certbot" in [tags] {

    grok {

      match => {

        "message" => "%{TIMESTAMP_ISO8601:timestamp} - certbot - %{LOGLEVEL:level} - %{GREEDYDATA:certbot_message}"

      }

    }

    

    if "renewed" in [certbot_message] {

      mutate { add_field => { "cert_event" => "renewal_success" } }

    }

    if "failed" in [certbot_message] {

      mutate { add_field => { "cert_event" => "renewal_failed" } }

    }

  }

  

  # Add metadata

  mutate {

    add_field => { "[@metadata][index]" => "certificates-%{+YYYY.MM}" }

  }

}



output {

  elasticsearch {

    hosts => ["elasticsearch:9200"]

    index => "%{[@metadata][index]}"

  }

  

  # Alert on critical certificates

  if [cert_status] == "critical" {

    http {

      url => "https://hooks.slack.com/services/xxx"

      http_method => "post"

      format => "json"

      mapping => {

        "text" => "CERT CRITICAL: %{domain} expires in %{days_until_expiry} days!"

      }

    }

  }

}

"""



    def show_pipeline(self):

        print("=== Logstash Pipeline ===")

        print(self.LOGSTASH_CONFIG[:600])



pipeline = CertLogPipeline()

pipeline.show_pipeline()

Python Certificate Monitor

Certificate Manager Log Management ELK
# cert_monitor.py — Python certificate monitoring with ELK

import json



class CertMonitor:

    CODE = """

# cert_elk_monitor.py — Monitor certificates and send to ELK

import ssl

import socket

import json

import requests

from datetime import datetime

from elasticsearch import Elasticsearch



class CertificateELKMonitor:

    def __init__(self, es_host="localhost:9200"):

        self.es = Elasticsearch([es_host])

        self.index = f"certificates-{datetime.utcnow().strftime('%Y.%m')}"

    

    def check_certificate(self, domain, port=443):

        '''Check SSL certificate and return details'''

        try:

            context = ssl.create_default_context()

            with socket.create_connection((domain, port), timeout=10) as sock:

                with context.wrap_socket(sock, server_hostname=domain) as ssock:

                    cert = ssock.getpeercert()

                    

                    expire_str = cert['notAfter']

                    expire_date = datetime.strptime(expire_str, '%b %d %H:%M:%S %Y %Z')

                    days_left = (expire_date - datetime.utcnow()).days

                    

                    issuer = dict(x[0] for x in cert['issuer'])

                    subject = dict(x[0] for x in cert['subject'])

                    san = [entry[1] for entry in cert.get('subjectAltName', [])]

                    

                    status = 'ok' if days_left > 30 else 'warning' if days_left > 7 else 'critical'

                    

                    return {

                        'domain': domain,

                        'issuer': issuer.get('organizationName', ''),

                        'subject_cn': subject.get('commonName', ''),

                        'san': san,

                        'not_after': expire_date.isoformat(),

                        'days_until_expiry': days_left,

                        'cert_status': status,

                        'serial_number': cert.get('serialNumber', ''),

                        'version': cert.get('version', ''),

                        'protocol': ssock.version(),

                        'cipher': ssock.cipher()[0],

                        'checked_at': datetime.utcnow().isoformat(),

                    }

        except Exception as e:

            return {

                'domain': domain,

                'cert_status': 'error',

                'error': str(e),

                'checked_at': datetime.utcnow().isoformat(),

            }

    

    def send_to_elk(self, cert_data):

        '''Send certificate data to Elasticsearch'''

        self.es.index(index=self.index, document=cert_data)

    

    def monitor_domains(self, domains):

        '''Monitor multiple domains'''

        results = []

        for domain in domains:

            data = self.check_certificate(domain)

            self.send_to_elk(data)

            results.append(data)

        return results

    

    def get_expiring_certs(self, days=30):

        '''Query ELK for expiring certificates'''

        query = {

            "query": {

                "range": {

                    "days_until_expiry": {"lte": days}

                }

            },

            "sort": [{"days_until_expiry": "asc"}]

        }

        result = self.es.search(index="certificates-*", body=query)

        return [hit['_source'] for hit in result['hits']['hits']]



# monitor = CertificateELKMonitor("localhost:9200")

# domains = ["example.com", "api.example.com", "cdn.example.com"]

# results = monitor.monitor_domains(domains)

"""



    def show_code(self):

        print("=== Certificate ELK Monitor ===")

        print(self.CODE[:600])



monitor = CertMonitor()

monitor.show_code()

Kibana Dashboard

# kibana_dashboard.py — Kibana dashboard for certificate monitoring

import json

import random



class KibanaDashboard:

    VISUALIZATIONS = {

        "cert_status_pie": {

            "name": "Certificate Status Distribution",

            "type": "Pie Chart",

            "query": "agg: terms on cert_status field",

            "description": "แสดงสัดส่วน OK / Warning / Critical / Error",

        },

        "expiry_timeline": {

            "name": "Certificate Expiry Timeline",

            "type": "Timeline / Bar Chart",

            "query": "date_histogram on not_after field",

            "description": "แสดงว่า certificates จะหมดอายุเมื่อไหร่ — วางแผน renewal",

        },

        "renewal_events": {

            "name": "Renewal Events Over Time",

            "type": "Line Chart",

            "query": "date_histogram on checked_at, split by cert_event",

            "description": "แสดง renewal success/failure trends",

        },

        "issuer_breakdown": {

            "name": "Certificates by Issuer",

            "type": "Bar Chart",

            "query": "terms aggregation on issuer field",

            "description": "แสดงจำนวน certs แยกตาม CA (Let's Encrypt, DigiCert, etc.)",

        },

        "domain_table": {

            "name": "Domain Certificate Details",

            "type": "Data Table",

            "query": "top_hits, sorted by days_until_expiry ASC",

            "description": "ตาราง domains + expiry date + status — sortable",

        },

    }



    ALERTS = {

        "cert_expiring_30d": {

            "name": "Certificate Expiring in 30 Days",

            "condition": "days_until_expiry <= 30",

            "action": "Email + Slack notification",

        },

        "cert_expiring_7d": {

            "name": "Certificate Critical — 7 Days",

            "condition": "days_until_expiry <= 7",

            "action": "PagerDuty + Slack #incidents",

        },

        "renewal_failed": {

            "name": "Certificate Renewal Failed",

            "condition": "cert_event == 'renewal_failed'",

            "action": "Slack + Email to security team",

        },

    }



    def show_visualizations(self):

        print("=== Kibana Visualizations ===\n")

        for key, viz in self.VISUALIZATIONS.items():

            print(f"[{viz['name']}] ({viz['type']})")

            print(f"  {viz['description']}")

            print()



    def show_alerts(self):

        print("=== Kibana Alerts ===")

        for key, alert in self.ALERTS.items():

            print(f"  [{alert['name']}]")

            print(f"    Condition: {alert['condition']}")

            print(f"    Action: {alert['action']}")



    def sample_dashboard(self):

        print(f"\n=== Certificate Dashboard ===")

        print(f"  Total Certificates: {random.randint(50, 200)}")

        print(f"  OK: {random.randint(40, 180)}")

        print(f"  Warning (< 30d): {random.randint(3, 15)}")

        print(f"  Critical (< 7d): {random.randint(0, 3)}")

        print(f"  Errors: {random.randint(0, 5)}")

        print(f"  Renewals (24h): {random.randint(0, 10)} success, {random.randint(0, 2)} failed")



dash = KibanaDashboard()

dash.show_visualizations()

dash.show_alerts()

dash.sample_dashboard()

Automation & Compliance

# compliance.py — Certificate compliance automation

import json



class CertCompliance:

    COMPLIANCE_CHECKS = {

        "expiry": {

            "name": "Certificate Expiry Check",

            "rule": "ทุก certificate ต้องมีอายุเหลือ > 30 วัน",

            "frequency": "ทุกวัน",

        },

        "key_strength": {

            "name": "Key Strength Validation",

            "rule": "RSA >= 2048 bits, ECDSA >= 256 bits",

            "frequency": "ทุกสัปดาห์",

        },

        "protocol_version": {

            "name": "TLS Version Check",

            "rule": "TLS 1.2+ only, ปิด TLS 1.0/1.1",

            "frequency": "ทุกสัปดาห์",

        },

        "cert_chain": {

            "name": "Certificate Chain Validation",

            "rule": "Chain ต้องครบ — root CA → intermediate → leaf",

            "frequency": "ทุกวัน",

        },

        "san_coverage": {

            "name": "SAN Coverage Check",

            "rule": "ทุก subdomain ที่ใช้ต้องอยู่ใน SAN",

            "frequency": "เมื่อเพิ่ม subdomain ใหม่",

        },

    }



    AUTOMATION_SCRIPT = """

# cert_compliance.py — Automated compliance report

from elasticsearch import Elasticsearch

from datetime import datetime

import json



class CertComplianceReport:

    def __init__(self, es_host="localhost:9200"):

        self.es = Elasticsearch([es_host])

    

    def generate_report(self):

        # Query all certificates

        result = self.es.search(

            index="certificates-*",

            body={

                "size": 1000,

                "query": {"match_all": {}},

                "sort": [{"days_until_expiry": "asc"}],

            }

        )

        

        certs = [hit['_source'] for hit in result['hits']['hits']]

        

        report = {

            "generated_at": datetime.utcnow().isoformat(),

            "total_certificates": len(certs),

            "status_summary": {

                "ok": sum(1 for c in certs if c.get('cert_status') == 'ok'),

                "warning": sum(1 for c in certs if c.get('cert_status') == 'warning'),

                "critical": sum(1 for c in certs if c.get('cert_status') == 'critical'),

                "error": sum(1 for c in certs if c.get('cert_status') == 'error'),

            },

            "expiring_soon": [c for c in certs if c.get('days_until_expiry', 999) <= 30],

            "compliance_pass": all(

                c.get('days_until_expiry', 0) > 7 for c in certs

                if c.get('cert_status') != 'error'

            ),

        }

        return report



# reporter = CertComplianceReport()

# report = reporter.generate_report()

# print(json.dumps(report, indent=2))

"""



    def show_checks(self):

        print("=== Compliance Checks ===\n")

        for key, check in self.COMPLIANCE_CHECKS.items():

            print(f"[{check['name']}]")

            print(f"  Rule: {check['rule']}")

            print(f"  Frequency: {check['frequency']}")

            print()



    def show_script(self):

        print("=== Compliance Script ===")

        print(self.AUTOMATION_SCRIPT[:500])



compliance = CertCompliance()

compliance.show_checks()

compliance.show_script()

FAQ - คำถามที่พบบ่อย

Q: ELK Stack กับ Grafana/Loki อันไหนดีกว่าสำหรับ cert logs?

A: ELK: full-text search ดีกว่า, complex queries, mature ecosystem, Kibana dashboards สวย Grafana/Loki: lightweight กว่า, ถูกกว่า (ไม่ต้อง index ทุก field), integrate กับ Prometheus ดี เลือก ELK: ถ้าต้องการ full-text search + complex analytics + compliance reporting เลือก Loki: ถ้ามี Grafana อยู่แล้ว + ต้องการ cost-efficient log storage

เนื้อหาเกี่ยวข้อง — อ่านต่อ: data structure มีอะไรบ้าง

Q: Certificate logs เก็บนานแค่ไหน?

แนะนำเพิ่มเติม — บทวิเคราะห์จาก XM Signal

A: ขึ้นกับ compliance: PCI DSS: 1 ปี SOC 2: 1 ปี+ HIPAA: 6 ปี ทั่วไป: 1-2 ปี ELK: ใช้ ILM (Index Lifecycle Management) — hot → warm → cold → delete อัตโนมัติ ประหยัด storage: compress old indices, move to cold storage (S3)

เนื้อหาเกี่ยวข้อง — script type text/javascript คือ

Q: Filebeat กับ Logstash ต่างกันอย่างไร?

A: Filebeat: lightweight log shipper — อ่าน files แล้วส่งต่อ (CPU/RAM น้อย) Logstash: full data pipeline — parse, transform, enrich logs (CPU/RAM มากกว่า) ใช้ร่วมกัน: Filebeat (บน servers) → Logstash (centralized) → Elasticsearch หรือ: Filebeat → Elasticsearch โดยตรง (ถ้าไม่ต้อง complex parsing)

แนะนำเพิ่มเติม — หนังสือเทรดที่ SiamCafeBook

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: PHP Livewire Real-time Processing — คู่มือฉบับสมบูรณ์ 2026

Q: Monitor certificates กี่ domains ได้ใน ELK?

A: ไม่จำกัด — ขึ้นกับ Elasticsearch cluster size 100-500 domains: single node เพียงพอ 500-5000 domains: 3-node cluster แนะนำ 5000+ domains: multi-node cluster + dedicated master nodes Check ทุก 1 ชั่วโมง: 500 domains × 24 checks = 12,000 documents/day — เล็กมากสำหรับ Elasticsearch

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: gRPC Protobuf RBAC ABAC Policy

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง