BigQuery Scheduled Queries คืออะไร
BigQuery Scheduled Queries เป็น feature ที่ให้ตั้ง SQL queries ให้รันอัตโนมัติตาม schedule ที่กำหนด เช่น ทุกชั่วโมง ทุกวัน หรือทุกสัปดาห์ ใช้สำหรับ ETL processes, data aggregation, reporting tables และ data warehouse maintenance โดยไม่ต้องสร้าง infrastructure เพิ่มเติม
ข้อดีของ BigQuery Scheduled Queries ได้แก่ serverless ไม่ต้องจัดการ infrastructure, built-in retry logic สำหรับ failed queries, parameterized queries ด้วย @run_time และ @run_date, email notifications เมื่อ query fail, integration กับ BigQuery Data Transfer Service และ IAM-based access control
Distributed System ใน context นี้หมายถึงการออกแบบ data pipeline ที่กระจาย workload ออกเป็นหลาย scheduled queries ที่ทำงานประสานกัน แทนที่จะรัน single massive query ที่ใช้เวลานานและเสี่ยง timeout ช่วยให้ pipeline reliable, maintainable และ cost-efficient มากขึ้น
Use cases ได้แก่ daily/hourly data aggregation, materialized view refreshes, data quality checks, cross-dataset data synchronization, reporting table preparation และ data retention/cleanup
ตั้งค่า Scheduled Queries ใน BigQuery
วิธีสร้างและจัดการ Scheduled Queries
# === BigQuery Scheduled Queries Setup ===
# 1. Using gcloud CLI
# ===================================
# ติดตั้ง gcloud
# curl https://sdk.cloud.google.com | bash
# gcloud init
# สร้าง Scheduled Query
bq mk --transfer_config \
--project_id=my-project \
--data_source=scheduled_query \
--target_dataset=analytics \
--display_name="Daily User Aggregation" \
--schedule="every 24 hours" \
--params='{
"query": "INSERT INTO `my-project.analytics.daily_users` SELECT DATE(event_timestamp) AS event_date, COUNT(DISTINCT user_id) AS unique_users, COUNT(*) AS total_events, COUNTIF(event_name = \"purchase\") AS purchases FROM `my-project.raw.events` WHERE DATE(event_timestamp) = DATE_SUB(@run_date, INTERVAL 1 DAY) GROUP BY 1",
"destination_table_name_template": "",
"write_disposition": "WRITE_APPEND"
}'
# List scheduled queries
bq ls --transfer_config --transfer_location=us
# Check run history
bq ls --transfer_run --transfer_location=us \
--run_attempt=LATEST \
projects/my-project/locations/us/transferConfigs/TRANSFER_ID
# 2. Using SQL directly in BigQuery Console
# ===================================
-- Daily Sales Aggregation
-- Schedule: Every day at 06:00 UTC
-- Destination: analytics.daily_sales
SELECT
DATE(order_timestamp) AS order_date,
product_category,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers
FROM `my-project.raw.orders`
WHERE DATE(order_timestamp) = DATE_SUB(@run_date, INTERVAL 1 DAY)
GROUP BY 1, 2;
-- Hourly Metrics Rollup
-- Schedule: Every hour
SELECT
TIMESTAMP_TRUNC(@run_time, HOUR) AS metric_hour,
service_name,
COUNT(*) AS request_count,
AVG(latency_ms) AS avg_latency,
APPROX_QUANTILES(latency_ms, 100)[OFFSET(95)] AS p95_latency,
COUNTIF(status_code >= 500) AS error_count,
SAFE_DIVIDE(COUNTIF(status_code >= 500), COUNT(*)) * 100 AS error_rate_pct
FROM `my-project.logs.requests`
WHERE timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 1 HOUR)
AND timestamp < @run_time
GROUP BY 1, 2;
-- Weekly Data Quality Check
-- Schedule: Every Monday at 00:00 UTC
SELECT
'orders' AS table_name,
COUNT(*) AS total_rows,
COUNTIF(customer_id IS NULL) AS null_customer_ids,
COUNTIF(amount <= 0) AS invalid_amounts,
COUNT(DISTINCT DATE(order_timestamp)) AS distinct_dates,
MIN(order_timestamp) AS earliest_record,
MAX(order_timestamp) AS latest_record,
CURRENT_TIMESTAMP() AS check_timestamp
FROM `my-project.raw.orders`
WHERE DATE(order_timestamp) >= DATE_SUB(@run_date, INTERVAL 7 DAY);
สร้าง Distributed Data Pipeline
ออกแบบ pipeline แบบ distributed
# === Distributed Pipeline Architecture ===
# Pipeline Stages (each is a separate scheduled query):
#
# Stage 1: Raw Data Ingestion (every hour)
# └── Extract from source tables, light transformation
#
# Stage 2: Data Cleansing (every hour, 15 min after Stage 1)
# └── Remove duplicates, fix data types, handle nulls
#
# Stage 3: Aggregation (every hour, 30 min after Stage 1)
# └── Pre-aggregate for common query patterns
#
# Stage 4: Reporting Tables (daily at 07:00)
# └── Build final reporting tables from aggregations
#
# Stage 5: Data Quality (daily at 08:00)
# └── Validate data quality, alert on issues
# === Stage 1: Raw Data Extraction ===
-- Schedule: every 1 hour
-- Name: stage1_extract_events
CREATE OR REPLACE TABLE `project.staging.events_hourly`
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_name
AS
SELECT
event_id,
user_id,
event_name,
event_timestamp,
PARSE_JSON(event_params) AS params,
device_type,
country,
session_id,
CURRENT_TIMESTAMP() AS processed_at
FROM `project.raw.events_stream`
WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
AND event_timestamp < @run_time;
# === Stage 2: Data Cleansing ===
-- Schedule: every 1 hour (offset 15 min)
-- Name: stage2_cleanse
INSERT INTO `project.clean.events`
SELECT DISTINCT
event_id,
user_id,
LOWER(TRIM(event_name)) AS event_name,
event_timestamp,
params,
COALESCE(device_type, 'unknown') AS device_type,
COALESCE(country, 'unknown') AS country,
session_id,
processed_at
FROM `project.staging.events_hourly`
WHERE event_id NOT IN (
SELECT event_id FROM `project.clean.events`
WHERE DATE(event_timestamp) >= DATE_SUB(CURRENT_DATE(), INTERVAL 2 DAY)
);
# === Stage 3: Hourly Aggregation ===
-- Schedule: every 1 hour (offset 30 min)
-- Name: stage3_aggregate
MERGE `project.analytics.hourly_metrics` AS target
USING (
SELECT
TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,
event_name,
country,
device_type,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(DISTINCT session_id) AS sessions
FROM `project.clean.events`
WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
AND event_timestamp < @run_time
GROUP BY 1, 2, 3, 4
) AS source
ON target.hour = source.hour
AND target.event_name = source.event_name
AND target.country = source.country
AND target.device_type = source.device_type
WHEN MATCHED THEN
UPDATE SET
event_count = source.event_count,
unique_users = source.unique_users,
sessions = source.sessions
WHEN NOT MATCHED THEN
INSERT (hour, event_name, country, device_type, event_count, unique_users, sessions)
VALUES (source.hour, source.event_name, source.country, source.device_type,
source.event_count, source.unique_users, source.sessions);
# === Stage 4: Daily Reporting ===
-- Schedule: daily at 07:00 UTC
-- Name: stage4_daily_report
CREATE OR REPLACE TABLE `project.reporting.daily_dashboard` AS
SELECT
DATE(hour) AS report_date,
SUM(event_count) AS total_events,
SUM(unique_users) AS total_users,
SUM(sessions) AS total_sessions,
SUM(IF(event_name = 'purchase', event_count, 0)) AS purchases,
SAFE_DIVIDE(
SUM(IF(event_name = 'purchase', unique_users, 0)),
SUM(unique_users)
) * 100 AS conversion_rate_pct,
ARRAY_AGG(STRUCT(country, SUM(unique_users) AS users) ORDER BY SUM(unique_users) DESC LIMIT 10) AS top_countries
FROM `project.analytics.hourly_metrics`
WHERE DATE(hour) >= DATE_SUB(@run_date, INTERVAL 30 DAY)
GROUP BY 1
ORDER BY 1 DESC;
Orchestration ด้วย Python และ Cloud Functions
จัดการ scheduled queries ด้วย Python
#!/usr/bin/env python3
# bq_orchestrator.py — BigQuery Pipeline Orchestration
from google.cloud import bigquery
from google.cloud import bigquery_datatransfer
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("bq_orchestrator")
class BigQueryOrchestrator:
def __init__(self, project_id):
self.project_id = project_id
self.bq_client = bigquery.Client(project=project_id)
self.transfer_client = bigquery_datatransfer.DataTransferServiceClient()
self.parent = f"projects/{project_id}/locations/us"
def create_scheduled_query(self, name, query, schedule,
dataset="analytics",
destination_table=None,
write_disposition="WRITE_APPEND"):
config = bigquery_datatransfer.TransferConfig(
display_name=name,
data_source_id="scheduled_query",
destination_dataset_id=dataset,
schedule=schedule,
params={
"query": query,
"write_disposition": write_disposition,
},
)
if destination_table:
config.params["destination_table_name_template"] = destination_table
result = self.transfer_client.create_transfer_config(
parent=self.parent,
transfer_config=config,
)
logger.info(f"Created scheduled query: {name} -> {result.name}")
return result.name
def list_scheduled_queries(self):
configs = self.transfer_client.list_transfer_configs(parent=self.parent)
queries = []
for config in configs:
if config.data_source_id == "scheduled_query":
queries.append({
"name": config.display_name,
"config_id": config.name,
"schedule": config.schedule,
"state": config.state.name,
"dataset": config.destination_dataset_id,
"update_time": config.update_time.isoformat() if config.update_time else None,
})
return queries
def get_run_history(self, config_id, limit=10):
runs = self.transfer_client.list_transfer_runs(parent=config_id)
history = []
for i, run in enumerate(runs):
if i >= limit:
break
history.append({
"run_id": run.name,
"state": run.state.name,
"start_time": run.start_time.isoformat() if run.start_time else None,
"end_time": run.end_time.isoformat() if run.end_time else None,
"error": run.error_status.message if run.error_status else None,
})
return history
def run_query(self, query, destination_table=None, dry_run=False):
job_config = bigquery.QueryJobConfig(dry_run=dry_run)
if destination_table:
job_config.destination = destination_table
job_config.write_disposition = "WRITE_TRUNCATE"
job = self.bq_client.query(query, job_config=job_config)
if dry_run:
return {
"bytes_processed": job.total_bytes_processed,
"estimated_cost_usd": job.total_bytes_processed / 1e12 * 6.25,
}
result = job.result()
return {
"job_id": job.job_id,
"rows_affected": job.num_dml_affected_rows or result.total_rows,
"bytes_processed": job.total_bytes_processed,
"slot_ms": job.slot_millis,
"duration_s": (job.ended - job.started).total_seconds(),
}
def check_table_freshness(self, dataset, table):
table_ref = f"{self.project_id}.{dataset}.{table}"
tbl = self.bq_client.get_table(table_ref)
modified = tbl.modified
age_hours = (datetime.utcnow() - modified.replace(tzinfo=None)).total_seconds() / 3600
return {
"table": table_ref,
"last_modified": modified.isoformat(),
"age_hours": round(age_hours, 1),
"num_rows": tbl.num_rows,
"size_bytes": tbl.num_bytes,
"is_stale": age_hours > 25,
}
def pipeline_health_check(self, pipeline_queries):
health = {"timestamp": datetime.utcnow().isoformat(), "queries": []}
for q in pipeline_queries:
runs = self.get_run_history(q["config_id"], limit=5)
recent_failures = sum(1 for r in runs if r["state"] == "FAILED")
last_success = next(
(r for r in runs if r["state"] == "SUCCEEDED"), None
)
health["queries"].append({
"name": q["name"],
"recent_failures": recent_failures,
"last_success": last_success["end_time"] if last_success else None,
"status": "healthy" if recent_failures == 0 else "degraded",
})
health["overall"] = (
"healthy" if all(q["status"] == "healthy" for q in health["queries"])
else "degraded"
)
return health
# orchestrator = BigQueryOrchestrator("my-project")
# queries = orchestrator.list_scheduled_queries()
# for q in queries:
# print(f"{q['name']}: {q['schedule']} ({q['state']})")
# health = orchestrator.pipeline_health_check(queries)
Monitoring และ Error Handling
Monitor pipeline และจัดการ errors
#!/usr/bin/env python3
# pipeline_monitor.py — BigQuery Pipeline Monitoring
from google.cloud import bigquery
from google.cloud import monitoring_v3
import json
import logging
from datetime import datetime, timedelta
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")
class PipelineMonitor:
def __init__(self, project_id):
self.project_id = project_id
self.bq_client = bigquery.Client(project=project_id)
def check_job_status(self, hours=24):
query = f"""
SELECT
job_id,
creation_time,
end_time,
state,
total_bytes_processed,
total_slot_ms,
error_result.reason AS error_reason,
error_result.message AS error_message,
TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS duration_s
FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`
WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {hours} HOUR)
AND job_type = 'QUERY'
ORDER BY creation_time DESC
LIMIT 100
"""
results = self.bq_client.query(query).result()
jobs = []
for row in results:
jobs.append({
"job_id": row.job_id,
"state": row.state,
"duration_s": row.duration_s,
"bytes_processed": row.total_bytes_processed,
"error": row.error_message,
})
return jobs
def get_cost_summary(self, days=30):
query = f"""
SELECT
DATE(creation_time) AS query_date,
COUNT(*) AS query_count,
SUM(total_bytes_processed) AS total_bytes,
SUM(total_bytes_processed) / POW(10, 12) * 6.25 AS estimated_cost_usd,
AVG(TIMESTAMP_DIFF(end_time, creation_time, SECOND)) AS avg_duration_s,
SUM(total_slot_ms) / 1000 AS total_slot_seconds
FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`
WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
GROUP BY 1
ORDER BY 1 DESC
"""
results = self.bq_client.query(query).result()
daily_costs = []
for row in results:
daily_costs.append({
"date": row.query_date.isoformat(),
"queries": row.query_count,
"bytes_processed_tb": round(row.total_bytes / 1e12, 3),
"cost_usd": round(row.estimated_cost_usd, 2),
"avg_duration_s": round(row.avg_duration_s, 1),
})
total_cost = sum(d["cost_usd"] for d in daily_costs)
return {
"period_days": days,
"total_cost_usd": round(total_cost, 2),
"avg_daily_cost_usd": round(total_cost / max(len(daily_costs), 1), 2),
"daily_breakdown": daily_costs[:7],
}
def find_expensive_queries(self, days=7, limit=10):
query = f"""
SELECT
job_id,
user_email,
creation_time,
total_bytes_processed,
total_bytes_processed / POW(10, 12) * 6.25 AS cost_usd,
total_slot_ms / 1000 AS slot_seconds,
TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS duration_s,
SUBSTR(query, 1, 200) AS query_preview
FROM `{self.project_id}.region-us.INFORMATION_SCHEMA.JOBS`
WHERE creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL {days} DAY)
AND job_type = 'QUERY'
AND state = 'DONE'
ORDER BY total_bytes_processed DESC
LIMIT {limit}
"""
results = self.bq_client.query(query).result()
expensive = []
for row in results:
expensive.append({
"job_id": row.job_id,
"user": row.user_email,
"cost_usd": round(row.cost_usd, 4),
"duration_s": row.duration_s,
"query_preview": row.query_preview,
})
return expensive
def generate_alert(self, check_type, details):
alert = {
"timestamp": datetime.utcnow().isoformat(),
"project": self.project_id,
"type": check_type,
"details": details,
}
logger.warning(f"ALERT: {check_type} - {json.dumps(details)}")
return alert
# monitor = PipelineMonitor("my-project")
# jobs = monitor.check_job_status(hours=24)
# failed = [j for j in jobs if j["state"] != "DONE"]
# if failed:
# print(f"Failed jobs: {len(failed)}")
# costs = monitor.get_cost_summary(days=30)
# print(f"Monthly cost: ")
Cost Optimization และ Best Practices
ลดค่าใช้จ่ายและ best practices
# === BigQuery Cost Optimization ===
# 1. Partitioning and Clustering
# ===================================
-- Always partition time-series tables
CREATE TABLE `project.analytics.events` (
event_id STRING,
user_id STRING,
event_name STRING,
event_timestamp TIMESTAMP,
properties JSON
)
PARTITION BY DATE(event_timestamp)
CLUSTER BY user_id, event_name
OPTIONS (
partition_expiration_days = 365,
require_partition_filter = true -- Force partition filter in queries
);
# 2. Materialized Views (auto-refreshed)
# ===================================
CREATE MATERIALIZED VIEW `project.analytics.hourly_summary`
PARTITION BY DATE(event_hour)
CLUSTER BY event_name
AS
SELECT
TIMESTAMP_TRUNC(event_timestamp, HOUR) AS event_hour,
event_name,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users
FROM `project.analytics.events`
GROUP BY 1, 2;
-- BigQuery automatically uses materialized views
-- when it can answer queries from them (query rewrite)
# 3. Cost Control Best Practices
# ===================================
# Set custom cost controls per project
# bq update --default_table_expiration 7776000 my_dataset # 90 days
# bq update --max_bytes_billed 1000000000000 my_dataset # 1TB max
# Use LIMIT with caution (still scans all data)
# Use SELECT specific columns instead of SELECT *
# Use approximate functions: APPROX_COUNT_DISTINCT, APPROX_QUANTILES
# Use BI Engine for repeated dashboard queries
# 4. Scheduled Query Optimization
# ===================================
# - Use incremental processing (WHERE date = @run_date)
# - Avoid full table scans in scheduled queries
# - Use MERGE for upsert instead of DELETE + INSERT
# - Set appropriate schedule (don't run more often than needed)
# - Use dry_run to estimate costs before scheduling
# 5. Slot Reservations for Predictable Cost
# ===================================
# On-demand: $6.25/TB scanned (unpredictable)
# Flat-rate: $2,000/month for 100 slots (predictable)
# Editions: Autoscale with commitment discounts
# Break-even: ~320TB/month
# If scanning > 320TB/month, flat-rate is cheaper
# 6. Data Lifecycle Management
# ===================================
# Set partition expiration for auto-cleanup
ALTER TABLE `project.analytics.events`
SET OPTIONS (partition_expiration_days = 180);
# Archive old data to Cloud Storage
EXPORT DATA OPTIONS (
uri = 'gs://my-bucket/archive/events_*.parquet',
format = 'PARQUET',
overwrite = true
) AS
SELECT * FROM `project.analytics.events`
WHERE DATE(event_timestamp) < DATE_SUB(CURRENT_DATE(), INTERVAL 180 DAY);
# Delete archived data
DELETE FROM `project.analytics.events`
WHERE DATE(event_timestamp) < DATE_SUB(CURRENT_DATE(), INTERVAL 180 DAY);
echo "BigQuery optimization complete"
FAQ คำถามที่พบบ่อย
Q: Scheduled Query มี timeout ไหม?
A: BigQuery queries มี default timeout 6 ชั่วโมง สำหรับ scheduled queries ก็เช่นกัน ถ้า query ใช้เวลานานกว่า 6 ชั่วโมงจะ fail ทางแก้คือแบ่ง query เป็นหลาย stages ที่เล็กลง ใช้ incremental processing แทน full table scan ใช้ partitioning และ clustering เพื่อลด data scanned และ optimize query ด้วย execution plan
Q: Scheduled Queries รัน fail แล้วจะเป็นอย่างไร?
A: BigQuery จะ retry อัตโนมัติตาม retry policy ที่ตั้งไว้ ส่ง email notification ไปยัง owner ของ scheduled query บันทึก error ใน run history ที่ดูได้ผ่าน Console หรือ API ถ้า fail ซ้ำหลายครั้ง BigQuery อาจ disable scheduled query สามารถตั้ง Cloud Monitoring alerts สำหรับ transfer failures เพื่อ notify ผ่าน Slack/PagerDuty
Q: ใช้ Scheduled Queries แทน Airflow ได้ไหม?
A: ได้สำหรับ simple SQL-based pipelines ที่ไม่ต้องการ complex dependencies Scheduled Queries เหมาะสำหรับ SQL transformations ที่ independent หรือ sequential ไม่เหมาะเมื่อต้องการ complex DAGs, conditional logic, non-SQL tasks (เช่น API calls, file processing), cross-service orchestration สำหรับ pipelines ซับซ้อน ใช้ Airflow/Prefect/Cloud Composer ที่เรียก BigQuery เป็น task
Q: ค่าใช้จ่ายของ Scheduled Queries เป็นอย่างไร?
A: ไม่มีค่าใช้จ่ายเพิ่มเติมสำหรับ scheduling ตัว scheduling เอง ค่าใช้จ่ายคิดจาก data scanned ตาม BigQuery pricing ปกติ ($6.25/TB on-demand) ดังนั้นถ้า query scan 100GB ต่อครั้ง รันวันละ 1 ครั้ง = 3TB/เดือน = $18.75/เดือน วิธีลดค่าใช้จ่าย ใช้ partitioning (scan เฉพาะ partition ที่ต้องการ), clustering, materialized views และ incremental processing
