BigQuery Scheduled Query ?????????????????????
BigQuery Scheduled Query ?????????????????????????????????????????? Google BigQuery ?????????????????? SQL queries ???????????????????????????????????? schedule ???????????????????????? ????????????????????????????????? ETL, data aggregation, reporting ????????????????????????????????????????????? code ???????????? manage infrastructure
Low-Code/No-Code approach ?????????????????? data pipeline ????????????????????? ????????? BigQuery SQL + Scheduled Queries ????????? Airflow/Dagster, ????????? Google Sheets + Connected Sheets ????????? custom dashboards, ????????? Looker Studio ????????? coding visualization, ????????? Cloud Functions + Pub/Sub ?????????????????? event-driven triggers
???????????????????????? approach ????????? ????????????????????? manage servers (serverless), ???????????????????????????????????? Python/Java (SQL only), Cost-effective ????????????????????? usage, Business users ????????????????????????????????? pipelines ??????????????????, Built-in monitoring ????????? error handling
????????????????????? Scheduled Queries
Setup BigQuery Scheduled Queries
# === BigQuery Scheduled Query Setup ===
# 1. Create scheduled query via bq CLI
bq query --use_legacy_sql=false \
--schedule="every 24 hours" \
--display_name="daily_sales_aggregation" \
--destination_table="analytics.daily_sales" \
--replace=true \
'
SELECT
DATE(order_date) AS sale_date,
product_category,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers
FROM `project.raw.orders`
WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY sale_date, product_category
'
# 2. Terraform configuration
cat > bigquery_scheduled.tf << 'EOF'
resource "google_bigquery_data_transfer_config" "daily_sales" {
display_name = "Daily Sales Aggregation"
location = "asia-southeast1"
data_source_id = "scheduled_query"
schedule = "every day 02:00"
destination_dataset_id = "analytics"
params = {
destination_table_name_template = "daily_sales"
write_disposition = "WRITE_TRUNCATE"
query = <<-SQL
SELECT
DATE(order_date) AS sale_date,
product_category,
COUNT(*) AS order_count,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
COUNT(DISTINCT customer_id) AS unique_customers,
CURRENT_TIMESTAMP() AS processed_at
FROM `.raw.orders`
WHERE DATE(order_date) = @run_date
GROUP BY sale_date, product_category
SQL
}
email_preferences {
enable_failure_email = true
}
}
resource "google_bigquery_data_transfer_config" "hourly_metrics" {
display_name = "Hourly Website Metrics"
location = "asia-southeast1"
data_source_id = "scheduled_query"
schedule = "every 1 hours"
destination_dataset_id = "analytics"
params = {
destination_table_name_template = "hourly_metrics_{run_date}"
write_disposition = "WRITE_APPEND"
query = <<-SQL
SELECT
TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,
event_name,
COUNT(*) AS event_count,
COUNT(DISTINCT user_pseudo_id) AS unique_users,
COUNTIF(event_name = 'purchase') AS purchases
FROM `.analytics_raw.events_*`
WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', @run_date)
GROUP BY hour, event_name
SQL
}
}
variable "project_id" {
default = "my-project"
}
EOF
# 3. Multiple dependent queries (chain)
cat > chain_queries.sh << 'BASH'
#!/bin/bash
# Chain of scheduled queries (dependency management)
# Query 1: Raw ??? Staging (every hour)
# Query 2: Staging ??? Aggregated (every hour, after Query 1)
# Query 3: Aggregated ??? Dashboard (every day)
echo "BigQuery does not natively support query dependencies."
echo "Alternatives:"
echo " 1. Use time-based offsets (Query 2 runs 15min after Query 1)"
echo " 2. Use Cloud Composer (Airflow) for complex DAGs"
echo " 3. Use Cloud Workflows for serverless orchestration"
echo " 4. Use Dataform for SQL-based dependency management"
BASH
echo "Scheduled queries configured"
Low-Code Data Pipeline ???????????? BigQuery
??????????????? data pipeline ?????????????????? SQL ????????????????????????
# === Low-Code Data Pipeline ===
# 1. Medallion Architecture with SQL only
cat > medallion_pipeline.sql << 'SQL'
-- === Bronze Layer: Raw Data ===
-- Scheduled: every 1 hour
CREATE OR REPLACE TABLE `analytics.bronze_orders` AS
SELECT
*,
CURRENT_TIMESTAMP() AS _ingested_at,
_FILE_NAME AS _source_file
FROM `raw.orders_external`
WHERE _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 HOUR);
-- === Silver Layer: Cleaned Data ===
-- Scheduled: every 1 hour (15 min after bronze)
CREATE OR REPLACE TABLE `analytics.silver_orders`
PARTITION BY DATE(order_date)
CLUSTER BY customer_id AS
SELECT
CAST(order_id AS INT64) AS order_id,
TRIM(customer_id) AS customer_id,
LOWER(TRIM(product_category)) AS product_category,
CAST(amount AS FLOAT64) AS amount,
PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', order_date) AS order_date,
CURRENT_TIMESTAMP() AS _processed_at
FROM `analytics.bronze_orders`
WHERE order_id IS NOT NULL
AND amount > 0
AND amount < 1000000; -- Remove outliers
-- === Gold Layer: Business Metrics ===
-- Scheduled: every day at 03:00
CREATE OR REPLACE TABLE `analytics.gold_daily_kpis` AS
SELECT
DATE(order_date) AS report_date,
COUNT(DISTINCT customer_id) AS daily_active_customers,
COUNT(*) AS total_orders,
SUM(amount) AS total_revenue,
AVG(amount) AS avg_order_value,
APPROX_QUANTILES(amount, 100)[OFFSET(50)] AS median_order_value,
SUM(CASE WHEN product_category = 'electronics' THEN amount ELSE 0 END) AS electronics_revenue,
SUM(CASE WHEN product_category = 'clothing' THEN amount ELSE 0 END) AS clothing_revenue
FROM `analytics.silver_orders`
WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY report_date;
-- === Data Quality Checks ===
-- Scheduled: after gold layer
CREATE OR REPLACE TABLE `analytics.dq_checks` AS
SELECT
CURRENT_DATE() AS check_date,
'orders_count' AS check_name,
CASE WHEN cnt > 100 THEN 'PASS' ELSE 'FAIL' END AS status,
cnt AS value
FROM (SELECT COUNT(*) AS cnt FROM `analytics.silver_orders` WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))
UNION ALL
SELECT
CURRENT_DATE(),
'null_customer_id',
CASE WHEN cnt = 0 THEN 'PASS' ELSE 'FAIL' END,
cnt
FROM (SELECT COUNT(*) AS cnt FROM `analytics.silver_orders` WHERE customer_id IS NULL AND DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY));
SQL
echo "Medallion pipeline SQL ready"
No-Code Integration ????????? Business Tools
??????????????????????????? BigQuery ????????? tools ?????????????????????????????? code
#!/usr/bin/env python3
# nocode_integrations.py ??? No-Code Integration Options
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("nocode")
class NoCodeIntegrations:
"""No-code integration options for BigQuery"""
def __init__(self):
pass
def integration_catalog(self):
return {
"google_sheets": {
"type": "Visualization / Reporting",
"description": "Connected Sheets ??????????????????????????? BigQuery ?????? Google Sheets",
"setup": "Sheets ??? Data ??? Data connectors ??? BigQuery",
"features": [
"Query billions of rows from Sheets",
"Auto-refresh schedules",
"Pivot tables on BigQuery data",
"Charts and formatting",
],
"limitations": "10,000 rows extract limit, read-only",
"cost": "Free (BigQuery query costs apply)",
},
"looker_studio": {
"type": "Dashboard / BI",
"description": "Free dashboard tool ????????? Google",
"setup": "Create report ??? Add BigQuery data source",
"features": [
"Drag-and-drop charts",
"Real-time BigQuery queries",
"Scheduled email reports",
"Embedding and sharing",
"Custom calculated fields",
],
"limitations": "Complex visualizations limited",
"cost": "Free",
},
"dataform": {
"type": "SQL-based Data Pipeline",
"description": "SQL workflow tool (built into BigQuery)",
"setup": "BigQuery Studio ??? Dataform ??? Create repository",
"features": [
"SQL-based transformations with dependencies",
"Version control (Git)",
"Data quality assertions",
"Scheduling and orchestration",
"Incremental processing",
],
"limitations": "SQL only (no Python)",
"cost": "Free (BigQuery query costs apply)",
},
"appsheet": {
"type": "No-Code App Builder",
"description": "??????????????? mobile/web apps ????????? BigQuery data",
"setup": "AppSheet ??? New app ??? Connect BigQuery",
"features": [
"Auto-generated CRUD app",
"Barcode scanning",
"Offline support",
"Workflow automation",
],
"cost": "Free tier available, $5/user/month (Core)",
},
"cloud_workflows": {
"type": "Serverless Orchestration",
"description": "YAML-based workflow orchestration",
"setup": "Cloud Console ??? Workflows ??? Create",
"features": [
"Chain BigQuery jobs",
"Conditional logic",
"Error handling and retry",
"HTTP API calls",
"Pub/Sub integration",
],
"cost": "5,000 steps/month free",
},
}
integrations = NoCodeIntegrations()
catalog = integrations.integration_catalog()
print("No-Code BigQuery Integrations:")
for name, info in catalog.items():
print(f"\n {name} ({info['type']}):")
print(f" {info['description']}")
print(f" Cost: {info['cost']}")
print(f" Setup: {info['setup']}")
Cost Optimization ????????? Best Practices
????????????????????????????????????????????? best practices
# === Cost Optimization ===
cat > cost_optimization.yaml << 'EOF'
bigquery_cost_optimization:
query_optimization:
partition_tables:
description: "Partition by date ??????????????? scan ??????????????? data ??????????????????????????????"
example: "PARTITION BY DATE(created_at)"
savings: "80-95% reduction in scanned data"
cluster_tables:
description: "Cluster by frequently filtered columns"
example: "CLUSTER BY customer_id, product_category"
savings: "50-80% reduction"
select_specific_columns:
description: "?????????????????????????????? columns ????????????????????? ?????????????????? SELECT *"
bad: "SELECT * FROM orders"
good: "SELECT order_id, amount, customer_id FROM orders"
savings: "Proportional to columns excluded"
use_approximate_functions:
description: "????????? APPROX_COUNT_DISTINCT ????????? COUNT(DISTINCT)"
savings: "2-5x faster, less data processed"
materialize_cte:
description: "????????? temp tables ????????? repeated CTEs"
savings: "Avoid re-scanning same data multiple times"
scheduled_query_tips:
- "????????? @run_date parameter ????????? CURRENT_DATE() ??????????????? backfill ?????????"
- "???????????? destination table partitioned ????????????????????? scan costs"
- "????????? WRITE_APPEND + partition filter ????????? WRITE_TRUNCATE ????????????????????????????????????"
- "Set query priority to BATCH ?????????????????? non-urgent queries (cheaper)"
- "Monitor query costs ???????????? INFORMATION_SCHEMA.JOBS"
pricing_estimate:
on_demand:
query: "$6.25 per TB scanned"
storage: "$0.02 per GB/month (active)"
scheduled_query: "Free (only query costs)"
flat_rate:
slots: "$2,000/month per 100 slots"
best_for: "Predictable workloads > $10K/month on-demand"
monitoring_query: |
SELECT
user_email,
COUNT(*) AS query_count,
SUM(total_bytes_processed) / POW(1024, 4) AS tb_processed,
SUM(total_bytes_processed) / POW(1024, 4) * 6.25 AS estimated_cost_usd
FROM `region-asia-southeast1`.INFORMATION_SCHEMA.JOBS
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
AND job_type = 'QUERY'
GROUP BY user_email
ORDER BY tb_processed DESC
LIMIT 20
EOF
python3 -c "
import yaml
with open('cost_optimization.yaml') as f:
data = yaml.safe_load(f)
opt = data['bigquery_cost_optimization']
print('BigQuery Cost Optimization:')
for name, info in opt['query_optimization'].items():
print(f' {name}: {info[\"description\"]} (Savings: {info[\"savings\"]})')
print(f'\nPricing:')
print(f' On-demand: {opt[\"pricing_estimate\"][\"on_demand\"][\"query\"]}')
print(f' Flat-rate: {opt[\"pricing_estimate\"][\"flat_rate\"][\"slots\"]}')
"
echo "Cost optimization guide ready"
Monitoring ????????? Alerting
?????????????????????????????????????????????????????? scheduled queries
#!/usr/bin/env python3
# bq_monitor.py ??? BigQuery Scheduled Query Monitoring
import json
import logging
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")
class BQScheduledMonitor:
def __init__(self):
pass
def dashboard(self):
return {
"scheduled_queries": [
{"name": "daily_sales_aggregation", "schedule": "daily 02:00", "last_run": "success", "duration": "45s", "bytes_processed": "2.1 GB", "cost": "$0.013"},
{"name": "hourly_website_metrics", "schedule": "hourly", "last_run": "success", "duration": "12s", "bytes_processed": "500 MB", "cost": "$0.003"},
{"name": "weekly_cohort_analysis", "schedule": "weekly Mon 04:00", "last_run": "success", "duration": "3m 20s", "bytes_processed": "15 GB", "cost": "$0.094"},
{"name": "daily_dq_checks", "schedule": "daily 03:30", "last_run": "failed", "duration": "5s", "bytes_processed": "0", "cost": "$0.000"},
],
"cost_summary_30d": {
"total_queries": 1250,
"total_tb_processed": 0.85,
"total_cost": "$5.31",
"avg_cost_per_query": "$0.004",
"most_expensive": "weekly_cohort_analysis ($0.094/run)",
},
"alerts": [
{"severity": "ERROR", "message": "daily_dq_checks failed: Table not found", "time": "03:30"},
{"severity": "WARNING", "message": "hourly_website_metrics took 2x longer than usual", "time": "14:00"},
],
"recommendations": [
"Partition daily_sales source table by order_date (save 80% scan)",
"Add clustering to hourly_metrics on event_name",
"Fix daily_dq_checks: update table reference after schema change",
"Consider flat-rate pricing if monthly cost exceeds $500",
],
}
monitor = BQScheduledMonitor()
dash = monitor.dashboard()
print("BigQuery Scheduled Query Dashboard:")
for q in dash["scheduled_queries"]:
status = "OK" if q["last_run"] == "success" else "FAIL"
print(f" [{status}] {q['name']}: {q['schedule']}, {q['duration']}, {q['cost']}")
cost = dash["cost_summary_30d"]
print(f"\nCost (30d): {cost['total_cost']} ({cost['total_queries']} queries, {cost['total_tb_processed']} TB)")
print(f"\nAlerts:")
for a in dash["alerts"]:
print(f" [{a['severity']}] {a['message']}")
print(f"\nRecommendations:")
for r in dash["recommendations"]:
print(f" - {r}")
FAQ ??????????????????????????????????????????
Q: BigQuery Scheduled Query ????????? Airflow ??????????????????????????????????
A: BigQuery Scheduled Query ??????????????? SQL-only transformations ??????????????? ??????????????? dependencies ????????????????????? ????????????????????? manage infrastructure ????????? (??????????????????????????? query costs) Business analysts ????????????????????????????????? Airflow (Cloud Composer) ??????????????? Complex DAGs ?????? dependencies ???????????? tasks, ???????????? mix SQL + Python + API calls, ???????????? error handling ?????????????????????, ???????????? backfill, retry, SLA monitoring ???????????? Cloud Composer ??????????????? $300-500/month ??????????????? ??????????????????????????? Scheduled Queries ??????????????? pipeline ??????????????????????????????????????????????????????????????? Dataform ???????????? Airflow ????????????????????? Cloud Workflows ?????????????????????????????????
Q: Dataform ????????? dbt ??????????????????????????????????????????????????? BigQuery?
A: Dataform (Google) built-in ?????? BigQuery Studio ????????? ????????????????????? setup ??????????????? SQL-based + JavaScript ?????????????????? macros Git integration scheduling built-in ??????????????? teams ?????????????????? BigQuery ???????????????????????? dbt (dbt Labs) ?????????????????????????????? warehouses (BigQuery, Snowflake, Redshift, Databricks) community ???????????????????????? packages ????????????????????? dbt Cloud ?????? IDE, scheduling, docs ???????????? dbt Cloud ??????????????? $100/month (Developer) ?????????????????? BigQuery ?????????????????????????????? Dataform ?????????????????????????????????????????? ????????? multi-warehouse ???????????????????????? dbt ecosystem ??????????????? dbt
Q: Low-Code approach ???????????????????????? data pipeline ??????????????????????
A: Low-Code (BigQuery Scheduled + Dataform + Looker Studio) ??????????????? Data pipelines ????????????????????? 20-30 tables, SQL transformations ????????????????????????, Team 1-5 ??????, ????????????????????? real-time processing, Budget ??????????????? ??????????????????????????????????????? ???????????? Python/Spark ?????????????????? complex transformations, ???????????? real-time streaming (????????? Dataflow), ???????????? orchestrate non-SQL tasks (API calls, file processing), Team > 10 ?????????????????? governance ????????????????????? ??????????????? ??????????????? low-code ???????????? ??????????????? hit limitations ???????????? adopt tools ?????????????????????????????????????????? ?????????????????? over-engineer ??????????????????????????????
Q: BigQuery Scheduled Query ?????? backfill ??????????????????????
A: BigQuery Scheduled Query ?????????????????? backfill ???????????? manual trigger ?????? Console ??????????????? scheduled query ??? Run transfer now ??? ??????????????? date range ????????????????????? backfill ?????? SQL ????????????????????? @run_date parameter ????????? CURRENT_DATE() ???????????? WHERE DATE(created_at) = @run_date ???????????????????????? backfill ???????????????????????????????????????????????? ?????????????????? CURRENT_DATE() ???????????? backfill ?????????????????????????????????????????????????????????????????????????????? ???????????? CLI ????????? bq mk --transfer_run --run_time ?????????????????? scheduled queries ????????????????????? backfill ???????????? ????????????????????? Dataform ??????????????????????????? incremental models ??????????????????