SiamCafe.net Blog
Technology
PagerDuty Incident Post-mortem Analysis | SiamCafe Blog
2026-04-04· อ. บอม — SiamCafe.net· 9,803 คำ

ClickHouse คืออะไรและทำไมถึงเร็ว

ClickHouse เป็น open source columnar OLAP database ที่พัฒนาโดย Yandex ออกแบบมาสำหรับ real-time analytical queries บน datasets ขนาดใหญ่ (petabyte-scale) ให้ query speed ที่เร็วกว่า traditional databases หลายร้อยเท่า

เหตุผลที่ ClickHouse เร็วคือ columnar storage อ่านเฉพาะ columns ที่ต้องการ, data compression ที่ลดขนาดข้อมูล 5-10 เท่า, vectorized query execution ที่ process data เป็น batches, parallel processing ใช้ CPU cores ทั้งหมด, primary key index ที่ skip data ไม่ต้องการ และ specialized data structures สำหรับแต่ละ data type

ClickHouse เหมาะสำหรับ log analytics, clickstream analysis, real-time dashboards, time series data, ad-tech analytics, financial data analysis และ IoT data processing ไม่เหมาะสำหรับ transactional workloads (OLTP) ที่ต้อง UPDATE/DELETE บ่อย

เมื่อเทียบกับ alternatives ClickHouse เร็วกว่า PostgreSQL สำหรับ analytical queries 100-1000 เท่า เร็วกว่า Elasticsearch สำหรับ structured data analytics 5-10 เท่า และมี operational cost ต่ำกว่า cloud data warehouses เช่น BigQuery หรือ Snowflake

ติดตั้ง ClickHouse และตั้งค่าเบื้องต้น

วิธีติดตั้งและ configure ClickHouse

# === ติดตั้ง ClickHouse ===

# Ubuntu/Debian
sudo apt-get install -y apt-transport-https ca-certificates curl gnupg
curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | \
 sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] \
 https://packages.clickhouse.com/deb stable main" | \
 sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client

# Start service
sudo systemctl start clickhouse-server
sudo systemctl enable clickhouse-server

# Docker
docker run -d --name clickhouse \
 -p 8123:8123 -p 9000:9000 \
 -v clickhouse-data:/var/lib/clickhouse \
 -v clickhouse-logs:/var/log/clickhouse-server \
 -e CLICKHOUSE_DB=analytics \
 -e CLICKHOUSE_USER=admin \
 -e CLICKHOUSE_PASSWORD=ch_password \
 clickhouse/clickhouse-server:latest

# เชื่อมต่อ
clickhouse-client --user admin --password ch_password

# === สร้าง Database และ Tables ===
CREATE DATABASE IF NOT EXISTS analytics;

-- Web Analytics Events
CREATE TABLE analytics.events (
 event_date Date,
 event_time DateTime64(3),
 user_id UInt64,
 session_id String,
 event_type LowCardinality(String),
 page_url String,
 referrer String,
 country LowCardinality(String),
 device LowCardinality(String),
 browser LowCardinality(String),
 duration_ms UInt32,
 properties Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id, event_time)
TTL event_date + INTERVAL 1 YEAR
SETTINGS index_granularity = 8192;

-- Server Metrics
CREATE TABLE analytics.server_metrics (
 timestamp DateTime64(3),
 host LowCardinality(String),
 metric LowCardinality(String),
 value Float64,
 tags Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (host, metric, timestamp)
TTL timestamp + INTERVAL 90 DAY;

-- Insert sample data
INSERT INTO analytics.events
SELECT
 today() - (rand() % 365) AS event_date,
 toDateTime64(event_date, 3) + rand() % 86400 AS event_time,
 rand() % 100000 AS user_id,
 generateUUIDv4() AS session_id,
 arrayElement(['pageview','click','scroll','purchase'], 1 + rand() % 4) AS event_type,
 concat('/page/', toString(rand() % 50)) AS page_url,
 arrayElement(['google.com','facebook.com','direct','twitter.com'], 1 + rand() % 4) AS referrer,
 arrayElement(['TH','US','JP','SG','DE'], 1 + rand() % 5) AS country,
 arrayElement(['mobile','desktop','tablet'], 1 + rand() % 3) AS device,
 arrayElement(['Chrome','Firefox','Safari','Edge'], 1 + rand() % 4) AS browser,
 rand() % 30000 AS duration_ms,
 map('version', '1.0') AS properties
FROM numbers(10000000);

SELECT count() FROM analytics.events;
SELECT formatReadableSize(total_bytes) FROM system.tables WHERE name = 'events';

สร้าง Analytics Queries และ Materialized Views

Analytical queries และ automated aggregation

# === ClickHouse Analytics Queries ===

-- Daily active users
SELECT
 event_date,
 uniqExact(user_id) AS dau,
 count() AS total_events,
 countIf(event_type = 'purchase') AS purchases,
 round(countIf(event_type = 'purchase') / uniqExact(user_id) * 100, 2) AS conversion_rate
FROM analytics.events
WHERE event_date >= today() - 30
GROUP BY event_date
ORDER BY event_date;

-- Top pages with engagement metrics
SELECT
 page_url,
 count() AS views,
 uniqExact(user_id) AS unique_visitors,
 avg(duration_ms) AS avg_duration_ms,
 quantile(0.50)(duration_ms) AS p50_duration,
 quantile(0.95)(duration_ms) AS p95_duration
FROM analytics.events
WHERE event_type = 'pageview'
 AND event_date >= today() - 7
GROUP BY page_url
ORDER BY views DESC
LIMIT 20;

-- Funnel analysis
SELECT
 level,
 users,
 round(users / first_value(users) OVER (ORDER BY level) * 100, 1) AS pct_of_start
FROM (
 SELECT 1 AS level, uniqExact(user_id) AS users
 FROM analytics.events WHERE event_type = 'pageview' AND event_date = today()
 UNION ALL
 SELECT 2, uniqExact(user_id)
 FROM analytics.events WHERE event_type = 'click' AND event_date = today()
 UNION ALL
 SELECT 3, uniqExact(user_id)
 FROM analytics.events WHERE event_type = 'purchase' AND event_date = today()
)
ORDER BY level;

-- === Materialized Views (auto-aggregation) ===

-- Daily stats materialized view
CREATE MATERIALIZED VIEW analytics.daily_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, country, device)
AS SELECT
 event_date,
 country,
 device,
 count() AS event_count,
 uniqState(user_id) AS unique_users_state,
 sumIf(1, event_type = 'purchase') AS purchase_count,
 avgState(duration_ms) AS avg_duration_state
FROM analytics.events
GROUP BY event_date, country, device;

-- Query materialized view
SELECT
 event_date,
 country,
 sum(event_count) AS events,
 uniqMerge(unique_users_state) AS unique_users,
 sum(purchase_count) AS purchases,
 avgMerge(avg_duration_state) AS avg_duration
FROM analytics.daily_stats_mv
WHERE event_date >= today() - 7
GROUP BY event_date, country
ORDER BY event_date, events DESC;

-- Hourly metrics materialized view
CREATE MATERIALIZED VIEW analytics.hourly_metrics_mv
ENGINE = SummingMergeTree()
ORDER BY (hour, host, metric)
AS SELECT
 toStartOfHour(timestamp) AS hour,
 host,
 metric,
 avg(value) AS avg_value,
 max(value) AS max_value,
 min(value) AS min_value,
 count() AS sample_count
FROM analytics.server_metrics
GROUP BY hour, host, metric;

Automation Scripts ด้วย Python

Python scripts สำหรับ automate ClickHouse tasks

#!/usr/bin/env python3
# clickhouse_automation.py — ClickHouse Analytics Automation
import clickhouse_connect
import json
import logging
import csv
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ch_automation")

class ClickHouseAutomation:
 def __init__(self, host="localhost", port=8123,
 user="admin", password="ch_password",
 database="analytics"):
 self.client = clickhouse_connect.get_client(
 host=host, port=port,
 username=user, password=password,
 database=database,
 )
 logger.info(f"Connected to ClickHouse {host}:{port}/{database}")
 
 def run_query(self, sql, params=None):
 result = self.client.query(sql, parameters=params)
 return result.result_rows, result.column_names
 
 def run_command(self, sql):
 self.client.command(sql)
 
 def daily_report(self, date=None):
 if date is None:
 date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
 
 queries = {
 "overview": f"""
 SELECT
 count() AS total_events,
 uniqExact(user_id) AS unique_users,
 countIf(event_type = 'purchase') AS purchases,
 round(avg(duration_ms), 0) AS avg_duration_ms
 FROM events WHERE event_date = '{date}'
 """,
 "top_pages": f"""
 SELECT page_url, count() AS views, uniqExact(user_id) AS users
 FROM events
 WHERE event_date = '{date}' AND event_type = 'pageview'
 GROUP BY page_url ORDER BY views DESC LIMIT 10
 """,
 "country_breakdown": f"""
 SELECT country, uniqExact(user_id) AS users, count() AS events
 FROM events WHERE event_date = '{date}'
 GROUP BY country ORDER BY users DESC
 """,
 }
 
 report = {"date": date, "generated_at": datetime.now().isoformat()}
 
 for name, sql in queries.items():
 rows, cols = self.run_query(sql)
 report[name] = [dict(zip(cols, row)) for row in rows]
 
 return report
 
 def optimize_tables(self, database="analytics"):
 rows, _ = self.run_query(
 "SELECT name FROM system.tables WHERE database = %(db)s",
 params={"db": database},
 )
 
 for (table_name,) in rows:
 logger.info(f"Optimizing {database}.{table_name}")
 self.run_command(f"OPTIMIZE TABLE {database}.{table_name} FINAL")
 
 def cleanup_old_partitions(self, table, retention_days=365):
 cutoff = (datetime.now() - timedelta(days=retention_days)).strftime("%Y%m%d")
 
 rows, _ = self.run_query(f"""
 SELECT partition, sum(rows) AS total_rows,
 formatReadableSize(sum(bytes_on_disk)) AS size
 FROM system.parts
 WHERE table = '{table}' AND partition < '{cutoff}'
 GROUP BY partition ORDER BY partition
 """)
 
 for partition, total_rows, size in rows:
 logger.info(f"Dropping partition {partition} ({total_rows} rows, {size})")
 self.run_command(f"ALTER TABLE {table} DROP PARTITION '{partition}'")
 
 def export_csv(self, sql, output_file):
 rows, cols = self.run_query(sql)
 
 with open(output_file, "w", newline="") as f:
 writer = csv.writer(f)
 writer.writerow(cols)
 writer.writerows(rows)
 
 logger.info(f"Exported {len(rows)} rows to {output_file}")
 
 def get_table_stats(self):
 rows, cols = self.run_query("""
 SELECT
 database, name AS table_name,
 formatReadableSize(total_bytes) AS size,
 formatReadableQuantity(total_rows) AS rows,
 partition_count,
 engine
 FROM (
 SELECT database, name, total_bytes, total_rows, engine,
 count() AS partition_count
 FROM system.tables t
 LEFT JOIN system.parts p ON t.name = p.table AND t.database = p.database
 WHERE t.database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')
 GROUP BY database, name, total_bytes, total_rows, engine
 )
 ORDER BY total_bytes DESC
 """)
 
 return [dict(zip(cols, row)) for row in rows]
 
 def schedule_backup(self, backup_name=None):
 if not backup_name:
 backup_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
 
 self.run_command(f"BACKUP DATABASE analytics TO Disk('backups', '{backup_name}')")
 logger.info(f"Backup created: {backup_name}")

# automation = ClickHouseAutomation()
# report = automation.daily_report()
# print(json.dumps(report, indent=2, default=str))
# automation.cleanup_old_partitions("events", retention_days=365)
# stats = automation.get_table_stats()
# for s in stats:
# print(f"{s['table_name']}: {s['size']} ({s['rows']} rows)")

ETL Pipeline และ Data Ingestion

สร้าง ETL pipeline สำหรับ ClickHouse

#!/usr/bin/env python3
# etl_pipeline.py — ClickHouse ETL Pipeline
import clickhouse_connect
import json
import csv
import gzip
import logging
from datetime import datetime
from pathlib import Path
from typing import List, Dict

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("etl")

class ClickHouseETL:
 def __init__(self, client):
 self.client = client
 
 def ingest_csv(self, table, csv_file, batch_size=100000):
 logger.info(f"Ingesting {csv_file} into {table}")
 
 opener = gzip.open if str(csv_file).endswith(".gz") else open
 
 with opener(csv_file, "rt") as f:
 reader = csv.DictReader(f)
 batch = []
 total = 0
 columns = None
 
 for row in reader:
 if columns is None:
 columns = list(row.keys())
 
 batch.append(list(row.values()))
 
 if len(batch) >= batch_size:
 self.client.insert(table, batch, column_names=columns)
 total += len(batch)
 batch = []
 logger.info(f" Inserted {total} rows")
 
 if batch:
 self.client.insert(table, batch, column_names=columns)
 total += len(batch)
 
 logger.info(f"Total ingested: {total} rows")
 return total
 
 def ingest_json_lines(self, table, jsonl_file, batch_size=50000):
 logger.info(f"Ingesting {jsonl_file} into {table}")
 
 opener = gzip.open if str(jsonl_file).endswith(".gz") else open
 
 with opener(jsonl_file, "rt") as f:
 batch = []
 columns = None
 total = 0
 
 for line in f:
 record = json.loads(line.strip())
 
 if columns is None:
 columns = list(record.keys())
 
 batch.append([record.get(c) for c in columns])
 
 if len(batch) >= batch_size:
 self.client.insert(table, batch, column_names=columns)
 total += len(batch)
 batch = []
 
 if batch:
 self.client.insert(table, batch, column_names=columns)
 total += len(batch)
 
 logger.info(f"Total ingested: {total} rows")
 return total
 
 def transform_and_load(self, source_table, target_table, transform_sql):
 logger.info(f"Transform: {source_table} -> {target_table}")
 
 self.client.command(f"""
 INSERT INTO {target_table}
 {transform_sql}
 """)
 
 rows, _ = self.client.query(f"SELECT count() FROM {target_table}")
 logger.info(f"Target table rows: {rows.result_rows[0][0]}")
 
 def run_pipeline(self, steps):
 logger.info(f"Running pipeline with {len(steps)} steps")
 start = datetime.now()
 
 results = []
 for i, step in enumerate(steps):
 step_start = datetime.now()
 logger.info(f"Step {i+1}/{len(steps)}: {step['name']}")
 
 try:
 if step["type"] == "ingest_csv":
 count = self.ingest_csv(step["table"], step["file"])
 elif step["type"] == "ingest_jsonl":
 count = self.ingest_json_lines(step["table"], step["file"])
 elif step["type"] == "transform":
 self.transform_and_load(
 step["source"], step["target"], step["sql"]
 )
 count = 0
 elif step["type"] == "query":
 self.client.command(step["sql"])
 count = 0
 
 results.append({
 "step": step["name"],
 "status": "success",
 "duration_s": (datetime.now() - step_start).total_seconds(),
 })
 except Exception as e:
 results.append({
 "step": step["name"],
 "status": "error",
 "error": str(e),
 })
 logger.error(f"Step failed: {e}")
 if step.get("required", True):
 break
 
 total_time = (datetime.now() - start).total_seconds()
 logger.info(f"Pipeline completed in {total_time:.1f}s")
 return results

# client = clickhouse_connect.get_client(host="localhost", username="admin", password="ch_password")
# etl = ClickHouseETL(client)
# 
# pipeline = [
# {"name": "Ingest events", "type": "ingest_csv", "table": "analytics.events_raw", "file": "data/events.csv.gz"},
# {"name": "Transform", "type": "transform", "source": "events_raw", "target": "events",
# "sql": "SELECT *, toDate(event_time) AS event_date FROM analytics.events_raw"},
# {"name": "Optimize", "type": "query", "sql": "OPTIMI

📖 บทความที่เกี่ยวข้อง

Strapi CMS Post-mortem Analysisอ่านบทความ → PagerDuty Incident Chaos Engineeringอ่านบทความ → Whisper Speech Post-mortem Analysisอ่านบทความ → React Query TanStack Post-mortem Analysisอ่านบทความ → PagerDuty Incident API Gateway Patternอ่านบทความ →

📚 ดูบทความทั้งหมด →