SiamCafe.net Blog
Cybersecurity

QuestDB Time Series Shift Left Security

questdb time series shift left security
QuestDB Time Series Shift Left Security | SiamCafe Blog
2026-03-31· อ. บอม — SiamCafe.net· 9,293 คำ

QuestDB สำหรับ Time Series Data

QuestDB เป็น Time Series Database ที่เร็วมาก ใช้ SQL Query ได้เลย รองรับหลาย Protocol ทำให้ Migrate จาก Database อื่นง่าย เหมาะสำหรับ Monitoring, IoT, Financial Data และ Log Analytics

Shift Left Security ย้ายการทดสอบ Security มาทำตั้งแต่ Development ใช้ SAST, SCA, Secret Scanning ใน CI/CD ตรวจจับปัญหาก่อน Deploy ลดค่าใช้จ่ายแก้ไขปัญหา Security

QuestDB Setup และ SQL Queries

# === QuestDB Installation และ Configuration ===

# 1. Docker
docker run -d --name questdb \
  -p 9000:9000 \
  -p 9009:9009 \
  -p 8812:8812 \
  -p 9003:9003 \
  -v questdb-data:/var/lib/questdb \
  questdb/questdb:latest

# Ports:
# 9000 — REST API / Web Console
# 9009 — InfluxDB Line Protocol (ILP)
# 8812 — PostgreSQL Wire Protocol
# 9003 — Health/Metrics

# 2. สร้าง Table สำหรับ Security Events
# เปิด Web Console: http://localhost:9000

# CREATE TABLE security_events (
#   timestamp TIMESTAMP,
#   event_type SYMBOL,
#   severity SYMBOL,
#   source_ip STRING,
#   destination_ip STRING,
#   port INT,
#   protocol SYMBOL,
#   message STRING,
#   user_id STRING,
#   risk_score DOUBLE
# ) timestamp(timestamp) PARTITION BY DAY WAL;

# CREATE TABLE metrics (
#   timestamp TIMESTAMP,
#   host SYMBOL,
#   metric_name SYMBOL,
#   value DOUBLE,
#   tags STRING
# ) timestamp(timestamp) PARTITION BY HOUR WAL;

# 3. SQL Queries สำหรับ Security Analytics

# -- Top 10 Source IPs with most events
# SELECT source_ip, count() as event_count,
#        avg(risk_score) as avg_risk
# FROM security_events
# WHERE timestamp > dateadd('h', -24, now())
# GROUP BY source_ip
# ORDER BY event_count DESC
# LIMIT 10;

# -- Security Events per Hour
# SELECT timestamp, event_type, count() as cnt
# FROM security_events
# WHERE timestamp > dateadd('d', -7, now())
# SAMPLE BY 1h
# ALIGN TO CALENDAR;

# -- High Risk Events
# SELECT * FROM security_events
# WHERE risk_score > 8.0
#   AND timestamp > dateadd('h', -1, now())
# ORDER BY timestamp DESC;

# -- Anomaly Detection: Events exceeding 3x average
# WITH hourly AS (
#   SELECT timestamp, count() as cnt
#   FROM security_events
#   SAMPLE BY 1h
# )
# SELECT timestamp, cnt,
#        avg(cnt) OVER (ORDER BY timestamp ROWS 24 PRECEDING) as avg_24h
# FROM hourly
# WHERE cnt > 3 * avg(cnt) OVER (ORDER BY timestamp ROWS 24 PRECEDING);

echo "QuestDB running at http://localhost:9000"
echo "  PostgreSQL: localhost:8812"
echo "  ILP: localhost:9009"

Python Client และ Data Ingestion

# questdb_client.py — QuestDB Python Client
# pip install questdb psycopg2-binary pandas

from questdb.ingress import Sender, IngressError, TimestampNanos
import psycopg2
import time
import random
from datetime import datetime, timedelta
from typing import List, Dict

class QuestDBClient:
    """QuestDB Client สำหรับ Security Data"""

    def __init__(self, host="localhost", ilp_port=9009, pg_port=8812):
        self.host = host
        self.ilp_port = ilp_port
        self.pg_port = pg_port

    def ingest_security_events(self, events: List[Dict]):
        """Ingest Security Events ผ่าน ILP (เร็วที่สุด)"""
        try:
            with Sender(self.host, self.ilp_port) as sender:
                for event in events:
                    sender.row(
                        "security_events",
                        symbols={
                            "event_type": event["event_type"],
                            "severity": event["severity"],
                            "protocol": event.get("protocol", "TCP"),
                        },
                        columns={
                            "source_ip": event["source_ip"],
                            "destination_ip": event["destination_ip"],
                            "port": event["port"],
                            "message": event["message"],
                            "user_id": event.get("user_id", ""),
                            "risk_score": event["risk_score"],
                        },
                        at=TimestampNanos.now(),
                    )
                sender.flush()
            return len(events)
        except IngressError as e:
            print(f"Ingest error: {e}")
            return 0

    def query(self, sql: str):
        """Query ผ่าน PostgreSQL Protocol"""
        conn = psycopg2.connect(
            host=self.host, port=self.pg_port,
            user="admin", password="quest",
            database="qdb",
        )
        cur = conn.cursor()
        cur.execute(sql)
        columns = [desc[0] for desc in cur.description]
        rows = cur.fetchall()
        conn.close()
        return [dict(zip(columns, row)) for row in rows]

    def security_dashboard(self):
        """Security Dashboard Queries"""
        print(f"\n{'='*55}")
        print(f"Security Dashboard — {datetime.now().strftime('%Y-%m-%d %H:%M')}")
        print(f"{'='*55}")

        # Total Events (24h)
        result = self.query("""
            SELECT count() as total,
                   avg(risk_score) as avg_risk,
                   max(risk_score) as max_risk
            FROM security_events
            WHERE timestamp > dateadd('h', -24, now())
        """)
        if result:
            r = result[0]
            print(f"\n  Last 24h: {r['total']} events")
            print(f"  Avg Risk: {r['avg_risk']:.1f} | Max Risk: {r['max_risk']:.1f}")

        # By Severity
        result = self.query("""
            SELECT severity, count() as cnt
            FROM security_events
            WHERE timestamp > dateadd('h', -24, now())
            GROUP BY severity
            ORDER BY cnt DESC
        """)
        print(f"\n  By Severity:")
        for r in result:
            print(f"    {r['severity']:>10}: {r['cnt']}")

        # Top Attackers
        result = self.query("""
            SELECT source_ip, count() as cnt, avg(risk_score) as risk
            FROM security_events
            WHERE timestamp > dateadd('h', -24, now())
              AND risk_score > 5
            GROUP BY source_ip
            ORDER BY cnt DESC
            LIMIT 5
        """)
        print(f"\n  Top Suspicious IPs:")
        for r in result:
            print(f"    {r['source_ip']}: {r['cnt']} events (risk: {r['risk']:.1f})")

    def benchmark(self, n=100000):
        """Benchmark Ingestion Speed"""
        events = []
        event_types = ["auth_failure", "port_scan", "brute_force", "sql_injection", "xss"]
        severities = ["low", "medium", "high", "critical"]

        for _ in range(n):
            events.append({
                "event_type": random.choice(event_types),
                "severity": random.choice(severities),
                "source_ip": f"10.0.{random.randint(1,255)}.{random.randint(1,255)}",
                "destination_ip": f"192.168.1.{random.randint(1,50)}",
                "port": random.choice([22, 80, 443, 3306, 5432, 8080]),
                "protocol": random.choice(["TCP", "UDP"]),
                "message": f"Security event detected",
                "risk_score": round(random.uniform(0, 10), 1),
            })

        start = time.perf_counter()
        ingested = self.ingest_security_events(events)
        elapsed = time.perf_counter() - start

        rate = ingested / elapsed
        print(f"\nBenchmark: {ingested:,} events in {elapsed:.2f}s")
        print(f"  Rate: {rate:,.0f} events/second")

# client = QuestDBClient()
# client.benchmark(100000)
# client.security_dashboard()

Shift Left Security Pipeline

# === GitHub Actions — Shift Left Security Pipeline ===
# .github/workflows/shift-left-security.yml

name: Shift Left Security
on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  sast:
    name: Static Analysis (SAST)
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Semgrep SAST
        uses: returntocorp/semgrep-action@v1
        with:
          config: >-
            p/security-audit
            p/secrets
            p/owasp-top-ten
            p/python

      - name: Bandit (Python Security)
        run: |
          pip install bandit
          bandit -r src/ -f json -o bandit-report.json || true
          bandit -r src/ -ll  # Show high severity

      - name: Upload SAST Results
        uses: actions/upload-artifact@v4
        with:
          name: sast-results
          path: bandit-report.json

  sca:
    name: Software Composition Analysis
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Trivy Vulnerability Scan
        uses: aquasecurity/trivy-action@master
        with:
          scan-type: fs
          scan-ref: .
          format: table
          severity: CRITICAL, HIGH

      - name: pip-audit
        run: |
          pip install pip-audit
          pip-audit -r requirements.txt --desc || true

  secrets:
    name: Secret Scanning
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
        with:
          fetch-depth: 0

      - name: Gitleaks Secret Scan
        uses: gitleaks/gitleaks-action@v2
        env:
          GITHUB_TOKEN: }

      - name: TruffleHog
        run: |
          pip install trufflehog
          trufflehog filesystem --directory=. --only-verified || true

  container:
    name: Container Security
    runs-on: ubuntu-latest
    needs: [sast, sca, secrets]
    steps:
      - uses: actions/checkout@v4

      - name: Build Image
        run: docker build -t myapp:test .

      - name: Trivy Container Scan
        uses: aquasecurity/trivy-action@master
        with:
          image-ref: myapp:test
          format: table
          severity: CRITICAL, HIGH
          exit-code: 1

      - name: Dockle Lint
        run: |
          docker run --rm -v /var/run/docker.sock:/var/run/docker.sock \
            goodwithtech/dockle:latest myapp:test

  dast:
    name: Dynamic Testing (DAST)
    runs-on: ubuntu-latest
    needs: container
    steps:
      - uses: actions/checkout@v4

      - name: Start Application
        run: |
          docker compose up -d
          sleep 10

      - name: OWASP ZAP Scan
        uses: zaproxy/action-baseline@v0.10.0
        with:
          target: http://localhost:8080

      - name: Stop Application
        run: docker compose down

Best Practices

QuestDB คืออะไร

Open-source Time Series Database เร็วมาก ใช้ SQL Query รองรับ InfluxDB Line Protocol PostgreSQL Wire Protocol REST API เหมาะ IoT Monitoring Financial Data Log Analytics

Shift Left Security คืออะไร

ย้ายทดสอบ Security มาตั้งแต่ Development ไม่รอตอนจบ ใช้ SAST SCA Secret Scanning ใน CI/CD ตรวจจับปัญหาก่อน Deploy ลดค่าใช้จ่ายแก้ไข

QuestDB เร็วกว่า Database อื่นแค่ไหน

เร็วกว่า InfluxDB 2-10 เท่า Column-oriented Storage SIMD Instructions Memory-mapped Files Ingest 1.4M+ rows/second Single Node ใช้ SQL ปกติ

วิธี Secure Time Series Database ทำอย่างไร

เปิด Authentication ทุก Endpoint TLS Encryption Firewall จำกัด Access RBAC สิทธิ์ Encrypt Data at Rest Audit Logging ทุก Query Security Scanning ไม่เปิด API สู่ Internet

สรุป

QuestDB เป็น Time Series Database ที่เร็วและใช้ SQL ได้ เมื่อรวมกับ Shift Left Security ตรวจจับปัญหา Security ตั้งแต่ Development ใช้ SAST SCA Secret Scanning Container Scanning ใน CI/CD Pipeline Secure Database ด้วย Authentication TLS RBAC Audit Logging

📖 บทความที่เกี่ยวข้อง

QuestDB Time Series Agile Scrum Kanbanอ่านบทความ → QuestDB Time Series Code Review Best Practiceอ่านบทความ → GCP BigQuery ML Shift Left Securityอ่านบทความ → MongoDB Atlas Search Shift Left Securityอ่านบทความ → Monte Carlo Observability Shift Left Securityอ่านบทความ →

📚 ดูบทความทั้งหมด →