SiamCafe · Blog
ClickHouse Analytics Message Queue Design
บทความทั่วไป

ClickHouse Analytics Message Queue Design

เผยแพร่ May 28, 2026

ClickHouse Analytics Message Queue Design คืออะไร

ClickHouse Analytics Message Queue Design

ClickHouse เป็น open source columnar database ที่เร็วที่สุดสำหรับ real-time analytics queries ออกแบบมาเพื่อประมวลผลข้อมูลหลายพันล้าน rows ในเสี้ยววินาที Message Queue เช่น Apache Kafka, RabbitMQ ทำหน้าที่เป็น buffer ระหว่าง data producers กับ ClickHouse การออกแบบระบบที่รวม ClickHouse กับ Message Queue ช่วยสร้าง analytics pipeline ที่ real-time, scalable และ fault-tolerant สำหรับ use cases เช่น web analytics, log analysis, IoT telemetry และ business intelligence

ClickHouse Architecture

# clickhouse_arch.py — ClickHouse architecture overview
import json

class ClickHouseArch:
    FEATURES = {
        "columnar": {
            "name": "Columnar Storage",
            "description": "เก็บข้อมูลเป็น column แทน row — อ่านเฉพาะ columns ที่ต้องการ",
            "benefit": "Query analytics เร็วมาก — scan เฉพาะ columns ที่ query ใช้",
        },
        "compression": {
            "name": "Data Compression",
            "description": "Compress ข้อมูลได้ 10-50x — เพราะ same column = same data type",
            "benefit": "ใช้ storage น้อย, disk I/O ลด → query เร็วขึ้น",
        },
        "vectorized": {
            "name": "Vectorized Query Execution",
            "description": "ประมวลผลทีละ batch (vector) ไม่ใช่ทีละ row — ใช้ CPU SIMD",
            "benefit": "CPU utilization สูง → throughput สูงมาก",
        },
        "mergetree": {
            "name": "MergeTree Engine",
            "description": "Default storage engine — sort data by primary key, merge in background",
            "benefit": "Fast inserts + fast range queries + automatic data merging",
        },
        "distributed": {
            "name": "Distributed Queries",
            "description": "Query ข้าม multiple shards/replicas อัตโนมัติ",
            "benefit": "Horizontal scaling — เพิ่ม nodes = เพิ่ม capacity",
        },
    }

    PERFORMANCE = {
        "insert": "~1M rows/sec per node (batch insert)",
        "query": "Billions of rows in < 1 second (aggregation queries)",
        "compression": "10-50x compression ratio",
        "concurrent": "100+ concurrent queries",
    }

    def show_features(self):
        print("=== ClickHouse Features ===\n")
        for key, f in self.FEATURES.items():
            print(f"[{f['name']}]")
            print(f"  {f['description']}")
            print()

    def show_performance(self):
        print("=== Performance ===")
        for metric, value in self.PERFORMANCE.items():
            print(f"  [{metric}] {value}")

arch = ClickHouseArch()
arch.show_features()
arch.show_performance()

Message Queue Integration

# mq_integration.py — ClickHouse + Kafka integration
import json

class KafkaIntegration:
    KAFKA_ENGINE = """
-- ClickHouse Kafka Engine — consume directly from Kafka
CREATE TABLE events_queue (
    event_id String,
    user_id String,
    event_type String,
    properties String,
    timestamp DateTime64(3)
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse-consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4,
    kafka_max_block_size = 65536;

-- Target MergeTree table
CREATE TABLE events (
    event_id String,
    user_id String,
    event_type LowCardinality(String),
    properties String,
    timestamp DateTime64(3),
    date Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_type, user_id, timestamp)
TTL date + INTERVAL 90 DAY;

-- Materialized View — auto-insert from Kafka to MergeTree
CREATE MATERIALIZED VIEW events_mv TO events AS
SELECT
    event_id,
    user_id,
    event_type,
    properties,
    timestamp
FROM events_queue;

-- Pre-aggregated table for fast dashboard queries
CREATE MATERIALIZED VIEW events_hourly_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour)
AS SELECT
    event_type,
    toStartOfHour(timestamp) AS hour,
    count() AS event_count,
    uniqExact(user_id) AS unique_users
FROM events_queue
GROUP BY event_type, hour;
"""

    ARCHITECTURE_FLOW = """
    Data Flow:
    
    [Producers] → [Kafka Topics] → [ClickHouse Kafka Engine] → [MergeTree Tables]
                                                              → [Materialized Views (aggregations)]
    
    Web App → topic: events      → events table + hourly aggregation
    Logs    → topic: logs        → logs table + error aggregation  
    IoT     → topic: telemetry   → metrics table + minute aggregation
    """

    def show_kafka_engine(self):
        print("=== Kafka Engine SQL ===")
        print(self.KAFKA_ENGINE[:600])

    def show_flow(self):
        print(f"\n=== Architecture Flow ===")
        print(self.ARCHITECTURE_FLOW)

kafka = KafkaIntegration()
kafka.show_kafka_engine()
kafka.show_flow()

Analytics Queries

ClickHouse Analytics Message Queue Design
# analytics.py — ClickHouse analytics queries
import json

class AnalyticsQueries:
    QUERIES = {
        "realtime_dashboard": {
            "name": "Real-time Dashboard",
            "sql": """
-- Events per minute (last hour)
SELECT
    toStartOfMinute(timestamp) AS minute,
    count() AS events,
    uniqExact(user_id) AS users
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
""",
        },
        "funnel": {
            "name": "Conversion Funnel",
            "sql": """
-- Funnel: page_view → add_to_cart → purchase
SELECT
    level,
    users,
    round(users / first_value(users) OVER (ORDER BY level) * 100, 1) AS conversion_pct
FROM (
    SELECT 1 AS level, uniqExact(user_id) AS users
    FROM events WHERE event_type = 'page_view' AND date = today()
    UNION ALL
    SELECT 2, uniqExact(user_id)
    FROM events WHERE event_type = 'add_to_cart' AND date = today()
    UNION ALL
    SELECT 3, uniqExact(user_id)
    FROM events WHERE event_type = 'purchase' AND date = today()
);
""",
        },
        "retention": {
            "name": "User Retention (Cohort)",
            "sql": """
-- Weekly retention cohort
WITH cohorts AS (
    SELECT
        user_id,
        toMonday(min(date)) AS cohort_week
    FROM events
    GROUP BY user_id
)
SELECT
    c.cohort_week,
    dateDiff('week', c.cohort_week, toMonday(e.date)) AS weeks_later,
    uniqExact(e.user_id) AS users
FROM events e
JOIN cohorts c ON e.user_id = c.user_id
WHERE c.cohort_week >= today() - INTERVAL 8 WEEK
GROUP BY c.cohort_week, weeks_later
ORDER BY c.cohort_week, weeks_later;
""",
        },
        "top_events": {
            "name": "Top Events (Last 24h)",
            "sql": """
SELECT
    event_type,
    count() AS total,
    uniqExact(user_id) AS unique_users,
    round(total / sum(total) OVER () * 100, 1) AS pct
FROM events
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY event_type
ORDER BY total DESC
LIMIT 20;
""",
        },
    }

    def show_queries(self):
        print("=== Analytics Queries ===\n")
        for key, q in self.QUERIES.items():
            print(f"[{q['name']}]")
            print(q["sql"][:200].strip())
            print("...\n")

queries = AnalyticsQueries()
queries.show_queries()

Python Client & Monitoring

# python_client.py — ClickHouse Python client
import json
import random

class ClickHousePython:
    CODE = """
# clickhouse_client.py — Python client for ClickHouse
from clickhouse_driver import Client
import pandas as pd
from datetime import datetime

class AnalyticsClient:
    def __init__(self, host='localhost', port=9000):
        self.client = Client(host=host, port=port)
    
    def insert_events(self, events):
        '''Batch insert events'''
        self.client.execute(
            'INSERT INTO events (event_id, user_id, event_type, properties, timestamp) VALUES',
            events,
        )
        return len(events)
    
    def query_dashboard(self, interval='1 HOUR'):
        '''Get dashboard metrics'''
        result = self.client.execute(f'''
            SELECT
                toStartOfMinute(timestamp) AS minute,
                count() AS events,
                uniqExact(user_id) AS users
            FROM events
            WHERE timestamp >= now() - INTERVAL {interval}
            GROUP BY minute
            ORDER BY minute
        ''')
        return pd.DataFrame(result, columns=['minute', 'events', 'users'])
    
    def query_funnel(self, steps, date='today()'):
        '''Calculate conversion funnel'''
        queries = []
        for i, step in enumerate(steps):
            queries.append(f"SELECT {i+1} AS level, uniqExact(user_id) AS users FROM events WHERE event_type = '{step}' AND date = {date}")
        
        sql = ' UNION ALL '.join(queries)
        return self.client.execute(sql)
    
    def health_check(self):
        '''Check ClickHouse health'''
        result = self.client.execute('SELECT version(), uptime()')
        parts = self.client.execute('''
            SELECT database, table, sum(rows) AS rows, 
                   formatReadableSize(sum(bytes_on_disk)) AS size
            FROM system.parts WHERE active
            GROUP BY database, table
            ORDER BY sum(bytes_on_disk) DESC LIMIT 10
        ''')
        return {"version": result[0][0], "uptime": result[0][1], "top_tables": parts}

client = AnalyticsClient()
# client.insert_events(batch)
# df = client.query_dashboard('24 HOUR')
"""

    def show_code(self):
        print("=== Python Client ===")
        print(self.CODE[:600])

    def monitoring(self):
        print(f"\n=== ClickHouse Monitoring ===")
        metrics = {
            "Queries/sec": random.randint(50, 500),
            "Insert rows/sec": f"{random.randint(100, 1000):,}K",
            "Active parts": random.randint(50, 300),
            "Memory usage": f"{random.uniform(2, 16):.1f} GB",
            "Disk usage": f"{random.uniform(50, 500):.0f} GB",
            "Kafka lag": f"{random.randint(0, 10000):,} messages",
            "Merge rate": f"{random.randint(1, 20)} merges/min",
        }
        for m, v in metrics.items():
            print(f"  {m}: {v}")

py = ClickHousePython()
py.show_code()
py.monitoring()

Infrastructure & Scaling

# infra.py — ClickHouse infrastructure
import json

class Infrastructure:
    DOCKER = """
# docker-compose.yml — ClickHouse + Kafka
version: '3.8'
services:
  clickhouse:
    image: clickhouse/clickhouse-server:24.1
    ports:
      - "8123:8123"  # HTTP
      - "9000:9000"  # Native
    volumes:
      - ch-data:/var/lib/clickhouse
    ulimits:
      nofile:
        soft: 262144
        hard: 262144

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    environment:
      KAFKA_NUM_PARTITIONS: 12
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1

  grafana:
    image: grafana/grafana:latest
    ports: ["3000:3000"]
    environment:
      GF_INSTALL_PLUGINS: grafana-clickhouse-datasource

volumes:
  ch-data:
"""

    SCALING = {
        "vertical": "เพิ่ม CPU/RAM — ClickHouse ใช้ CPU เก่งมาก (vectorized execution)",
        "sharding": "แบ่งข้อมูลตาม shard key → distribute across nodes",
        "replication": "ReplicatedMergeTree — auto-replicate across nodes",
        "tiered": "Hot (SSD) → Warm (HDD) → Cold (S3) storage tiering",
    }

    def show_docker(self):
        print("=== Docker Compose ===")
        print(self.DOCKER[:400])

    def show_scaling(self):
        print(f"\n=== Scaling Strategies ===")
        for strategy, desc in self.SCALING.items():
            print(f"  [{strategy}] {desc}")

    def sizing_guide(self):
        print(f"\n=== Sizing Guide ===")
        tiers = [
            {"name": "Small", "data": "< 1TB", "queries": "< 100/sec", "spec": "8 CPU, 32GB RAM, SSD"},
            {"name": "Medium", "data": "1-10TB", "queries": "100-500/sec", "spec": "32 CPU, 128GB RAM, NVMe SSD"},
            {"name": "Large", "data": "10-100TB", "queries": "500+/sec", "spec": "Cluster: 3-10 nodes, 64+ CPU each"},
        ]
        for t in tiers:
            print(f"  [{t['name']}] Data: {t['data']} | QPS: {t['queries']} → {t['spec']}")

infra = Infrastructure()
infra.show_docker()
infra.show_scaling()
infra.sizing_guide()

FAQ - คำถามที่พบบ่อย

Q: ClickHouse กับ PostgreSQL ต่างกัน?

A: ClickHouse: columnar OLAP — เร็วมากสำหรับ analytics (aggregation, scan billions of rows) PostgreSQL: row-based OLTP — เร็วสำหรับ transactional workloads (insert/update/select by PK) ใช้ ClickHouse: analytics, dashboards, log analysis, time-series ใช้ PostgreSQL: application data, CRUD, transactions ใช้ทั้งคู่: PostgreSQL สำหรับ app → Kafka → ClickHouse สำหรับ analytics

Q: Kafka Engine กับ INSERT จาก application อันไหนดี?

A: Kafka Engine: ดีกว่า — ClickHouse consume โดยตรง, exactly-once semantics, auto-resume INSERT: ง่ายกว่า — application insert ตรง แต่ต้อง handle retry, buffering เอง แนะนำ: ใช้ Kafka Engine สำหรับ production — reliable, decoupled, scalable

Q: MergeTree กับ ReplacingMergeTree ต่างกัน?

A: MergeTree: default — fast insert, allow duplicates ReplacingMergeTree: deduplicate by version column (eventually, during merges) CollapsingMergeTree: handle updates/deletes with sign column AggregatingMergeTree: pre-aggregate during merge เลือกตาม use case: append-only → MergeTree, need dedup → Replacing, need updates → Collapsing

Q: ClickHouse เหมาะกับงานอะไร?

A: เหมาะมาก: Web analytics (เหมือน Google Analytics), Log analysis, Real-time dashboards, Time-series, A/B testing ไม่เหมาะ: OLTP (frequent updates/deletes), Small datasets (< 1M rows), Key-value lookups, Full-text search (ใช้ Elasticsearch แทน)