Vector DB + ELK
Vector Database Pinecone Log Management ELK Elasticsearch Logstash Kibana Embedding Semantic Search ANN Hybrid Search RAG Anomaly Detection Production
| Feature | Pinecone | Elasticsearch | Weaviate | Qdrant |
|---|---|---|---|---|
| Type | Managed Vector DB | Search Engine | Open Source Vector | Open Source Vector |
| Search | ANN Vector | BM25 + Vector | ANN + BM25 | ANN Vector |
| Scale | Serverless auto | Manual cluster | K8s cluster | K8s/Docker |
| Pricing | Pay per query | Per node | Free OSS | Free OSS |
| Metadata | JSON filter | Full JSON | JSON filter | JSON filter |
| เหมาะกับ | Managed RAG | Full-text + vector | Self-hosted RAG | Self-hosted perf |
Pinecone Setup
# === Pinecone Vector Database for Log Search ===
# pip install pinecone-client sentence-transformers
# from pinecone import Pinecone, ServerlessSpec
# from sentence_transformers import SentenceTransformer
# import datetime
#
# # Initialize
# pc = Pinecone(api_key="YOUR_API_KEY")
# model = SentenceTransformer("all-MiniLM-L6-v2") # 384 dimensions
#
# # Create Index
# pc.create_index(
# name="log-vectors",
# dimension=384,
# metric="cosine",
# spec=ServerlessSpec(cloud="aws", region="us-east-1")
# )
# index = pc.Index("log-vectors")
#
# # Ingest Logs as Vectors
# def ingest_log(log_id, message, source, level, timestamp):
# embedding = model.encode(message).tolist()
# index.upsert(vectors=[{
# "id": log_id,
# "values": embedding,
# "metadata": {
# "message": message,
# "source": source,
# "level": level,
# "timestamp": timestamp,
# }
# }])
#
# # Semantic Search
# def search_logs(query, top_k=10, level_filter=None):
# query_vec = model.encode(query).tolist()
# filter_dict = {}
# if level_filter:
# filter_dict["level"] = {"$eq": level_filter}
# results = index.query(
# vector=query_vec,
# top_k=top_k,
# include_metadata=True,
# filter=filter_dict if filter_dict else None
# )
# return results.matches
from dataclasses import dataclass
@dataclass
class LogVector:
log_id: str
message: str
source: str
level: str
score: float
sample_results = [
LogVector("log-001", "Out of memory: Kill process nginx", "web-01", "ERROR", 0.95),
LogVector("log-042", "Memory usage exceeded 90% threshold", "web-02", "WARN", 0.88),
LogVector("log-103", "OOM killer invoked for process java", "app-01", "ERROR", 0.85),
LogVector("log-205", "High memory pressure detected on node", "k8s-03", "WARN", 0.82),
LogVector("log-310", "Swapping heavily due to memory exhaustion", "db-01", "WARN", 0.78),
]
print("=== Semantic Search: 'memory leak' ===")
for r in sample_results:
print(f" [{r.score:.2f}] [{r.level}] {r.source}")
print(f" {r.message}")
ELK Pipeline
# === ELK Stack Log Pipeline ===
# Filebeat → Logstash → Elasticsearch → Kibana
# + Logstash → Python Script → Pinecone (vector enrichment)
# filebeat.yml:
# filebeat.inputs:
# - type: log
# paths: ["/var/log/app/*.log"]
# fields:
# source: "app-server"
# output.logstash:
# hosts: ["logstash:5044"]
# logstash.conf:
# input {
# beats { port => 5044 }
# }
# filter {
# grok {
# match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:log_message}" }
# }
# date { match => ["timestamp", "ISO8601"] }
# }
# output {
# elasticsearch {
# hosts => ["elasticsearch:9200"]
# index => "logs-%{+YYYY.MM.dd}"
# }
# http {
# url => "http://vector-ingest:8080/ingest"
# http_method => "post"
# format => "json"
# }
# }
# Kibana Dashboard queries
# KQL: level:ERROR AND source:web-*
# Lucene: message:"out of memory" AND level:ERROR
# Aggregation: Terms on source, Date Histogram on @timestamp
@dataclass
class PipelineComponent:
component: str
role: str
input_source: str
output: str
throughput: str
pipeline = [
PipelineComponent("Filebeat", "Agent collect logs", "Log files /var/log/", "Logstash:5044", "10K events/s"),
PipelineComponent("Logstash", "Parse transform", "Filebeat", "ES + Vector API", "5K events/s"),
PipelineComponent("Elasticsearch", "Store + keyword search", "Logstash", "Kibana queries", "50K docs/s index"),
PipelineComponent("Kibana", "Dashboard visualization", "Elasticsearch", "User browser", "N/A"),
PipelineComponent("Vector Ingest", "Embed + upsert", "Logstash HTTP", "Pinecone", "500 vectors/s"),
PipelineComponent("Pinecone", "Semantic search", "Vector Ingest", "Search API", "1000 queries/s"),
]
print("\n=== Log Pipeline ===")
for p in pipeline:
print(f" [{p.component}] {p.role}")
print(f" In: {p.input_source} → Out: {p.output}")
print(f" Throughput: {p.throughput}")
Hybrid Search and Ops
# === Hybrid Search: ELK + Pinecone ===
# def hybrid_search(query, keyword_query=None, top_k=20):
# # Semantic search from Pinecone
# vector_results = search_logs(query, top_k=top_k)
#
# # Keyword search from Elasticsearch
# es_query = keyword_query or query
# es_results = es.search(index="logs-*", body={
# "query": {"match": {"message": es_query}},
# "size": top_k
# })
#
# # Reciprocal Rank Fusion
# scores = {}
# k = 60
# for rank, hit in enumerate(vector_results):
# scores[hit.id] = scores.get(hit.id, 0) + 1 / (k + rank + 1)
# for rank, hit in enumerate(es_results["hits"]["hits"]):
# doc_id = hit["_id"]
# scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
#
# sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
# return sorted_results[:top_k]
@dataclass
class OpsMetric:
metric: str
elk_value: str
pinecone_value: str
combined: str
metrics = [
OpsMetric("Daily Log Volume", "50GB / 100M events", "2M vectors", "Full coverage"),
OpsMetric("Search Latency (p95)", "120ms keyword", "45ms semantic", "150ms hybrid"),
OpsMetric("Storage Cost/month", "$500 (3-node ES)", "$200 (serverless)", "$700 total"),
OpsMetric("Retention", "30 days hot, 90 cold", "90 days", "90 days both"),
OpsMetric("Query Precision", "High (exact match)", "High (semantic)", "Very high (hybrid)"),
OpsMetric("Uptime (30d)", "99.9%", "99.99%", "99.9%"),
]
print("Operational Metrics:")
for m in metrics:
print(f" [{m.metric}]")
print(f" ELK: {m.elk_value}")
print(f" Pinecone: {m.pinecone_value}")
print(f" Combined: {m.combined}")
alerts = {
"Error Spike": "ES query: level:ERROR count > 100 in 5min → PagerDuty",
"Anomaly Detection": "Pinecone: unusual log pattern similarity < 0.5 → Slack",
"Disk Usage": "ES cluster disk > 80% → scale or rotate",
"Ingest Lag": "Logstash pipeline lag > 30s → investigate",
"Vector Ingest Fail": "HTTP 5xx from vector API > 1% → alert",
}
print(f"\n\nAlert Rules:")
for k, v in alerts.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Hybrid: ใช้ ELK + Pinecone Hybrid Search ครอบคลุมทั้ง Keyword และ Semantic
- Batch: Ingest Vectors เป็น Batch ไม่ใช่ทีละตัว ประหยัด API Call
- Namespace: ใช้ Pinecone Namespace แยกตาม Environment หรือ Source
- Retention: ลบ Vectors เก่าเกิน 90 วัน ประหยัด Storage
- Model: ใช้ all-MiniLM-L6-v2 สำหรับ Log เร็วและดีพอ
Vector Database คืออะไร
เก็บค้นหา Vector Embedding ANN Search Pinecone Managed Serverless Metadata Filtering Namespace Hybrid LLM RAG Semantic Recommendation
ใช้ Pinecone กับ Log Management อย่างไร
Log Message เป็น Vector Embedding Sentence Transformer Metadata Timestamp Source Level Semantic Search ไม่ต้อง Exact Match Hybrid ELK
ELK Stack คืออะไร
Elasticsearch เก็บค้นหา Logstash รับแปลงส่ง Kibana Dashboard Beats Agent Filebeat Metricbeat Centralized Log Security APM Scale
Hybrid Search ทำอย่างไร
Elasticsearch Keyword Exact Match Regex Pinecone Semantic Similarity Score Fusion Reciprocal Rank Fusion ครอบคลุม Precision Recall Log Analysis
สรุป
Vector Database Pinecone Log Management ELK Elasticsearch Logstash Kibana Embedding Semantic Search Hybrid ANN Anomaly Detection Pipeline Production
