ClickHouse Analytics Automation Script —
ClickHouse คืออะไรและทำไมถึงเร็ว

ClickHouse เป็น open source columnar OLAP database ที่พัฒนาโดย Yandex ออกแบบมาสำหรับ real-time analytical queries บน datasets ขนาดใหญ่ (petabyte-scale) ให้ query speed ที่เร็วกว่า traditional databases หลายร้อยเท่า
เหตุผลที่ ClickHouse เร็วคือ columnar storage อ่านเฉพาะ columns ที่ต้องการ, data compression ที่ลดขนาดข้อมูล 5-10 เท่า, vectorized query execution ที่ process data เป็น batches, parallel processing ใช้ CPU cores ทั้งหมด, primary key index ที่ skip data ไม่ต้องการ และ specialized data structures สำหรับแต่ละ data type
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Go Chi Router Internal Developer Platform
ClickHouse เหมาะสำหรับ log analytics, clickstream analysis, real-time dashboards, time series data, ad-tech analytics, financial data analysis และ IoT data processing ไม่เหมาะสำหรับ transactional workloads (OLTP) ที่ต้อง UPDATE/DELETE บ่อย
เมื่อเทียบกับ alternatives ClickHouse เร็วกว่า PostgreSQL สำหรับ analytical queries 100-1000 เท่า เร็วกว่า Elasticsearch สำหรับ structured data analytics 5-10 เท่า และมี operational cost ต่ำกว่า cloud data warehouses เช่น BigQuery หรือ Snowflake
แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Uptime Kuma Monitoring Load Testing Strategy —
ติดตั้ง ClickHouse และตั้งค่าเบื้องต้น
วิธีติดตั้งและ configure ClickHouse
# === ติดตั้ง ClickHouse ===
# Ubuntu/Debian
sudo apt-get install -y apt-transport-https ca-certificates curl gnupg
curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | \
sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] \
https://packages.clickhouse.com/deb stable main" | \
sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
# Start service
sudo systemctl start clickhouse-server
sudo systemctl enable clickhouse-server
# Docker
docker run -d --name clickhouse \
-p 8123:8123 -p 9000:9000 \
-v clickhouse-data:/var/lib/clickhouse \
-v clickhouse-logs:/var/log/clickhouse-server \
-e CLICKHOUSE_DB=analytics \
-e CLICKHOUSE_USER=admin \
-e CLICKHOUSE_PASSWORD=ch_password \
clickhouse/clickhouse-server:latest
# เชื่อมต่อ
clickhouse-client --user admin --password ch_password
# === สร้าง Database และ Tables ===
CREATE DATABASE IF NOT EXISTS analytics;
-- Web Analytics Events
CREATE TABLE analytics.events (
event_date Date,
event_time DateTime64(3),
user_id UInt64,
session_id String,
event_type LowCardinality(String),
page_url String,
referrer String,
country LowCardinality(String),
device LowCardinality(String),
browser LowCardinality(String),
duration_ms UInt32,
properties Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id, event_time)
TTL event_date + INTERVAL 1 YEAR
SETTINGS index_granularity = 8192;
-- Server Metrics
CREATE TABLE analytics.server_metrics (
timestamp DateTime64(3),
host LowCardinality(String),
metric LowCardinality(String),
value Float64,
tags Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (host, metric, timestamp)
TTL timestamp + INTERVAL 90 DAY;
-- Insert sample data
INSERT INTO analytics.events
SELECT
today() - (rand() % 365) AS event_date,
toDateTime64(event_date, 3) + rand() % 86400 AS event_time,
rand() % 100000 AS user_id,
generateUUIDv4() AS session_id,
arrayElement(['pageview','click','scroll','purchase'], 1 + rand() % 4) AS event_type,
concat('/page/', toString(rand() % 50)) AS page_url,
arrayElement(['google.com','facebook.com','direct','twitter.com'], 1 + rand() % 4) AS referrer,
arrayElement(['TH','US','JP','SG','DE'], 1 + rand() % 5) AS country,
arrayElement(['mobile','desktop','tablet'], 1 + rand() % 3) AS device,
arrayElement(['Chrome','Firefox','Safari','Edge'], 1 + rand() % 4) AS browser,
rand() % 30000 AS duration_ms,
map('version', '1.0') AS properties
FROM numbers(10000000);
SELECT count() FROM analytics.events;
SELECT formatReadableSize(total_bytes) FROM system.tables WHERE name = 'events';
สร้าง Analytics Queries และ Materialized Views

Analytical queries และ automated aggregation
แนะนำเพิ่มเติม — เรียนเทรดกับ iCafeForex
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Elasticsearch OpenSearch Cloud Migration Strategy
# === ClickHouse Analytics Queries ===
-- Daily active users
SELECT
event_date,
uniqExact(user_id) AS dau,
count() AS total_events,
countIf(event_type = 'purchase') AS purchases,
round(countIf(event_type = 'purchase') / uniqExact(user_id) * 100, 2) AS conversion_rate
FROM analytics.events
WHERE event_date >= today() - 30
GROUP BY event_date
ORDER BY event_date;
-- Top pages with engagement metrics
SELECT
page_url,
count() AS views,
uniqExact(user_id) AS unique_visitors,
avg(duration_ms) AS avg_duration_ms,
quantile(0.50)(duration_ms) AS p50_duration,
quantile(0.95)(duration_ms) AS p95_duration
FROM analytics.events
WHERE event_type = 'pageview'
AND event_date >= today() - 7
GROUP BY page_url
ORDER BY views DESC
LIMIT 20;
-- Funnel analysis
SELECT
level,
users,
round(users / first_value(users) OVER (ORDER BY level) * 100, 1) AS pct_of_start
FROM (
SELECT 1 AS level, uniqExact(user_id) AS users
FROM analytics.events WHERE event_type = 'pageview' AND event_date = today()
UNION ALL
SELECT 2, uniqExact(user_id)
FROM analytics.events WHERE event_type = 'click' AND event_date = today()
UNION ALL
SELECT 3, uniqExact(user_id)
FROM analytics.events WHERE event_type = 'purchase' AND event_date = today()
)
ORDER BY level;
-- === Materialized Views (auto-aggregation) ===
-- Daily stats materialized view
CREATE MATERIALIZED VIEW analytics.daily_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, country, device)
AS SELECT
event_date,
country,
device,
count() AS event_count,
uniqState(user_id) AS unique_users_state,
sumIf(1, event_type = 'purchase') AS purchase_count,
avgState(duration_ms) AS avg_duration_state
FROM analytics.events
GROUP BY event_date, country, device;
-- Query materialized view
SELECT
event_date,
country,
sum(event_count) AS events,
uniqMerge(unique_users_state) AS unique_users,
sum(purchase_count) AS purchases,
avgMerge(avg_duration_state) AS avg_duration
FROM analytics.daily_stats_mv
WHERE event_date >= today() - 7
GROUP BY event_date, country
ORDER BY event_date, events DESC;
-- Hourly metrics materialized view
CREATE MATERIALIZED VIEW analytics.hourly_metrics_mv
ENGINE = SummingMergeTree()
ORDER BY (hour, host, metric)
AS SELECT
toStartOfHour(timestamp) AS hour,
host,
metric,
avg(value) AS avg_value,
max(value) AS max_value,
min(value) AS min_value,
count() AS sample_count
FROM analytics.server_metrics
GROUP BY hour, host, metric;
Automation Scripts ด้วย Python
Python scripts สำหรับ automate ClickHouse tasks
#!/usr/bin/env python3
# clickhouse_automation.py — ClickHouse Analytics Automation
import clickhouse_connect
import json
import logging
import csv
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ch_automation")
class ClickHouseAutomation:
def __init__(self, host="localhost", port=8123,
user="admin", password="ch_password",
database="analytics"):
self.client = clickhouse_connect.get_client(
host=host, port=port,
username=user, password=password,
database=database,
)
logger.info(f"Connected to ClickHouse {host}:{port}/{database}")
def run_query(self, sql, params=None):
result = self.client.query(sql, parameters=params)
return result.result_rows, result.column_names
def run_command(self, sql):
self.client.command(sql)
def daily_report(self, date=None):
if date is None:
date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
queries = {
"overview": f"""
SELECT
count() AS total_events,
uniqExact(user_id) AS unique_users,
countIf(event_type = 'purchase') AS purchases,
round(avg(duration_ms), 0) AS avg_duration_ms
FROM events WHERE event_date = '{date}'
""",
"top_pages": f"""
SELECT page_url, count() AS views, uniqExact(user_id) AS users
FROM events
WHERE event_date = '{date}' AND event_type = 'pageview'
GROUP BY page_url ORDER BY views DESC LIMIT 10
""",
"country_breakdown": f"""
SELECT country, uniqExact(user_id) AS users, count() AS events
FROM events WHERE event_date = '{date}'
GROUP BY country ORDER BY users DESC
""",
}
report = {"date": date, "generated_at": datetime.now().isoformat()}
for name, sql in queries.items():
rows, cols = self.run_query(sql)
report[name] = [dict(zip(cols, row)) for row in rows]
return report
def optimize_tables(self, database="analytics"):
rows, _ = self.run_query(
"SELECT name FROM system.tables WHERE database = %(db)s",
params={"db": database},
)
for (table_name,) in rows:
logger.info(f"Optimizing {database}.{table_name}")
self.run_command(f"OPTIMIZE TABLE {database}.{table_name} FINAL")
def cleanup_old_partitions(self, table, retention_days=365):
cutoff = (datetime.now() - timedelta(days=retention_days)).strftime("%Y%m%d")
rows, _ = self.run_query(f"""
SELECT partition, sum(rows) AS total_rows,
formatReadableSize(sum(bytes_on_disk)) AS size
FROM system.parts
WHERE table = '{table}' AND partition < '{cutoff}'
GROUP BY partition ORDER BY partition
""")
for partition, total_rows, size in rows:
logger.info(f"Dropping partition {partition} ({total_rows} rows, {size})")
self.run_command(f"ALTER TABLE {table} DROP PARTITION '{partition}'")
def export_csv(self, sql, output_file):
rows, cols = self.run_query(sql)
with open(output_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(cols)
writer.writerows(rows)
logger.info(f"Exported {len(rows)} rows to {output_file}")
def get_table_stats(self):
rows, cols = self.run_query("""
SELECT
database, name AS table_name,
formatReadableSize(total_bytes) AS size,
formatReadableQuantity(total_rows) AS rows,
partition_count,
engine
FROM (
SELECT database, name, total_bytes, total_rows, engine,
count() AS partition_count
FROM system.tables t
LEFT JOIN system.parts p ON t.name = p.table AND t.database = p.database
WHERE t.database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')
GROUP BY database, name, total_bytes, total_rows, engine
)
ORDER BY total_bytes DESC
""")
return [dict(zip(cols, row)) for row in rows]
def schedule_backup(self, backup_name=None):
if not backup_name:
backup_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
self.run_command(f"BACKUP DATABASE analytics TO Disk('backups', '{backup_name}')")
logger.info(f"Backup created: {backup_name}")
# automation = ClickHouseAutomation()
# report = automation.daily_report()
# print(json.dumps(report, indent=2, default=str))
# automation.cleanup_old_partitions("events", retention_days=365)
# stats = automation.get_table_stats()
# for s in stats:
# print(f"{s['table_name']}: {s['size']} ({s['rows']} rows)")
ETL Pipeline และ Data Ingestion
สร้าง ETL pipeline สำหรับ ClickHouse
เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง เหรียญ luna วันนี้ — คู่มือฉบับสมบูรณ์ 2026
#!/usr/bin/env python3 # etl_pipeline.py — ClickHouse ETL Pipeline import clickhouse_connect import json import csv import gzip import logging from datetime import datetime from pathlib import Path from typing import List, Dict logging.basicConfig(level=logging.INFO) logger = logging.getLogger("etl") class ClickHouseETL: def __init__(self, client): self.client = client def ingest_csv(self, table, csv_file, batch_size=100000): logger.info(f"Ingesting {csv_file} into {table}") opener = gzip.open if str(csv_file).endswith(".gz") else open with opener(csv_file, "rt") as f: reader = csv.DictReader(f) batch = [] total = 0 columns = None for row in reader: if columns is None: columns = list(row.keys()) batch.append(list(row.values())) if len(batch) >= batch_size: self.client.insert(table, batch, column_names=columns) total += len(batch) batch = [] logger.info(f" Inserted {total} rows") if batch: self.client.insert(table, batch, column_names=columns) total += len(batch) logger.info(f"Total ingested: {total} rows") return total def ingest_json_lines(self, table, jsonl_file, batch_size=50000): logger.info(f"Ingesting {jsonl_file} into {table}") opener = gzip.open if str(jsonl_file).endswith(".gz") else open with opener(jsonl_file, "rt") as f: batch = [] columns = None total = 0 for line in f: record = json.loads(line.strip()) if columns is None: columns = list(record.keys()) batch.append([record.get(c) for c in columns]) if len(batch) >= batch_size: self.client.insert(table, batch, column_names=columns) total += len(batch) batch = [] if batch: self.client.insert(table, batch, column_names=columns) total += len(batch) logger.info(f"Total ingested: {total} rows") return total def transform_and_load(self, source_table, target_table, transform_sql): logger.info(f"Transform: {source_table} -> {target_table}") self.client.command(f""" INSERT INTO {target_table} {transform_sql} """) rows, _ = self.client.query(f"SELECT count() FROM {target_table}") logger.info(f"Target table rows: {rows.result_rows[0][0]}") def run_pipeline(self, steps): logger.info(f"Running pipeline with {len(steps)} steps") start = datetime.now() results = [] for i, step in enumerate(steps): step_start = datetime.now() logger.info(f"Step {i+1}/{len(steps)}: {step['name']}") try: if step["type"] == "ingest_csv": count = self.ingest_csv(step["table"], step["file"]) elif step["type"] == "ingest_jsonl": count = self.ingest_json_lines(step["table"], step["file"]) elif step["type"] == "transform": self.transform_and_load( step["source"], step["target"], step["sql"] ) count = 0 elif step["type"] == "query": self.client.command(step["sql"]) count = 0 results.append({ "step": step["name"], "status": "success", "duration_s": (datetime.now() - step_start).total_seconds(), }) except Exception as e: results.append({ "step": step["name"], "status": "error", "error": str(e), }) logger.error(f"Step failed: {e}") if step.get("required", True): break total_time = (datetime.now() - start).total_seconds() logger.info(f"Pipeline completed in {total_time:.1f}s") return results # client = clickhouse_connect.get_client(host="localhost", username="admin", password="ch_password") # etl = ClickHouseETL(client) # # pipeline = [ # {"name": "Ingest events", "type": "ingest_csv", "table": "analytics.events_raw", "file": "data/events.csv.gz"}, # {"name": "Transform", "type": "transform", "source": "events_raw", "target": "events", # "sql": "SELECT *, toDate(event_time) AS event_date FROM analytics.events_raw"}, # {"name": "Optimize", "type": "query", "sql": "OPTIMI




