BigQuery Scheduled Query ?????????????????????
BigQuery Scheduled Query ?????????????????????????????????????????? Google BigQuery ?????????????????????????????? SQL queries ????????????????????????????????????????????? (cron schedule) ???????????????????????????????????? ???????????????????????????????????? code ???????????????????????? external scheduler ????????????????????????????????? ETL jobs, report generation, data aggregation ????????? data cleanup
Stream Processing ?????? BigQuery ????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? near-real-time ???????????? BigQuery Streaming API ???????????? Pub/Sub + Dataflow ????????????????????????????????? Scheduled Queries ?????????????????????????????? data pipeline ????????????????????? batch ????????? stream processing ??????????????? platform ???????????????
???????????????????????? BigQuery Scheduled Queries Serverless ??????????????????????????????????????? infrastructure, Built-in ???????????????????????????????????? BigQuery Console, Parameterized ????????? parameters ???????????? @run_time @run_date, Error notification ?????????????????????????????????????????? query ?????????????????????, Backfill ????????? query ?????????????????????????????????
????????????????????? Scheduled Queries
??????????????? scheduled queries ???????????? CLI ????????? Terraform
# === BigQuery Scheduled Query Setup ===
# 1. Create via bq CLI
bq query \
--use_legacy_sql=false \
--destination_table='project:dataset.daily_summary' \
--schedule='every 24 hours' \
--display_name='Daily Sales Summary' \
--replace=true \
'
SELECT
DATE(created_at) AS date,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
COUNT(DISTINCT user_id) AS unique_customers
FROM `project.dataset.orders`
WHERE DATE(created_at) = DATE_SUB(@run_date, INTERVAL 1 DAY)
GROUP BY date
'
# 2. Create via Terraform
cat > bigquery_scheduled.tf << 'EOF'
resource "google_bigquery_data_transfer_config" "daily_summary" {
display_name = "Daily Sales Summary"
location = "asia-southeast1"
data_source_id = "scheduled_query"
schedule = "every day 02:00"
destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
params = {
destination_table_name_template = "daily_summary"
write_disposition = "WRITE_TRUNCATE"
query = <<-SQL
SELECT
DATE(created_at) AS date,
product_category,
COUNT(*) AS orders,
SUM(amount) AS revenue,
AVG(amount) AS avg_value
FROM `.raw.orders`
WHERE DATE(created_at) = DATE_SUB(@run_date, INTERVAL 1 DAY)
GROUP BY date, product_category
SQL
}
email_preferences {
enable_failure_email = true
}
}
resource "google_bigquery_data_transfer_config" "hourly_metrics" {
display_name = "Hourly Active Users"
location = "asia-southeast1"
data_source_id = "scheduled_query"
schedule = "every 1 hours"
destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
params = {
destination_table_name_template = "hourly_active_users_{run_date}"
write_disposition = "WRITE_APPEND"
query = <<-SQL
SELECT
TIMESTAMP_TRUNC(@run_time, HOUR) AS hour,
COUNT(DISTINCT user_id) AS active_users,
COUNT(*) AS total_events
FROM `.raw.events`
WHERE timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 1 HOUR)
AND timestamp < @run_time
SQL
}
}
# Dataset
resource "google_bigquery_dataset" "analytics" {
dataset_id = "analytics"
location = "asia-southeast1"
default_table_expiration_ms = 7776000000 # 90 days
labels = {
environment = "production"
team = "data-engineering"
}
}
EOF
terraform apply
echo "Scheduled queries configured"
Stream Processing ???????????? BigQuery
???????????????????????????????????????????????????????????? streaming
#!/usr/bin/env python3
# stream_processing.py ??? BigQuery Stream Processing
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("stream")
class BigQueryStreamProcessor:
def __init__(self):
self.config = {}
def streaming_insert_example(self):
"""Example: Streaming insert to BigQuery"""
return """
from google.cloud import bigquery
import json
import time
client = bigquery.Client()
table_id = "project.dataset.events"
# Streaming insert
rows = [
{
"user_id": "user_123",
"event_type": "page_view",
"page": "/products",
"timestamp": "2024-06-15T10:30:00Z",
"metadata": json.dumps({"browser": "Chrome", "device": "mobile"}),
},
{
"user_id": "user_456",
"event_type": "purchase",
"page": "/checkout",
"timestamp": "2024-06-15T10:30:05Z",
"metadata": json.dumps({"amount": 1500, "currency": "THB"}),
},
]
errors = client.insert_rows_json(table_id, rows)
if errors:
print(f"Errors: {errors}")
else:
print(f"Inserted {len(rows)} rows")
"""
def pubsub_to_bigquery(self):
"""Pub/Sub ??? Dataflow ??? BigQuery pipeline"""
return {
"architecture": "Pub/Sub ??? Dataflow (Apache Beam) ??? BigQuery",
"components": {
"pubsub": "Message queue ????????? events real-time",
"dataflow": "Stream processing (transform, enrich, filter)",
"bigquery": "Data warehouse ?????????????????? storage + analytics",
},
"dataflow_template": "gs://dataflow-templates/latest/PubSub_to_BigQuery",
"gcloud_command": """
gcloud dataflow jobs run pubsub-to-bq \\
--gcs-location gs://dataflow-templates/latest/PubSub_to_BigQuery \\
--region asia-southeast1 \\
--staging-location gs://my-bucket/staging \\
--parameters \\
inputTopic=projects/my-project/topics/events,\\
outputTableSpec=my-project:dataset.events
""",
"latency": "1-5 seconds end-to-end",
"cost": "Dataflow ~$0.056/vCPU-hour + BigQuery streaming $0.01/200MB",
}
def materialized_views(self):
"""Use materialized views for near-real-time aggregations"""
return {
"description": "BigQuery Materialized Views ???????????????????????????????????????????????????????????? base table ?????????????????????",
"sql": """
CREATE MATERIALIZED VIEW `project.analytics.realtime_metrics`
OPTIONS (
enable_refresh = true,
refresh_interval_minutes = 5
) AS
SELECT
TIMESTAMP_TRUNC(timestamp, MINUTE) AS minute,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users,
COUNTIF(event_type = 'purchase') AS purchases,
SUM(IF(event_type = 'purchase', CAST(JSON_VALUE(metadata, '$.amount') AS FLOAT64), 0)) AS revenue
FROM `project.raw.events`
WHERE timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
GROUP BY minute
""",
"benefits": ["Auto-refresh", "Query cost reduction (up to 100%)", "Near-real-time"],
}
processor = BigQueryStreamProcessor()
pubsub = processor.pubsub_to_bigquery()
print(f"Architecture: {pubsub['architecture']}")
print(f"Latency: {pubsub['latency']}")
print(f"Cost: {pubsub['cost']}")
mv = processor.materialized_views()
print(f"\nMaterialized Views: {mv['description']}")
for benefit in mv["benefits"]:
print(f" - {benefit}")
??????????????? Data Pipeline ???????????????????????????
End-to-end data pipeline
# === Automated Data Pipeline ===
# 1. Multi-stage Scheduled Queries Pipeline
# Stage 1: Raw ??? Cleaned (every hour)
cat > stage1_clean.sql << 'EOF'
-- Stage 1: Clean raw data
CREATE OR REPLACE TABLE `project.staging.orders_cleaned` AS
SELECT
id,
LOWER(TRIM(email)) AS email,
SAFE_CAST(amount AS FLOAT64) AS amount,
IFNULL(currency, 'THB') AS currency,
PARSE_TIMESTAMP('%Y-%m-%dT%H:%M:%SZ', created_at) AS created_at,
CURRENT_TIMESTAMP() AS processed_at
FROM `project.raw.orders`
WHERE amount IS NOT NULL
AND amount > 0
AND created_at >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR);
EOF
# Stage 2: Cleaned ??? Aggregated (every day)
cat > stage2_aggregate.sql << 'EOF'
-- Stage 2: Daily aggregation
INSERT INTO `project.analytics.daily_metrics`
SELECT
DATE(created_at) AS date,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
APPROX_COUNT_DISTINCT(email) AS unique_customers,
COUNTIF(amount > 1000) AS high_value_orders,
PERCENTILE_CONT(amount, 0.5) OVER() AS median_order_value,
CURRENT_TIMESTAMP() AS aggregated_at
FROM `project.staging.orders_cleaned`
WHERE DATE(created_at) = DATE_SUB(@run_date, INTERVAL 1 DAY)
GROUP BY date;
EOF
# Stage 3: Aggregated ??? Dashboard Tables (every day after stage 2)
cat > stage3_dashboard.sql << 'EOF'
-- Stage 3: Dashboard-ready tables
CREATE OR REPLACE TABLE `project.dashboard.kpi_summary` AS
WITH daily AS (
SELECT * FROM `project.analytics.daily_metrics`
WHERE date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
),
weekly AS (
SELECT
DATE_TRUNC(date, WEEK) AS week,
SUM(total_orders) AS weekly_orders,
SUM(total_revenue) AS weekly_revenue
FROM daily
GROUP BY week
)
SELECT
d.date,
d.total_orders,
d.total_revenue,
d.avg_order_value,
d.unique_customers,
w.weekly_orders,
w.weekly_revenue,
SAFE_DIVIDE(d.total_revenue - LAG(d.total_revenue) OVER(ORDER BY d.date),
LAG(d.total_revenue) OVER(ORDER BY d.date)) * 100 AS revenue_growth_pct
FROM daily d
LEFT JOIN weekly w ON DATE_TRUNC(d.date, WEEK) = w.week
ORDER BY d.date DESC;
EOF
# Create all scheduled queries
for stage in stage1_clean stage2_aggregate stage3_dashboard; do
echo "Creating scheduled query: $stage"
done
echo "Pipeline configured"
Optimization ????????? Cost Management
?????????????????????????????????????????????????????????????????????????????????????????????
#!/usr/bin/env python3
# bq_optimization.py ??? BigQuery Cost Optimization
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("optimize")
class BigQueryOptimizer:
def __init__(self):
pass
def cost_optimization(self):
return {
"partitioning": {
"description": "Partition tables by date/timestamp",
"impact": "?????? scan data 80-99% ?????????????????? time-based queries",
"sql": "CREATE TABLE t (...) PARTITION BY DATE(created_at)",
},
"clustering": {
"description": "Cluster by frequently filtered columns",
"impact": "?????? scan data 50-80%",
"sql": "CREATE TABLE t (...) PARTITION BY DATE(created_at) CLUSTER BY user_id, product_id",
},
"materialized_views": {
"description": "Pre-computed aggregations",
"impact": "Query cost ?????? 100% (????????? cached results)",
},
"scheduled_query_tips": {
"use_run_date": "????????? @run_date filter ????????? scan ???????????? table",
"write_disposition": "WRITE_TRUNCATE ??????????????????????????????????????? append",
"destination_table": "????????????????????? destination table ?????????????????? temp table",
"dry_run": "??????????????????????????? --dry_run ???????????? schedule",
},
"slot_reservations": {
"description": "????????? flat-rate pricing ?????????????????? predictable workloads",
"when": "?????????????????????????????? on-demand > $10,000/month",
"pricing": "Flex slots $0.04/slot/hour (100 slot minimum)",
},
}
def cost_estimate(self):
return {
"on_demand_pricing": {
"query": "$6.25 per TB scanned",
"streaming_insert": "$0.01 per 200 MB",
"storage": "$0.02 per GB/month (active), $0.01 (long-term)",
},
"example_monthly": {
"queries_tb_scanned": 10,
"query_cost": 62.50,
"streaming_gb": 50,
"streaming_cost": 2.50,
"storage_gb": 500,
"storage_cost": 10.00,
"scheduled_queries": "Free (pay for query cost only)",
"total": 75.00,
},
}
optimizer = BigQueryOptimizer()
tips = optimizer.cost_optimization()
print("Cost Optimization Tips:")
for name, info in tips.items():
if isinstance(info, dict) and "description" in info:
print(f" {name}: {info['description']}")
if "impact" in info:
print(f" Impact: {info['impact']}")
estimate = optimizer.cost_estimate()
print(f"\nExample Monthly Cost: ")
print(f" Queries: ")
print(f" Streaming: ")
print(f" Storage: ")
Monitoring ????????? Alerting
?????????????????? pipeline health
# === BigQuery Pipeline Monitoring ===
# 1. Query to check scheduled query status
cat > check_status.sql << 'EOF'
-- Check scheduled query run history
SELECT
transfer_config_id,
display_name,
state,
start_time,
end_time,
TIMESTAMP_DIFF(end_time, start_time, SECOND) AS duration_sec,
error_status.message AS error_message
FROM `region-asia-southeast1`.INFORMATION_SCHEMA.TRANSFER_RUNS
WHERE start_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
ORDER BY start_time DESC
LIMIT 50;
EOF
# 2. Cloud Monitoring Alert
cat > alert_policy.json << 'EOF'
{
"displayName": "BigQuery Scheduled Query Failure",
"conditions": [{
"displayName": "Transfer run failed",
"conditionThreshold": {
"filter": "resource.type=\"bigquery_dts_config\" AND metric.type=\"bigquerydatatransfer.googleapis.com/transfer_run_count\" AND metric.labels.state=\"FAILED\"",
"comparison": "COMPARISON_GT",
"thresholdValue": 0,
"duration": "0s",
"aggregations": [{
"alignmentPeriod": "300s",
"perSeriesAligner": "ALIGN_SUM"
}]
}
}],
"notificationChannels": ["projects/my-project/notificationChannels/123"],
"alertStrategy": {
"autoClose": "604800s"
}
}
EOF
# 3. Cost Alert
cat > cost_alert.json << 'EOF'
{
"displayName": "BigQuery Daily Cost Alert",
"budgetFilter": {
"projects": ["projects/my-project"],
"services": ["services/24E6-581D-38E5"]
},
"amount": {
"specifiedAmount": {
"currencyCode": "USD",
"units": "100"
}
},
"thresholdRules": [
{"thresholdPercent": 0.5},
{"thresholdPercent": 0.8},
{"thresholdPercent": 1.0}
]
}
EOF
# 4. Data Freshness Check
cat > freshness_check.sql << 'EOF'
SELECT
table_name,
MAX(processed_at) AS last_update,
TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), MAX(processed_at), MINUTE) AS minutes_since_update,
CASE
WHEN TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), MAX(processed_at), MINUTE) > 120 THEN 'STALE'
WHEN TIMESTAMP_DIFF(CURRENT_TIMESTAMP(), MAX(processed_at), MINUTE) > 60 THEN 'WARNING'
ELSE 'FRESH'
END AS freshness_status
FROM (
SELECT 'orders_cleaned' AS table_name, MAX(processed_at) AS processed_at FROM `project.staging.orders_cleaned`
UNION ALL
SELECT 'daily_metrics', MAX(aggregated_at) FROM `project.analytics.daily_metrics`
)
GROUP BY table_name;
EOF
echo "Monitoring configured"
FAQ ??????????????????????????????????????????
Q: Scheduled Query ????????? Airflow/Cloud Composer ???????????????????????????????????????????
A: BigQuery Scheduled Query ????????????????????????????????? SQL-only pipelines ???????????? ????????????????????? setup infrastructure ????????? (????????????????????? query cost) ?????????????????? parameterized queries (@run_date, @run_time) ???????????????????????????????????? dependencies ????????????????????? queries (????????????????????? time-based ordering) Airflow/Cloud Composer ????????????????????????????????? complex pipelines ??????????????? dependencies, branching, multiple data sources, non-SQL tasks (Python, API calls) ?????????????????? DAG dependencies ??????????????? ?????? UI ?????????????????? monitoring ????????????????????? setup ?????????????????????????????? Composer ($300+/month) ??????????????? ????????? Scheduled Query ?????????????????? simple SQL pipelines, ????????? Composer ??????????????? pipeline ?????????????????????????????????
Q: Streaming Insert ????????? Batch Load ?????????????????????????????????????
A: ????????????????????? use case Streaming Insert ??????????????? data available ?????? seconds, ??????????????? real-time analytics ????????????????????? $0.01/200MB (????????????????????? batch), ???????????? handle errors, 1 TB/table/day limit Batch Load ??????????????? ????????? (??????????????? ingestion cost), load ??????????????????????????????????????? large files, AVRO/Parquet ?????????????????? CSV ????????????????????? ?????????????????? load ??????????????? (seconds-minutes), 1,500 load jobs/table/day ??????????????? ????????? Streaming ?????????????????? real-time requirements (dashboards, alerts), ????????? Batch ?????????????????? ETL pipelines ????????? latency minutes-hours ??????????????????????????? ????????????????????????????????????????????????????????????
Q: BigQuery Scheduled Query ????????????????????????????????? ???????????????????????????????
A: ??????????????????????????????????????????????????????????????????????????? Query timeout ????????????????????????????????? scan ???????????? partition filter, ????????? LIMIT ????????????????????? testing, Quota exceeded ????????? slot reservations ????????? on-demand, ?????? concurrent scheduled queries, Schema change source table schema ????????????????????? ????????? SAFE_CAST ????????????????????? type errors, Permission error ????????????????????? service account ?????? bigquery.dataEditor role, Data not ready upstream data ???????????????????????? ???????????? schedule ???????????? upstream pipeline ??????????????? ???????????? email notification ?????????????????? failures ????????? scheduled query ???????????? monitor ???????????? Cloud Monitoring ????????????
Q: ?????????????????????????????? BigQuery ?????????????????????????????????????
A: ????????????????????????????????????????????????????????????????????????????????? Partition ????????? table ????????? date/timestamp ?????? scan 80-99%, Cluster by columns ?????????????????? filter ????????????, ????????? SELECT ??????????????? columns ?????????????????????????????? ?????????????????? SELECT *, ????????? Materialized Views ?????????????????? aggregations ????????? query ????????????, ???????????? table expiration ?????????????????? temporary/staging tables, ????????? long-term storage pricing (auto ???????????? 90 ????????????????????????????????? ?????? 50%), ????????? --dry_run ????????????????????? query ???????????? ????????????????????? bytes ??????????????? scan, ???????????? cost controls (custom quota per user/project) ?????????????????? workloads > $10,000/month ????????????????????? flat-rate pricing (slot reservations)
