Technology

ClickHouse Analytics 12 Factor App

clickhouse analytics 12 factor app
ClickHouse Analytics 12 Factor App | SiamCafe Blog
2025-10-12· อ. บอม — SiamCafe.net· 11,031 คำ

ClickHouse 12-Factor

ClickHouse Analytics 12-Factor App Columnar Database Config Dependencies Stateless Logging Concurrency Disposability Scale Backing Service Production

FactorClickHouse AnalyticsImplementationPriority
I. CodebaseGit RepositoryGitHub/GitLab mono-repoMust
II. Dependenciesrequirements.txtclickhouse-driver pandasMust
III. ConfigEnvironment VariablesCH_HOST CH_PORT CH_DBMust
IV. Backing ServicesClickHouse as ServiceConnection String ENVMust
V. Build/Release/RunDocker + CI/CDBuild → Tag → DeployMust
VI. ProcessesStateless AppNo local stateMust

ClickHouse Setup

# === ClickHouse 12-Factor Configuration ===

# Docker Compose — ClickHouse
# version: '3.8'
# services:
#   clickhouse:
#     image: clickhouse/clickhouse-server:latest
#     ports:
#       - "8123:8123"   # HTTP
#       - "9000:9000"   # Native
#     volumes:
#       - ch_data:/var/lib/clickhouse
#       - ./config/users.xml:/etc/clickhouse-server/users.d/users.xml
#     environment:
#       - CLICKHOUSE_DB=analytics
#       - CLICKHOUSE_USER=app
#       - CLICKHOUSE_PASSWORD=
#     ulimits:
#       nofile:
#         soft: 262144
#         hard: 262144

# Factor III: Config via Environment Variables
# .env
# CH_HOST=clickhouse.example.com
# CH_PORT=9000
# CH_DB=analytics
# CH_USER=app
# CH_PASSWORD=secure-password
# CH_POOL_SIZE=10
# LOG_LEVEL=info
# APP_PORT=8080

import os
from dataclasses import dataclass

@dataclass
class AppConfig:
    ch_host: str
    ch_port: int
    ch_db: str
    ch_user: str
    ch_pool_size: int
    log_level: str
    app_port: int

    @classmethod
    def from_env(cls):
        return cls(
            ch_host=os.getenv("CH_HOST", "localhost"),
            ch_port=int(os.getenv("CH_PORT", "9000")),
            ch_db=os.getenv("CH_DB", "analytics"),
            ch_user=os.getenv("CH_USER", "default"),
            ch_pool_size=int(os.getenv("CH_POOL_SIZE", "10")),
            log_level=os.getenv("LOG_LEVEL", "info"),
            app_port=int(os.getenv("APP_PORT", "8080")),
        )

config = AppConfig.from_env()
print("=== App Config (from ENV) ===")
print(f"  ClickHouse: {config.ch_host}:{config.ch_port}/{config.ch_db}")
print(f"  Pool Size: {config.ch_pool_size}")
print(f"  Log Level: {config.log_level}")
print(f"  App Port: {config.app_port}")

# Table Design — MergeTree
# CREATE TABLE analytics.events (
#     event_id UUID DEFAULT generateUUIDv4(),
#     event_time DateTime64(3) DEFAULT now64(3),
#     event_date Date DEFAULT toDate(event_time),
#     user_id UInt64,
#     event_type LowCardinality(String),
#     page_url String,
#     referrer String DEFAULT '',
#     country LowCardinality(String) DEFAULT '',
#     device LowCardinality(String) DEFAULT '',
#     duration_ms UInt32 DEFAULT 0,
#     properties String DEFAULT '{}'
# ) ENGINE = MergeTree()
# PARTITION BY toYYYYMM(event_date)
# ORDER BY (event_type, user_id, event_time)
# TTL event_date + INTERVAL 90 DAY
# SETTINGS index_granularity = 8192;

# Materialized View — Pre-aggregation
# CREATE MATERIALIZED VIEW analytics.daily_stats
# ENGINE = SummingMergeTree()
# PARTITION BY toYYYYMM(date)
# ORDER BY (date, event_type, country)
# AS SELECT
#     toDate(event_time) AS date,
#     event_type,
#     country,
#     count() AS event_count,
#     uniqExact(user_id) AS unique_users,
#     avg(duration_ms) AS avg_duration
# FROM analytics.events
# GROUP BY date, event_type, country;

print("\n=== ClickHouse Schema ===")
print("  Table: analytics.events (MergeTree)")
print("  Partition: Monthly (toYYYYMM)")
print("  Order: event_type, user_id, event_time")
print("  TTL: 90 days")
print("  MV: daily_stats (SummingMergeTree)")

Application Layer

# === Stateless Analytics API ===

# Factor VI: Stateless Processes
# from fastapi import FastAPI
# from clickhouse_driver import Client
# import os
# import logging
#
# app = FastAPI()
# logger = logging.getLogger("analytics")
#
# def get_ch_client():
#     return Client(
#         host=os.getenv("CH_HOST"),
#         port=int(os.getenv("CH_PORT", "9000")),
#         database=os.getenv("CH_DB"),
#         user=os.getenv("CH_USER"),
#         password=os.getenv("CH_PASSWORD"),
#     )
#
# @app.get("/api/v1/stats/daily")
# async def daily_stats(date_from: str, date_to: str):
#     client = get_ch_client()
#     result = client.execute("""
#         SELECT date, event_type,
#             sum(event_count) as events,
#             sum(unique_users) as users
#         FROM analytics.daily_stats
#         WHERE date BETWEEN %(from)s AND %(to)s
#         GROUP BY date, event_type
#         ORDER BY date
#     """, {"from": date_from, "to": date_to})
#     return {"data": result}
#
# @app.post("/api/v1/events")
# async def ingest_events(events: list):
#     client = get_ch_client()
#     client.execute(
#         "INSERT INTO analytics.events (user_id, event_type, page_url) VALUES",
#         events
#     )
#     logger.info(f"Ingested {len(events)} events")  # Factor XI: Logs
#     return {"inserted": len(events)}

# Factor IX: Disposability — Graceful Shutdown
# import signal
# import sys
#
# def graceful_shutdown(signum, frame):
#     logger.info("Shutting down gracefully...")
#     # Close connections, flush buffers
#     sys.exit(0)
#
# signal.signal(signal.SIGTERM, graceful_shutdown)

# Factor VIII: Concurrency — Scale with Processes
# Dockerfile
# FROM python:3.12-slim
# WORKDIR /app
# COPY requirements.txt .
# RUN pip install --no-cache-dir -r requirements.txt
# COPY . .
# CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "4"]

# Kubernetes Deployment
# apiVersion: apps/v1
# kind: Deployment
# spec:
#   replicas: 3  # Factor VIII: Concurrency
#   template:
#     spec:
#       containers:
#         - name: analytics
#           image: analytics-api:v1.0
#           env:  # Factor III: Config
#             - name: CH_HOST
#               valueFrom:
#                 configMapKeyRef:
#                   name: analytics-config
#                   key: CH_HOST
#           livenessProbe:  # Factor IX: Disposability
#             httpGet:
#               path: /health
#               port: 8080
#           readinessProbe:
#             httpGet:
#               path: /ready
#               port: 8080

@dataclass
class FactorCheck:
    factor: str
    name: str
    implementation: str
    status: str

factors = [
    FactorCheck("I", "Codebase", "Git mono-repo GitHub", "Done"),
    FactorCheck("II", "Dependencies", "requirements.txt pip freeze", "Done"),
    FactorCheck("III", "Config", "ENV vars ConfigMap Secret", "Done"),
    FactorCheck("IV", "Backing Services", "ClickHouse Redis as ENV URL", "Done"),
    FactorCheck("V", "Build/Release/Run", "Docker CI/CD GitHub Actions", "Done"),
    FactorCheck("VI", "Processes", "Stateless FastAPI no local state", "Done"),
    FactorCheck("VII", "Port Binding", "Uvicorn self-contained port 8080", "Done"),
    FactorCheck("VIII", "Concurrency", "K8s replicas 3 workers 4", "Done"),
    FactorCheck("IX", "Disposability", "Graceful shutdown health check", "Done"),
    FactorCheck("X", "Dev/Prod Parity", "Docker Compose = K8s same images", "Done"),
    FactorCheck("XI", "Logs", "JSON stdout Loki collection", "Done"),
    FactorCheck("XII", "Admin Processes", "Management commands K8s Jobs", "Done"),
]

print("\n=== 12-Factor Checklist ===")
for f in factors:
    print(f"  [{f.status}] Factor {f.factor}: {f.name}")
    print(f"    Implementation: {f.implementation}")

Production Scaling

# === ClickHouse Cluster Scaling ===

# ClickHouse Cluster — 3 Shards 2 Replicas
# 
#   
#     
#       
#         ch-s1-r19000
#         ch-s1-r29000
#       
#       
#         ch-s2-r19000
#         ch-s2-r29000
#       
#       
#         ch-s3-r19000
#         ch-s3-r29000
#       
#     
#   
# 

# Distributed Table
# CREATE TABLE analytics.events_distributed AS analytics.events
# ENGINE = Distributed(analytics_cluster, analytics, events, rand());

@dataclass
class ScaleMetric:
    component: str
    current: str
    max_tested: str
    bottleneck: str
    action: str

scale_metrics = [
    ScaleMetric("API Pods", "3 pods", "20 pods", "CPU", "HPA auto-scale"),
    ScaleMetric("ClickHouse Shards", "3 shards", "10 shards", "Disk I/O", "Add shard"),
    ScaleMetric("ClickHouse Replicas", "2 per shard", "3 per shard", "Read load", "Add replica"),
    ScaleMetric("Ingestion Rate", "100K events/sec", "1M events/sec", "Network", "Batch + Buffer"),
    ScaleMetric("Query Latency (p99)", "120ms", "500ms", "Complex queries", "Materialized Views"),
    ScaleMetric("Storage", "2TB", "50TB", "Disk space", "TTL + Tiered Storage"),
]

print("Production Scale Metrics:")
for s in scale_metrics:
    print(f"  [{s.component}] Current: {s.current} | Max: {s.max_tested}")
    print(f"    Bottleneck: {s.bottleneck} | Action: {s.action}")

เคล็ดลับ

12-Factor App คืออะไร

หลักการ 12 ข้อ Codebase Dependencies Config Backing Services Build/Release Processes Port Binding Concurrency Disposability Parity Logs Admin

ClickHouse เหมาะกับ 12-Factor อย่างไร

Backing Service ENV Config Stateless App Logs stdout Disposability Connection Pool Concurrency Scale App แยก ClickHouse Cluster

ตั้งค่า ClickHouse สำหรับ Analytics อย่างไร

Docker ClickHouse Cloud MergeTree Partition Date Order Key Query Batch Insert Materialized View TTL Data Retention system.query_log

Scale ClickHouse Analytics App อย่างไร

App Horizontal Pod ClickHouse Sharding Replication ClickHouse Keeper Load Balancer Read Write Scaling Connection Pooling Distributed Table

สรุป

ClickHouse Analytics 12-Factor App Config ENV Stateless Backing Service Logs Concurrency MergeTree Materialized View Sharding Replication Production Scale

📖 บทความที่เกี่ยวข้อง

ClickHouse Analytics Pub Sub Architectureอ่านบทความ → ClickHouse Analytics Post-mortem Analysisอ่านบทความ → ClickHouse Analytics SSL TLS Certificateอ่านบทความ → LVM Thin Provisioning 12 Factor Appอ่านบทความ → ClickHouse Analytics Community Buildingอ่านบทความ →

📚 ดูบทความทั้งหมด →