Technology

BigQuery Scheduled Query Distributed System — สร้าง Data Pipeline อัตโนมัติ

bigquery scheduled query distributed system
BigQuery Scheduled Query Distributed System | SiamCafe Blog
2025-06-25· อ. บอม — SiamCafe.net· 1,905 คำ

BigQuery Scheduled Queries คืออะไร

BigQuery Scheduled Queries เป็น feature ที่ให้ตั้ง SQL queries ให้รันอัตโนมัติตาม schedule ที่กำหนด เช่น ทุกชั่วโมง ทุกวัน หรือทุกสัปดาห์ ใช้สำหรับ ETL processes, data aggregation, reporting tables และ data warehouse maintenance โดยไม่ต้องสร้าง infrastructure เพิ่มเติม

ข้อดีของ BigQuery Scheduled Queries ได้แก่ serverless ไม่ต้องจัดการ infrastructure, built-in retry logic สำหรับ failed queries, parameterized queries ด้วย @run_time และ @run_date, email notifications เมื่อ query fail, integration กับ BigQuery Data Transfer Service และ IAM-based access control

Distributed System ใน context นี้หมายถึงการออกแบบ data pipeline ที่กระจาย workload ออกเป็นหลาย scheduled queries ที่ทำงานประสานกัน แทนที่จะรัน single massive query ที่ใช้เวลานานและเสี่ยง timeout ช่วยให้ pipeline reliable, maintainable และ cost-efficient มากขึ้น

Use cases ได้แก่ daily/hourly data aggregation, materialized view refreshes, data quality checks, cross-dataset data synchronization, reporting table preparation และ data retention/cleanup

ตั้งค่า Scheduled Queries ใน BigQuery

วิธีสร้างและจัดการ Scheduled Queries

# === BigQuery Scheduled Queries Setup ===

# 1. Using gcloud CLI
# ===================================

# ติดตั้ง gcloud
# curl https://sdk.cloud.google.com | bash
# gcloud init

# สร้าง Scheduled Query
bq mk --transfer_config \
    --project_id=my-project \
    --data_source=scheduled_query \
    --target_dataset=analytics \
    --display_name="Daily User Aggregation" \
    --schedule="every 24 hours" \
    --params='{
        "query": "INSERT INTO `my-project.analytics.daily_users` SELECT DATE(event_timestamp) AS event_date, COUNT(DISTINCT user_id) AS unique_users, COUNT(*) AS total_events, COUNTIF(event_name = \"purchase\") AS purchases FROM `my-project.raw.events` WHERE DATE(event_timestamp) = DATE_SUB(@run_date, INTERVAL 1 DAY) GROUP BY 1",
        "destination_table_name_template": "",
        "write_disposition": "WRITE_APPEND"
    }'

# List scheduled queries
bq ls --transfer_config --transfer_location=us

# Check run history
bq ls --transfer_run --transfer_location=us \
    --run_attempt=LATEST \
    projects/my-project/locations/us/transferConfigs/TRANSFER_ID

# 2. Using SQL directly in BigQuery Console
# ===================================

-- Daily Sales Aggregation
-- Schedule: Every day at 06:00 UTC
-- Destination: analytics.daily_sales

SELECT
    DATE(order_timestamp) AS order_date,
    product_category,
    COUNT(*) AS order_count,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
FROM `my-project.raw.orders`
WHERE DATE(order_timestamp) = DATE_SUB(@run_date, INTERVAL 1 DAY)
GROUP BY 1, 2;

-- Hourly Metrics Rollup
-- Schedule: Every hour
SELECT
    TIMESTAMP_TRUNC(@run_time, HOUR) AS metric_hour,
    service_name,
    COUNT(*) AS request_count,
    AVG(latency_ms) AS avg_latency,
    APPROX_QUANTILES(latency_ms, 100)[OFFSET(95)] AS p95_latency,
    COUNTIF(status_code >= 500) AS error_count,
    SAFE_DIVIDE(COUNTIF(status_code >= 500), COUNT(*)) * 100 AS error_rate_pct
FROM `my-project.logs.requests`
WHERE timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 1 HOUR)
  AND timestamp < @run_time
GROUP BY 1, 2;

-- Weekly Data Quality Check
-- Schedule: Every Monday at 00:00 UTC
SELECT
    'orders' AS table_name,
    COUNT(*) AS total_rows,
    COUNTIF(customer_id IS NULL) AS null_customer_ids,
    COUNTIF(amount <= 0) AS invalid_amounts,
    COUNT(DISTINCT DATE(order_timestamp)) AS distinct_dates,
    MIN(order_timestamp) AS earliest_record,
    MAX(order_timestamp) AS latest_record,
    CURRENT_TIMESTAMP() AS check_timestamp
FROM `my-project.raw.orders`
WHERE DATE(order_timestamp) >= DATE_SUB(@run_date, INTERVAL 7 DAY);

สร้าง Distributed Data Pipeline

ออกแบบ pipeline แบบ distributed

# === Distributed Pipeline Architecture ===

# Pipeline Stages (each is a separate scheduled query):
#
# Stage 1: Raw Data Ingestion (every hour)
#   └── Extract from source tables, light transformation
#
# Stage 2: Data Cleansing (every hour, 15 min after Stage 1)
#   └── Remove duplicates, fix data types, handle nulls
#
# Stage 3: Aggregation (every hour, 30 min after Stage 1)
#   └── Pre-aggregate for common query patterns
#
# Stage 4: Reporting Tables (daily at 07:00)
#   └── Build final reporting tables from aggregations
#
# Stage 5: Data Quality (daily at 08:00)
#   └── Validate data quality, alert on issues

# === Stage 1: Raw Data Extraction ===
-- Schedule: every 1 hour
-- Name: stage1_extract_events

CREATE OR REPLACE TABLE `project.staging.events_hourly`
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_name
AS
SELECT
    event_id,
    user_id,
    event_name,
    event_timestamp,
    PARSE_JSON(event_params) AS params,
    device_type,
    country,
    session_id,
    CURRENT_TIMESTAMP() AS processed_at
FROM `project.raw.events_stream`
WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
  AND event_timestamp < @run_time;

# === Stage 2: Data Cleansing ===
-- Schedule: every 1 hour (offset 15 min)
-- Name: stage2_cleanse

INSERT INTO `project.clean.events`
SELECT DISTINCT
    event_id,
    user_id,
    LOWER(TRIM(event_name)) AS event_name,
    event_timestamp,
    params,
    COALESCE(device_type, 'unknown') AS device_type,
    COALESCE(country, 'unknown') AS country,
    session_id,
    processed_at
FROM `project.staging.events_hourly`
WHERE event_id NOT IN (
    SELECT event_id FROM `project.clean.events`
    WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)
);

# === Stage 3: Hourly Aggregation ===
-- Schedule: every 1 hour (offset 30 min)
-- Name: stage3_aggregate

MERGE `project.analytics.hourly_metrics` AS target
USING (
    SELECT
        TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,
        event_name,
        country,
        device_type,
        COUNT(*) AS event_count,
        COUNT(DISTINCT user_id) AS unique_users,
        COUNT(DISTINCT session_id) AS sessions
    FROM `project.clean.events`
    WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
      AND event_timestamp < @run_time
    GROUP BY 1, 2, 3, 4
) AS source
ON target.hour = source.hour
    AND target.event_name = source.event_name
    AND target.country = source.country
    AND target.device_type = source.device_type
WHEN MATCHED THEN
    UPDATE SET
        event_count = source.event_count,
        unique_users = source.unique_users,
        sessions = source.sessions
WHEN NOT MATCHED THEN
    INSERT (hour, event_name, country, device_type, event_count, unique_users, sessions)
    VALUES (source.hour, source.event_name, source.country, source.device_type,
            source.event_count, source.unique_users, source.sessions);

# === Stage 4: Daily Reporting ===
-- Schedule: daily at 07:00 UTC
-- Name: stage4_daily_report

CREATE OR REPLACE TABLE `project.reporting.daily_dashboard` AS
SELECT
    DATE(hour) AS report_date,
    SUM(event_count) AS total_events,
    SUM(unique_users) AS total_users,
    SUM(sessions) AS total_sessions,
    SUM(IF(event_name = 'purchase', event_count, 0)) AS purchases,
    SAFE_DIVIDE(
        SUM(IF(event_name = 'purchase', unique_users, 0)),
        SUM(unique_users)
    ) * 100 AS conversion_rate_pct,
    ARRAY_AGG(STRUCT(country, SUM(unique_users) AS users) ORDER BY SUM(unique_users) DESC LIMIT 10) AS top_countries
FROM `project.analytics.hourly_metrics`
WHERE DATE(hour) >= DATE_SUB(@run_date, INTERVAL 30 DAY)
GROUP BY 1
ORDER BY 1 DESC;

Orchestration ด้วย Python และ Cloud Functions

จัดการ scheduled queries ด้วย Python

#!/usr/bin/env python3
# bq_orchestrator.py — BigQuery Pipeline Orchestration
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("bq_orchestrator")

class BigQueryOrchestrator:
    def __init__(self, project_id):
        self.project_id = project_id
        self.bq_client = bigquery.Client(project=project_id)
        self.transfer_client = bigquery_datatransfer.DataTransferServiceClient()
        self.parent = f"projects/{project_id}/locations/us"
    
    def create_scheduled_query(self, name, query, schedule,
                                dataset="analytics",
                                destination_table=None,
                                write_disposition="WRITE_APPEND"):
        config = bigquery_datatransfer.TransferConfig(
            display_name=name,
            data_source_id="scheduled_query",
            destination_dataset_id=dataset,
            schedule=schedule,
            params={
                "query": query,
                "write_disposition": write_disposition,
            },
        )
        
        if destination_table:
            config.params["destination_table_name_template"] = destination_table
        
        result = self.transfer_client.create_transfer_config(
            parent=self.parent,
            transfer_config=config,
        )
        
        logger.info(f"Created scheduled query: {name} -> {result.name}")
        return result.name
    
    def list_scheduled_queries(self):
        configs = self.transfer_client.list_transfer_configs(parent=self.parent)
        
        queries = []
        for config in configs:
            if config.data_source_id == "scheduled_query":
                queries.append({
                    "name": config.display_name,
                    "config_id": config.name,
                    "schedule": config.schedule,
                    "state": config.state.name,
                    "dataset": config.destination_dataset_id,
                    "update_time": config.update_time.isoformat() if config.update_time else None,
                })
        
        return queries
    
    def get_run_history(self, config_id, limit=10):
        runs = self.transfer_client.list_transfer_runs(parent=config_id)
        
        history = []
        for i, run in enumerate(runs):
            if i >= limit:
                break
            history.append({
                "run_id": run.name,
                "state": run.state.name,
                "start_time": run.start_time.isoformat() if run.start_time else None,
                "end_time": run.end_time.isoformat() if run.end_time else None,
                "error": run.error_status.message if run.error_status else None,
            })
        
        return history
    
    def run_query(self, query, destination_table=None, dry_run=False):
        job_config = bigquery.QueryJobConfig(dry_run=dry_run)
        
        if destination_table:
            job_config.destination = destination_table
            job_config.write_disposition = "WRITE_TRUNCATE"
        
        job = self.bq_client.query(query, job_config=job_config)
        
        if dry_run:
            return {
                "bytes_processed": job.total_bytes_processed,
                "estimated_cost_usd": job.total_bytes_processed / 1e12 * 6.25,
            }
        
        result = job.result()
        
        return {
            "job_id": job.job_id,
            "rows_affected": job.num_dml_affected_rows or result.total_rows,
            "bytes_processed": job.total_bytes_processed,
            "slot_ms": job.slot_millis,
            "duration_s": (job.ended - job.started).total_seconds(),
        }
    
    def check_table_freshness(self, dataset, table):
        table_ref = f"{self.project_id}.{dataset}.{table}"
        tbl = self.bq_client.get_table(table_ref)
        
        modified = tbl.modified
        age_hours = (datetime.utcnow() - modified.replace(tzinfo=None)).total_seconds() / 3600
        
        return {
            "table": table_ref,
            "last_modified": modified.isoformat(),
            "age_hours": round(age_hours, 1),
            "num_rows": tbl.num_rows,
            "size_bytes": tbl.num_bytes,
            "is_stale": age_hours > 25,
        }
    
    def pipeline_health_check(self, pipeline_queries):
        health = {"timestamp": datetime.utcnow().isoformat(), "queries": []}
        
        for q in pipeline_queries:
            runs = self.get_run_history(q["config_id"], limit=5)
            
            recent_failures = sum(1 for r in runs if r["state"] == "FAILED")
            last_success = next(
                (r for r in runs if r["state"] == "SUCCEEDED"), None
            )
            
            health["queries"].append({
                "name": q["name"],
                "recent_failures": recent_failures,
                "last_success": last_success["end_time"] if last_success else None,
                "status": "healthy" if recent_failures == 0 else "degraded",
            })
        
        health["overall"] = (
            "healthy" if all(q["status"] == "healthy" for q in health["queries"])
            else "degraded"
        )
        
        return health

# orchestrator = BigQueryOrchestrator("my-project")
# queries = orchestrator.list_scheduled_queries()
# for q in queries:
#     print(f"{q['name']}: {q['schedule']} ({q['state']})")
# health = orchestrator.pipeline_health_check(queries)

Monitoring และ Error Handling

Monitor pipeline และจัดการ errors

#!/usr/bin/env python3
# pipeline_monitor.py — BigQuery Pipeline Monitoring
from google.cloud import bigquery
from google.cloud import monitoring_v3
import json
import logging
from datetime import datetime, timedelta

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class PipelineMonitor:
    def __init__(self, project_id):
        self.project_id = project_id
        self.bq_client = bigquery.Client(project=project_id)
    
    def check_job_status(self, hours=24):
        query = f"""
        SELECT
            job_id,
            creation_time,
            end_time,
            state,
            total_bytes_processed,
            total_slot_ms,
            error_result.reason AS error_reason,
            error_result.message AS error_message,
            TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS duration_s
        FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`
        WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR)
            AND job_type = 'QUERY'
        ORDER BY creation_time DESC
        LIMIT 100
        """
        
        results = self.bq_client.query(query).result()
        
        jobs = []
        for row in results:
            jobs.append({
                "job_id": row.job_id,
                "state": row.state,
                "duration_s": row.duration_s,
                "bytes_processed": row.total_bytes_processed,
                "error": row.error_message,
            })
        
        return jobs
    
    def get_cost_summary(self, days=30):
        query = f"""
        SELECT
            DATE(creation_time) AS query_date,
            COUNT(*) AS query_count,
            SUM(total_bytes_processed) AS total_bytes,
            SUM(total_bytes_processed) / POW(10, 12) * 6.25 AS estimated_cost_usd,
            AVG(TIMESTAMP_DIFF(end_time, creation_time, SECOND)) AS avg_duration_s,
            SUM(total_slot_ms) / 1000 AS total_slot_seconds
        FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`
        WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
            AND job_type = 'QUERY'
            AND state = 'DONE'
        GROUP BY 1
        ORDER BY 1 DESC
        """
        
        results = self.bq_client.query(query).result()
        
        daily_costs = []
        for row in results:
            daily_costs.append({
                "date": row.query_date.isoformat(),
                "queries": row.query_count,
                "bytes_processed_tb": round(row.total_bytes / 1e12, 3),
                "cost_usd": round(row.estimated_cost_usd, 2),
                "avg_duration_s": round(row.avg_duration_s, 1),
            })
        
        total_cost = sum(d["cost_usd"] for d in daily_costs)
        
        return {
            "period_days": days,
            "total_cost_usd": round(total_cost, 2),
            "avg_daily_cost_usd": round(total_cost / max(len(daily_costs), 1), 2),
            "daily_breakdown": daily_costs[:7],
        }
    
    def find_expensive_queries(self, days=7, limit=10):
        query = f"""
        SELECT
            job_id,
            user_email,
            creation_time,
            total_bytes_processed,
            total_bytes_processed / POW(10, 12) * 6.25 AS cost_usd,
            total_slot_ms / 1000 AS slot_seconds,
            TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS duration_s,
            SUBSTR(query, 1, 200) AS query_preview
        FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`
        WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
            AND job_type = 'QUERY'
            AND state = 'DONE'
        ORDER BY total_bytes_processed DESC
        LIMIT {limit}
        """
        
        results = self.bq_client.query(query).result()
        
        expensive = []
        for row in results:
            expensive.append({
                "job_id": row.job_id,
                "user": row.user_email,
                "cost_usd": round(row.cost_usd, 4),
                "duration_s": row.duration_s,
                "query_preview": row.query_preview,
            })
        
        return expensive
    
    def generate_alert(self, check_type, details):
        alert = {
            "timestamp": datetime.utcnow().isoformat(),
            "project": self.project_id,
            "type": check_type,
            "details": details,
        }
        
        logger.warning(f"ALERT: {check_type} - {json.dumps(details)}")
        return alert

# monitor = PipelineMonitor("my-project")
# jobs = monitor.check_job_status(hours=24)
# failed = [j for j in jobs if j["state"] != "DONE"]
# if failed:
#     print(f"Failed jobs: {len(failed)}")
# costs = monitor.get_cost_summary(days=30)
# print(f"Monthly cost: ")

Cost Optimization และ Best Practices

ลดค่าใช้จ่ายและ best practices

# === BigQuery Cost Optimization ===

# 1. Partitioning and Clustering
# ===================================
-- Always partition time-series tables
CREATE TABLE `project.analytics.events` (
    event_id STRING,
    user_id STRING,
    event_name STRING,
    event_timestamp TIMESTAMP,
    properties JSON
)
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_name
OPTIONS (
    partition_expiration_days = 365,
    require_partition_filter = true  -- Force partition filter in queries
);

# 2. Materialized Views (auto-refreshed)
# ===================================
CREATE MATERIALIZED VIEW `project.analytics.hourly_summary`
PARTITION BY DATE(event_hour)
CLUSTER BY event_name
AS
SELECT
    TIMESTAMP_TRUNC(event_timestamp, HOUR) AS event_hour,
    event_name,
    COUNT(*) AS event_count,
    COUNT(DISTINCT user_id) AS unique_users
FROM `project.analytics.events`
GROUP BY 1, 2;

-- BigQuery automatically uses materialized views
-- when it can answer queries from them (query rewrite)

# 3. Cost Control Best Practices
# ===================================

# Set custom cost controls per project
# bq update --default_table_expiration 7776000 my_dataset  # 90 days
# bq update --max_bytes_billed 1000000000000 my_dataset    # 1TB max

# Use LIMIT with caution (still scans all data)
# Use SELECT specific columns instead of SELECT *
# Use approximate functions: APPROX_COUNT_DISTINCT, APPROX_QUANTILES
# Use BI Engine for repeated dashboard queries

# 4. Scheduled Query Optimization
# ===================================
# - Use incremental processing (WHERE date = @run_date)
# - Avoid full table scans in scheduled queries
# - Use MERGE for upsert instead of DELETE + INSERT
# - Set appropriate schedule (don't run more often than needed)
# - Use dry_run to estimate costs before scheduling

# 5. Slot Reservations for Predictable Cost
# ===================================
# On-demand: $6.25/TB scanned (unpredictable)
# Flat-rate: $2,000/month for 100 slots (predictable)
# Editions: Autoscale with commitment discounts

# Break-even: ~320TB/month
# If scanning > 320TB/month, flat-rate is cheaper

# 6. Data Lifecycle Management
# ===================================
# Set partition expiration for auto-cleanup
ALTER TABLE `project.analytics.events`
SET OPTIONS (partition_expiration_days = 180);

# Archive old data to Cloud Storage
EXPORT DATA OPTIONS (
    uri = 'gs://my-bucket/archive/events_*.parquet',
    format = 'PARQUET',
    overwrite = true
) AS
SELECT * FROM `project.analytics.events`
WHERE DATE(event_timestamp) < DATE_SUB(CURRENT_DATE(), INTERVAL 180 DAY);

# Delete archived data
DELETE FROM `project.analytics.events`
WHERE DATE(event_timestamp) < DATE_SUB(CURRENT_DATE(), INTERVAL 180 DAY);

echo "BigQuery optimization complete"

FAQ คำถามที่พบบ่อย

Q: Scheduled Query มี timeout ไหม?

A: BigQuery queries มี default timeout 6 ชั่วโมง สำหรับ scheduled queries ก็เช่นกัน ถ้า query ใช้เวลานานกว่า 6 ชั่วโมงจะ fail ทางแก้คือแบ่ง query เป็นหลาย stages ที่เล็กลง ใช้ incremental processing แทน full table scan ใช้ partitioning และ clustering เพื่อลด data scanned และ optimize query ด้วย execution plan

Q: Scheduled Queries รัน fail แล้วจะเป็นอย่างไร?

A: BigQuery จะ retry อัตโนมัติตาม retry policy ที่ตั้งไว้ ส่ง email notification ไปยัง owner ของ scheduled query บันทึก error ใน run history ที่ดูได้ผ่าน Console หรือ API ถ้า fail ซ้ำหลายครั้ง BigQuery อาจ disable scheduled query สามารถตั้ง Cloud Monitoring alerts สำหรับ transfer failures เพื่อ notify ผ่าน Slack/PagerDuty

Q: ใช้ Scheduled Queries แทน Airflow ได้ไหม?

A: ได้สำหรับ simple SQL-based pipelines ที่ไม่ต้องการ complex dependencies Scheduled Queries เหมาะสำหรับ SQL transformations ที่ independent หรือ sequential ไม่เหมาะเมื่อต้องการ complex DAGs, conditional logic, non-SQL tasks (เช่น API calls, file processing), cross-service orchestration สำหรับ pipelines ซับซ้อน ใช้ Airflow/Prefect/Cloud Composer ที่เรียก BigQuery เป็น task

Q: ค่าใช้จ่ายของ Scheduled Queries เป็นอย่างไร?

A: ไม่มีค่าใช้จ่ายเพิ่มเติมสำหรับ scheduling ตัว scheduling เอง ค่าใช้จ่ายคิดจาก data scanned ตาม BigQuery pricing ปกติ ($6.25/TB on-demand) ดังนั้นถ้า query scan 100GB ต่อครั้ง รันวันละ 1 ครั้ง = 3TB/เดือน = $18.75/เดือน วิธีลดค่าใช้จ่าย ใช้ partitioning (scan เฉพาะ partition ที่ต้องการ), clustering, materialized views และ incremental processing

📖 บทความที่เกี่ยวข้อง

BigQuery Scheduled Query Zero Downtime Deploymentอ่านบทความ → BigQuery Scheduled Query Technical Debt Managementอ่านบทความ → HTTP/3 QUIC Distributed Systemอ่านบทความ → BigQuery Scheduled Query Certification Pathอ่านบทความ → BigQuery Scheduled Query Learning Path Roadmapอ่านบทความ →

📚 ดูบทความทั้งหมด →