Technology
CircleCI Orbs Team Productivity | SiamCafe Blog
2025-11-02· อ. บอม — SiamCafe.net· 11,036 คำ

BigQuery Scheduled Query ?????????????????????

BigQuery Scheduled Query ?????????????????????????????????????????? Google BigQuery ?????????????????? SQL queries ???????????????????????????????????? schedule ???????????????????????? ????????????????????????????????? ETL, data aggregation, reporting ????????????????????????????????????????????? code ???????????? manage infrastructure

Low-Code/No-Code approach ?????????????????? data pipeline ????????????????????? ????????? BigQuery SQL + Scheduled Queries ????????? Airflow/Dagster, ????????? Google Sheets + Connected Sheets ????????? custom dashboards, ????????? Looker Studio ????????? coding visualization, ????????? Cloud Functions + Pub/Sub ?????????????????? event-driven triggers

???????????????????????? approach ????????? ????????????????????? manage servers (serverless), ???????????????????????????????????? Python/Java (SQL only), Cost-effective ????????????????????? usage, Business users ????????????????????????????????? pipelines ??????????????????, Built-in monitoring ????????? error handling

????????????????????? Scheduled Queries

Setup BigQuery Scheduled Queries

# === BigQuery Scheduled Query Setup ===

# 1. Create scheduled query via bq CLI
bq query --use_legacy_sql=false \
  --schedule="every 24 hours" \
  --display_name="daily_sales_aggregation" \
  --destination_table="analytics.daily_sales" \
  --replace=true \
  '
  SELECT
    DATE(order_date) AS sale_date,
    product_category,
    COUNT(*) AS order_count,
    SUM(amount) AS total_revenue,
    AVG(amount) AS avg_order_value,
    COUNT(DISTINCT customer_id) AS unique_customers
  FROM `project.raw.orders`
  WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
  GROUP BY sale_date, product_category
  '

# 2. Terraform configuration
cat > bigquery_scheduled.tf << 'EOF'
resource "google_bigquery_data_transfer_config" "daily_sales" {
  display_name           = "Daily Sales Aggregation"
  location               = "asia-southeast1"
  data_source_id         = "scheduled_query"
  schedule               = "every day 02:00"
  destination_dataset_id = "analytics"

  params = {
    destination_table_name_template = "daily_sales"
    write_disposition               = "WRITE_TRUNCATE"
    query                           = <<-SQL
      SELECT
        DATE(order_date) AS sale_date,
        product_category,
        COUNT(*) AS order_count,
        SUM(amount) AS total_revenue,
        AVG(amount) AS avg_order_value,
        COUNT(DISTINCT customer_id) AS unique_customers,
        CURRENT_TIMESTAMP() AS processed_at
      FROM `.raw.orders`
      WHERE DATE(order_date) = @run_date
      GROUP BY sale_date, product_category
    SQL
  }

  email_preferences {
    enable_failure_email = true
  }
}

resource "google_bigquery_data_transfer_config" "hourly_metrics" {
  display_name           = "Hourly Website Metrics"
  location               = "asia-southeast1"
  data_source_id         = "scheduled_query"
  schedule               = "every 1 hours"
  destination_dataset_id = "analytics"

  params = {
    destination_table_name_template = "hourly_metrics_{run_date}"
    write_disposition               = "WRITE_APPEND"
    query                           = <<-SQL
      SELECT
        TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,
        event_name,
        COUNT(*) AS event_count,
        COUNT(DISTINCT user_pseudo_id) AS unique_users,
        COUNTIF(event_name = 'purchase') AS purchases
      FROM `.analytics_raw.events_*`
      WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', @run_date)
      GROUP BY hour, event_name
    SQL
  }
}

variable "project_id" {
  default = "my-project"
}
EOF

# 3. Multiple dependent queries (chain)
cat > chain_queries.sh << 'BASH'
#!/bin/bash
# Chain of scheduled queries (dependency management)
# Query 1: Raw ??? Staging (every hour)
# Query 2: Staging ??? Aggregated (every hour, after Query 1)
# Query 3: Aggregated ??? Dashboard (every day)

echo "BigQuery does not natively support query dependencies."
echo "Alternatives:"
echo "  1. Use time-based offsets (Query 2 runs 15min after Query 1)"
echo "  2. Use Cloud Composer (Airflow) for complex DAGs"
echo "  3. Use Cloud Workflows for serverless orchestration"
echo "  4. Use Dataform for SQL-based dependency management"
BASH

echo "Scheduled queries configured"

Low-Code Data Pipeline ???????????? BigQuery

??????????????? data pipeline ?????????????????? SQL ????????????????????????

# === Low-Code Data Pipeline ===

# 1. Medallion Architecture with SQL only
cat > medallion_pipeline.sql << 'SQL'
-- === Bronze Layer: Raw Data ===
-- Scheduled: every 1 hour
CREATE OR REPLACE TABLE `analytics.bronze_orders` AS
SELECT
  *,
  CURRENT_TIMESTAMP() AS _ingested_at,
  _FILE_NAME AS _source_file
FROM `raw.orders_external`
WHERE _PARTITIONTIME >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 HOUR);

-- === Silver Layer: Cleaned Data ===
-- Scheduled: every 1 hour (15 min after bronze)
CREATE OR REPLACE TABLE `analytics.silver_orders`
PARTITION BY DATE(order_date)
CLUSTER BY customer_id AS
SELECT
  CAST(order_id AS INT64) AS order_id,
  TRIM(customer_id) AS customer_id,
  LOWER(TRIM(product_category)) AS product_category,
  CAST(amount AS FLOAT64) AS amount,
  PARSE_TIMESTAMP('%Y-%m-%d %H:%M:%S', order_date) AS order_date,
  CURRENT_TIMESTAMP() AS _processed_at
FROM `analytics.bronze_orders`
WHERE order_id IS NOT NULL
  AND amount > 0
  AND amount < 1000000;  -- Remove outliers

-- === Gold Layer: Business Metrics ===
-- Scheduled: every day at 03:00
CREATE OR REPLACE TABLE `analytics.gold_daily_kpis` AS
SELECT
  DATE(order_date) AS report_date,
  COUNT(DISTINCT customer_id) AS daily_active_customers,
  COUNT(*) AS total_orders,
  SUM(amount) AS total_revenue,
  AVG(amount) AS avg_order_value,
  APPROX_QUANTILES(amount, 100)[OFFSET(50)] AS median_order_value,
  SUM(CASE WHEN product_category = 'electronics' THEN amount ELSE 0 END) AS electronics_revenue,
  SUM(CASE WHEN product_category = 'clothing' THEN amount ELSE 0 END) AS clothing_revenue
FROM `analytics.silver_orders`
WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY report_date;

-- === Data Quality Checks ===
-- Scheduled: after gold layer
CREATE OR REPLACE TABLE `analytics.dq_checks` AS
SELECT
  CURRENT_DATE() AS check_date,
  'orders_count' AS check_name,
  CASE WHEN cnt > 100 THEN 'PASS' ELSE 'FAIL' END AS status,
  cnt AS value
FROM (SELECT COUNT(*) AS cnt FROM `analytics.silver_orders` WHERE DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))
UNION ALL
SELECT
  CURRENT_DATE(),
  'null_customer_id',
  CASE WHEN cnt = 0 THEN 'PASS' ELSE 'FAIL' END,
  cnt
FROM (SELECT COUNT(*) AS cnt FROM `analytics.silver_orders` WHERE customer_id IS NULL AND DATE(order_date) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY));
SQL

echo "Medallion pipeline SQL ready"

No-Code Integration ????????? Business Tools

??????????????????????????? BigQuery ????????? tools ?????????????????????????????? code

#!/usr/bin/env python3
# nocode_integrations.py ??? No-Code Integration Options
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("nocode")

class NoCodeIntegrations:
    """No-code integration options for BigQuery"""
    
    def __init__(self):
        pass
    
    def integration_catalog(self):
        return {
            "google_sheets": {
                "type": "Visualization / Reporting",
                "description": "Connected Sheets ??????????????????????????? BigQuery ?????? Google Sheets",
                "setup": "Sheets ??? Data ??? Data connectors ??? BigQuery",
                "features": [
                    "Query billions of rows from Sheets",
                    "Auto-refresh schedules",
                    "Pivot tables on BigQuery data",
                    "Charts and formatting",
                ],
                "limitations": "10,000 rows extract limit, read-only",
                "cost": "Free (BigQuery query costs apply)",
            },
            "looker_studio": {
                "type": "Dashboard / BI",
                "description": "Free dashboard tool ????????? Google",
                "setup": "Create report ??? Add BigQuery data source",
                "features": [
                    "Drag-and-drop charts",
                    "Real-time BigQuery queries",
                    "Scheduled email reports",
                    "Embedding and sharing",
                    "Custom calculated fields",
                ],
                "limitations": "Complex visualizations limited",
                "cost": "Free",
            },
            "dataform": {
                "type": "SQL-based Data Pipeline",
                "description": "SQL workflow tool (built into BigQuery)",
                "setup": "BigQuery Studio ??? Dataform ??? Create repository",
                "features": [
                    "SQL-based transformations with dependencies",
                    "Version control (Git)",
                    "Data quality assertions",
                    "Scheduling and orchestration",
                    "Incremental processing",
                ],
                "limitations": "SQL only (no Python)",
                "cost": "Free (BigQuery query costs apply)",
            },
            "appsheet": {
                "type": "No-Code App Builder",
                "description": "??????????????? mobile/web apps ????????? BigQuery data",
                "setup": "AppSheet ??? New app ??? Connect BigQuery",
                "features": [
                    "Auto-generated CRUD app",
                    "Barcode scanning",
                    "Offline support",
                    "Workflow automation",
                ],
                "cost": "Free tier available, $5/user/month (Core)",
            },
            "cloud_workflows": {
                "type": "Serverless Orchestration",
                "description": "YAML-based workflow orchestration",
                "setup": "Cloud Console ??? Workflows ??? Create",
                "features": [
                    "Chain BigQuery jobs",
                    "Conditional logic",
                    "Error handling and retry",
                    "HTTP API calls",
                    "Pub/Sub integration",
                ],
                "cost": "5,000 steps/month free",
            },
        }

integrations = NoCodeIntegrations()
catalog = integrations.integration_catalog()
print("No-Code BigQuery Integrations:")
for name, info in catalog.items():
    print(f"\n  {name} ({info['type']}):")
    print(f"    {info['description']}")
    print(f"    Cost: {info['cost']}")
    print(f"    Setup: {info['setup']}")

Cost Optimization ????????? Best Practices

????????????????????????????????????????????? best practices

# === Cost Optimization ===

cat > cost_optimization.yaml << 'EOF'
bigquery_cost_optimization:
  query_optimization:
    partition_tables:
      description: "Partition by date ??????????????? scan ??????????????? data ??????????????????????????????"
      example: "PARTITION BY DATE(created_at)"
      savings: "80-95% reduction in scanned data"
      
    cluster_tables:
      description: "Cluster by frequently filtered columns"
      example: "CLUSTER BY customer_id, product_category"
      savings: "50-80% reduction"
      
    select_specific_columns:
      description: "?????????????????????????????? columns ????????????????????? ?????????????????? SELECT *"
      bad: "SELECT * FROM orders"
      good: "SELECT order_id, amount, customer_id FROM orders"
      savings: "Proportional to columns excluded"
      
    use_approximate_functions:
      description: "????????? APPROX_COUNT_DISTINCT ????????? COUNT(DISTINCT)"
      savings: "2-5x faster, less data processed"
      
    materialize_cte:
      description: "????????? temp tables ????????? repeated CTEs"
      savings: "Avoid re-scanning same data multiple times"

  scheduled_query_tips:
    - "????????? @run_date parameter ????????? CURRENT_DATE() ??????????????? backfill ?????????"
    - "???????????? destination table partitioned ????????????????????? scan costs"
    - "????????? WRITE_APPEND + partition filter ????????? WRITE_TRUNCATE ????????????????????????????????????"
    - "Set query priority to BATCH ?????????????????? non-urgent queries (cheaper)"
    - "Monitor query costs ???????????? INFORMATION_SCHEMA.JOBS"

  pricing_estimate:
    on_demand:
      query: "$6.25 per TB scanned"
      storage: "$0.02 per GB/month (active)"
      scheduled_query: "Free (only query costs)"
    flat_rate:
      slots: "$2,000/month per 100 slots"
      best_for: "Predictable workloads > $10K/month on-demand"
    
  monitoring_query: |
    SELECT
      user_email,
      COUNT(*) AS query_count,
      SUM(total_bytes_processed) / POW(1024, 4) AS tb_processed,
      SUM(total_bytes_processed) / POW(1024, 4) * 6.25 AS estimated_cost_usd
    FROM `region-asia-southeast1`.INFORMATION_SCHEMA.JOBS
    WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)
      AND job_type = 'QUERY'
    GROUP BY user_email
    ORDER BY tb_processed DESC
    LIMIT 20
EOF

python3 -c "
import yaml
with open('cost_optimization.yaml') as f:
    data = yaml.safe_load(f)
opt = data['bigquery_cost_optimization']
print('BigQuery Cost Optimization:')
for name, info in opt['query_optimization'].items():
    print(f'  {name}: {info[\"description\"]} (Savings: {info[\"savings\"]})')
print(f'\nPricing:')
print(f'  On-demand: {opt[\"pricing_estimate\"][\"on_demand\"][\"query\"]}')
print(f'  Flat-rate: {opt[\"pricing_estimate\"][\"flat_rate\"][\"slots\"]}')
"

echo "Cost optimization guide ready"

Monitoring ????????? Alerting

?????????????????????????????????????????????????????? scheduled queries

#!/usr/bin/env python3
# bq_monitor.py ??? BigQuery Scheduled Query Monitoring
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("monitor")

class BQScheduledMonitor:
    def __init__(self):
        pass
    
    def dashboard(self):
        return {
            "scheduled_queries": [
                {"name": "daily_sales_aggregation", "schedule": "daily 02:00", "last_run": "success", "duration": "45s", "bytes_processed": "2.1 GB", "cost": "$0.013"},
                {"name": "hourly_website_metrics", "schedule": "hourly", "last_run": "success", "duration": "12s", "bytes_processed": "500 MB", "cost": "$0.003"},
                {"name": "weekly_cohort_analysis", "schedule": "weekly Mon 04:00", "last_run": "success", "duration": "3m 20s", "bytes_processed": "15 GB", "cost": "$0.094"},
                {"name": "daily_dq_checks", "schedule": "daily 03:30", "last_run": "failed", "duration": "5s", "bytes_processed": "0", "cost": "$0.000"},
            ],
            "cost_summary_30d": {
                "total_queries": 1250,
                "total_tb_processed": 0.85,
                "total_cost": "$5.31",
                "avg_cost_per_query": "$0.004",
                "most_expensive": "weekly_cohort_analysis ($0.094/run)",
            },
            "alerts": [
                {"severity": "ERROR", "message": "daily_dq_checks failed: Table not found", "time": "03:30"},
                {"severity": "WARNING", "message": "hourly_website_metrics took 2x longer than usual", "time": "14:00"},
            ],
            "recommendations": [
                "Partition daily_sales source table by order_date (save 80% scan)",
                "Add clustering to hourly_metrics on event_name",
                "Fix daily_dq_checks: update table reference after schema change",
                "Consider flat-rate pricing if monthly cost exceeds $500",
            ],
        }

monitor = BQScheduledMonitor()
dash = monitor.dashboard()
print("BigQuery Scheduled Query Dashboard:")
for q in dash["scheduled_queries"]:
    status = "OK" if q["last_run"] == "success" else "FAIL"
    print(f"  [{status}] {q['name']}: {q['schedule']}, {q['duration']}, {q['cost']}")

cost = dash["cost_summary_30d"]
print(f"\nCost (30d): {cost['total_cost']} ({cost['total_queries']} queries, {cost['total_tb_processed']} TB)")

print(f"\nAlerts:")
for a in dash["alerts"]:
    print(f"  [{a['severity']}] {a['message']}")

print(f"\nRecommendations:")
for r in dash["recommendations"]:
    print(f"  - {r}")

FAQ ??????????????????????????????????????????

Q: BigQuery Scheduled Query ????????? Airflow ??????????????????????????????????

A: BigQuery Scheduled Query ??????????????? SQL-only transformations ??????????????? ??????????????? dependencies ????????????????????? ????????????????????? manage infrastructure ????????? (??????????????????????????? query costs) Business analysts ????????????????????????????????? Airflow (Cloud Composer) ??????????????? Complex DAGs ?????? dependencies ???????????? tasks, ???????????? mix SQL + Python + API calls, ???????????? error handling ?????????????????????, ???????????? backfill, retry, SLA monitoring ???????????? Cloud Composer ??????????????? $300-500/month ??????????????? ??????????????????????????? Scheduled Queries ??????????????? pipeline ??????????????????????????????????????????????????????????????? Dataform ???????????? Airflow ????????????????????? Cloud Workflows ?????????????????????????????????

Q: Dataform ????????? dbt ??????????????????????????????????????????????????? BigQuery?

A: Dataform (Google) built-in ?????? BigQuery Studio ????????? ????????????????????? setup ??????????????? SQL-based + JavaScript ?????????????????? macros Git integration scheduling built-in ??????????????? teams ?????????????????? BigQuery ???????????????????????? dbt (dbt Labs) ?????????????????????????????? warehouses (BigQuery, Snowflake, Redshift, Databricks) community ???????????????????????? packages ????????????????????? dbt Cloud ?????? IDE, scheduling, docs ???????????? dbt Cloud ??????????????? $100/month (Developer) ?????????????????? BigQuery ?????????????????????????????? Dataform ?????????????????????????????????????????? ????????? multi-warehouse ???????????????????????? dbt ecosystem ??????????????? dbt

Q: Low-Code approach ???????????????????????? data pipeline ??????????????????????

A: Low-Code (BigQuery Scheduled + Dataform + Looker Studio) ??????????????? Data pipelines ????????????????????? 20-30 tables, SQL transformations ????????????????????????, Team 1-5 ??????, ????????????????????? real-time processing, Budget ??????????????? ??????????????????????????????????????? ???????????? Python/Spark ?????????????????? complex transformations, ???????????? real-time streaming (????????? Dataflow), ???????????? orchestrate non-SQL tasks (API calls, file processing), Team > 10 ?????????????????? governance ????????????????????? ??????????????? ??????????????? low-code ???????????? ??????????????? hit limitations ???????????? adopt tools ?????????????????????????????????????????? ?????????????????? over-engineer ??????????????????????????????

Q: BigQuery Scheduled Query ?????? backfill ??????????????????????

A: BigQuery Scheduled Query ?????????????????? backfill ???????????? manual trigger ?????? Console ??????????????? scheduled query ??? Run transfer now ??? ??????????????? date range ????????????????????? backfill ?????? SQL ????????????????????? @run_date parameter ????????? CURRENT_DATE() ???????????? WHERE DATE(created_at) = @run_date ???????????????????????? backfill ???????????????????????????????????????????????? ?????????????????? CURRENT_DATE() ???????????? backfill ?????????????????????????????????????????????????????????????????????????????? ???????????? CLI ????????? bq mk --transfer_run --run_time ?????????????????? scheduled queries ????????????????????? backfill ???????????? ????????????????????? Dataform ??????????????????????????? incremental models ??????????????????

📖 บทความที่เกี่ยวข้อง

GCP BigQuery ML Team Productivityอ่านบทความ → CircleCI Orbs Performance Tuning เพิ่มความเร็วอ่านบทความ → CircleCI Orbs Agile Scrum Kanbanอ่านบทความ → AWS App Runner Team Productivityอ่านบทความ → CircleCI Orbs Code Review Best Practiceอ่านบทความ →

📚 ดูบทความทั้งหมด →