Certificate Manager Log Management ELK คืออะไร
Certificate Manager คือระบบจัดการ SSL/TLS certificates ตลอด lifecycle ตั้งแต่ออก, ติดตั้ง, renew จนถึง revoke ELK Stack (Elasticsearch, Logstash, Kibana) เป็น open-source log management platform ที่ใช้รวบรวม วิเคราะห์ และแสดงผล logs จากทุกระบบ การรวม Certificate Manager กับ ELK ช่วยให้ monitor certificate events ได้ real-time ตรวจจับ certificates ที่ใกล้หมดอายุ วิเคราะห์ security incidents จาก certificate errors และสร้าง compliance reports อัตโนมัติ
ELK Stack Architecture
# elk_arch.py — ELK Stack architecture for cert management
import json
class ELKArchitecture:
COMPONENTS = {
"elasticsearch": {
"name": "Elasticsearch",
"role": "Search & Analytics Engine — เก็บและค้นหา logs",
"description": "Distributed search engine บน Apache Lucene — index, search, aggregate logs",
"port": 9200,
},
"logstash": {
"name": "Logstash",
"role": "Data Pipeline — รับ, แปลง, ส่ง logs",
"description": "Input → Filter → Output pipeline สำหรับ process logs ก่อนส่ง Elasticsearch",
"port": 5044,
},
"kibana": {
"name": "Kibana",
"role": "Visualization — dashboard และ analytics",
"description": "Web UI สำหรับ visualize data จาก Elasticsearch — charts, maps, alerts",
"port": 5601,
},
"filebeat": {
"name": "Filebeat",
"role": "Log Shipper — ส่ง log files ไป Logstash/Elasticsearch",
"description": "Lightweight agent ติดตั้งบน servers — อ่าน log files แล้วส่งต่อ",
},
}
DOCKER_COMPOSE = """
# docker-compose.yml — ELK Stack
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms1g -Xmx1g"
ports:
- "9200:9200"
volumes:
- esdata:/usr/share/elasticsearch/data
logstash:
image: docker.elastic.co/logstash/logstash:8.12.0
ports:
- "5044:5044"
volumes:
- ./logstash.conf:/usr/share/logstash/pipeline/logstash.conf
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.12.0
ports:
- "5601:5601"
depends_on:
- elasticsearch
volumes:
esdata:
"""
def show_components(self):
print("=== ELK Stack Components ===\n")
for key, comp in self.COMPONENTS.items():
print(f"[{comp['name']}] — {comp['role']}")
print(f" {comp['description']}")
print()
def show_docker(self):
print("=== Docker Compose ===")
print(self.DOCKER_COMPOSE[:500])
elk = ELKArchitecture()
elk.show_components()
elk.show_docker()
Certificate Log Pipeline
# cert_pipeline.py — Logstash pipeline for certificate logs
import json
class CertLogPipeline:
LOGSTASH_CONFIG = """
# logstash.conf — Certificate log processing pipeline
input {
# Filebeat input for cert logs
beats {
port => 5044
tags => ["certificate"]
}
# HTTP input for cert manager webhooks
http {
port => 8080
codec => json
tags => ["cert-webhook"]
}
# File input for certbot logs
file {
path => "/var/log/letsencrypt/letsencrypt.log"
start_position => "beginning"
tags => ["certbot"]
}
}
filter {
# Parse certificate events
if "certificate" in [tags] {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:event_message}"
}
}
# Extract certificate details
if "certificate" in [event_message] {
grok {
match => {
"event_message" => "domain=%{HOSTNAME:domain} serial=%{DATA:serial} expires=%{TIMESTAMP_ISO8601:expiry_date}"
}
tag_on_failure => ["_cert_parse_failed"]
}
}
# Calculate days until expiry
ruby {
code => '
if event.get("expiry_date")
expiry = Time.parse(event.get("expiry_date"))
days_left = ((expiry - Time.now) / 86400).to_i
event.set("days_until_expiry", days_left)
if days_left <= 7
event.set("cert_status", "critical")
elsif days_left <= 30
event.set("cert_status", "warning")
else
event.set("cert_status", "ok")
end
end
'
}
}
# Parse certbot logs
if "certbot" in [tags] {
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:timestamp} - certbot - %{LOGLEVEL:level} - %{GREEDYDATA:certbot_message}"
}
}
if "renewed" in [certbot_message] {
mutate { add_field => { "cert_event" => "renewal_success" } }
}
if "failed" in [certbot_message] {
mutate { add_field => { "cert_event" => "renewal_failed" } }
}
}
# Add metadata
mutate {
add_field => { "[@metadata][index]" => "certificates-%{+YYYY.MM}" }
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "%{[@metadata][index]}"
}
# Alert on critical certificates
if [cert_status] == "critical" {
http {
url => "https://hooks.slack.com/services/xxx"
http_method => "post"
format => "json"
mapping => {
"text" => "CERT CRITICAL: %{domain} expires in %{days_until_expiry} days!"
}
}
}
}
"""
def show_pipeline(self):
print("=== Logstash Pipeline ===")
print(self.LOGSTASH_CONFIG[:600])
pipeline = CertLogPipeline()
pipeline.show_pipeline()
Python Certificate Monitor
# cert_monitor.py — Python certificate monitoring with ELK
import json
class CertMonitor:
CODE = """
# cert_elk_monitor.py — Monitor certificates and send to ELK
import ssl
import socket
import json
import requests
from datetime import datetime
from elasticsearch import Elasticsearch
class CertificateELKMonitor:
def __init__(self, es_host="localhost:9200"):
self.es = Elasticsearch([es_host])
self.index = f"certificates-{datetime.utcnow().strftime('%Y.%m')}"
def check_certificate(self, domain, port=443):
'''Check SSL certificate and return details'''
try:
context = ssl.create_default_context()
with socket.create_connection((domain, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
expire_str = cert['notAfter']
expire_date = datetime.strptime(expire_str, '%b %d %H:%M:%S %Y %Z')
days_left = (expire_date - datetime.utcnow()).days
issuer = dict(x[0] for x in cert['issuer'])
subject = dict(x[0] for x in cert['subject'])
san = [entry[1] for entry in cert.get('subjectAltName', [])]
status = 'ok' if days_left > 30 else 'warning' if days_left > 7 else 'critical'
return {
'domain': domain,
'issuer': issuer.get('organizationName', ''),
'subject_cn': subject.get('commonName', ''),
'san': san,
'not_after': expire_date.isoformat(),
'days_until_expiry': days_left,
'cert_status': status,
'serial_number': cert.get('serialNumber', ''),
'version': cert.get('version', ''),
'protocol': ssock.version(),
'cipher': ssock.cipher()[0],
'checked_at': datetime.utcnow().isoformat(),
}
except Exception as e:
return {
'domain': domain,
'cert_status': 'error',
'error': str(e),
'checked_at': datetime.utcnow().isoformat(),
}
def send_to_elk(self, cert_data):
'''Send certificate data to Elasticsearch'''
self.es.index(index=self.index, document=cert_data)
def monitor_domains(self, domains):
'''Monitor multiple domains'''
results = []
for domain in domains:
data = self.check_certificate(domain)
self.send_to_elk(data)
results.append(data)
return results
def get_expiring_certs(self, days=30):
'''Query ELK for expiring certificates'''
query = {
"query": {
"range": {
"days_until_expiry": {"lte": days}
}
},
"sort": [{"days_until_expiry": "asc"}]
}
result = self.es.search(index="certificates-*", body=query)
return [hit['_source'] for hit in result['hits']['hits']]
# monitor = CertificateELKMonitor("localhost:9200")
# domains = ["example.com", "api.example.com", "cdn.example.com"]
# results = monitor.monitor_domains(domains)
"""
def show_code(self):
print("=== Certificate ELK Monitor ===")
print(self.CODE[:600])
monitor = CertMonitor()
monitor.show_code()
Kibana Dashboard
# kibana_dashboard.py — Kibana dashboard for certificate monitoring
import json
import random
class KibanaDashboard:
VISUALIZATIONS = {
"cert_status_pie": {
"name": "Certificate Status Distribution",
"type": "Pie Chart",
"query": "agg: terms on cert_status field",
"description": "แสดงสัดส่วน OK / Warning / Critical / Error",
},
"expiry_timeline": {
"name": "Certificate Expiry Timeline",
"type": "Timeline / Bar Chart",
"query": "date_histogram on not_after field",
"description": "แสดงว่า certificates จะหมดอายุเมื่อไหร่ — วางแผน renewal",
},
"renewal_events": {
"name": "Renewal Events Over Time",
"type": "Line Chart",
"query": "date_histogram on checked_at, split by cert_event",
"description": "แสดง renewal success/failure trends",
},
"issuer_breakdown": {
"name": "Certificates by Issuer",
"type": "Bar Chart",
"query": "terms aggregation on issuer field",
"description": "แสดงจำนวน certs แยกตาม CA (Let's Encrypt, DigiCert, etc.)",
},
"domain_table": {
"name": "Domain Certificate Details",
"type": "Data Table",
"query": "top_hits, sorted by days_until_expiry ASC",
"description": "ตาราง domains + expiry date + status — sortable",
},
}
ALERTS = {
"cert_expiring_30d": {
"name": "Certificate Expiring in 30 Days",
"condition": "days_until_expiry <= 30",
"action": "Email + Slack notification",
},
"cert_expiring_7d": {
"name": "Certificate Critical — 7 Days",
"condition": "days_until_expiry <= 7",
"action": "PagerDuty + Slack #incidents",
},
"renewal_failed": {
"name": "Certificate Renewal Failed",
"condition": "cert_event == 'renewal_failed'",
"action": "Slack + Email to security team",
},
}
def show_visualizations(self):
print("=== Kibana Visualizations ===\n")
for key, viz in self.VISUALIZATIONS.items():
print(f"[{viz['name']}] ({viz['type']})")
print(f" {viz['description']}")
print()
def show_alerts(self):
print("=== Kibana Alerts ===")
for key, alert in self.ALERTS.items():
print(f" [{alert['name']}]")
print(f" Condition: {alert['condition']}")
print(f" Action: {alert['action']}")
def sample_dashboard(self):
print(f"\n=== Certificate Dashboard ===")
print(f" Total Certificates: {random.randint(50, 200)}")
print(f" OK: {random.randint(40, 180)}")
print(f" Warning (< 30d): {random.randint(3, 15)}")
print(f" Critical (< 7d): {random.randint(0, 3)}")
print(f" Errors: {random.randint(0, 5)}")
print(f" Renewals (24h): {random.randint(0, 10)} success, {random.randint(0, 2)} failed")
dash = KibanaDashboard()
dash.show_visualizations()
dash.show_alerts()
dash.sample_dashboard()
Automation & Compliance
# compliance.py — Certificate compliance automation
import json
class CertCompliance:
COMPLIANCE_CHECKS = {
"expiry": {
"name": "Certificate Expiry Check",
"rule": "ทุก certificate ต้องมีอายุเหลือ > 30 วัน",
"frequency": "ทุกวัน",
},
"key_strength": {
"name": "Key Strength Validation",
"rule": "RSA >= 2048 bits, ECDSA >= 256 bits",
"frequency": "ทุกสัปดาห์",
},
"protocol_version": {
"name": "TLS Version Check",
"rule": "TLS 1.2+ only, ปิด TLS 1.0/1.1",
"frequency": "ทุกสัปดาห์",
},
"cert_chain": {
"name": "Certificate Chain Validation",
"rule": "Chain ต้องครบ — root CA → intermediate → leaf",
"frequency": "ทุกวัน",
},
"san_coverage": {
"name": "SAN Coverage Check",
"rule": "ทุก subdomain ที่ใช้ต้องอยู่ใน SAN",
"frequency": "เมื่อเพิ่ม subdomain ใหม่",
},
}
AUTOMATION_SCRIPT = """
# cert_compliance.py — Automated compliance report
from elasticsearch import Elasticsearch
from datetime import datetime
import json
class CertComplianceReport:
def __init__(self, es_host="localhost:9200"):
self.es = Elasticsearch([es_host])
def generate_report(self):
# Query all certificates
result = self.es.search(
index="certificates-*",
body={
"size": 1000,
"query": {"match_all": {}},
"sort": [{"days_until_expiry": "asc"}],
}
)
certs = [hit['_source'] for hit in result['hits']['hits']]
report = {
"generated_at": datetime.utcnow().isoformat(),
"total_certificates": len(certs),
"status_summary": {
"ok": sum(1 for c in certs if c.get('cert_status') == 'ok'),
"warning": sum(1 for c in certs if c.get('cert_status') == 'warning'),
"critical": sum(1 for c in certs if c.get('cert_status') == 'critical'),
"error": sum(1 for c in certs if c.get('cert_status') == 'error'),
},
"expiring_soon": [c for c in certs if c.get('days_until_expiry', 999) <= 30],
"compliance_pass": all(
c.get('days_until_expiry', 0) > 7 for c in certs
if c.get('cert_status') != 'error'
),
}
return report
# reporter = CertComplianceReport()
# report = reporter.generate_report()
# print(json.dumps(report, indent=2))
"""
def show_checks(self):
print("=== Compliance Checks ===\n")
for key, check in self.COMPLIANCE_CHECKS.items():
print(f"[{check['name']}]")
print(f" Rule: {check['rule']}")
print(f" Frequency: {check['frequency']}")
print()
def show_script(self):
print("=== Compliance Script ===")
print(self.AUTOMATION_SCRIPT[:500])
compliance = CertCompliance()
compliance.show_checks()
compliance.show_script()
FAQ - คำถามที่พบบ่อย
Q: ELK Stack กับ Grafana/Loki อันไหนดีกว่าสำหรับ cert logs?
A: ELK: full-text search ดีกว่า, complex queries, mature ecosystem, Kibana dashboards สวย Grafana/Loki: lightweight กว่า, ถูกกว่า (ไม่ต้อง index ทุก field), integrate กับ Prometheus ดี เลือก ELK: ถ้าต้องการ full-text search + complex analytics + compliance reporting เลือก Loki: ถ้ามี Grafana อยู่แล้ว + ต้องการ cost-efficient log storage
Q: Certificate logs เก็บนานแค่ไหน?
A: ขึ้นกับ compliance: PCI DSS: 1 ปี SOC 2: 1 ปี+ HIPAA: 6 ปี ทั่วไป: 1-2 ปี ELK: ใช้ ILM (Index Lifecycle Management) — hot → warm → cold → delete อัตโนมัติ ประหยัด storage: compress old indices, move to cold storage (S3)
Q: Filebeat กับ Logstash ต่างกันอย่างไร?
A: Filebeat: lightweight log shipper — อ่าน files แล้วส่งต่อ (CPU/RAM น้อย) Logstash: full data pipeline — parse, transform, enrich logs (CPU/RAM มากกว่า) ใช้ร่วมกัน: Filebeat (บน servers) → Logstash (centralized) → Elasticsearch หรือ: Filebeat → Elasticsearch โดยตรง (ถ้าไม่ต้อง complex parsing)
Q: Monitor certificates กี่ domains ได้ใน ELK?
A: ไม่จำกัด — ขึ้นกับ Elasticsearch cluster size 100-500 domains: single node เพียงพอ 500-5000 domains: 3-node cluster แนะนำ 5000+ domains: multi-node cluster + dedicated master nodes Check ทุก 1 ชั่วโมง: 500 domains × 24 checks = 12,000 documents/day — เล็กมากสำหรับ Elasticsearch
