ai

<h1>ClickHouse Analytics Automation Script</h1>

<h1>ClickHouse Analytics Automation Script</h1>

ClickHouse คืออะไรและทำไมถึงเร็ว

<h1>ClickHouse Analytics Automation Script</h1>

ClickHouse เป็น open source columnar OLAP database ที่พัฒนาโดย Yandex ออกแบบมาสำหรับ real-time analytical queries บน datasets ขนาดใหญ่ (petabyte-scale) ให้ query speed ที่เร็วกว่า traditional databases หลายร้อยเท่า

เหตุผลที่ ClickHouse เร็วคือ columnar storage อ่านเฉพาะ columns ที่ต้องการ, data compression ที่ลดขนาดข้อมูล 5-10 เท่า, vectorized query execution ที่ process data เป็น batches, parallel processing ใช้ CPU cores ทั้งหมด, primary key index ที่ skip data ไม่ต้องการ และ specialized data structures สำหรับแต่ละ data type

เนื้อหาเกี่ยวข้อง — eBPF Networking MLOps Workflow

ClickHouse เหมาะสำหรับ log analytics, clickstream analysis, real-time dashboards, time series data, ad-tech analytics, financial data analysis และ IoT data processing ไม่เหมาะสำหรับ transactional workloads (OLTP) ที่ต้อง UPDATE/DELETE บ่อย

เมื่อเทียบกับ alternatives ClickHouse เร็วกว่า PostgreSQL สำหรับ analytical queries 100-1000 เท่า เร็วกว่า Elasticsearch สำหรับ structured data analytics 5-10 เท่า และมี operational cost ต่ำกว่า cloud data warehouses เช่น BigQuery หรือ Snowflake

แนะนำเพิ่มเติม — ระบบเทรดของ iCafeForex

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน OWASP ZAP Agile Scrum Kanban

ติดตั้ง ClickHouse และตั้งค่าเบื้องต้น

วิธีติดตั้งและ configure ClickHouse

# === ติดตั้ง ClickHouse ===

# Ubuntu/Debian
sudo apt-get install -y apt-transport-https ca-certificates curl gnupg
curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | \
 sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] \
 https://packages.clickhouse.com/deb stable main" | \
 sudo tee /etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client

# Start service
sudo systemctl start clickhouse-server
sudo systemctl enable clickhouse-server

# Docker
docker run -d --name clickhouse \
 -p 8123:8123 -p 9000:9000 \
 -v clickhouse-data:/var/lib/clickhouse \
 -v clickhouse-logs:/var/log/clickhouse-server \
 -e CLICKHOUSE_DB=analytics \
 -e CLICKHOUSE_USER=admin \
 -e CLICKHOUSE_PASSWORD=ch_password \
 clickhouse/clickhouse-server:latest

# เชื่อมต่อ
clickhouse-client --user admin --password ch_password

# === สร้าง Database และ Tables ===
CREATE DATABASE IF NOT EXISTS analytics;

-- Web Analytics Events
CREATE TABLE analytics.events (
 event_date Date,
 event_time DateTime64(3),
 user_id UInt64,
 session_id String,
 event_type LowCardinality(String),
 page_url String,
 referrer String,
 country LowCardinality(String),
 device LowCardinality(String),
 browser LowCardinality(String),
 duration_ms UInt32,
 properties Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, user_id, event_time)
TTL event_date + INTERVAL 1 YEAR
SETTINGS index_granularity = 8192;

-- Server Metrics
CREATE TABLE analytics.server_metrics (
 timestamp DateTime64(3),
 host LowCardinality(String),
 metric LowCardinality(String),
 value Float64,
 tags Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (host, metric, timestamp)
TTL timestamp + INTERVAL 90 DAY;

-- Insert sample data
INSERT INTO analytics.events
SELECT
 today() - (rand() % 365) AS event_date,
 toDateTime64(event_date, 3) + rand() % 86400 AS event_time,
 rand() % 100000 AS user_id,
 generateUUIDv4() AS session_id,
 arrayElement(['pageview','click','scroll','purchase'], 1 + rand() % 4) AS event_type,
 concat('/page/', toString(rand() % 50)) AS page_url,
 arrayElement(['google.com','facebook.com','direct','twitter.com'], 1 + rand() % 4) AS referrer,
 arrayElement(['TH','US','JP','SG','DE'], 1 + rand() % 5) AS country,
 arrayElement(['mobile','desktop','tablet'], 1 + rand() % 3) AS device,
 arrayElement(['Chrome','Firefox','Safari','Edge'], 1 + rand() % 4) AS browser,
 rand() % 30000 AS duration_ms,
 map('version', '1.0') AS properties
FROM numbers(10000000);

SELECT count() FROM analytics.events;
SELECT formatReadableSize(total_bytes) FROM system.tables WHERE name = 'events';

สร้าง Analytics Queries และ Materialized Views

<h1>ClickHouse Analytics Automation Script</h1>

Analytical queries และ automated aggregation

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Tra Cứu Giấy Nộp Tiền Trên Trang Thuế Điện Tử —

# === ClickHouse Analytics Queries ===

-- Daily active users
SELECT
 event_date,
 uniqExact(user_id) AS dau,
 count() AS total_events,
 countIf(event_type = 'purchase') AS purchases,
 round(countIf(event_type = 'purchase') / uniqExact(user_id) * 100, 2) AS conversion_rate
FROM analytics.events
WHERE event_date >= today() - 30
GROUP BY event_date
ORDER BY event_date;

-- Top pages with engagement metrics
SELECT
 page_url,
 count() AS views,
 uniqExact(user_id) AS unique_visitors,
 avg(duration_ms) AS avg_duration_ms,
 quantile(0.50)(duration_ms) AS p50_duration,
 quantile(0.95)(duration_ms) AS p95_duration
FROM analytics.events
WHERE event_type = 'pageview'
 AND event_date >= today() - 7
GROUP BY page_url
ORDER BY views DESC
LIMIT 20;

-- Funnel analysis
SELECT
 level,
 users,
 round(users / first_value(users) OVER (ORDER BY level) * 100, 1) AS pct_of_start
FROM (
 SELECT 1 AS level, uniqExact(user_id) AS users
 FROM analytics.events WHERE event_type = 'pageview' AND event_date = today()
 UNION ALL
 SELECT 2, uniqExact(user_id)
 FROM analytics.events WHERE event_type = 'click' AND event_date = today()
 UNION ALL
 SELECT 3, uniqExact(user_id)
 FROM analytics.events WHERE event_type = 'purchase' AND event_date = today()
)
ORDER BY level;

-- === Materialized Views (auto-aggregation) ===

-- Daily stats materialized view
CREATE MATERIALIZED VIEW analytics.daily_stats_mv
ENGINE = SummingMergeTree()
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, country, device)
AS SELECT
 event_date,
 country,
 device,
 count() AS event_count,
 uniqState(user_id) AS unique_users_state,
 sumIf(1, event_type = 'purchase') AS purchase_count,
 avgState(duration_ms) AS avg_duration_state
FROM analytics.events
GROUP BY event_date, country, device;

-- Query materialized view
SELECT
 event_date,
 country,
 sum(event_count) AS events,
 uniqMerge(unique_users_state) AS unique_users,
 sum(purchase_count) AS purchases,
 avgMerge(avg_duration_state) AS avg_duration
FROM analytics.daily_stats_mv
WHERE event_date >= today() - 7
GROUP BY event_date, country
ORDER BY event_date, events DESC;

-- Hourly metrics materialized view
CREATE MATERIALIZED VIEW analytics.hourly_metrics_mv
ENGINE = SummingMergeTree()
ORDER BY (hour, host, metric)
AS SELECT
 toStartOfHour(timestamp) AS hour,
 host,
 metric,
 avg(value) AS avg_value,
 max(value) AS max_value,
 min(value) AS min_value,
 count() AS sample_count
FROM analytics.server_metrics
GROUP BY hour, host, metric;

Automation Scripts ด้วย Python

Python scripts สำหรับ automate ClickHouse tasks

#!/usr/bin/env python3
# clickhouse_automation.py — ClickHouse Analytics Automation
import clickhouse_connect
import json
import logging
import csv
from datetime import datetime, timedelta
from pathlib import Path
from typing import List, Dict, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ch_automation")

class ClickHouseAutomation:
 def __init__(self, host="localhost", port=8123,
 user="admin", password="ch_password",
 database="analytics"):
 self.client = clickhouse_connect.get_client(
 host=host, port=port,
 username=user, password=password,
 database=database,
 )
 logger.info(f"Connected to ClickHouse {host}:{port}/{database}")
 
 def run_query(self, sql, params=None):
 result = self.client.query(sql, parameters=params)
 return result.result_rows, result.column_names
 
 def run_command(self, sql):
 self.client.command(sql)
 
 def daily_report(self, date=None):
 if date is None:
 date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
 
 queries = {
 "overview": f"""
 SELECT
 count() AS total_events,
 uniqExact(user_id) AS unique_users,
 countIf(event_type = 'purchase') AS purchases,
 round(avg(duration_ms), 0) AS avg_duration_ms
 FROM events WHERE event_date = '{date}'
 """,
 "top_pages": f"""
 SELECT page_url, count() AS views, uniqExact(user_id) AS users
 FROM events
 WHERE event_date = '{date}' AND event_type = 'pageview'
 GROUP BY page_url ORDER BY views DESC LIMIT 10
 """,
 "country_breakdown": f"""
 SELECT country, uniqExact(user_id) AS users, count() AS events
 FROM events WHERE event_date = '{date}'
 GROUP BY country ORDER BY users DESC
 """,
 }
 
 report = {"date": date, "generated_at": datetime.now().isoformat()}
 
 for name, sql in queries.items():
 rows, cols = self.run_query(sql)
 report[name] = [dict(zip(cols, row)) for row in rows]
 
 return report
 
 def optimize_tables(self, database="analytics"):
 rows, _ = self.run_query(
 "SELECT name FROM system.tables WHERE database = %(db)s",
 params={"db": database},
 )
 
 for (table_name,) in rows:
 logger.info(f"Optimizing {database}.{table_name}")
 self.run_command(f"OPTIMIZE TABLE {database}.{table_name} FINAL")
 
 def cleanup_old_partitions(self, table, retention_days=365):
 cutoff = (datetime.now() - timedelta(days=retention_days)).strftime("%Y%m%d")
 
 rows, _ = self.run_query(f"""
 SELECT partition, sum(rows) AS total_rows,
 formatReadableSize(sum(bytes_on_disk)) AS size
 FROM system.parts
 WHERE table = '{table}' AND partition < '{cutoff}'
 GROUP BY partition ORDER BY partition
 """)
 
 for partition, total_rows, size in rows:
 logger.info(f"Dropping partition {partition} ({total_rows} rows, {size})")
 self.run_command(f"ALTER TABLE {table} DROP PARTITION '{partition}'")
 
 def export_csv(self, sql, output_file):
 rows, cols = self.run_query(sql)
 
 with open(output_file, "w", newline="") as f:
 writer = csv.writer(f)
 writer.writerow(cols)
 writer.writerows(rows)
 
 logger.info(f"Exported {len(rows)} rows to {output_file}")
 
 def get_table_stats(self):
 rows, cols = self.run_query("""
 SELECT
 database, name AS table_name,
 formatReadableSize(total_bytes) AS size,
 formatReadableQuantity(total_rows) AS rows,
 partition_count,
 engine
 FROM (
 SELECT database, name, total_bytes, total_rows, engine,
 count() AS partition_count
 FROM system.tables t
 LEFT JOIN system.parts p ON t.name = p.table AND t.database = p.database
 WHERE t.database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')
 GROUP BY database, name, total_bytes, total_rows, engine
 )
 ORDER BY total_bytes DESC
 """)
 
 return [dict(zip(cols, row)) for row in rows]
 
 def schedule_backup(self, backup_name=None):
 if not backup_name:
 backup_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
 
 self.run_command(f"BACKUP DATABASE analytics TO Disk('backups', '{backup_name}')")
 logger.info(f"Backup created: {backup_name}")

# automation = ClickHouseAutomation()
# report = automation.daily_report()
# print(json.dumps(report, indent=2, default=str))
# automation.cleanup_old_partitions("events", retention_days=365)
# stats = automation.get_table_stats()
# for s in stats:
# print(f"{s['table_name']}: {s['size']} ({s['rows']} rows)")

ETL Pipeline และ Data Ingestion

สร้าง ETL pipeline สำหรับ ClickHouse

เนื้อหาเกี่ยวข้อง — อ่านต่อ: smart contract แปลว่า

#!/usr/bin/env python3 # etl_pipeline.py — ClickHouse ETL Pipeline import clickhouse_connect import json import csv import gzip import logging from datetime import datetime from pathlib import Path from typing import List, Dict logging.basicConfig(level=logging.INFO) logger = logging.getLogger("etl") class ClickHouseETL: def __init__(self, client): self.client = client def ingest_csv(self, table, csv_file, batch_size=100000): logger.info(f"Ingesting {csv_file} into {table}") opener = gzip.open if str(csv_file).endswith(".gz") else open with opener(csv_file, "rt") as f: reader = csv.DictReader(f) batch = [] total = 0 columns = None for row in reader: if columns is None: columns = list(row.keys()) batch.append(list(row.values())) if len(batch) >= batch_size: self.client.insert(table, batch, column_names=columns) total += len(batch) batch = [] logger.info(f" Inserted {total} rows") if batch: self.client.insert(table, batch, column_names=columns) total += len(batch) logger.info(f"Total ingested: {total} rows") return total def ingest_json_lines(self, table, jsonl_file, batch_size=50000): logger.info(f"Ingesting {jsonl_file} into {table}") opener = gzip.open if str(jsonl_file).endswith(".gz") else open with opener(jsonl_file, "rt") as f: batch = [] columns = None total = 0 for line in f: record = json.loads(line.strip()) if columns is None: columns = list(record.keys()) batch.append([record.get(c) for c in columns]) if len(batch) >= batch_size: self.client.insert(table, batch, column_names=columns) total += len(batch) batch = [] if batch: self.client.insert(table, batch, column_names=columns) total += len(batch) logger.info(f"Total ingested: {total} rows") return total def transform_and_load(self, source_table, target_table, transform_sql): logger.info(f"Transform: {source_table} -> {target_table}") self.client.command(f""" INSERT INTO {target_table} {transform_sql} """) rows, _ = self.client.query(f"SELECT count() FROM {target_table}") logger.info(f"Target table rows: {rows.result_rows[0][0]}") def run_pipeline(self, steps): logger.info(f"Running pipeline with {len(steps)} steps") start = datetime.now() results = [] for i, step in enumerate(steps): step_start = datetime.now() logger.info(f"Step {i+1}/{len(steps)}: {step['name']}") try: if step["type"] == "ingest_csv": count = self.ingest_csv(step["table"], step["file"]) elif step["type"] == "ingest_jsonl": count = self.ingest_json_lines(step["table"], step["file"]) elif step["type"] == "transform": self.transform_and_load( step["source"], step["target"], step["sql"] ) count = 0 elif step["type"] == "query": self.client.command(step["sql"]) count = 0 results.append({ "step": step["name"], "status": "success", "duration_s": (datetime.now() - step_start).total_seconds(), }) except Exception as e: results.append({ "step": step["name"], "status": "error", "error": str(e), }) logger.error(f"Step failed: {e}") if step.get("required", True): break total_time = (datetime.now() - start).total_seconds() logger.info(f"Pipeline completed in {total_time:.1f}s") return results # client = clickhouse_connect.get_client(host="localhost", username="admin", password="ch_password") # etl = ClickHouseETL(client) # # pipeline = [ # {"name": "Ingest events", "type": "ingest_csv", "table": "analytics.events_raw", "file": "data/events.csv.gz"}, # {"name": "Transform", "type": "transform", "source": "events_raw", "target": "events", # "sql": "SELECT *, toDate(event_time) AS event_date FROM analytics.events_raw"}, # {"name": "Optimize", "type": "query", "sql": "OPTIMI
XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง