DuckDB คืออะไรและทำไมเหมาะกับ Home Lab
DuckDB เป็น in-process analytical database engine ที่ออกแบบมาสำหรับ OLAP (Online Analytical Processing) workloads ทำงานแบบ embedded ไม่ต้องติดตั้ง server แยก เหมือน SQLite แต่ optimize สำหรับ analytics แทน transactions
ข้อดีของ DuckDB สำหรับ Home Lab คือ zero configuration ไม่ต้อง setup server, ใช้ RAM น้อย (เริ่มต้นแค่ไม่กี่ MB), อ่าน Parquet, CSV, JSON ได้โดยตรงไม่ต้อง import, query เร็วมากด้วย vectorized execution engine, รองรับ SQL standard ครบถ้วน และ integrate กับ Python, R, Node.js ได้ง่าย
Use cases สำหรับ Home Lab ได้แก่ วิเคราะห์ log files จาก servers, network traffic analysis, IoT sensor data analytics, personal finance tracking, media library metadata analysis และ system performance monitoring
DuckDB เร็วกว่า pandas สำหรับ analytical queries 10-100 เท่า โดยเฉพาะ GROUP BY, JOIN, window functions บน datasets ขนาดใหญ่ เพราะใช้ columnar storage, vectorized processing และ parallel execution
ติดตั้ง DuckDB และเริ่มต้นใช้งาน
วิธีติดตั้งและใช้งาน DuckDB
# === ติดตั้ง DuckDB ===
# Python
pip install duckdb
# Node.js
npm install duckdb
# CLI (standalone)
# Linux
wget https://github.com/duckdb/duckdb/releases/latest/download/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
sudo mv duckdb /usr/local/bin/
# macOS
brew install duckdb
# === เริ่มต้นใช้งาน CLI ===
duckdb homelab.db
# สร้างตาราง
CREATE TABLE server_logs (
timestamp TIMESTAMP,
server_name VARCHAR,
cpu_percent DOUBLE,
memory_percent DOUBLE,
disk_io_read BIGINT,
disk_io_write BIGINT,
network_rx BIGINT,
network_tx BIGINT,
request_count INTEGER,
error_count INTEGER
);
# Import CSV
COPY server_logs FROM 'logs/server_metrics_*.csv' (HEADER, DELIMITER ',');
# หรืออ่าน CSV โดยตรง (ไม่ต้อง import)
SELECT * FROM read_csv_auto('logs/server_metrics_2024*.csv') LIMIT 10;
# อ่าน Parquet
SELECT * FROM read_parquet('data/events/*.parquet') LIMIT 10;
# อ่าน JSON
SELECT * FROM read_json_auto('logs/app_logs.json') LIMIT 10;
# === Basic Analytics Queries ===
# Server CPU usage per hour
SELECT
date_trunc('hour', timestamp) AS hour,
server_name,
AVG(cpu_percent) AS avg_cpu,
MAX(cpu_percent) AS max_cpu,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY cpu_percent) AS p95_cpu
FROM server_logs
WHERE timestamp >= CURRENT_DATE - INTERVAL '7 days'
GROUP BY 1, 2
ORDER BY 1, 2;
# Top memory consumers
SELECT
server_name,
AVG(memory_percent) AS avg_memory,
MAX(memory_percent) AS max_memory,
COUNT(*) AS sample_count
FROM server_logs
GROUP BY server_name
ORDER BY avg_memory DESC
LIMIT 10;
# Anomaly detection (values > 2 standard deviations)
WITH stats AS (
SELECT
server_name,
AVG(cpu_percent) AS mean_cpu,
STDDEV(cpu_percent) AS std_cpu
FROM server_logs
GROUP BY server_name
)
SELECT l.timestamp, l.server_name, l.cpu_percent,
s.mean_cpu, s.std_cpu
FROM server_logs l
JOIN stats s ON l.server_name = s.server_name
WHERE l.cpu_percent > s.mean_cpu + 2 * s.std_cpu
ORDER BY l.timestamp DESC
LIMIT 20;
# Export results
COPY (
SELECT date_trunc('day', timestamp) AS day,
COUNT(*) AS total_requests,
SUM(error_count) AS total_errors
FROM server_logs
GROUP BY 1
ORDER BY 1
) TO 'reports/daily_summary.parquet' (FORMAT PARQUET);
.quit
สร้าง Analytics Pipeline ด้วย DuckDB
Pipeline สำหรับ automated analytics
#!/usr/bin/env python3
# analytics_pipeline.py — DuckDB Analytics Pipeline for Home Lab
import duckdb
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("analytics")
class HomeLabAnalytics:
def __init__(self, db_path="homelab.db"):
self.con = duckdb.connect(db_path)
self._setup_tables()
def _setup_tables(self):
self.con.execute("""
CREATE TABLE IF NOT EXISTS server_metrics (
timestamp TIMESTAMP,
server VARCHAR,
cpu DOUBLE,
memory DOUBLE,
disk_read BIGINT,
disk_write BIGINT,
net_rx BIGINT,
net_tx BIGINT
);
CREATE TABLE IF NOT EXISTS docker_stats (
timestamp TIMESTAMP,
container VARCHAR,
cpu_percent DOUBLE,
memory_mb DOUBLE,
net_rx_mb DOUBLE,
net_tx_mb DOUBLE,
block_read_mb DOUBLE,
block_write_mb DOUBLE
);
CREATE TABLE IF NOT EXISTS network_logs (
timestamp TIMESTAMP,
source_ip VARCHAR,
dest_ip VARCHAR,
dest_port INTEGER,
protocol VARCHAR,
bytes_transferred BIGINT,
action VARCHAR
);
""")
def ingest_csv(self, table, csv_pattern):
count_before = self.con.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
self.con.execute(f"""
INSERT INTO {table}
SELECT * FROM read_csv_auto('{csv_pattern}')
""")
count_after = self.con.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]
added = count_after - count_before
logger.info(f"Ingested {added} rows into {table}")
return added
def ingest_parquet(self, table, parquet_pattern):
self.con.execute(f"""
INSERT INTO {table}
SELECT * FROM read_parquet('{parquet_pattern}')
""")
logger.info(f"Ingested parquet data into {table}")
def server_health_report(self, hours=24):
result = self.con.execute(f"""
SELECT
server,
COUNT(*) AS samples,
ROUND(AVG(cpu), 1) AS avg_cpu,
ROUND(MAX(cpu), 1) AS max_cpu,
ROUND(PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY cpu), 1) AS p95_cpu,
ROUND(AVG(memory), 1) AS avg_memory,
ROUND(MAX(memory), 1) AS max_memory,
ROUND(SUM(disk_read) / 1024.0 / 1024.0, 1) AS total_read_mb,
ROUND(SUM(disk_write) / 1024.0 / 1024.0, 1) AS total_write_mb
FROM server_metrics
WHERE timestamp >= NOW() - INTERVAL '{hours} hours'
GROUP BY server
ORDER BY avg_cpu DESC
""").fetchdf()
return result
def top_containers(self, hours=24, limit=10):
result = self.con.execute(f"""
SELECT
container,
ROUND(AVG(cpu_percent), 2) AS avg_cpu,
ROUND(MAX(cpu_percent), 2) AS max_cpu,
ROUND(AVG(memory_mb), 1) AS avg_memory_mb,
ROUND(MAX(memory_mb), 1) AS max_memory_mb,
ROUND(SUM(net_rx_mb), 1) AS total_net_rx_mb,
ROUND(SUM(net_tx_mb), 1) AS total_net_tx_mb
FROM docker_stats
WHERE timestamp >= NOW() - INTERVAL '{hours} hours'
GROUP BY container
ORDER BY avg_cpu DESC
LIMIT {limit}
""").fetchdf()
return result
def network_analysis(self, hours=24):
result = self.con.execute(f"""
SELECT
dest_port,
protocol,
COUNT(*) AS connection_count,
COUNT(DISTINCT source_ip) AS unique_sources,
ROUND(SUM(bytes_transferred) / 1024.0 / 1024.0, 1) AS total_mb,
SUM(CASE WHEN action = 'block' THEN 1 ELSE 0 END) AS blocked_count
FROM network_logs
WHERE timestamp >= NOW() - INTERVAL '{hours} hours'
GROUP BY dest_port, protocol
ORDER BY connection_count DESC
LIMIT 20
""").fetchdf()
return result
def detect_anomalies(self, metric="cpu", threshold_stddev=2):
result = self.con.execute(f"""
WITH stats AS (
SELECT
server,
AVG({metric}) AS mean_val,
STDDEV({metric}) AS std_val
FROM server_metrics
WHERE timestamp >= NOW() - INTERVAL '7 days'
GROUP BY server
)
SELECT
m.timestamp, m.server, m.{metric} AS value,
ROUND(s.mean_val, 1) AS mean,
ROUND(s.std_val, 1) AS stddev,
ROUND((m.{metric} - s.mean_val) / NULLIF(s.std_val, 0), 2) AS z_score
FROM server_metrics m
JOIN stats s ON m.server = s.server
WHERE m.{metric} > s.mean_val + {threshold_stddev} * s.std_val
AND m.timestamp >= NOW() - INTERVAL '24 hours'
ORDER BY m.timestamp DESC
LIMIT 50
""").fetchdf()
return result
def export_report(self, output_dir="reports"):
Path(output_dir).mkdir(exist_ok=True)
date_str = datetime.now().strftime("%Y%m%d")
health = self.server_health_report()
health.to_csv(f"{output_dir}/server_health_{date_str}.csv", index=False)
containers = self.top_containers()
containers.to_csv(f"{output_dir}/top_containers_{date_str}.csv", index=False)
anomalies = self.detect_anomalies()
anomalies.to_csv(f"{output_dir}/anomalies_{date_str}.csv", index=False)
logger.info(f"Reports exported to {output_dir}/")
def close(self):
self.con.close()
# analytics = HomeLabAnalytics()
# analytics.ingest_csv("server_metrics", "logs/metrics_*.csv")
# print(analytics.server_health_report())
# print(analytics.top_containers())
# analytics.export_report()
# analytics.close()
DuckDB กับ Python Data Stack
ใช้ DuckDB ร่วมกับ pandas, Polars และ Arrow
#!/usr/bin/env python3
# duckdb_python.py — DuckDB with Python Data Stack
import duckdb
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# === DuckDB + Pandas ===
def pandas_integration():
# สร้าง DataFrame
df = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=1000000, freq="s"),
"server": np.random.choice(["web1", "web2", "db1", "cache1"], 1000000),
"cpu": np.random.uniform(5, 95, 1000000),
"memory": np.random.uniform(20, 90, 1000000),
"requests": np.random.poisson(100, 1000000),
})
# Query DataFrame ด้วย DuckDB SQL (เร็วกว่า pandas groupby มาก)
result = duckdb.sql("""
SELECT
server,
date_trunc('hour', date) AS hour,
AVG(cpu) AS avg_cpu,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY cpu) AS p99_cpu,
SUM(requests) AS total_requests,
COUNT(*) AS samples
FROM df
GROUP BY server, date_trunc('hour', date)
ORDER BY hour, server
""").df()
print(f"Aggregated to {len(result)} rows")
return result
# === DuckDB + Parquet Files ===
def parquet_analytics():
con = duckdb.connect()
# Query Parquet files directly (no import needed)
result = con.execute("""
SELECT
date_trunc('day', timestamp) AS day,
COUNT(*) AS events,
COUNT(DISTINCT user_id) AS unique_users,
AVG(duration_ms) AS avg_duration
FROM read_parquet('data/events/**/*.parquet')
WHERE timestamp >= '2024-01-01'
GROUP BY 1
ORDER BY 1
""").fetchdf()
return result
# === DuckDB Window Functions ===
def window_functions_demo():
con = duckdb.connect()
# Create sample data
con.execute("""
CREATE TABLE metrics AS
SELECT
'2024-01-01'::DATE + (i / 24)::INTEGER AS date,
i % 24 AS hour,
CASE WHEN i % 3 = 0 THEN 'web1'
WHEN i % 3 = 1 THEN 'web2'
ELSE 'db1' END AS server,
50 + random() * 40 AS cpu,
30 + random() * 50 AS memory
FROM generate_series(0, 10000) AS t(i)
""")
# Moving average
result = con.execute("""
SELECT
date,
hour,
server,
cpu,
ROUND(AVG(cpu) OVER (
PARTITION BY server
ORDER BY date, hour
ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
), 1) AS cpu_24h_avg,
ROUND(cpu - LAG(cpu) OVER (
PARTITION BY server
ORDER BY date, hour
), 1) AS cpu_change,
RANK() OVER (
PARTITION BY date
ORDER BY cpu DESC
) AS cpu_rank
FROM metrics
ORDER BY date DESC, hour DESC
LIMIT 20
""").fetchdf()
return result
# === DuckDB Extensions ===
def extensions_demo():
con = duckdb.connect()
# Install and load extensions
con.execute("INSTALL httpfs; LOAD httpfs;")
con.execute("INSTALL json; LOAD json;")
# Query remote Parquet file (S3 or HTTP)
# result = con.execute("""
# SELECT * FROM read_parquet('https://example.com/data/file.parquet')
# LIMIT 10
# """).fetchdf()
# Query JSON files
# result = con.execute("""
# SELECT
# json_extract_string(data, '$.name') AS name,
# json_extract(data, '$.metrics.cpu')::DOUBLE AS cpu
# FROM read_json_auto('logs/*.json')
# """).fetchdf()
# Full-text search
con.execute("INSTALL fts; LOAD fts;")
con.execute("""
CREATE TABLE logs (id INTEGER, message VARCHAR, level VARCHAR);
INSERT INTO logs VALUES
(1, 'Connection timeout to database server', 'ERROR'),
(2, 'User login successful from 192.168.1.100', 'INFO'),
(3, 'Disk usage exceeded 90 percent threshold', 'WARN'),
(4, 'Database query took 5 seconds to complete', 'WARN');
""")
con.execute("PRAGMA create_fts_index('logs', 'id', 'message');")
result = con.execute("""
SELECT id, message, level, fts_main_logs.match_bm25(id, 'database') AS score
FROM logs
WHERE score IS NOT NULL
ORDER BY score DESC
""").fetchdf()
print(result)
return result
pandas_integration()
# window_functions_demo()
Home Lab Analytics Dashboard
สร้าง dashboard สำหรับ home lab monitoring
#!/usr/bin/env python3
# homelab_dashboard.py — Home Lab Analytics Dashboard
import duckdb
import json
from datetime import datetime
from pathlib import Path
class HomeLabDashboard:
def __init__(self, db_path="homelab.db"):
self.con = duckdb.connect(db_path, read_only=True)
def get_overview(self):
try:
result = self.con.execute("""
SELECT
COUNT(DISTINCT server) AS total_servers,
COUNT(*) AS total_samples,
ROUND(AVG(cpu), 1) AS avg_cpu,
ROUND(AVG(memory), 1) AS avg_memory,
MIN(timestamp) AS oldest_data,
MAX(timestamp) AS newest_data
FROM server_metrics
""").fetchone()
return {
"total_servers": result[0],
"total_samples": result[1],
"avg_cpu": result[2],
"avg_memory": result[3],
"data_range": f"{result[4]} to {result[5]}",
}
except Exception:
return {"total_servers": 0, "total_samples": 0, "avg_cpu": 0, "avg_memory": 0}
def generate_html(self, output="homelab_dashboard.html"):
overview = self.get_overview()
html = f"""
DuckDB Analytics Home Lab Setup — | SiamCafe
Powered by DuckDB | Updated: {datetime.now().strftime('%Y-%m-%d %H:%M')}
Servers{overview['total_servers']}
Data Points{overview['total_samples']:,}
Avg CPU{overview['avg_cpu']}%
Avg Memory{overview['avg_memory']}%
DuckDB Query Examples
Query Description
SELECT * FROM server_metrics LIMIT 10;View recent metrics
SELECT server, AVG(cpu) FROM server_metrics GROUP BY 1;Average CPU per server
SELECT * FROM read_csv_auto('logs/*.csv');Query CSV files directly
SELECT * FROM read_parquet('data/*.parquet');Query Parquet files
COPY (...) TO 'report.parquet' (FORMAT PARQUET);Export to Parquet
Quick Start Commands
Command Description
duckdb homelab.dbOpen database
.tablesList tables
.schema table_nameShow table schema
DESCRIBE table_name;Describe columns
SUMMARIZE table_name;Statistics summary
"""
Path(output).write_text(html)
return output
# dashboard = HomeLabDashboard()
# dashboard.generate_html()
Performance Tuning และ Advanced Features
เทคนิค optimize DuckDB performance
#!/usr/bin/env python3
# duckdb_advanced.py — DuckDB Advanced Features and Tuning
import duckdb
import time
# === Performance Tuning ===
con = duckdb.connect("homelab.db")
# Set memory limit
con.execute("SET memory_limit = '4GB';")
# Set threads
con.execute("SET threads = 4;")
# Enable progress bar
con.execute("SET enable_progress_bar = true;")
# Preserve insertion order (disable for speed)
con.execute("SET preserve_insertion_order = false;")
# === Benchmark Helper ===
def benchmark(name, query, con):
start = time.time()
result = con.execute(query).fetchdf()
elapsed = time.time() - start
print(f"{name}: {elapsed:.3f}s ({len(result)} rows)")
return result
# === Indexing ===
# DuckDB uses ART indexes for point queries
# con.execute("CREATE INDEX idx_server ON server_metrics(server);")
# con.execute("CREATE INDEX idx_timestamp ON server_metrics(timestamp);")
# === Materialized Views (using tables) ===
con.execute("""
CREATE OR REPLACE TABLE daily_summary AS
SELECT
date_trunc('day', timestamp)::DATE AS date,
server,
COUNT(*) AS samples,
ROUND(AVG(cpu), 2) AS avg_cpu,
ROUND(MAX(cpu), 2) AS max_cpu,
ROUND(AVG(memory), 2) AS avg_memory,
ROUND(MAX(memory), 2) AS max_memory,
SUM(disk_read) AS total_disk_read,
SUM(disk_write) AS total_disk_write
FROM server_metrics
GROUP BY 1, 2;
""")
# === Recursive CTEs ===
con.execute("""
-- Generate date series for gap filling
CREATE OR REPLACE VIEW date_range AS
WITH RECURSIVE dates AS (
SELECT '2024-01-01'::DATE AS d
UNION ALL
SELECT d + INTERVAL '1 day' FROM dates WHERE d < '2024-12-31'
)
SELECT d AS date FROM dates;
""")
# === Window Functions Advanced ===
# Exponential moving average approximation
con.execute("""
CREATE OR REPLACE VIEW cpu_trends AS
SELECT
timestamp,
server,
cpu,
AVG(cpu) OVER w AS rolling_avg,
STDDEV(cpu) OVER w AS rolling_std,
cpu - AVG(cpu) OVER w AS deviation,
NTILE(10) OVER (PARTITION BY server ORDER BY cpu) AS decile
FROM server_metrics
WINDOW w AS (
PARTITION BY server
ORDER BY timestamp
ROWS BETWEEN 99 PRECEDING AND CURRENT ROW
);
""")
# === PIVOT / UNPIVOT ===
# Pivot: rows to columns
con.execute("""
CREATE OR REPLACE VIEW server_pivot AS
PIVOT (
SELECT date_trunc('day', timestamp)::DATE AS date,
server, ROUND(AVG(cpu), 1) AS avg_cpu
FROM server_metrics
GROUP BY 1, 2
)
ON server
USING AVG(avg_cpu)
ORDER BY date;
""")
# === User-Defined Functions ===
con.create_function(
"classify_cpu",
lambda x: "critical" if x > 90 else "warning" if x > 70 else "normal",
[duckdb.typing.DOUBLE],
duckdb.typing.VARCHAR,
)
# ใช้ UDF
# result = con.execute("""
# SELECT server, cpu, classify_cpu(cpu) AS status
# FROM server_metrics
# ORDER BY cpu DESC LIMIT 10
# """).fetchdf()
# === Exporting Data ===
# Parquet (recommended for analytics)
# con.execute("COPY server_metrics TO 'export/metrics.parquet' (FORMAT PARQUET, COMPRESSION ZSTD);")
# CSV
# con.execute("COPY server_metrics TO 'export/metrics.csv' (HEADER, DELIMITER ',');")
# JSON
# con.execute("COPY server_metrics TO 'export/metrics.json' (FORMAT JSON, ARRAY true);")
# Partitioned Parquet
# con.execute("""
# COPY server_metrics TO 'export/partitioned'
# (FORMAT PARQUET, PARTITION_BY (server), OVERWRITE_OR_IGNORE);
# """)
print("DuckDB advanced features configured")
con.close()
FAQ คำถามที่พบบ่อย
Q: DuckDB กับ SQLite ต่างกันอย่างไร?
A: SQLite เป็น row-oriented database ออกแบบสำหรับ OLTP (transactions, point queries) DuckDB เป็น column-oriented database ออกแบบสำหรับ OLAP (analytics, aggregations) สำหรับ query ที่ scan ข้อมูลจำนวนมาก (GROUP BY, SUM, AVG) DuckDB เร็วกว่า 10-100 เท่า สำหรับ INSERT ทีละ row SQLite เร็วกว่า ใช้ทั้งสองตาม use case SQLite สำหรับ app data DuckDB สำหรับ analytics
Q: DuckDB รองรับ data ขนาดเท่าไหร?
A: DuckDB รองรับ data ที่ใหญ่กว่า RAM ได้ด้วย out-of-core processing (spill to disk) ทดสอบจริงรัน queries บน datasets 100GB+ ด้วย RAM 16GB ได้ สำหรับ Home Lab ที่ data ไม่เกิน 10-50GB DuckDB ทำงานได้สบาย ถ้า data เกิน 100GB+ อาจพิจารณา ClickHouse หรือ Apache Spark แทน
Q: DuckDB เหมาะกับ production ไหม?
A: DuckDB เหมาะสำหรับ single-user analytical workloads ไม่เหมาะสำหรับ multi-user concurrent access หรือ high-throughput write workloads เหมาะสำหรับ embedded analytics, data science notebooks, ETL pipelines, reporting และ Home Lab analytics สำหรับ production multi-user analytics ใช้ ClickHouse, Snowflake หรือ BigQuery
Q: จะ backup DuckDB database อย่างไร?
A: DuckDB database เป็นไฟล์เดียว (.db) สามารถ copy ไฟล์ได้เลย (เมื่อ database ปิดอยู่) หรือ export เป็น Parquet files ด้วย EXPORT DATABASE 'backup_dir' (FORMAT PARQUET) สำหรับ Home Lab แนะนำ cron job ที่ export เป็น Parquet ทุกวัน เพราะ Parquet files เป็น standard format อ่านได้จากทุก tool
