SiamCafe · Blog
Vector Database Pinecone Log Management ELK —
บทความ

Vector Database Pinecone Log Management ELK —

เผยแพร่ 28 พฤษภาคม 2569

Vector DB + ELK

Vector Database Pinecone Log Management ELK Elasticsearch Logstash Kibana Embedding Semantic Search ANN Hybrid Search RAG Anomaly Detection Production

FeaturePineconeElasticsearchWeaviateQdrant
TypeManaged Vector DBSearch EngineOpen Source VectorOpen Source Vector
SearchANN VectorBM25 + VectorANN + BM25ANN Vector
ScaleServerless autoManual clusterK8s clusterK8s/Docker
PricingPay per queryPer nodeFree OSSFree OSS
MetadataJSON filterFull JSONJSON filterJSON filter
เหมาะกับManaged RAGFull-text + vectorSelf-hosted RAGSelf-hosted perf

Pinecone Setup

=== Pinecone Vector Database for Log Search ===

pip install pinecone-client sentence-transformers

from pinecone import Pinecone, ServerlessSpec

from sentence_transformers import SentenceTransformer

import datetime

# Initialize

pc = Pinecone(api_key="YOUR_API_KEY")

model = SentenceTransformer("all-MiniLM-L6-v2") # 384 dimensions

# Create Index

pc.create_index(

name="log-vectors",

dimension=384,

metric="cosine",

spec=ServerlessSpec(cloud="aws", region="us-east-1")

)

index = pc.Index("log-vectors")

# Ingest Logs as Vectors

def ingest_log(log_id, message, source, level, timestamp):

embedding = model.encode(message).tolist()

index.upsert(vectors=[{

"id": log_id,

"values": embedding,

"metadata": {

"message": message,

"source": source,

"level": level,

"timestamp": timestamp,

}

}])

# Semantic Search

def search_logs(query, top_k=10, level_filter=None):

query_vec = model.encode(query).tolist()

filter_dict = {}

if level_filter:

filter_dict["level"] = {"$eq": level_filter}

results = index.query(

vector=query_vec,

top_k=top_k,

include_metadata=True,

filter=filter_dict if filter_dict else None

)

return results.matches

from dataclasses import dataclass

@dataclass

class LogVector:

log_id: str

message: str

source: str

level: str

score: float

sample_results = [

LogVector("log-001", "Out of memory: Kill process nginx", "web-01", "ERROR", 0.95),

LogVector("log-042", "Memory usage exceeded 90% threshold", "web-02", "WARN", 0.88),

LogVector("log-103", "OOM killer invoked for process java", "app-01", "ERROR", 0.85),

LogVector("log-205", "High memory pressure detected on node", "k8s-03", "WARN", 0.82),

LogVector("log-310", "Swapping heavily due to memory exhaustion", "db-01", "WARN", 0.78),

]

print("=== Semantic Search: 'memory leak' ===")

for r in sample_results:

print(f" [{r.score:.2f}] [{r.level}] {r.source}")

print(f" {r.message}")

ELK Pipeline

=== ELK Stack Log Pipeline ===

Filebeat → Logstash → Elasticsearch → Kibana

+ Logstash → Python Script → Pinecone (vector enrichment)

filebeat.yml:

filebeat.inputs:

  • type: log

paths: ["/var/log/app/*.log"]

fields:

source: "app-server"

output.logstash:

hosts: ["logstash:5044"]

logstash.conf:

input {

beats { port => 5044 }

}

filter {

grok {

match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:log_message}" }

}

date { match => ["timestamp", "ISO8601"] }

}

output {

elasticsearch {

hosts => ["elasticsearch:9200"]

index => "logs-%{+YYYY.MM.dd}"

}

http {

url => "http://vector-ingest:8080/ingest"

http_method => "post"

format => "json"

}

}

Kibana Dashboard queries

KQL: level:ERROR AND source:web-*

Lucene: message:"out of memory" AND level:ERROR

Aggregation: Terms on source, Date Histogram on @timestamp

@dataclass

class PipelineComponent:

component: str

role: str

input_source: str

output: str

throughput: str

pipeline = [

PipelineComponent("Filebeat", "Agent collect logs", "Log files /var/log/", "Logstash:5044", "10K events/s"),

PipelineComponent("Logstash", "Parse transform", "Filebeat", "ES + Vector API", "5K events/s"),

PipelineComponent("Elasticsearch", "Store + keyword search", "Logstash", "Kibana queries", "50K docs/s index"),

PipelineComponent("Kibana", "Dashboard visualization", "Elasticsearch", "User browser", "N/A"),

PipelineComponent("Vector Ingest", "Embed + upsert", "Logstash HTTP", "Pinecone", "500 vectors/s"),

PipelineComponent("Pinecone", "Semantic search", "Vector Ingest", "Search API", "1000 queries/s"),

]

print("\n=== Log Pipeline ===")

for p in pipeline:

print(f" [{p.component}] {p.role}")

print(f" In: {p.input_source} → Out: {p.output}")

print(f" Throughput: {p.throughput}")

Hybrid Search and Ops

# === Hybrid Search: ELK + Pinecone ===

# def hybrid_search(query, keyword_query=None, top_k=20):
#     # Semantic search from Pinecone
#     vector_results = search_logs(query, top_k=top_k)
#
#     # Keyword search from Elasticsearch
#     es_query = keyword_query or query
#     es_results = es.search(index="logs-*", body={
#         "query": {"match": {"message": es_query}},
#         "size": top_k
#     })
#
#     # Reciprocal Rank Fusion
#     scores = {}
#     k = 60
#     for rank, hit in enumerate(vector_results):
#         scores[hit.id] = scores.get(hit.id, 0) + 1 / (k + rank + 1)
#     for rank, hit in enumerate(es_results["hits"]["hits"]):
#         doc_id = hit["_id"]
#         scores[doc_id] = scores.get(doc_id, 0) + 1 / (k + rank + 1)
#
#     sorted_results = sorted(scores.items(), key=lambda x: x[1], reverse=True)
#     return sorted_results[:top_k]

@dataclass
class OpsMetric:
    metric: str
    elk_value: str
    pinecone_value: str
    combined: str

metrics = [
    OpsMetric("Daily Log Volume", "50GB / 100M events", "2M vectors", "Full coverage"),
    OpsMetric("Search Latency (p95)", "120ms keyword", "45ms semantic", "150ms hybrid"),
    OpsMetric("Storage Cost/month", "$500 (3-node ES)", "$200 (serverless)", "$700 total"),
    OpsMetric("Retention", "30 days hot, 90 cold", "90 days", "90 days both"),
    OpsMetric("Query Precision", "High (exact match)", "High (semantic)", "Very high (hybrid)"),
    OpsMetric("Uptime (30d)", "99.9%", "99.99%", "99.9%"),
]

print("Operational Metrics:")
for m in metrics:
    print(f"  [{m.metric}]")
    print(f"    ELK: {m.elk_value}")
    print(f"    Pinecone: {m.pinecone_value}")
    print(f"    Combined: {m.combined}")

alerts = {
    "Error Spike": "ES query: level:ERROR count > 100 in 5min → PagerDuty",
    "Anomaly Detection": "Pinecone: unusual log pattern similarity < 0.5 → Slack",
    "Disk Usage": "ES cluster disk > 80% → scale or rotate",
    "Ingest Lag": "Logstash pipeline lag > 30s → investigate",
    "Vector Ingest Fail": "HTTP 5xx from vector API > 1% → alert",
}

print(f"\n\nAlert Rules:")
for k, v in alerts.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Hybrid: ใช้ ELK + Pinecone Hybrid Search ครอบคลุมทั้ง Keyword และ Semantic
  • Batch: Ingest Vectors เป็น Batch ไม่ใช่ทีละตัว ประหยัด API Call
  • Namespace: ใช้ Pinecone Namespace แยกตาม Environment หรือ Source
  • Retention: ลบ Vectors เก่าเกิน 90 วัน ประหยัด Storage
  • Model: ใช้ all-MiniLM-L6-v2 สำหรับ Log เร็วและดีพอ

Vector Database คืออะไร

เก็บค้นหา Vector Embedding ANN Search Pinecone Managed Serverless Metadata Filtering Namespace Hybrid LLM RAG Semantic Recommendation