ai

ClickHouse Analytics Automation Script —

ClickHouse Analytics Automation Script —

ClickHouse คืออะไรและทำไมถึงเร็ว

ClickHouse Analytics Automation Script —

ClickHouse เป็น open source columnar OLAP database ที่พัฒนาโดย Yandex ออกแบบมาสำหรับ real-time analytical queries บน datasets ขนาดใหญ่ (petabyte-scale) ให้ query speed ที่เร็วกว่า traditional databases หลายร้อยเท่า

เหตุผลที่ ClickHouse เร็วคือ columnar storage อ่านเฉพาะ columns ที่ต้องการ, data compression ที่ลดขนาดข้อมูล 5-10 เท่า, vectorized query execution ที่ process data เป็น batches, parallel processing ใช้ CPU cores ทั้งหมด, primary key index ที่ skip data ไม่ต้องการ และ specialized data structures สำหรับแต่ละ data type

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Go Chi Router Internal Developer Platform

ClickHouse เหมาะสำหรับ log analytics, clickstream analysis, real-time dashboards, time series data, ad-tech analytics, financial data analysis และ IoT data processing ไม่เหมาะสำหรับ transactional workloads (OLTP) ที่ต้อง UPDATE/DELETE บ่อย

เมื่อเทียบกับ alternatives ClickHouse เร็วกว่า PostgreSQL สำหรับ analytical queries 100-1000 เท่า เร็วกว่า Elasticsearch สำหรับ structured data analytics 5-10 เท่า และมี operational cost ต่ำกว่า cloud data warehouses เช่น BigQuery หรือ Snowflake

แนะนำเพิ่มเติม — ดูสัญญาณเทรดที่ XM Signal

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Uptime Kuma Monitoring Load Testing Strategy —

ติดตั้ง ClickHouse และตั้งค่าเบื้องต้น

วิธีติดตั้งและ configure ClickHouse

# === ติดตั้ง ClickHouse ===



# Ubuntu/Debian

sudo apt-get install -y apt-transport-https ca-certificates curl gnupg

curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | \

    sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg

echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] \

    https://packages.clickhouse.com/deb stable main" | \

    sudo tee /etc/apt/sources.list.d/clickhouse.list

sudo apt-get update

sudo apt-get install -y clickhouse-server clickhouse-client



# Start service

sudo systemctl start clickhouse-server

sudo systemctl enable clickhouse-server



# Docker

docker run -d --name clickhouse \

    -p 8123:8123 -p 9000:9000 \

    -v clickhouse-data:/var/lib/clickhouse \

    -v clickhouse-logs:/var/log/clickhouse-server \

    -e CLICKHOUSE_DB=analytics \

    -e CLICKHOUSE_USER=admin \

    -e CLICKHOUSE_PASSWORD=ch_password \

    clickhouse/clickhouse-server:latest



# เชื่อมต่อ

clickhouse-client --user admin --password ch_password



# === สร้าง Database และ Tables ===

CREATE DATABASE IF NOT EXISTS analytics;



-- Web Analytics Events

CREATE TABLE analytics.events (

    event_date Date,

    event_time DateTime64(3),

    user_id UInt64,

    session_id String,

    event_type LowCardinality(String),

    page_url String,

    referrer String,

    country LowCardinality(String),

    device LowCardinality(String),

    browser LowCardinality(String),

    duration_ms UInt32,

    properties Map(String, String)

) ENGINE = MergeTree()

PARTITION BY toYYYYMM(event_date)

ORDER BY (event_date, user_id, event_time)

TTL event_date + INTERVAL 1 YEAR

SETTINGS index_granularity = 8192;



-- Server Metrics

CREATE TABLE analytics.server_metrics (

    timestamp DateTime64(3),

    host LowCardinality(String),

    metric LowCardinality(String),

    value Float64,

    tags Map(String, String)

) ENGINE = MergeTree()

PARTITION BY toYYYYMMDD(timestamp)

ORDER BY (host, metric, timestamp)

TTL timestamp + INTERVAL 90 DAY;



-- Insert sample data

INSERT INTO analytics.events

SELECT

    today() - (rand() % 365) AS event_date,

    toDateTime64(event_date, 3) + rand() % 86400 AS event_time,

    rand() % 100000 AS user_id,

    generateUUIDv4() AS session_id,

    arrayElement(['pageview','click','scroll','purchase'], 1 + rand() % 4) AS event_type,

    concat('/page/', toString(rand() % 50)) AS page_url,

    arrayElement(['google.com','facebook.com','direct','twitter.com'], 1 + rand() % 4) AS referrer,

    arrayElement(['TH','US','JP','SG','DE'], 1 + rand() % 5) AS country,

    arrayElement(['mobile','desktop','tablet'], 1 + rand() % 3) AS device,

    arrayElement(['Chrome','Firefox','Safari','Edge'], 1 + rand() % 4) AS browser,

    rand() % 30000 AS duration_ms,

    map('version', '1.0') AS properties

FROM numbers(10000000);



SELECT count() FROM analytics.events;

SELECT formatReadableSize(total_bytes) FROM system.tables WHERE name = 'events';

สร้าง Analytics Queries และ Materialized Views

ClickHouse Analytics Automation Script —

Analytical queries และ automated aggregation

แนะนำเพิ่มเติม — เรียนเทรดกับ iCafeForex

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Elasticsearch OpenSearch Cloud Migration Strategy

# === ClickHouse Analytics Queries ===



-- Daily active users

SELECT

    event_date,

    uniqExact(user_id) AS dau,

    count() AS total_events,

    countIf(event_type = 'purchase') AS purchases,

    round(countIf(event_type = 'purchase') / uniqExact(user_id) * 100, 2) AS conversion_rate

FROM analytics.events

WHERE event_date >= today() - 30

GROUP BY event_date

ORDER BY event_date;



-- Top pages with engagement metrics

SELECT

    page_url,

    count() AS views,

    uniqExact(user_id) AS unique_visitors,

    avg(duration_ms) AS avg_duration_ms,

    quantile(0.50)(duration_ms) AS p50_duration,

    quantile(0.95)(duration_ms) AS p95_duration

FROM analytics.events

WHERE event_type = 'pageview'

  AND event_date >= today() - 7

GROUP BY page_url

ORDER BY views DESC

LIMIT 20;



-- Funnel analysis

SELECT

    level,

    users,

    round(users / first_value(users) OVER (ORDER BY level) * 100, 1) AS pct_of_start

FROM (

    SELECT 1 AS level, uniqExact(user_id) AS users

    FROM analytics.events WHERE event_type = 'pageview' AND event_date = today()

    UNION ALL

    SELECT 2, uniqExact(user_id)

    FROM analytics.events WHERE event_type = 'click' AND event_date = today()

    UNION ALL

    SELECT 3, uniqExact(user_id)

    FROM analytics.events WHERE event_type = 'purchase' AND event_date = today()

)

ORDER BY level;



-- === Materialized Views (auto-aggregation) ===



-- Daily stats materialized view

CREATE MATERIALIZED VIEW analytics.daily_stats_mv

ENGINE = SummingMergeTree()

PARTITION BY toYYYYMM(event_date)

ORDER BY (event_date, country, device)

AS SELECT

    event_date,

    country,

    device,

    count() AS event_count,

    uniqState(user_id) AS unique_users_state,

    sumIf(1, event_type = 'purchase') AS purchase_count,

    avgState(duration_ms) AS avg_duration_state

FROM analytics.events

GROUP BY event_date, country, device;



-- Query materialized view

SELECT

    event_date,

    country,

    sum(event_count) AS events,

    uniqMerge(unique_users_state) AS unique_users,

    sum(purchase_count) AS purchases,

    avgMerge(avg_duration_state) AS avg_duration

FROM analytics.daily_stats_mv

WHERE event_date >= today() - 7

GROUP BY event_date, country

ORDER BY event_date, events DESC;



-- Hourly metrics materialized view

CREATE MATERIALIZED VIEW analytics.hourly_metrics_mv

ENGINE = SummingMergeTree()

ORDER BY (hour, host, metric)

AS SELECT

    toStartOfHour(timestamp) AS hour,

    host,

    metric,

    avg(value) AS avg_value,

    max(value) AS max_value,

    min(value) AS min_value,

    count() AS sample_count

FROM analytics.server_metrics

GROUP BY hour, host, metric;

Automation Scripts ด้วย Python

Python scripts สำหรับ automate ClickHouse tasks

#!/usr/bin/env python3

# clickhouse_automation.py — ClickHouse Analytics Automation

import clickhouse_connect

import json

import logging

import csv

from datetime import datetime, timedelta

from pathlib import Path

from typing import List, Dict, Optional



logging.basicConfig(level=logging.INFO)

logger = logging.getLogger("ch_automation")



class ClickHouseAutomation:

    def __init__(self, host="localhost", port=8123,

                 user="admin", password="ch_password",

                 database="analytics"):

        self.client = clickhouse_connect.get_client(

            host=host, port=port,

            username=user, password=password,

            database=database,

        )

        logger.info(f"Connected to ClickHouse {host}:{port}/{database}")

    

    def run_query(self, sql, params=None):

        result = self.client.query(sql, parameters=params)

        return result.result_rows, result.column_names

    

    def run_command(self, sql):

        self.client.command(sql)

    

    def daily_report(self, date=None):

        if date is None:

            date = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")

        

        queries = {

            "overview": f"""

                SELECT

                    count() AS total_events,

                    uniqExact(user_id) AS unique_users,

                    countIf(event_type = 'purchase') AS purchases,

                    round(avg(duration_ms), 0) AS avg_duration_ms

                FROM events WHERE event_date = '{date}'

            """,

            "top_pages": f"""

                SELECT page_url, count() AS views, uniqExact(user_id) AS users

                FROM events

                WHERE event_date = '{date}' AND event_type = 'pageview'

                GROUP BY page_url ORDER BY views DESC LIMIT 10

            """,

            "country_breakdown": f"""

                SELECT country, uniqExact(user_id) AS users, count() AS events

                FROM events WHERE event_date = '{date}'

                GROUP BY country ORDER BY users DESC

            """,

        }

        

        report = {"date": date, "generated_at": datetime.now().isoformat()}

        

        for name, sql in queries.items():

            rows, cols = self.run_query(sql)

            report[name] = [dict(zip(cols, row)) for row in rows]

        

        return report

    

    def optimize_tables(self, database="analytics"):

        rows, _ = self.run_query(

            "SELECT name FROM system.tables WHERE database = %(db)s",

            params={"db": database},

        )

        

        for (table_name,) in rows:

            logger.info(f"Optimizing {database}.{table_name}")

            self.run_command(f"OPTIMIZE TABLE {database}.{table_name} FINAL")

    

    def cleanup_old_partitions(self, table, retention_days=365):

        cutoff = (datetime.now() - timedelta(days=retention_days)).strftime("%Y%m%d")

        

        rows, _ = self.run_query(f"""

            SELECT partition, sum(rows) AS total_rows,

                   formatReadableSize(sum(bytes_on_disk)) AS size

            FROM system.parts

            WHERE table = '{table}' AND partition < '{cutoff}'

            GROUP BY partition ORDER BY partition

        """)

        

        for partition, total_rows, size in rows:

            logger.info(f"Dropping partition {partition} ({total_rows} rows, {size})")

            self.run_command(f"ALTER TABLE {table} DROP PARTITION '{partition}'")

    

    def export_csv(self, sql, output_file):

        rows, cols = self.run_query(sql)

        

        with open(output_file, "w", newline="") as f:

            writer = csv.writer(f)

            writer.writerow(cols)

            writer.writerows(rows)

        

        logger.info(f"Exported {len(rows)} rows to {output_file}")

    

    def get_table_stats(self):

        rows, cols = self.run_query("""

            SELECT

                database, name AS table_name,

                formatReadableSize(total_bytes) AS size,

                formatReadableQuantity(total_rows) AS rows,

                partition_count,

                engine

            FROM (

                SELECT database, name, total_bytes, total_rows, engine,

                       count() AS partition_count

                FROM system.tables t

                LEFT JOIN system.parts p ON t.name = p.table AND t.database = p.database

                WHERE t.database NOT IN ('system', 'INFORMATION_SCHEMA', 'information_schema')

                GROUP BY database, name, total_bytes, total_rows, engine

            )

            ORDER BY total_bytes DESC

        """)

        

        return [dict(zip(cols, row)) for row in rows]

    

    def schedule_backup(self, backup_name=None):

        if not backup_name:

            backup_name = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        

        self.run_command(f"BACKUP DATABASE analytics TO Disk('backups', '{backup_name}')")

        logger.info(f"Backup created: {backup_name}")



# automation = ClickHouseAutomation()

# report = automation.daily_report()

# print(json.dumps(report, indent=2, default=str))

# automation.cleanup_old_partitions("events", retention_days=365)

# stats = automation.get_table_stats()

# for s in stats:

#     print(f"{s['table_name']}: {s['size']} ({s['rows']} rows)")

ETL Pipeline และ Data Ingestion

สร้าง ETL pipeline สำหรับ ClickHouse

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง เหรียญ luna วันนี้ — คู่มือฉบับสมบูรณ์ 2026

#!/usr/bin/env python3 # etl_pipeline.py — ClickHouse ETL Pipeline import clickhouse_connect import json import csv import gzip import logging from datetime import datetime from pathlib import Path from typing import List, Dict logging.basicConfig(level=logging.INFO) logger = logging.getLogger("etl") class ClickHouseETL: def __init__(self, client): self.client = client def ingest_csv(self, table, csv_file, batch_size=100000): logger.info(f"Ingesting {csv_file} into {table}") opener = gzip.open if str(csv_file).endswith(".gz") else open with opener(csv_file, "rt") as f: reader = csv.DictReader(f) batch = [] total = 0 columns = None for row in reader: if columns is None: columns = list(row.keys()) batch.append(list(row.values())) if len(batch) >= batch_size: self.client.insert(table, batch, column_names=columns) total += len(batch) batch = [] logger.info(f" Inserted {total} rows") if batch: self.client.insert(table, batch, column_names=columns) total += len(batch) logger.info(f"Total ingested: {total} rows") return total def ingest_json_lines(self, table, jsonl_file, batch_size=50000): logger.info(f"Ingesting {jsonl_file} into {table}") opener = gzip.open if str(jsonl_file).endswith(".gz") else open with opener(jsonl_file, "rt") as f: batch = [] columns = None total = 0 for line in f: record = json.loads(line.strip()) if columns is None: columns = list(record.keys()) batch.append([record.get(c) for c in columns]) if len(batch) >= batch_size: self.client.insert(table, batch, column_names=columns) total += len(batch) batch = [] if batch: self.client.insert(table, batch, column_names=columns) total += len(batch) logger.info(f"Total ingested: {total} rows") return total def transform_and_load(self, source_table, target_table, transform_sql): logger.info(f"Transform: {source_table} -> {target_table}") self.client.command(f""" INSERT INTO {target_table} {transform_sql} """) rows, _ = self.client.query(f"SELECT count() FROM {target_table}") logger.info(f"Target table rows: {rows.result_rows[0][0]}") def run_pipeline(self, steps): logger.info(f"Running pipeline with {len(steps)} steps") start = datetime.now() results = [] for i, step in enumerate(steps): step_start = datetime.now() logger.info(f"Step {i+1}/{len(steps)}: {step['name']}") try: if step["type"] == "ingest_csv": count = self.ingest_csv(step["table"], step["file"]) elif step["type"] == "ingest_jsonl": count = self.ingest_json_lines(step["table"], step["file"]) elif step["type"] == "transform": self.transform_and_load( step["source"], step["target"], step["sql"] ) count = 0 elif step["type"] == "query": self.client.command(step["sql"]) count = 0 results.append({ "step": step["name"], "status": "success", "duration_s": (datetime.now() - step_start).total_seconds(), }) except Exception as e: results.append({ "step": step["name"], "status": "error", "error": str(e), }) logger.error(f"Step failed: {e}") if step.get("required", True): break total_time = (datetime.now() - start).total_seconds() logger.info(f"Pipeline completed in {total_time:.1f}s") return results # client = clickhouse_connect.get_client(host="localhost", username="admin", password="ch_password") # etl = ClickHouseETL(client) # # pipeline = [ # {"name": "Ingest events", "type": "ingest_csv", "table": "analytics.events_raw", "file": "data/events.csv.gz"}, # {"name": "Transform", "type": "transform", "source": "events_raw", "target": "events", # "sql": "SELECT *, toDate(event_time) AS event_date FROM analytics.events_raw"}, # {"name": "Optimize", "type": "query", "sql": "OPTIMI
XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง