QuestDB คืออะไร
QuestDB เป็น Time Series Database แบบ Open-source ที่ออกแบบมาเพื่อประสิทธิภาพสูงสุด เขียนด้วย Java และ C++ ใช้ Memory-mapped Files และ SIMD Instructions สำหรับ Query ที่เร็วมาก รองรับ SQL ทำให้เรียนรู้ง่ายไม่ต้องเรียนภาษาใหม่
จุดเด่นคือ Ingestion Speed ที่สูงมาก (หลายล้าน rows/second), SQL Query ที่คุ้นเคย, Built-in Web Console สำหรับ Query และ Visualization, รองรับหลาย Protocol (InfluxDB Line Protocol, PostgreSQL Wire, REST API) และ Partitioning อัตโนมัติตามเวลา
Setup และ SQL Queries
# === QuestDB Setup ===
# Docker
docker run -d --name questdb \
-p 9000:9000 \
-p 9009:9009 \
-p 8812:8812 \
-v questdb_data:/var/lib/questdb \
questdb/questdb:latest
# เข้า Web Console: http://localhost:9000
# === SQL — สร้าง Table ===
CREATE TABLE metrics (
host SYMBOL CAPACITY 256 CACHE,
metric SYMBOL CAPACITY 1024 CACHE,
value DOUBLE,
tags STRING,
timestamp TIMESTAMP
) TIMESTAMP(timestamp) PARTITION BY DAY
WAL
DEDUP UPSERT KEYS(timestamp, host, metric);
CREATE TABLE trades (
symbol SYMBOL CAPACITY 256 CACHE,
side SYMBOL CAPACITY 2 CACHE,
price DOUBLE,
quantity DOUBLE,
timestamp TIMESTAMP
) TIMESTAMP(timestamp) PARTITION BY HOUR;
CREATE TABLE iot_sensors (
device_id SYMBOL CAPACITY 10000 CACHE,
sensor_type SYMBOL CAPACITY 100 CACHE,
value DOUBLE,
battery_pct DOUBLE,
location_lat DOUBLE,
location_lon DOUBLE,
timestamp TIMESTAMP
) TIMESTAMP(timestamp) PARTITION BY DAY;
# === SQL — Query Examples ===
# 1. ค่าเฉลี่ย CPU ทุก 5 นาที
SELECT
host,
avg(value) AS avg_cpu,
max(value) AS max_cpu,
min(value) AS min_cpu,
count() AS samples
FROM metrics
WHERE metric = 'cpu_usage'
AND timestamp > dateadd('h', -1, now())
SAMPLE BY 5m
ALIGN TO CALENDAR;
# 2. Moving Average สำหรับ Stock Price
SELECT
symbol,
timestamp,
price,
avg(price) OVER (
PARTITION BY symbol
ORDER BY timestamp
ROWS BETWEEN 19 PRECEDING AND CURRENT ROW
) AS ma20,
avg(price) OVER (
PARTITION BY symbol
ORDER BY timestamp
ROWS BETWEEN 49 PRECEDING AND CURRENT ROW
) AS ma50
FROM trades
WHERE symbol = 'BTCUSDT'
AND timestamp > dateadd('d', -7, now());
# 3. ASOF JOIN — จับคู่ข้อมูลตามเวลาใกล้สุด
SELECT
t.timestamp, t.symbol, t.price,
m.value AS cpu_at_trade_time
FROM trades t
ASOF JOIN metrics m ON (m.host = 'trading-server-1'
AND m.metric = 'cpu_usage');
# 4. LATEST ON — ค่าล่าสุดของแต่ละ Device
SELECT * FROM iot_sensors
LATEST ON timestamp
PARTITION BY device_id;
# 5. Data Retention — ลบข้อมูลเก่ากว่า 30 วัน
ALTER TABLE metrics DROP PARTITION
WHERE timestamp < dateadd('d', -30, now());
Python — Ingestion Pipeline
# questdb_pipeline.py — Data Ingestion Pipeline สำหรับ QuestDB
# pip install questdb psycopg2-binary
from questdb.ingress import Sender, IngressError, TimestampNanos
import psycopg2
import time
import random
import json
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Optional
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class QuestDBPipeline:
"""Data Ingestion Pipeline สำหรับ QuestDB"""
def __init__(self, host="localhost", ilp_port=9009, pg_port=8812):
self.host = host
self.ilp_port = ilp_port
self.pg_port = pg_port
self.stats = {"sent": 0, "errors": 0, "start_time": time.time()}
def ingest_metrics(self, metrics: List[dict], batch_size=1000):
"""Ingest Metrics ผ่าน ILP (InfluxDB Line Protocol)"""
try:
with Sender(self.host, self.ilp_port) as sender:
for i, m in enumerate(metrics):
sender.row(
"metrics",
symbols={"host": m["host"], "metric": m["metric"]},
columns={"value": m["value"]},
at=TimestampNanos(int(m["timestamp"] * 1e9)),
)
if (i + 1) % batch_size == 0:
sender.flush()
self.stats["sent"] += batch_size
sender.flush()
self.stats["sent"] += len(metrics) % batch_size
logger.info(f"Ingested {len(metrics)} metrics")
except IngressError as e:
self.stats["errors"] += 1
logger.error(f"Ingestion error: {e}")
raise
def ingest_trades(self, trades: List[dict]):
"""Ingest Trade Data"""
try:
with Sender(self.host, self.ilp_port) as sender:
for trade in trades:
sender.row(
"trades",
symbols={
"symbol": trade["symbol"],
"side": trade["side"],
},
columns={
"price": trade["price"],
"quantity": trade["quantity"],
},
at=TimestampNanos(int(trade["timestamp"] * 1e9)),
)
sender.flush()
self.stats["sent"] += len(trades)
except IngressError as e:
self.stats["errors"] += 1
logger.error(f"Trade ingestion error: {e}")
def query(self, sql):
"""Query ผ่าน PostgreSQL Wire Protocol"""
conn = psycopg2.connect(
host=self.host, port=self.pg_port,
user="admin", password="quest",
database="qdb",
)
conn.autocommit = True
with conn.cursor() as cur:
cur.execute(sql)
if cur.description:
columns = [desc[0] for desc in cur.description]
rows = cur.fetchall()
return [dict(zip(columns, row)) for row in rows]
conn.close()
return []
def health_check(self):
"""ตรวจสอบสถานะ QuestDB"""
try:
result = self.query("SELECT count() FROM metrics")
metrics_count = result[0]["count"] if result else 0
result = self.query("SELECT count() FROM trades")
trades_count = result[0]["count"] if result else 0
elapsed = time.time() - self.stats["start_time"]
rate = self.stats["sent"] / elapsed if elapsed > 0 else 0
print(f"\n=== QuestDB Health ===")
print(f" Metrics rows: {metrics_count:,}")
print(f" Trades rows: {trades_count:,}")
print(f" Ingested: {self.stats['sent']:,}")
print(f" Errors: {self.stats['errors']}")
print(f" Rate: {rate:,.0f} rows/sec")
return True
except Exception as e:
logger.error(f"Health check failed: {e}")
return False
def run_retention(self, table, days=30):
"""Data Retention — ลบข้อมูลเก่า"""
sql = f"""
ALTER TABLE {table} DROP PARTITION
WHERE timestamp < dateadd('d', -{days}, now())
"""
try:
self.query(sql)
logger.info(f"Retention: dropped partitions older than {days} days from {table}")
except Exception as e:
logger.warning(f"Retention failed: {e}")
# === Demo: Generate Sample Data ===
def generate_sample_metrics(n=10000):
"""สร้างข้อมูลตัวอย่าง"""
hosts = [f"server-{i}" for i in range(1, 6)]
metric_types = ["cpu_usage", "memory_usage", "disk_io", "network_rx"]
now = time.time()
metrics = []
for i in range(n):
metrics.append({
"host": random.choice(hosts),
"metric": random.choice(metric_types),
"value": random.uniform(0, 100),
"timestamp": now - (n - i),
})
return metrics
# pipeline = QuestDBPipeline()
# metrics = generate_sample_metrics(10000)
# pipeline.ingest_metrics(metrics)
# pipeline.health_check()
Technical Debt Checklist สำหรับ Time Series DB
# ts_debt_audit.py — Technical Debt Audit สำหรับ Time Series Systems
debt_checklist = {
"Schema Design": [
("ใช้ SYMBOL type สำหรับ Low-cardinality Columns", True),
("มี Timestamp Column และ PARTITION BY ที่เหมาะสม", True),
("ไม่มี Wide Tables (> 50 columns)", False),
("มี Deduplication Strategy", True),
],
"Data Retention": [
("มี Retention Policy อัตโนมัติ", False),
("กำหนดระยะเวลาเก็บข้อมูลสำหรับแต่ละ Table", False),
("มี Downsampling สำหรับข้อมูลเก่า", False),
],
"Query Optimization": [
("ใช้ SAMPLE BY แทน GROUP BY สำหรับ Time Aggregation", True),
("ใช้ LATEST ON สำหรับดึงค่าล่าสุด", True),
("มี Index สำหรับ Columns ที่ Filter บ่อย", False),
("ไม่มี SELECT * ใน Production Code", True),
],
"Ingestion Pipeline": [
("ใช้ ILP (InfluxDB Line Protocol) สำหรับ High-throughput", True),
("มี Batch Ingestion (ไม่ส่งทีละ Row)", True),
("มี Error Handling และ Retry Logic", False),
("มี Backpressure Handling", False),
("มี Dead Letter Queue สำหรับ Failed Records", False),
],
"Monitoring": [
("ติดตาม Ingestion Rate", True),
("ติดตาม Query Latency", False),
("ติดตาม Disk Usage", True),
("มี Alert สำหรับ Ingestion Failures", False),
("มี Dashboard สำหรับ DB Performance", False),
],
"Operations": [
("มี Backup Strategy", False),
("มี Disaster Recovery Plan", False),
("มี Capacity Planning", False),
("Documentation ครบถ้วน", False),
],
}
print("Time Series DB — Technical Debt Audit")
print("=" * 55)
total = 0
done = 0
for category, items in debt_checklist.items():
ready = sum(1 for _, ok in items if ok)
total += len(items)
done += ready
pct = ready / len(items) * 100
print(f"\n[{category}] {ready}/{len(items)} ({pct:.0f}%)")
for desc, ok in items:
mark = "v" if ok else "x"
print(f" [{mark}] {desc}")
overall = done / total * 100
print(f"\nOverall: {done}/{total} ({overall:.0f}%)")
print(f"Status: {'Good' if overall >= 80 else 'Needs Work' if overall >= 50 else 'Critical'}")
Best Practices
- SYMBOL สำหรับ Tags: ใช้ SYMBOL type สำหรับ Columns ที่มี Low Cardinality เช่น host, metric_name เร็วกว่า STRING มาก
- Partitioning: เลือก Partition Size ที่เหมาะสม DAY สำหรับข้อมูลไม่มาก HOUR สำหรับ High-frequency Data
- SAMPLE BY: ใช้ SAMPLE BY แทน GROUP BY + date_trunc สำหรับ Time-based Aggregation เร็วกว่ามาก
- Data Retention: ตั้ง Retention Policy ลบ Partition เก่าอัตโนมัติ ป้องกัน Disk Full
- Batch Ingestion: ส่งข้อมูลเป็น Batch ไม่ใช่ทีละ Row ใช้ ILP สำหรับ High-throughput
- Monitoring: ติดตาม Ingestion Rate, Query Latency, Disk Usage ตั้ง Alert
QuestDB คืออะไร
Time Series Database แบบ Open-source ที่เร็วมาก ใช้ SQL Query ได้ รองรับ InfluxDB Line Protocol, PostgreSQL Wire Protocol, REST API ประสิทธิภาพสูงกว่า InfluxDB และ TimescaleDB ในหลาย Benchmark
Time Series Data คืออะไร
ข้อมูลที่เรียงตามเวลา แต่ละ Record มี Timestamp กำกับ เช่น Server Metrics, Stock Prices, IoT Sensor Data, Application Logs มีลักษณะ Append-only และ Query ตาม Time Range เป็นหลัก
Technical Debt ในระบบ Time Series มีอะไรบ้าง
Schema ไม่เหมาะ ไม่มี Retention Policy ไม่มี Partitioning Query ไม่ Optimize ไม่มี Monitoring Ingestion Pipeline ไม่มี Error Handling ไม่มี Backup และ Documentation ไม่ครบ
QuestDB ติดตั้งอย่างไร
Docker: docker run -p 9000:9000 -p 9009:9009 -p 8812:8812 questdb/questdb Port 9000 Web Console, 9009 ILP, 8812 PostgreSQL Wire เข้า Web Console ที่ localhost:9000 พร้อมใช้งานทันที
สรุป
QuestDB เป็น Time Series Database ที่เร็วและใช้ SQL ได้ เหมาะกับ Metrics, IoT, Financial Data ใช้ SYMBOL type สำหรับ Tags, SAMPLE BY สำหรับ Time Aggregation, LATEST ON สำหรับค่าล่าสุด สำหรับ Technical Debt ต้องมี Retention Policy, Partitioning Strategy, Error Handling ใน Ingestion Pipeline, Monitoring และ Documentation
