QuestDB สำหรับ Time Series Data
QuestDB เป็น Time Series Database ที่เร็วมาก ใช้ SQL Query ได้เลย รองรับหลาย Protocol ทำให้ Migrate จาก Database อื่นง่าย เหมาะสำหรับ Monitoring, IoT, Financial Data และ Log Analytics
Shift Left Security ย้ายการทดสอบ Security มาทำตั้งแต่ Development ใช้ SAST, SCA, Secret Scanning ใน CI/CD ตรวจจับปัญหาก่อน Deploy ลดค่าใช้จ่ายแก้ไขปัญหา Security
QuestDB Setup และ SQL Queries
# === QuestDB Installation และ Configuration ===
# 1. Docker
docker run -d --name questdb \
-p 9000:9000 \
-p 9009:9009 \
-p 8812:8812 \
-p 9003:9003 \
-v questdb-data:/var/lib/questdb \
questdb/questdb:latest
# Ports:
# 9000 — REST API / Web Console
# 9009 — InfluxDB Line Protocol (ILP)
# 8812 — PostgreSQL Wire Protocol
# 9003 — Health/Metrics
# 2. สร้าง Table สำหรับ Security Events
# เปิด Web Console: http://localhost:9000
# CREATE TABLE security_events (
# timestamp TIMESTAMP,
# event_type SYMBOL,
# severity SYMBOL,
# source_ip STRING,
# destination_ip STRING,
# port INT,
# protocol SYMBOL,
# message STRING,
# user_id STRING,
# risk_score DOUBLE
# ) timestamp(timestamp) PARTITION BY DAY WAL;
# CREATE TABLE metrics (
# timestamp TIMESTAMP,
# host SYMBOL,
# metric_name SYMBOL,
# value DOUBLE,
# tags STRING
# ) timestamp(timestamp) PARTITION BY HOUR WAL;
# 3. SQL Queries สำหรับ Security Analytics
# -- Top 10 Source IPs with most events
# SELECT source_ip, count() as event_count,
# avg(risk_score) as avg_risk
# FROM security_events
# WHERE timestamp > dateadd('h', -24, now())
# GROUP BY source_ip
# ORDER BY event_count DESC
# LIMIT 10;
# -- Security Events per Hour
# SELECT timestamp, event_type, count() as cnt
# FROM security_events
# WHERE timestamp > dateadd('d', -7, now())
# SAMPLE BY 1h
# ALIGN TO CALENDAR;
# -- High Risk Events
# SELECT * FROM security_events
# WHERE risk_score > 8.0
# AND timestamp > dateadd('h', -1, now())
# ORDER BY timestamp DESC;
# -- Anomaly Detection: Events exceeding 3x average
# WITH hourly AS (
# SELECT timestamp, count() as cnt
# FROM security_events
# SAMPLE BY 1h
# )
# SELECT timestamp, cnt,
# avg(cnt) OVER (ORDER BY timestamp ROWS 24 PRECEDING) as avg_24h
# FROM hourly
# WHERE cnt > 3 * avg(cnt) OVER (ORDER BY timestamp ROWS 24 PRECEDING);
echo "QuestDB running at http://localhost:9000"
echo " PostgreSQL: localhost:8812"
echo " ILP: localhost:9009"
Python Client และ Data Ingestion
# questdb_client.py — QuestDB Python Client
# pip install questdb psycopg2-binary pandas
from questdb.ingress import Sender, IngressError, TimestampNanos
import psycopg2
import time
import random
from datetime import datetime, timedelta
from typing import List, Dict
class QuestDBClient:
"""QuestDB Client สำหรับ Security Data"""
def __init__(self, host="localhost", ilp_port=9009, pg_port=8812):
self.host = host
self.ilp_port = ilp_port
self.pg_port = pg_port
def ingest_security_events(self, events: List[Dict]):
"""Ingest Security Events ผ่าน ILP (เร็วที่สุด)"""
try:
with Sender(self.host, self.ilp_port) as sender:
for event in events:
sender.row(
"security_events",
symbols={
"event_type": event["event_type"],
"severity": event["severity"],
"protocol": event.get("protocol", "TCP"),
},
columns={
"source_ip": event["source_ip"],
"destination_ip": event["destination_ip"],
"port": event["port"],
"message": event["message"],
"user_id": event.get("user_id", ""),
"risk_score": event["risk_score"],
},
at=TimestampNanos.now(),
)
sender.flush()
return len(events)
except IngressError as e:
print(f"Ingest error: {e}")
return 0
def query(self, sql: str):
"""Query ผ่าน PostgreSQL Protocol"""
conn = psycopg2.connect(
host=self.host, port=self.pg_port,
user="admin", password="quest",
database="qdb",
)
cur = conn.cursor()
cur.execute(sql)
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
conn.close()
return [dict(zip(columns, row)) for row in rows]
def security_dashboard(self):
"""Security Dashboard Queries"""
print(f"\n{'='*55}")
print(f"Security Dashboard — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print(f"{'='*55}")
# Total Events (24h)
result = self.query("""
SELECT count() as total,
avg(risk_score) as avg_risk,
max(risk_score) as max_risk
FROM security_events
WHERE timestamp > dateadd('h', -24, now())
""")
if result:
r = result[0]
print(f"\n Last 24h: {r['total']} events")
print(f" Avg Risk: {r['avg_risk']:.1f} | Max Risk: {r['max_risk']:.1f}")
# By Severity
result = self.query("""
SELECT severity, count() as cnt
FROM security_events
WHERE timestamp > dateadd('h', -24, now())
GROUP BY severity
ORDER BY cnt DESC
""")
print(f"\n By Severity:")
for r in result:
print(f" {r['severity']:>10}: {r['cnt']}")
# Top Attackers
result = self.query("""
SELECT source_ip, count() as cnt, avg(risk_score) as risk
FROM security_events
WHERE timestamp > dateadd('h', -24, now())
AND risk_score > 5
GROUP BY source_ip
ORDER BY cnt DESC
LIMIT 5
""")
print(f"\n Top Suspicious IPs:")
for r in result:
print(f" {r['source_ip']}: {r['cnt']} events (risk: {r['risk']:.1f})")
def benchmark(self, n=100000):
"""Benchmark Ingestion Speed"""
events = []
event_types = ["auth_failure", "port_scan", "brute_force", "sql_injection", "xss"]
severities = ["low", "medium", "high", "critical"]
for _ in range(n):
events.append({
"event_type": random.choice(event_types),
"severity": random.choice(severities),
"source_ip": f"10.0.{random.randint(1,255)}.{random.randint(1,255)}",
"destination_ip": f"192.168.1.{random.randint(1,50)}",
"port": random.choice([22, 80, 443, 3306, 5432, 8080]),
"protocol": random.choice(["TCP", "UDP"]),
"message": f"Security event detected",
"risk_score": round(random.uniform(0, 10), 1),
})
start = time.perf_counter()
ingested = self.ingest_security_events(events)
elapsed = time.perf_counter() - start
rate = ingested / elapsed
print(f"\nBenchmark: {ingested:,} events in {elapsed:.2f}s")
print(f" Rate: {rate:,.0f} events/second")
# client = QuestDBClient()
# client.benchmark(100000)
# client.security_dashboard()
Shift Left Security Pipeline
# === GitHub Actions — Shift Left Security Pipeline ===
# .github/workflows/shift-left-security.yml
name: Shift Left Security
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
sast:
name: Static Analysis (SAST)
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Semgrep SAST
uses: returntocorp/semgrep-action@v1
with:
config: >-
p/security-audit
p/secrets
p/owasp-top-ten
p/python
- name: Bandit (Python Security)
run: |
pip install bandit
bandit -r src/ -f json -o bandit-report.json || true
bandit -r src/ -ll # Show high severity
- name: Upload SAST Results
uses: actions/upload-artifact@v4
with:
name: sast-results
path: bandit-report.json
sca:
name: Software Composition Analysis
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Trivy Vulnerability Scan
uses: aquasecurity/trivy-action@master
with:
scan-type: fs
scan-ref: .
format: table
severity: CRITICAL, HIGH
- name: pip-audit
run: |
pip install pip-audit
pip-audit -r requirements.txt --desc || true
secrets:
name: Secret Scanning
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Gitleaks Secret Scan
uses: gitleaks/gitleaks-action@v2
env:
GITHUB_TOKEN: }
- name: TruffleHog
run: |
pip install trufflehog
trufflehog filesystem --directory=. --only-verified || true
container:
name: Container Security
runs-on: ubuntu-latest
needs: [sast, sca, secrets]
steps:
- uses: actions/checkout@v4
- name: Build Image
run: docker build -t myapp:test .
- name: Trivy Container Scan
uses: aquasecurity/trivy-action@master
with:
image-ref: myapp:test
format: table
severity: CRITICAL, HIGH
exit-code: 1
- name: Dockle Lint
run: |
docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
goodwithtech/dockle:latest myapp:test
dast:
name: Dynamic Testing (DAST)
runs-on: ubuntu-latest
needs: container
steps:
- uses: actions/checkout@v4
- name: Start Application
run: |
docker compose up -d
sleep 10
- name: OWASP ZAP Scan
uses: zaproxy/action-baseline@v0.10.0
with:
target: http://localhost:8080
- name: Stop Application
run: docker compose down
Best Practices
- Partitioning: ใช้ PARTITION BY DAY หรือ HOUR ตามปริมาณข้อมูล เพื่อ Query Performance
- WAL Mode: เปิด WAL (Write-Ahead Log) สำหรับ Concurrent Writes
- ILP Ingestion: ใช้ InfluxDB Line Protocol สำหรับ High-throughput Ingestion เร็วกว่า REST
- SAST ทุก PR: รัน Static Analysis ทุก Pull Request ก่อน Merge
- SCA Weekly: Scan Dependencies อย่างน้อยสัปดาห์ละครั้ง
- Secret Scanning: Block Commits ที่มี Secrets ด้วย Pre-commit Hooks
QuestDB คืออะไร
Open-source Time Series Database เร็วมาก ใช้ SQL Query รองรับ InfluxDB Line Protocol PostgreSQL Wire Protocol REST API เหมาะ IoT Monitoring Financial Data Log Analytics
Shift Left Security คืออะไร
ย้ายทดสอบ Security มาตั้งแต่ Development ไม่รอตอนจบ ใช้ SAST SCA Secret Scanning ใน CI/CD ตรวจจับปัญหาก่อน Deploy ลดค่าใช้จ่ายแก้ไข
QuestDB เร็วกว่า Database อื่นแค่ไหน
เร็วกว่า InfluxDB 2-10 เท่า Column-oriented Storage SIMD Instructions Memory-mapped Files Ingest 1.4M+ rows/second Single Node ใช้ SQL ปกติ
วิธี Secure Time Series Database ทำอย่างไร
เปิด Authentication ทุก Endpoint TLS Encryption Firewall จำกัด Access RBAC สิทธิ์ Encrypt Data at Rest Audit Logging ทุก Query Security Scanning ไม่เปิด API สู่ Internet
สรุป
QuestDB เป็น Time Series Database ที่เร็วและใช้ SQL ได้ เมื่อรวมกับ Shift Left Security ตรวจจับปัญหา Security ตั้งแต่ Development ใช้ SAST SCA Secret Scanning Container Scanning ใน CI/CD Pipeline Secure Database ด้วย Authentication TLS RBAC Audit Logging
