Vector Database Pinecone Log Management ELK —
Vector DB + ELK
Vector Database Pinecone Log Management ELK Elasticsearch Logstash Kibana Embedding Semantic Search ANN Hybrid Search RAG Anomaly Detection Production
| Feature | Pinecone | Elasticsearch | Weaviate | Qdrant |
|---|---|---|---|---|
| Type | Managed Vector DB | Search Engine | Open Source Vector | Open Source Vector |
| Search | ANN Vector | BM25 + Vector | ANN + BM25 | ANN Vector |
| Scale | Serverless auto | Manual cluster | K8s cluster | K8s/Docker |
| Pricing | Pay per query | Per node | Free OSS | Free OSS |
| Metadata | JSON filter | Full JSON | JSON filter | JSON filter |
| เหมาะกับ | Managed RAG | Full-text + vector | Self-hosted RAG | Self-hosted perf |
Pinecone Setup
=== Pinecone Vector Database for Log Search ===
pip install pinecone-client sentence-transformers
from pinecone import Pinecone, ServerlessSpec
from sentence_transformers import SentenceTransformer
import datetime
# Initialize
pc = Pinecone(api_key="YOUR_API_KEY")
model = SentenceTransformer("all-MiniLM-L6-v2") # 384 dimensions
# Create Index
pc.create_index(
name="log-vectors",
dimension=384,
metric="cosine",
spec=ServerlessSpec(cloud="aws", region="us-east-1")
)
index = pc.Index("log-vectors")
# Ingest Logs as Vectors
def ingest_log(log_id, message, source, level, timestamp):
embedding = model.encode(message).tolist()
index.upsert(vectors=[{
"id": log_id,
"values": embedding,
"metadata": {
"message": message,
"source": source,
"level": level,
"timestamp": timestamp,
}
}])
# Semantic Search
def search_logs(query, top_k=10, level_filter=None):
query_vec = model.encode(query).tolist()
filter_dict = {}
if level_filter:
filter_dict["level"] = {"$eq": level_filter}
results = index.query(
vector=query_vec,
top_k=top_k,
include_metadata=True,
filter=filter_dict if filter_dict else None
)
return results.matches
from dataclasses import dataclass
@dataclass
class LogVector:
log_id: str
message: str
source: str
level: str
score: float
sample_results = [
LogVector("log-001", "Out of memory: Kill process nginx", "web-01", "ERROR", 0.95),
LogVector("log-042", "Memory usage exceeded 90% threshold", "web-02", "WARN", 0.88),
LogVector("log-103", "OOM killer invoked for process java", "app-01", "ERROR", 0.85),
LogVector("log-205", "High memory pressure detected on node", "k8s-03", "WARN", 0.82),
LogVector("log-310", "Swapping heavily due to memory exhaustion", "db-01", "WARN", 0.78),
]
print("=== Semantic Search: 'memory leak' ===")
for r in sample_results:
print(f" [{r.score:.2f}] [{r.level}] {r.source}")
print(f" {r.message}")
ELK Pipeline
=== ELK Stack Log Pipeline ===
Filebeat → Logstash → Elasticsearch → Kibana
+ Logstash → Python Script → Pinecone (vector enrichment)
filebeat.yml:
filebeat.inputs:
- type: log
paths: ["/var/log/app/*.log"]
fields:
source: "app-server"
output.logstash:
hosts: ["logstash:5044"]
logstash.conf:
input {
beats { port => 5044 }
}
filter {
grok {
match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:log_message}" }
}
date { match => ["timestamp", "ISO8601"] }
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{+YYYY.MM.dd}"
}
http {
url => "http://vector-ingest:8080/ingest"
http_method => "post"
format => "json"
}
}
Kibana Dashboard queries
KQL: level:ERROR AND source:web-*
Lucene: message:"out of memory" AND level:ERROR
Aggregation: Terms on source, Date Histogram on @timestamp
@dataclass
class PipelineComponent:
component: str
role: str
input_source: str
output: str
throughput: str
pipeline = [
PipelineComponent("Filebeat", "Agent collect logs", "Log files /var/log/", "Logstash:5044", "10K events/s"),
PipelineComponent("Logstash", "Parse transform", "Filebeat", "ES + Vector API", "5K events/s"),
PipelineComponent("Elasticsearch", "Store + keyword search", "Logstash", "Kibana queries", "50K docs/s index"),
PipelineComponent("Kibana", "Dashboard visualization", "Elasticsearch", "User browser", "N/A"),
PipelineComponent("Vector Ingest", "Embed + upsert", "Logstash HTTP", "Pinecone", "500 vectors/s"),
PipelineComponent("Pinecone", "Semantic search", "Vector Ingest", "Search API", "1000 queries/s"),
]
print("\n=== Log Pipeline ===")
for p in pipeline:
print(f" [{p.component}] {p.role}")
print(f" In: {p.input_source} → Out: {p.output}")
print(f" Throughput: {p.throughput}")
Hybrid Search and Ops
# === Hybrid Search: ELK + Pinecone ===
# def hybrid_search(query, keyword_query=None, top_k=20):
# # Semantic search from Pinecone
# vector_results = search_logs(query, top_k=top_k)
#
# # Keyword search from Elasticsearch
# es_query = keyword_query or query
# es_results = es.search(index="logs-*", body={
# "query": {"match": {"message": es_query}},
# "size": top_k
# })
#
# # Reciprocal Rank Fusion
# scores = {}
# k = 60
# for rank, hit in enumerate(vector_results):
# scores[hit.id] = scores.get(hit.id, 0) + 1 / (k + rank + 1)
# for rank, hit in enumerate(es_results["hits"]["hits"]):
# doc_id = hit["_id"]
# scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
#
# sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
# return sorted_results[:top_k]
@dataclass
class OpsMetric:
metric: str
elk_value: str
pinecone_value: str
combined: str
metrics = [
OpsMetric("Daily Log Volume", "50GB / 100M events", "2M vectors", "Full coverage"),
OpsMetric("Search Latency (p95)", "120ms keyword", "45ms semantic", "150ms hybrid"),
OpsMetric("Storage Cost/month", "$500 (3-node ES)", "$200 (serverless)", "$700 total"),
OpsMetric("Retention", "30 days hot, 90 cold", "90 days", "90 days both"),
OpsMetric("Query Precision", "High (exact match)", "High (semantic)", "Very high (hybrid)"),
OpsMetric("Uptime (30d)", "99.9%", "99.99%", "99.9%"),
]
print("Operational Metrics:")
for m in metrics:
print(f" [{m.metric}]")
print(f" ELK: {m.elk_value}")
print(f" Pinecone: {m.pinecone_value}")
print(f" Combined: {m.combined}")
alerts = {
"Error Spike": "ES query: level:ERROR count > 100 in 5min → PagerDuty",
"Anomaly Detection": "Pinecone: unusual log pattern similarity < 0.5 → Slack",
"Disk Usage": "ES cluster disk > 80% → scale or rotate",
"Ingest Lag": "Logstash pipeline lag > 30s → investigate",
"Vector Ingest Fail": "HTTP 5xx from vector API > 1% → alert",
}
print(f"\n\nAlert Rules:")
for k, v in alerts.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Hybrid: ใช้ ELK + Pinecone Hybrid Search ครอบคลุมทั้ง Keyword และ Semantic
- Batch: Ingest Vectors เป็น Batch ไม่ใช่ทีละตัว ประหยัด API Call
- Namespace: ใช้ Pinecone Namespace แยกตาม Environment หรือ Source
- Retention: ลบ Vectors เก่าเกิน 90 วัน ประหยัด Storage
- Model: ใช้ all-MiniLM-L6-v2 สำหรับ Log เร็วและดีพอ
Vector Database คืออะไร
เก็บค้นหา Vector Embedding ANN Search Pinecone Managed Serverless Metadata Filtering Namespace Hybrid LLM RAG Semantic Recommendation