ClickHouse Analytics Message Queue Design คืออะไร
ClickHouse เป็น open source columnar database ที่เร็วที่สุดสำหรับ real-time analytics queries ออกแบบมาเพื่อประมวลผลข้อมูลหลายพันล้าน rows ในเสี้ยววินาที Message Queue เช่น Apache Kafka, RabbitMQ ทำหน้าที่เป็น buffer ระหว่าง data producers กับ ClickHouse การออกแบบระบบที่รวม ClickHouse กับ Message Queue ช่วยสร้าง analytics pipeline ที่ real-time, scalable และ fault-tolerant สำหรับ use cases เช่น web analytics, log analysis, IoT telemetry และ business intelligence
ClickHouse Architecture
# clickhouse_arch.py — ClickHouse architecture overview
import json
class ClickHouseArch:
FEATURES = {
"columnar": {
"name": "Columnar Storage",
"description": "เก็บข้อมูลเป็น column แทน row — อ่านเฉพาะ columns ที่ต้องการ",
"benefit": "Query analytics เร็วมาก — scan เฉพาะ columns ที่ query ใช้",
},
"compression": {
"name": "Data Compression",
"description": "Compress ข้อมูลได้ 10-50x — เพราะ same column = same data type",
"benefit": "ใช้ storage น้อย, disk I/O ลด → query เร็วขึ้น",
},
"vectorized": {
"name": "Vectorized Query Execution",
"description": "ประมวลผลทีละ batch (vector) ไม่ใช่ทีละ row — ใช้ CPU SIMD",
"benefit": "CPU utilization สูง → throughput สูงมาก",
},
"mergetree": {
"name": "MergeTree Engine",
"description": "Default storage engine — sort data by primary key, merge in background",
"benefit": "Fast inserts + fast range queries + automatic data merging",
},
"distributed": {
"name": "Distributed Queries",
"description": "Query ข้าม multiple shards/replicas อัตโนมัติ",
"benefit": "Horizontal scaling — เพิ่ม nodes = เพิ่ม capacity",
},
}
PERFORMANCE = {
"insert": "~1M rows/sec per node (batch insert)",
"query": "Billions of rows in < 1 second (aggregation queries)",
"compression": "10-50x compression ratio",
"concurrent": "100+ concurrent queries",
}
def show_features(self):
print("=== ClickHouse Features ===\n")
for key, f in self.FEATURES.items():
print(f"[{f['name']}]")
print(f" {f['description']}")
print()
def show_performance(self):
print("=== Performance ===")
for metric, value in self.PERFORMANCE.items():
print(f" [{metric}] {value}")
arch = ClickHouseArch()
arch.show_features()
arch.show_performance()
Message Queue Integration
# mq_integration.py — ClickHouse + Kafka integration
import json
class KafkaIntegration:
KAFKA_ENGINE = """
-- ClickHouse Kafka Engine — consume directly from Kafka
CREATE TABLE events_queue (
event_id String,
user_id String,
event_type String,
properties String,
timestamp DateTime64(3)
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'events',
kafka_group_name = 'clickhouse-consumer',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4,
kafka_max_block_size = 65536;
-- Target MergeTree table
CREATE TABLE events (
event_id String,
user_id String,
event_type LowCardinality(String),
properties String,
timestamp DateTime64(3),
date Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_type, user_id, timestamp)
TTL date + INTERVAL 90 DAY;
-- Materialized View — auto-insert from Kafka to MergeTree
CREATE MATERIALIZED VIEW events_mv TO events AS
SELECT
event_id,
user_id,
event_type,
properties,
timestamp
FROM events_queue;
-- Pre-aggregated table for fast dashboard queries
CREATE MATERIALIZED VIEW events_hourly_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour)
AS SELECT
event_type,
toStartOfHour(timestamp) AS hour,
count() AS event_count,
uniqExact(user_id) AS unique_users
FROM events_queue
GROUP BY event_type, hour;
"""
ARCHITECTURE_FLOW = """
Data Flow:
[Producers] → [Kafka Topics] → [ClickHouse Kafka Engine] → [MergeTree Tables]
→ [Materialized Views (aggregations)]
Web App → topic: events → events table + hourly aggregation
Logs → topic: logs → logs table + error aggregation
IoT → topic: telemetry → metrics table + minute aggregation
"""
def show_kafka_engine(self):
print("=== Kafka Engine SQL ===")
print(self.KAFKA_ENGINE[:600])
def show_flow(self):
print(f"\n=== Architecture Flow ===")
print(self.ARCHITECTURE_FLOW)
kafka = KafkaIntegration()
kafka.show_kafka_engine()
kafka.show_flow()
Analytics Queries
# analytics.py — ClickHouse analytics queries
import json
class AnalyticsQueries:
QUERIES = {
"realtime_dashboard": {
"name": "Real-time Dashboard",
"sql": """
-- Events per minute (last hour)
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS events,
uniqExact(user_id) AS users
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
""",
},
"funnel": {
"name": "Conversion Funnel",
"sql": """
-- Funnel: page_view → add_to_cart → purchase
SELECT
level,
users,
round(users / first_value(users) OVER (ORDER BY level) * 100, 1) AS conversion_pct
FROM (
SELECT 1 AS level, uniqExact(user_id) AS users
FROM events WHERE event_type = 'page_view' AND date = today()
UNION ALL
SELECT 2, uniqExact(user_id)
FROM events WHERE event_type = 'add_to_cart' AND date = today()
UNION ALL
SELECT 3, uniqExact(user_id)
FROM events WHERE event_type = 'purchase' AND date = today()
);
""",
},
"retention": {
"name": "User Retention (Cohort)",
"sql": """
-- Weekly retention cohort
WITH cohorts AS (
SELECT
user_id,
toMonday(min(date)) AS cohort_week
FROM events
GROUP BY user_id
)
SELECT
c.cohort_week,
dateDiff('week', c.cohort_week, toMonday(e.date)) AS weeks_later,
uniqExact(e.user_id) AS users
FROM events e
JOIN cohorts c ON e.user_id = c.user_id
WHERE c.cohort_week >= today() - INTERVAL 8 WEEK
GROUP BY c.cohort_week, weeks_later
ORDER BY c.cohort_week, weeks_later;
""",
},
"top_events": {
"name": "Top Events (Last 24h)",
"sql": """
SELECT
event_type,
count() AS total,
uniqExact(user_id) AS unique_users,
round(total / sum(total) OVER () * 100, 1) AS pct
FROM events
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY event_type
ORDER BY total DESC
LIMIT 20;
""",
},
}
def show_queries(self):
print("=== Analytics Queries ===\n")
for key, q in self.QUERIES.items():
print(f"[{q['name']}]")
print(q["sql"][:200].strip())
print("...\n")
queries = AnalyticsQueries()
queries.show_queries()
Python Client & Monitoring
# python_client.py — ClickHouse Python client
import json
import random
class ClickHousePython:
CODE = """
# clickhouse_client.py — Python client for ClickHouse
from clickhouse_driver import Client
import pandas as pd
from datetime import datetime
class AnalyticsClient:
def __init__(self, host='localhost', port=9000):
self.client = Client(host=host, port=port)
def insert_events(self, events):
'''Batch insert events'''
self.client.execute(
'INSERT INTO events (event_id, user_id, event_type, properties, timestamp) VALUES',
events,
)
return len(events)
def query_dashboard(self, interval='1 HOUR'):
'''Get dashboard metrics'''
result = self.client.execute(f'''
SELECT
toStartOfMinute(timestamp) AS minute,
count() AS events,
uniqExact(user_id) AS users
FROM events
WHERE timestamp >= now() - INTERVAL {interval}
GROUP BY minute
ORDER BY minute
''')
return pd.DataFrame(result, columns=['minute', 'events', 'users'])
def query_funnel(self, steps, date='today()'):
'''Calculate conversion funnel'''
queries = []
for i, step in enumerate(steps):
queries.append(f"SELECT {i+1} AS level, uniqExact(user_id) AS users FROM events WHERE event_type = '{step}' AND date = {date}")
sql = ' UNION ALL '.join(queries)
return self.client.execute(sql)
def health_check(self):
'''Check ClickHouse health'''
result = self.client.execute('SELECT version(), uptime()')
parts = self.client.execute('''
SELECT database, table, sum(rows) AS rows,
formatReadableSize(sum(bytes_on_disk)) AS size
FROM system.parts WHERE active
GROUP BY database, table
ORDER BY sum(bytes_on_disk) DESC LIMIT 10
''')
return {"version": result[0][0], "uptime": result[0][1], "top_tables": parts}
client = AnalyticsClient()
# client.insert_events(batch)
# df = client.query_dashboard('24 HOUR')
"""
def show_code(self):
print("=== Python Client ===")
print(self.CODE[:600])
def monitoring(self):
print(f"\n=== ClickHouse Monitoring ===")
metrics = {
"Queries/sec": random.randint(50, 500),
"Insert rows/sec": f"{random.randint(100, 1000):,}K",
"Active parts": random.randint(50, 300),
"Memory usage": f"{random.uniform(2, 16):.1f} GB",
"Disk usage": f"{random.uniform(50, 500):.0f} GB",
"Kafka lag": f"{random.randint(0, 10000):,} messages",
"Merge rate": f"{random.randint(1, 20)} merges/min",
}
for m, v in metrics.items():
print(f" {m}: {v}")
py = ClickHousePython()
py.show_code()
py.monitoring()
Infrastructure & Scaling
# infra.py — ClickHouse infrastructure
import json
class Infrastructure:
DOCKER = """
# docker-compose.yml — ClickHouse + Kafka
version: '3.8'
services:
clickhouse:
image: clickhouse/clickhouse-server:24.1
ports:
- "8123:8123" # HTTP
- "9000:9000" # Native
volumes:
- ch-data:/var/lib/clickhouse
ulimits:
nofile:
soft: 262144
hard: 262144
kafka:
image: confluentinc/cp-kafka:7.5.0
environment:
KAFKA_NUM_PARTITIONS: 12
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
grafana:
image: grafana/grafana:latest
ports: ["3000:3000"]
environment:
GF_INSTALL_PLUGINS: grafana-clickhouse-datasource
volumes:
ch-data:
"""
SCALING = {
"vertical": "เพิ่ม CPU/RAM — ClickHouse ใช้ CPU เก่งมาก (vectorized execution)",
"sharding": "แบ่งข้อมูลตาม shard key → distribute across nodes",
"replication": "ReplicatedMergeTree — auto-replicate across nodes",
"tiered": "Hot (SSD) → Warm (HDD) → Cold (S3) storage tiering",
}
def show_docker(self):
print("=== Docker Compose ===")
print(self.DOCKER[:400])
def show_scaling(self):
print(f"\n=== Scaling Strategies ===")
for strategy, desc in self.SCALING.items():
print(f" [{strategy}] {desc}")
def sizing_guide(self):
print(f"\n=== Sizing Guide ===")
tiers = [
{"name": "Small", "data": "< 1TB", "queries": "< 100/sec", "spec": "8 CPU, 32GB RAM, SSD"},
{"name": "Medium", "data": "1-10TB", "queries": "100-500/sec", "spec": "32 CPU, 128GB RAM, NVMe SSD"},
{"name": "Large", "data": "10-100TB", "queries": "500+/sec", "spec": "Cluster: 3-10 nodes, 64+ CPU each"},
]
for t in tiers:
print(f" [{t['name']}] Data: {t['data']} | QPS: {t['queries']} → {t['spec']}")
infra = Infrastructure()
infra.show_docker()
infra.show_scaling()
infra.sizing_guide()
FAQ - คำถามที่พบบ่อย
Q: ClickHouse กับ PostgreSQL ต่างกัน?
A: ClickHouse: columnar OLAP — เร็วมากสำหรับ analytics (aggregation, scan billions of rows) PostgreSQL: row-based OLTP — เร็วสำหรับ transactional workloads (insert/update/select by PK) ใช้ ClickHouse: analytics, dashboards, log analysis, time-series ใช้ PostgreSQL: application data, CRUD, transactions ใช้ทั้งคู่: PostgreSQL สำหรับ app → Kafka → ClickHouse สำหรับ analytics
Q: Kafka Engine กับ INSERT จาก application อันไหนดี?
A: Kafka Engine: ดีกว่า — ClickHouse consume โดยตรง, exactly-once semantics, auto-resume INSERT: ง่ายกว่า — application insert ตรง แต่ต้อง handle retry, buffering เอง แนะนำ: ใช้ Kafka Engine สำหรับ production — reliable, decoupled, scalable
Q: MergeTree กับ ReplacingMergeTree ต่างกัน?
A: MergeTree: default — fast insert, allow duplicates ReplacingMergeTree: deduplicate by version column (eventually, during merges) CollapsingMergeTree: handle updates/deletes with sign column AggregatingMergeTree: pre-aggregate during merge เลือกตาม use case: append-only → MergeTree, need dedup → Replacing, need updates → Collapsing
Q: ClickHouse เหมาะกับงานอะไร?
A: เหมาะมาก: Web analytics (เหมือน Google Analytics), Log analysis, Real-time dashboards, Time-series, A/B testing ไม่เหมาะ: OLTP (frequent updates/deletes), Small datasets (< 1M rows), Key-value lookups, Full-text search (ใช้ Elasticsearch แทน)
