Technology

ClickHouse Analytics Message Queue Design

clickhouse analytics message queue design
ClickHouse Analytics Message Queue Design | SiamCafe Blog
2025-08-22· อ. บอม — SiamCafe.net· 1,460 คำ

ClickHouse Analytics Message Queue Design คืออะไร

ClickHouse เป็น open source columnar database ที่เร็วที่สุดสำหรับ real-time analytics queries ออกแบบมาเพื่อประมวลผลข้อมูลหลายพันล้าน rows ในเสี้ยววินาที Message Queue เช่น Apache Kafka, RabbitMQ ทำหน้าที่เป็น buffer ระหว่าง data producers กับ ClickHouse การออกแบบระบบที่รวม ClickHouse กับ Message Queue ช่วยสร้าง analytics pipeline ที่ real-time, scalable และ fault-tolerant สำหรับ use cases เช่น web analytics, log analysis, IoT telemetry และ business intelligence

ClickHouse Architecture

# clickhouse_arch.py — ClickHouse architecture overview
import json

class ClickHouseArch:
    FEATURES = {
        "columnar": {
            "name": "Columnar Storage",
            "description": "เก็บข้อมูลเป็น column แทน row — อ่านเฉพาะ columns ที่ต้องการ",
            "benefit": "Query analytics เร็วมาก — scan เฉพาะ columns ที่ query ใช้",
        },
        "compression": {
            "name": "Data Compression",
            "description": "Compress ข้อมูลได้ 10-50x — เพราะ same column = same data type",
            "benefit": "ใช้ storage น้อย, disk I/O ลด → query เร็วขึ้น",
        },
        "vectorized": {
            "name": "Vectorized Query Execution",
            "description": "ประมวลผลทีละ batch (vector) ไม่ใช่ทีละ row — ใช้ CPU SIMD",
            "benefit": "CPU utilization สูง → throughput สูงมาก",
        },
        "mergetree": {
            "name": "MergeTree Engine",
            "description": "Default storage engine — sort data by primary key, merge in background",
            "benefit": "Fast inserts + fast range queries + automatic data merging",
        },
        "distributed": {
            "name": "Distributed Queries",
            "description": "Query ข้าม multiple shards/replicas อัตโนมัติ",
            "benefit": "Horizontal scaling — เพิ่ม nodes = เพิ่ม capacity",
        },
    }

    PERFORMANCE = {
        "insert": "~1M rows/sec per node (batch insert)",
        "query": "Billions of rows in < 1 second (aggregation queries)",
        "compression": "10-50x compression ratio",
        "concurrent": "100+ concurrent queries",
    }

    def show_features(self):
        print("=== ClickHouse Features ===\n")
        for key, f in self.FEATURES.items():
            print(f"[{f['name']}]")
            print(f"  {f['description']}")
            print()

    def show_performance(self):
        print("=== Performance ===")
        for metric, value in self.PERFORMANCE.items():
            print(f"  [{metric}] {value}")

arch = ClickHouseArch()
arch.show_features()
arch.show_performance()

Message Queue Integration

# mq_integration.py — ClickHouse + Kafka integration
import json

class KafkaIntegration:
    KAFKA_ENGINE = """
-- ClickHouse Kafka Engine — consume directly from Kafka
CREATE TABLE events_queue (
    event_id String,
    user_id String,
    event_type String,
    properties String,
    timestamp DateTime64(3)
) ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka:9092',
    kafka_topic_list = 'events',
    kafka_group_name = 'clickhouse-consumer',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4,
    kafka_max_block_size = 65536;

-- Target MergeTree table
CREATE TABLE events (
    event_id String,
    user_id String,
    event_type LowCardinality(String),
    properties String,
    timestamp DateTime64(3),
    date Date DEFAULT toDate(timestamp)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (event_type, user_id, timestamp)
TTL date + INTERVAL 90 DAY;

-- Materialized View — auto-insert from Kafka to MergeTree
CREATE MATERIALIZED VIEW events_mv TO events AS
SELECT
    event_id,
    user_id,
    event_type,
    properties,
    timestamp
FROM events_queue;

-- Pre-aggregated table for fast dashboard queries
CREATE MATERIALIZED VIEW events_hourly_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(hour)
ORDER BY (event_type, hour)
AS SELECT
    event_type,
    toStartOfHour(timestamp) AS hour,
    count() AS event_count,
    uniqExact(user_id) AS unique_users
FROM events_queue
GROUP BY event_type, hour;
"""

    ARCHITECTURE_FLOW = """
    Data Flow:
    
    [Producers] → [Kafka Topics] → [ClickHouse Kafka Engine] → [MergeTree Tables]
                                                              → [Materialized Views (aggregations)]
    
    Web App → topic: events      → events table + hourly aggregation
    Logs    → topic: logs        → logs table + error aggregation  
    IoT     → topic: telemetry   → metrics table + minute aggregation
    """

    def show_kafka_engine(self):
        print("=== Kafka Engine SQL ===")
        print(self.KAFKA_ENGINE[:600])

    def show_flow(self):
        print(f"\n=== Architecture Flow ===")
        print(self.ARCHITECTURE_FLOW)

kafka = KafkaIntegration()
kafka.show_kafka_engine()
kafka.show_flow()

Analytics Queries

# analytics.py — ClickHouse analytics queries
import json

class AnalyticsQueries:
    QUERIES = {
        "realtime_dashboard": {
            "name": "Real-time Dashboard",
            "sql": """
-- Events per minute (last hour)
SELECT
    toStartOfMinute(timestamp) AS minute,
    count() AS events,
    uniqExact(user_id) AS users
FROM events
WHERE timestamp >= now() - INTERVAL 1 HOUR
GROUP BY minute
ORDER BY minute;
""",
        },
        "funnel": {
            "name": "Conversion Funnel",
            "sql": """
-- Funnel: page_view → add_to_cart → purchase
SELECT
    level,
    users,
    round(users / first_value(users) OVER (ORDER BY level) * 100, 1) AS conversion_pct
FROM (
    SELECT 1 AS level, uniqExact(user_id) AS users
    FROM events WHERE event_type = 'page_view' AND date = today()
    UNION ALL
    SELECT 2, uniqExact(user_id)
    FROM events WHERE event_type = 'add_to_cart' AND date = today()
    UNION ALL
    SELECT 3, uniqExact(user_id)
    FROM events WHERE event_type = 'purchase' AND date = today()
);
""",
        },
        "retention": {
            "name": "User Retention (Cohort)",
            "sql": """
-- Weekly retention cohort
WITH cohorts AS (
    SELECT
        user_id,
        toMonday(min(date)) AS cohort_week
    FROM events
    GROUP BY user_id
)
SELECT
    c.cohort_week,
    dateDiff('week', c.cohort_week, toMonday(e.date)) AS weeks_later,
    uniqExact(e.user_id) AS users
FROM events e
JOIN cohorts c ON e.user_id = c.user_id
WHERE c.cohort_week >= today() - INTERVAL 8 WEEK
GROUP BY c.cohort_week, weeks_later
ORDER BY c.cohort_week, weeks_later;
""",
        },
        "top_events": {
            "name": "Top Events (Last 24h)",
            "sql": """
SELECT
    event_type,
    count() AS total,
    uniqExact(user_id) AS unique_users,
    round(total / sum(total) OVER () * 100, 1) AS pct
FROM events
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY event_type
ORDER BY total DESC
LIMIT 20;
""",
        },
    }

    def show_queries(self):
        print("=== Analytics Queries ===\n")
        for key, q in self.QUERIES.items():
            print(f"[{q['name']}]")
            print(q["sql"][:200].strip())
            print("...\n")

queries = AnalyticsQueries()
queries.show_queries()

Python Client & Monitoring

# python_client.py — ClickHouse Python client
import json
import random

class ClickHousePython:
    CODE = """
# clickhouse_client.py — Python client for ClickHouse
from clickhouse_driver import Client
import pandas as pd
from datetime import datetime

class AnalyticsClient:
    def __init__(self, host='localhost', port=9000):
        self.client = Client(host=host, port=port)
    
    def insert_events(self, events):
        '''Batch insert events'''
        self.client.execute(
            'INSERT INTO events (event_id, user_id, event_type, properties, timestamp) VALUES',
            events,
        )
        return len(events)
    
    def query_dashboard(self, interval='1 HOUR'):
        '''Get dashboard metrics'''
        result = self.client.execute(f'''
            SELECT
                toStartOfMinute(timestamp) AS minute,
                count() AS events,
                uniqExact(user_id) AS users
            FROM events
            WHERE timestamp >= now() - INTERVAL {interval}
            GROUP BY minute
            ORDER BY minute
        ''')
        return pd.DataFrame(result, columns=['minute', 'events', 'users'])
    
    def query_funnel(self, steps, date='today()'):
        '''Calculate conversion funnel'''
        queries = []
        for i, step in enumerate(steps):
            queries.append(f"SELECT {i+1} AS level, uniqExact(user_id) AS users FROM events WHERE event_type = '{step}' AND date = {date}")
        
        sql = ' UNION ALL '.join(queries)
        return self.client.execute(sql)
    
    def health_check(self):
        '''Check ClickHouse health'''
        result = self.client.execute('SELECT version(), uptime()')
        parts = self.client.execute('''
            SELECT database, table, sum(rows) AS rows, 
                   formatReadableSize(sum(bytes_on_disk)) AS size
            FROM system.parts WHERE active
            GROUP BY database, table
            ORDER BY sum(bytes_on_disk) DESC LIMIT 10
        ''')
        return {"version": result[0][0], "uptime": result[0][1], "top_tables": parts}

client = AnalyticsClient()
# client.insert_events(batch)
# df = client.query_dashboard('24 HOUR')
"""

    def show_code(self):
        print("=== Python Client ===")
        print(self.CODE[:600])

    def monitoring(self):
        print(f"\n=== ClickHouse Monitoring ===")
        metrics = {
            "Queries/sec": random.randint(50, 500),
            "Insert rows/sec": f"{random.randint(100, 1000):,}K",
            "Active parts": random.randint(50, 300),
            "Memory usage": f"{random.uniform(2, 16):.1f} GB",
            "Disk usage": f"{random.uniform(50, 500):.0f} GB",
            "Kafka lag": f"{random.randint(0, 10000):,} messages",
            "Merge rate": f"{random.randint(1, 20)} merges/min",
        }
        for m, v in metrics.items():
            print(f"  {m}: {v}")

py = ClickHousePython()
py.show_code()
py.monitoring()

Infrastructure & Scaling

# infra.py — ClickHouse infrastructure
import json

class Infrastructure:
    DOCKER = """
# docker-compose.yml — ClickHouse + Kafka
version: '3.8'
services:
  clickhouse:
    image: clickhouse/clickhouse-server:24.1
    ports:
      - "8123:8123"  # HTTP
      - "9000:9000"  # Native
    volumes:
      - ch-data:/var/lib/clickhouse
    ulimits:
      nofile:
        soft: 262144
        hard: 262144

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    environment:
      KAFKA_NUM_PARTITIONS: 12
      KAFKA_DEFAULT_REPLICATION_FACTOR: 1

  grafana:
    image: grafana/grafana:latest
    ports: ["3000:3000"]
    environment:
      GF_INSTALL_PLUGINS: grafana-clickhouse-datasource

volumes:
  ch-data:
"""

    SCALING = {
        "vertical": "เพิ่ม CPU/RAM — ClickHouse ใช้ CPU เก่งมาก (vectorized execution)",
        "sharding": "แบ่งข้อมูลตาม shard key → distribute across nodes",
        "replication": "ReplicatedMergeTree — auto-replicate across nodes",
        "tiered": "Hot (SSD) → Warm (HDD) → Cold (S3) storage tiering",
    }

    def show_docker(self):
        print("=== Docker Compose ===")
        print(self.DOCKER[:400])

    def show_scaling(self):
        print(f"\n=== Scaling Strategies ===")
        for strategy, desc in self.SCALING.items():
            print(f"  [{strategy}] {desc}")

    def sizing_guide(self):
        print(f"\n=== Sizing Guide ===")
        tiers = [
            {"name": "Small", "data": "< 1TB", "queries": "< 100/sec", "spec": "8 CPU, 32GB RAM, SSD"},
            {"name": "Medium", "data": "1-10TB", "queries": "100-500/sec", "spec": "32 CPU, 128GB RAM, NVMe SSD"},
            {"name": "Large", "data": "10-100TB", "queries": "500+/sec", "spec": "Cluster: 3-10 nodes, 64+ CPU each"},
        ]
        for t in tiers:
            print(f"  [{t['name']}] Data: {t['data']} | QPS: {t['queries']} → {t['spec']}")

infra = Infrastructure()
infra.show_docker()
infra.show_scaling()
infra.sizing_guide()

FAQ - คำถามที่พบบ่อย

Q: ClickHouse กับ PostgreSQL ต่างกัน?

A: ClickHouse: columnar OLAP — เร็วมากสำหรับ analytics (aggregation, scan billions of rows) PostgreSQL: row-based OLTP — เร็วสำหรับ transactional workloads (insert/update/select by PK) ใช้ ClickHouse: analytics, dashboards, log analysis, time-series ใช้ PostgreSQL: application data, CRUD, transactions ใช้ทั้งคู่: PostgreSQL สำหรับ app → Kafka → ClickHouse สำหรับ analytics

Q: Kafka Engine กับ INSERT จาก application อันไหนดี?

A: Kafka Engine: ดีกว่า — ClickHouse consume โดยตรง, exactly-once semantics, auto-resume INSERT: ง่ายกว่า — application insert ตรง แต่ต้อง handle retry, buffering เอง แนะนำ: ใช้ Kafka Engine สำหรับ production — reliable, decoupled, scalable

Q: MergeTree กับ ReplacingMergeTree ต่างกัน?

A: MergeTree: default — fast insert, allow duplicates ReplacingMergeTree: deduplicate by version column (eventually, during merges) CollapsingMergeTree: handle updates/deletes with sign column AggregatingMergeTree: pre-aggregate during merge เลือกตาม use case: append-only → MergeTree, need dedup → Replacing, need updates → Collapsing

Q: ClickHouse เหมาะกับงานอะไร?

A: เหมาะมาก: Web analytics (เหมือน Google Analytics), Log analysis, Real-time dashboards, Time-series, A/B testing ไม่เหมาะ: OLTP (frequent updates/deletes), Small datasets (< 1M rows), Key-value lookups, Full-text search (ใช้ Elasticsearch แทน)

📖 บทความที่เกี่ยวข้อง

Linux io_uring Message Queue Designอ่านบทความ → MongoDB Aggregation Message Queue Designอ่านบทความ → DuckDB Analytics Domain Driven Design DDDอ่านบทความ → ClickHouse Analytics Post-mortem Analysisอ่านบทความ → ClickHouse Analytics SSL TLS Certificateอ่านบทความ →

📚 ดูบทความทั้งหมด →