
ClickHouse Analytics 12 Factor App — สร้างแอป
ClickHouse 12-Factor

ClickHouse Analytics 12-Factor App Columnar Database Config Dependencies Stateless Logging Concurrency Disposability Scale Backing Service Production
| Factor | ClickHouse Analytics | Implementation | Priority |
|---|---|---|---|
| I. Codebase | Git Repository | GitHub/GitLab mono-repo | Must |
| II. Dependencies | requirements.txt | clickhouse-driver pandas | Must |
| III. Config | Environment Variables | CH_HOST CH_PORT CH_DB | Must |
| IV. Backing Services | ClickHouse as Service | Connection String ENV | Must |
| V. Build/Release/Run | Docker + CI/CD | Build → Tag → Deploy | Must |
| VI. Processes | Stateless App | No local state | Must |
ClickHouse Setup
=== ClickHouse 12-Factor Configuration ===
Docker Compose — ClickHouse
version: '3.8'
services:
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123" # HTTP
- "9000:9000" # Native
volumes:
- ch_data:/var/lib/clickhouse
- ./config/users.xml:/etc/clickhouse-server/users.d/users.xml
environment:
- CLICKHOUSE_DB=analytics
- CLICKHOUSE_USER=app
- CLICKHOUSE_PASSWORD=
ulimits:
nofile:
soft: 262144
hard: 262144
Factor III: Config via Environment Variables
.env
CH_HOST=clickhouse.example.com
CH_PORT=9000
CH_DB=analytics
CH_USER=app
CH_PASSWORD=secure-password
CH_POOL_SIZE=10
LOG_LEVEL=info
APP_PORT=8080
import os
from dataclasses import dataclass
@dataclass
class AppConfig:
ch_host: str
ch_port: int
ch_db: str
ch_user: str
ch_pool_size: int
log_level: str
app_port: int
@classmethod
def from_env(cls):
return cls(
ch_host=os.getenv("CH_HOST", "localhost"),
ch_port=int(os.getenv("CH_PORT", "9000")),
ch_db=os.getenv("CH_DB", "analytics"),
ch_user=os.getenv("CH_USER", "default"),
ch_pool_size=int(os.getenv("CH_POOL_SIZE", "10")),
log_level=os.getenv("LOG_LEVEL", "info"),
app_port=int(os.getenv("APP_PORT", "8080")),
)
config = AppConfig.from_env()
print("=== App Config (from ENV) ===")
print(f" ClickHouse: {config.ch_host}:{config.ch_port}/{config.ch_db}")
print(f" Pool Size: {config.ch_pool_size}")
print(f" Log Level: {config.log_level}")
print(f" App Port: {config.app_port}")
Table Design — MergeTree
CREATE TABLE analytics.events (
event_id UUID DEFAULT generateUUIDv4(),
event_time DateTime64(3) DEFAULT now64(3),
event_date Date DEFAULT toDate(event_time),
user_id UInt64,
event_type LowCardinality(String),
page_url String,
referrer String DEFAULT '',
country LowCardinality(String) DEFAULT '',
device LowCardinality(String) DEFAULT '',
duration_ms UInt32 DEFAULT 0,
properties String DEFAULT '{}'
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_type, user_id, event_time)
TTL event_date + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;
Materialized View — Pre-aggregation
CREATE MATERIALIZED VIEW analytics.daily_stats
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, event_type, country)
AS SELECT
toDate(event_time) AS date,
event_type,
country,
count() AS event_count,
uniqExact(user_id) AS unique_users,
avg(duration_ms) AS avg_duration
FROM analytics.events
GROUP BY date, event_type, country;
print("\n=== ClickHouse Schema ===")
print(" Table: analytics.events (MergeTree)")
print(" Partition: Monthly (toYYYYMM)")
print(" Order: event_type, user_id, event_time")
print(" TTL: 90 days")
print(" MV: daily_stats (SummingMergeTree)")
Application Layer
=== Stateless Analytics API ===
Factor VI: Stateless Processes
from fastapi import FastAPI
from clickhouse_driver import Client
import os
import logging
app = FastAPI()
logger = logging.getLogger("analytics")
def get_ch_client():
return Client(
host=os.getenv("CH_HOST"),
port=int(os.getenv("CH_PORT", "9000")),
database=os.getenv("CH_DB"),
user=os.getenv("CH_USER"),
password=os.getenv("CH_PASSWORD"),
)
@app.get("/api/v1/stats/daily")
async def daily_stats(date_from: str, date_to: str):

client = get_ch_client()
result = client.execute("""
SELECT date, event_type,
sum(event_count) as events,
sum(unique_users) as users
FROM analytics.daily_stats
WHERE date BETWEEN %(from)s AND %(to)s
GROUP BY date, event_type
ORDER BY date
""", {"from": date_from, "to": date_to})
return {"data": result}
@app.post("/api/v1/events")
async def ingest_events(events: list):
client = get_ch_client()
client.execute(
"INSERT INTO analytics.events (user_id, event_type, page_url) VALUES",
events
)
logger.info(f"Ingested {len(events)} events") # Factor XI: Logs
return {"inserted": len(events)}
Factor IX: Disposability — Graceful Shutdown
import signal
import sys
def graceful_shutdown(signum, frame):
logger.info("Shutting down gracefully...")
# Close connections, flush buffers
sys.exit(0)
signal.signal(signal.SIGTERM, graceful_shutdown)
Factor VIII: Concurrency — Scale with Processes
Dockerfile
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "4"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
spec:
replicas: 3 # Factor VIII: Concurrency
template:
spec:
containers:
- name: analytics
image: analytics-api:v1.0
env: # Factor III: Config
- name: CH_HOST
valueFrom:
configMapKeyRef:
name: analytics-config
key: CH_HOST
livenessProbe: # Factor IX: Disposability
httpGet:
path: /health
port: 8080
readinessProbe:
httpGet:
path: /ready
port: 8080
@dataclass
class FactorCheck:
factor: str
name: str
implementation: str
status: str
factors = [
FactorCheck("I", "Codebase", "Git mono-repo GitHub", "Done"),
FactorCheck("II", "Dependencies", "requirements.txt pip freeze", "Done"),
FactorCheck("III", "Config", "ENV vars ConfigMap Secret", "Done"),
FactorCheck("IV", "Backing Services", "ClickHouse Redis as ENV URL", "Done"),
FactorCheck("V", "Build/Release/Run", "Docker CI/CD GitHub Actions", "Done"),
FactorCheck("VI", "Processes", "Stateless FastAPI no local state", "Done"),
FactorCheck("VII", "Port Binding", "Uvicorn self-contained port 8080", "Done"),
FactorCheck("VIII", "Concurrency", "K8s replicas 3 workers 4", "Done"),
FactorCheck("IX", "Disposability", "Graceful shutdown health check", "Done"),
FactorCheck("X", "Dev/Prod Parity", "Docker Compose = K8s same images", "Done"),
FactorCheck("XI", "Logs", "JSON stdout Loki collection", "Done"),
FactorCheck("XII", "Admin Processes", "Management commands K8s Jobs", "Done"),
]
print("\n=== 12-Factor Checklist ===")
for f in factors:
print(f" [{f.status}] Factor {f.factor}: {f.name}")
print(f" Implementation: {f.implementation}")
Production Scaling
# === ClickHouse Cluster Scaling ===
# ClickHouse Cluster — 3 Shards 2 Replicas
#
#
#
#
# ch-s1-r1 9000
# ch-s1-r2 9000
#
#
# ch-s2-r1 9000
# ch-s2-r2 9000
#
#
# ch-s3-r1 9000
# ch-s3-r2 9000
#
#
#
#
# Distributed Table
# CREATE TABLE analytics.events_distributed AS analytics.events
# ENGINE = Distributed(analytics_cluster, analytics, events, rand());
@dataclass
class ScaleMetric:
component: str
current: str
max_tested: str
bottleneck: str
action: str
scale_metrics = [
ScaleMetric("API Pods", "3 pods", "20 pods", "CPU", "HPA auto-scale"),
ScaleMetric("ClickHouse Shards", "3 shards", "10 shards", "Disk I/O", "Add shard"),
ScaleMetric("ClickHouse Replicas", "2 per shard", "3 per shard", "Read load", "Add replica"),
ScaleMetric("Ingestion Rate", "100K events/sec", "1M events/sec", "Network", "Batch + Buffer"),
ScaleMetric("Query Latency (p99)", "120ms", "500ms", "Complex queries", "Materialized Views"),
ScaleMetric("Storage", "2TB", "50TB", "Disk space", "TTL + Tiered Storage"),
]
print("Production Scale Metrics:")
for s in scale_metrics:
print(f" [{s.component}] Current: {s.current} | Max: {s.max_tested}")
print(f" Bottleneck: {s.bottleneck} | Action: {s.action}")
เคล็ดลับ
- ENV: ทุก Config ต้องมาจาก Environment Variables ไม่ Hardcode
- Stateless: App Layer ต้อง Stateless Scale ได้ทันที
- Batch Insert: Insert ClickHouse เป็น Batch ไม่ทีละ Row
- MV: ใช้ Materialized View สำหรับ Query ที่ใช้บ่อย
- TTL: ตั้ง TTL ลบข้อมูลเก่าอัตโนมัติ ประหยัด Disk
12-Factor App คืออะไร
หลักการ 12 ข้อ Codebase Dependencies Config Backing Services Build/Release Processes Port Binding Concurrency Disposability Parity Logs Admin