QuestDB คืออะไรและเหมาะกับ Time Series อย่างไร
QuestDB เป็น open source time series database ที่เขียนด้วย Java และ C++ ออกแบบมาสำหรับ high-performance time series workloads รองรับ SQL syntax มาตรฐาน มี built-in web console สำหรับ query และ visualize data รองรับ ingestion ผ่าน InfluxDB Line Protocol, PostgreSQL wire protocol และ REST API
ข้อดีของ QuestDB เทียบกับ time series databases อื่นๆ คือ ingestion speed ที่เร็วมาก (หลายล้าน rows/second), SQL support เต็มรูปแบบ (ไม่ต้องเรียนภาษาใหม่), column-oriented storage สำหรับ analytical queries ที่เร็ว, time-based partitioning อัตโนมัติ, built-in functions สำหรับ time series (SAMPLE BY, LATEST ON, ASOF JOIN) และ zero-dependency deployment
Use cases ที่เหมาะกับ QuestDB ได้แก่ IoT sensor data collection, application performance monitoring (APM), financial market data (tick data), DevOps metrics และ logs, real-time analytics dashboards และ fleet/vehicle tracking
Low-Code/No-Code approach สำหรับ time series หมายถึงการใช้ visual tools, pre-built connectors และ drag-and-drop interfaces เพื่อสร้าง data pipelines และ dashboards โดยเขียน code น้อยที่สุด QuestDB เหมาะเพราะ SQL-based (ไม่ต้องเรียนภาษาเฉพาะ), มี web console built-in และ integrate กับ Grafana, Telegraf, Kafka Connect ได้ง่าย
ติดตั้ง QuestDB และเริ่มต้นใช้งาน
วิธีติดตั้งและใช้งาน QuestDB
# === ติดตั้ง QuestDB ===
# Docker (recommended)
docker run -d --name questdb \
-p 9000:9000 \
-p 9009:9009 \
-p 8812:8812 \
-p 9003:9003 \
-v questdb-data:/var/lib/questdb \
questdb/questdb:latest
# Ports:
# 9000 = Web Console + REST API
# 9009 = InfluxDB Line Protocol (ILP)
# 8812 = PostgreSQL wire protocol
# 9003 = Health check
# Docker Compose
# docker-compose.yml
# services:
# questdb:
# image: questdb/questdb:latest
# restart: always
# ports:
# - "9000:9000"
# - "9009:9009"
# - "8812:8812"
# volumes:
# - questdb-data:/var/lib/questdb
# environment:
# QDB_LOG_W_STDOUT_LEVEL: ERROR
# QDB_SHARED_WORKER_COUNT: 4
# QDB_PG_USER: admin
# QDB_PG_PASSWORD: quest_password
# QDB_TELEMETRY_ENABLED: "false"
#
# volumes:
# questdb-data:
# Linux (binary)
curl -LO https://github.com/questdb/questdb/releases/latest/download/questdb-bin.tar.gz
tar xzf questdb-bin.tar.gz
cd questdb-*
./questdb.sh start
# เปิด Web Console: http://localhost:9000
# === สร้างตารางและ Insert Data ===
# สร้างตาราง (ผ่าน Web Console หรือ psql)
CREATE TABLE sensors (
timestamp TIMESTAMP,
sensor_id SYMBOL,
location SYMBOL,
temperature DOUBLE,
humidity DOUBLE,
pressure DOUBLE,
battery_level DOUBLE
) TIMESTAMP(timestamp) PARTITION BY DAY
WAL
DEDUP UPSERT KEYS(timestamp, sensor_id);
# Insert data
INSERT INTO sensors VALUES(
now(), 'sensor_001', 'building_a',
25.5, 60.2, 1013.25, 85.0
);
# Bulk insert
INSERT INTO sensors
SELECT
dateadd('s', x::int, '2024-01-01T00:00:00') AS timestamp,
'sensor_' || lpad(cast((x % 100) as string), 3, '0') AS sensor_id,
CASE WHEN x % 3 = 0 THEN 'building_a'
WHEN x % 3 = 1 THEN 'building_b'
ELSE 'building_c' END AS location,
20 + rnd_double() * 15 AS temperature,
40 + rnd_double() * 40 AS humidity,
1000 + rnd_double() * 30 AS pressure,
50 + rnd_double() * 50 AS battery_level
FROM long_sequence(10000000) x;
# ตรวจสอบ
SELECT count() FROM sensors;
SELECT * FROM sensors LIMIT 10;
สร้าง Time Series Pipeline ด้วย SQL
ใช้ QuestDB SQL functions สำหรับ time series analytics
# === QuestDB Time Series SQL ===
# 1. SAMPLE BY — Time-based aggregation
# Average temperature per hour
SELECT
timestamp,
sensor_id,
avg(temperature) AS avg_temp,
min(temperature) AS min_temp,
max(temperature) AS max_temp,
count() AS samples
FROM sensors
WHERE timestamp IN '2024-01-01'
SAMPLE BY 1h
ALIGN TO CALENDAR;
# 5-minute intervals with fill
SELECT
timestamp,
avg(temperature) AS avg_temp,
avg(humidity) AS avg_humidity
FROM sensors
WHERE sensor_id = 'sensor_001'
AND timestamp > dateadd('d', -1, now())
SAMPLE BY 5m
FILL(LINEAR);
# 2. LATEST ON — Get latest value per group
SELECT * FROM sensors
LATEST ON timestamp
PARTITION BY sensor_id;
# 3. ASOF JOIN — Time-based join (nearest timestamp)
CREATE TABLE alerts (
timestamp TIMESTAMP,
sensor_id SYMBOL,
alert_type SYMBOL,
severity INT
) TIMESTAMP(timestamp) PARTITION BY DAY WAL;
SELECT s.timestamp, s.sensor_id, s.temperature,
a.alert_type, a.severity
FROM sensors s
ASOF JOIN alerts a ON (sensor_id);
# 4. Window Functions สำหรับ Time Series
SELECT
timestamp,
sensor_id,
temperature,
avg(temperature) OVER (
PARTITION BY sensor_id
ORDER BY timestamp
ROWS BETWEEN 59 PRECEDING AND CURRENT ROW
) AS moving_avg_60,
temperature - lag(temperature) OVER (
PARTITION BY sensor_id
ORDER BY timestamp
) AS temp_change
FROM sensors
WHERE sensor_id = 'sensor_001'
AND timestamp > dateadd('h', -6, now())
ORDER BY timestamp;
# 5. Anomaly Detection
WITH stats AS (
SELECT
sensor_id,
avg(temperature) AS mean_temp,
stddev(temperature) AS std_temp
FROM sensors
WHERE timestamp > dateadd('d', -7, now())
GROUP BY sensor_id
)
SELECT s.timestamp, s.sensor_id, s.temperature,
st.mean_temp,
(s.temperature - st.mean_temp) / st.std_temp AS z_score
FROM sensors s
JOIN stats st ON s.sensor_id = st.sensor_id
WHERE abs((s.temperature - st.mean_temp) / st.std_temp) > 3
AND s.timestamp > dateadd('d', -1, now())
ORDER BY abs((s.temperature - st.mean_temp) / st.std_temp) DESC;
# 6. Downsampling (compact old data)
INSERT INTO sensors_daily
SELECT
timestamp,
sensor_id,
location,
avg(temperature) AS temperature,
avg(humidity) AS humidity,
avg(pressure) AS pressure,
min(battery_level) AS battery_level
FROM sensors
WHERE timestamp < dateadd('M', -1, now())
SAMPLE BY 1d;
Low-Code Integration ด้วย Python และ REST API
เชื่อมต่อ QuestDB ด้วย Python
#!/usr/bin/env python3
# questdb_lowcode.py — Low-Code QuestDB Integration
import requests
import json
import socket
import time
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("questdb")
class QuestDBClient:
def __init__(self, host="localhost", http_port=9000,
ilp_port=9009, pg_port=8812):
self.http_url = f"http://{host}:{http_port}"
self.ilp_host = host
self.ilp_port = ilp_port
self.pg_port = pg_port
# === REST API (query) ===
def query(self, sql):
resp = requests.get(
f"{self.http_url}/exec",
params={"query": sql, "fmt": "json"},
)
resp.raise_for_status()
data = resp.json()
if "dataset" in data:
columns = [c["name"] for c in data["columns"]]
return [dict(zip(columns, row)) for row in data["dataset"]]
return []
def query_csv(self, sql):
resp = requests.get(
f"{self.http_url}/exp",
params={"query": sql},
)
resp.raise_for_status()
return resp.text
# === InfluxDB Line Protocol (fast ingestion) ===
def send_ilp(self, lines):
if isinstance(lines, str):
lines = [lines]
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((self.ilp_host, self.ilp_port))
for line in lines:
if not line.endswith("\n"):
line += "\n"
sock.sendall(line.encode())
sock.close()
def send_metrics(self, table, tags, fields, timestamp=None):
tag_str = ",".join(f"{k}={v}" for k, v in tags.items())
field_str = ",".join(
f"{k}={v}i" if isinstance(v, int) else f"{k}={v}"
for k, v in fields.items()
)
ts = timestamp or int(time.time() * 1e9)
line = f"{table},{tag_str} {field_str} {ts}"
self.send_ilp(line)
# === REST API (import CSV) ===
def import_csv(self, table, csv_data, overwrite=False):
resp = requests.post(
f"{self.http_url}/imp",
params={
"name": table,
"overwrite": str(overwrite).lower(),
"timestamp": "timestamp",
"partitionBy": "DAY",
},
files={"data": ("data.csv", csv_data)},
)
resp.raise_for_status()
return resp.json()
# === Health Check ===
def health(self):
try:
resp = requests.get(f"{self.http_url}/health", timeout=5)
return resp.status_code == 200
except Exception:
return False
class TimeSeriesPipeline:
def __init__(self, client: QuestDBClient):
self.client = client
def create_table(self, name, columns, partition_by="DAY"):
col_defs = ", ".join(f"{c['name']} {c['type']}" for c in columns)
ts_col = next((c["name"] for c in columns if c["type"] == "TIMESTAMP"), "timestamp")
sql = f"""
CREATE TABLE IF NOT EXISTS {name} (
{col_defs}
) TIMESTAMP({ts_col}) PARTITION BY {partition_by} WAL;
"""
self.client.query(sql)
logger.info(f"Table created: {name}")
def get_latest(self, table, group_by=None):
if group_by:
sql = f"SELECT * FROM {table} LATEST ON timestamp PARTITION BY {group_by}"
else:
sql = f"SELECT * FROM {table} ORDER BY timestamp DESC LIMIT 1"
return self.client.query(sql)
def get_aggregated(self, table, interval="1h", metrics=None,
where=None, fill="LINEAR"):
if metrics is None:
metrics = ["avg(temperature) AS avg_temp"]
metrics_str = ", ".join(metrics)
sql = f"SELECT timestamp, {metrics_str} FROM {table}"
if where:
sql += f" WHERE {where}"
sql += f" SAMPLE BY {interval} FILL({fill})"
return self.client.query(sql)
def detect_anomalies(self, table, column, threshold_stddev=3, hours=24):
sql = f"""
WITH stats AS (
SELECT avg({column}) AS mean_val, stddev({column}) AS std_val
FROM {table}
WHERE timestamp > dateadd('h', -{hours}, now())
)
SELECT timestamp, {column},
({column} - stats.mean_val) / stats.std_val AS z_score
FROM {table}, stats
WHERE timestamp > dateadd('h', -{hours}, now())
AND abs(({column} - stats.mean_val) / stats.std_val) > {threshold_stddev}
ORDER BY timestamp DESC
"""
return self.client.query(sql)
def export_csv(self, query, output_file):
csv_data = self.client.query_csv(query)
with open(output_file, "w") as f:
f.write(csv_data)
logger.info(f"Exported to {output_file}")
# client = QuestDBClient()
# pipeline = TimeSeriesPipeline(client)
#
# # Send metrics
# client.send_metrics("sensors", {"sensor_id": "s001", "location": "lab"},
# {"temperature": 25.5, "humidity": 60.2})
#
# # Query
# data = pipeline.get_aggregated("sensors", interval="5m",
# metrics=["avg(temperature) AS temp", "avg(humidity) AS hum"])
# print(data)
No-Code Dashboard ด้วย Grafana
สร้าง dashboard โดยไม่ต้องเขียน code
# === Grafana + QuestDB Setup ===
# Docker Compose
# docker-compose.yml
# services:
# questdb:
# image: questdb/questdb:latest
# ports: ["9000:9000", "9009:9009", "8812:8812"]
# volumes: ["questdb-data:/var/lib/questdb"]
#
# grafana:
# image: grafana/grafana:latest
# ports: ["3000:3000"]
# volumes: ["grafana-data:/var/lib/grafana"]
# environment:
# GF_SECURITY_ADMIN_PASSWORD: admin
# depends_on: [questdb]
#
# telegraf:
# image: telegraf:latest
# volumes:
# - ./telegraf.conf:/etc/telegraf/telegraf.conf:ro
# depends_on: [questdb]
#
# volumes:
# questdb-data:
# grafana-data:
# === Grafana Data Source Configuration ===
# 1. Open Grafana: http://localhost:3000
# 2. Configuration > Data Sources > Add
# 3. Select "PostgreSQL"
# 4. Settings:
# Host: questdb:8812
# Database: qdb
# User: admin
# Password: quest
# TLS/SSL Mode: disable
# Version: 12+
# 5. Save & Test
# === Grafana Dashboard JSON (provisioning) ===
# /etc/grafana/provisioning/dashboards/questdb.json
# {
# "dashboard": {
# "title": "QuestDB IoT Dashboard",
# "panels": [
# {
# "title": "Temperature Over Time",
# "type": "timeseries",
# "datasource": "QuestDB",
# "targets": [{
# "rawSql": "SELECT timestamp AS time, avg(temperature) AS temperature FROM sensors WHERE $__timeFilter(timestamp) SAMPLE BY $__interval FILL(LINEAR)",
# "format": "time_series"
# }]
# },
# {
# "title": "Current Sensor Status",
# "type": "stat",
# "datasource": "QuestDB",
# "targets": [{
# "rawSql": "SELECT count(DISTINCT sensor_id) AS sensors, avg(temperature) AS avg_temp, avg(humidity) AS avg_humidity FROM sensors WHERE timestamp > dateadd('m', -5, now())",
# "format": "table"
# }]
# }
# ]
# }
# }
# === Telegraf Configuration ===
# telegraf.conf
# [agent]
# interval = "10s"
# flush_interval = "10s"
#
# [[inputs.cpu]]
# percpu = true
# totalcpu = true
#
# [[inputs.mem]]
#
# [[inputs.disk]]
# ignore_fs = ["tmpfs", "devtmpfs"]
#
# [[inputs.net]]
#
# [[inputs.docker]]
# endpoint = "unix:///var/run/docker.sock"
#
# [[outputs.socket_writer]]
# address = "tcp://questdb:9009"
# data_format = "influx"
# === Useful Grafana Queries for QuestDB ===
# Real-time sensor overview
# SELECT timestamp AS time, sensor_id, temperature, humidity
# FROM sensors
# WHERE $__timeFilter(timestamp)
# AND sensor_id IN ($sensor)
# ORDER BY timestamp
# Hourly aggregation
# SELECT timestamp AS time,
# avg(temperature) AS avg_temp,
# max(temperature) AS max_temp,
# min(temperature) AS min_temp
# FROM sensors
# WHERE $__timeFilter(timestamp)
# SAMPLE BY 1h
# FILL(PREV)
# Alert query (anomaly)
# SELECT timestamp AS time, sensor_id, temperature
# FROM sensors
# WHERE timestamp > dateadd('h', -1, now())
# AND temperature > 35
# ORDER BY timestamp DESC
echo "Grafana + QuestDB dashboard configured"
Performance Tuning และ Production Setup
Optimize QuestDB สำหรับ production
# === QuestDB Performance Tuning ===
# server.conf (/var/lib/questdb/conf/server.conf)
# === Memory Settings ===
# Shared worker count (= CPU cores - 1)
# shared.worker.count=7
# Page frame memory for queries
# cairo.sql.page.frame.max.rows=1000000
# === Writer Settings ===
# WAL (Write-Ahead Log) settings
# cairo.wal.enabled.default=true
# cairo.wal.segment.rollover.size=2097152
# Commit lag (batch writes)
# cairo.max.uncommitted.rows=500000
# cairo.commit.lag=10000000 # 10 seconds in microseconds
# === ILP (InfluxDB Line Protocol) Settings ===
# line.tcp.worker.count=4
# line.tcp.msg.buffer.size=32768
# line.tcp.maintenance.job.interval=1000
# === HTTP Settings ===
# http.worker.count=4
# http.min.worker.count=2
# === Partition Settings ===
# ใช้ partition size ที่เหมาะกับ data volume
# - DAY: สำหรับ high-frequency data (>1M rows/day)
# - MONTH: สำหรับ medium data
# - YEAR: สำหรับ low-frequency data
# เปลี่ยน partition strategy
# ALTER TABLE sensors SET PARAM maxUncommittedRows = 500000;
# ALTER TABLE sensors SET PARAM o3MaxLag = 10000000;
# === Benchmark Script ===
#!/usr/bin/env python3
# benchmark_questdb.py
import socket
import time
import random
def benchmark_ingestion(host="localhost", port=9009, count=1000000):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((host, port))
start = time.time()
batch = []
batch_size = 10000
for i in range(count):
sensor_id = f"sensor_{i % 100:03d}"
temp = 20 + random.random() * 15
humidity = 40 + random.random() * 40
ts = int((time.time() + i * 0.001) * 1e9)
line = f"bench, sensor_id={sensor_id} temperature={temp:.2f}, humidity={humidity:.2f} {ts}\n"
batch.append(line)
if len(batch) >= batch_size:
sock.sendall("".join(batch).encode())
batch = []
if batch:
sock.sendall("".join(batch).encode())
sock.close()
elapsed = time.time() - start
rate = count / elapsed
print(f"Ingested {count:,} rows in {elapsed:.1f}s ({rate:,.0f} rows/sec)")
# benchmark_ingestion(count=5000000)
# === Production Checklist ===
# 1. Set appropriate partition size for data volume
# 2. Configure WAL commit lag for write batching
# 3. Set worker counts based on CPU cores
# 4. Enable data deduplication if needed
# 5. Set up retention policy (DROP PARTITION)
# 6. Monitor disk space (QuestDB pre-allocates)
# 7. Use ILP for high-throughput ingestion
# 8. Enable SSL for PostgreSQL wire protocol
# 9. Set up backup schedule
# 10. Monitor with Prometheus metrics (:9003/metrics)
# === Retention Policy ===
# Delete old partitions
# ALTER TABLE sensors DROP PARTITION LIST '2023-01-01', '2023-01-02';
# Automated retention (run daily via cron)
# DELETE FROM sensors WHERE timestamp < dateadd('M', -6, now());
echo "QuestDB production tuning complete"
FAQ คำถามที่พบบ่อย
Q: QuestDB กับ InfluxDB ต่างกันอย่างไร?
A: QuestDB ใช้ SQL มาตรฐาน (PostgreSQL compatible) ส่วน InfluxDB ใช้ Flux หรือ InfluxQL QuestDB มี ingestion speed ที่เร็วกว่า (ทดสอบได้หลายล้าน rows/sec) InfluxDB มี ecosystem ที่ใหญ่กว่า (Telegraf, Kapacitor) QuestDB เป็น single binary deploy ง่ายกว่า InfluxDB มี cloud service (InfluxDB Cloud) สำหรับ developers ที่คุ้นเคย SQL QuestDB เรียนรู้ง่ายกว่ามาก
Q: QuestDB รองรับ data retention อัตโนมัติไหม?
A: QuestDB ไม่มี built-in automatic retention policy เหมือน InfluxDB ต้องจัดการเองด้วย cron job ที่รัน ALTER TABLE DROP PARTITION หรือ DELETE query เป็นประจำ ข้อดีคือ flexible กว่า กำหนด retention ต่าง table ได้ ข้อเสียคือต้อง setup เอง แนะนำสร้าง script ที่รันทุกวันผ่าน systemd timer หรือ cron
Q: ใช้ QuestDB กับ Grafana ได้ดีแค่ไหน?
A: ดีมาก QuestDB รองรับ PostgreSQL wire protocol ทำให้ใช้ PostgreSQL data source ใน Grafana ได้เลย รองรับ Grafana variables ($__timeFilter, $__interval), time series format และ table format ข้อจำกัดคือ macros บางตัวของ PostgreSQL plugin อาจไม่ทำงาน 100% กับ QuestDB SQL extensions (เช่น SAMPLE BY) แต่ใช้ raw SQL query แก้ปัญหาได้
Q: QuestDB เหมาะกับ production scale ไหม?
A: เหมาะสำหรับ single-node deployment ที่มี data ไม่เกิน 10-50TB QuestDB ไม่มี built-in clustering (ณ ปัจจุบัน) ถ้าต้องการ horizontal scaling ต้อง shard ที่ application layer ข้อดีคือ single node performance สูงมาก รองรับ millions of rows/sec ingestion สำหรับ enterprise scale ที่ต้องการ HA และ clustering ตัวเลือกอื่นคือ TimescaleDB (PostgreSQL extension) หรือ ClickHouse
