Technology

BigQuery Scheduled Query 12 Factor App

bigquery scheduled query 12 factor app
BigQuery Scheduled Query 12 Factor App | SiamCafe Blog
2025-06-25· อ. บอม — SiamCafe.net· 1,822 คำ

BigQuery Scheduled Query 12 Factor App คืออะไร

BigQuery เป็น serverless data warehouse จาก Google Cloud ที่ประมวลผล SQL queries บน petabytes ของข้อมูลได้ในวินาที Scheduled Queries คือการตั้งเวลารัน SQL อัตโนมัติ เช่น ETL jobs, reports, aggregations 12-Factor App เป็นหลักการออกแบบ cloud-native applications 12 ข้อจาก Heroku การรวมสามแนวคิดนี้ช่วยสร้าง data pipelines บน BigQuery ที่ maintainable, scalable และ production-ready ตามหลัก 12-Factor

12-Factor Principles สำหรับ Data Pipelines

# twelve_factor.py — 12-Factor principles for BigQuery pipelines
import json

class TwelveFactorBQ:
    FACTORS = {
        "1_codebase": {
            "name": "I. Codebase",
            "principle": "One codebase tracked in version control, many deploys",
            "bq_application": "SQL queries เก็บใน Git repo — dev, staging, prod ใช้ codebase เดียว",
            "example": "queries/ folder ใน Git + CI/CD deploy scheduled queries",
        },
        "2_dependencies": {
            "name": "II. Dependencies",
            "principle": "Explicitly declare and isolate dependencies",
            "bq_application": "Declare datasets, tables, UDFs ที่ query ต้องใช้ — ไม่ hardcode project IDs",
            "example": "ใช้ parameters: @project_id, @dataset แทน hardcoded values",
        },
        "3_config": {
            "name": "III. Config",
            "principle": "Store config in the environment",
            "bq_application": "Project ID, dataset names, schedule ใน environment variables ไม่ใช่ใน SQL",
            "example": "Terraform variables สำหรับ scheduled query config",
        },
        "4_backing": {
            "name": "IV. Backing Services",
            "principle": "Treat backing services as attached resources",
            "bq_application": "BigQuery, Cloud Storage, Pub/Sub เป็น attached resources — เปลี่ยนได้โดยไม่แก้ code",
        },
        "5_build_release_run": {
            "name": "V. Build, Release, Run",
            "principle": "Strictly separate build and run stages",
            "bq_application": "Build: validate SQL + dry run → Release: deploy to BQ → Run: scheduled execution",
        },
        "6_processes": {
            "name": "VI. Processes",
            "principle": "Execute the app as stateless processes",
            "bq_application": "Scheduled queries เป็น stateless — ไม่เก็บ state ระหว่าง runs, ใช้ tables เป็น state",
        },
        "7_port_binding": {
            "name": "VII. Port Binding",
            "principle": "Export services via port binding",
            "bq_application": "Expose results ผ่าน BigQuery API, Connected Sheets, Looker",
        },
        "8_concurrency": {
            "name": "VIII. Concurrency",
            "principle": "Scale out via the process model",
            "bq_application": "BigQuery auto-scales — ไม่ต้อง manage capacity, ใช้ slots สำหรับ concurrency",
        },
        "9_disposability": {
            "name": "IX. Disposability",
            "principle": "Maximize robustness with fast startup and graceful shutdown",
            "bq_application": "Queries idempotent — rerun ได้ไม่กระทบ, ใช้ MERGE/INSERT OVERWRITE",
        },
        "10_dev_prod_parity": {
            "name": "X. Dev/Prod Parity",
            "principle": "Keep development, staging, and production as similar as possible",
            "bq_application": "ใช้ dataset แยก: dev_dataset, staging_dataset, prod_dataset — SQL เดียวกัน",
        },
        "11_logs": {
            "name": "XI. Logs",
            "principle": "Treat logs as event streams",
            "bq_application": "BigQuery audit logs → Cloud Logging → monitoring/alerting",
        },
        "12_admin": {
            "name": "XII. Admin Processes",
            "principle": "Run admin/management tasks as one-off processes",
            "bq_application": "Backfill queries, migration scripts เป็น one-off — ไม่ใช่ scheduled",
        },
    }

    def show_factors(self):
        print("=== 12-Factor for BigQuery ===\n")
        for key, factor in list(self.FACTORS.items())[:6]:
            print(f"[{factor['name']}]")
            print(f"  BQ: {factor['bq_application']}")
            print()

tf = TwelveFactorBQ()
tf.show_factors()

Scheduled Query Setup

# scheduled_query.py — BigQuery Scheduled Query examples
import json

class ScheduledQueryExamples:
    DAILY_AGGREGATION = """
-- daily_user_metrics.sql — Scheduled daily at 02:00 UTC
-- Idempotent: MERGE pattern (Factor IX: Disposability)
MERGE `{project}.{dataset}.user_metrics_daily` AS target
USING (
    SELECT
        DATE(event_timestamp) AS date,
        user_id,
        COUNT(*) AS total_events,
        COUNT(DISTINCT session_id) AS sessions,
        SUM(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS purchases,
        SUM(CASE WHEN event_name = 'purchase' THEN revenue ELSE 0 END) AS revenue,
        MIN(event_timestamp) AS first_event,
        MAX(event_timestamp) AS last_event
    FROM `{project}.{dataset}.raw_events`
    WHERE DATE(event_timestamp) = @run_date
    GROUP BY date, user_id
) AS source
ON target.date = source.date AND target.user_id = source.user_id
WHEN MATCHED THEN
    UPDATE SET
        total_events = source.total_events,
        sessions = source.sessions,
        purchases = source.purchases,
        revenue = source.revenue,
        first_event = source.first_event,
        last_event = source.last_event
WHEN NOT MATCHED THEN
    INSERT (date, user_id, total_events, sessions, purchases, revenue, first_event, last_event)
    VALUES (source.date, source.user_id, source.total_events, source.sessions,
            source.purchases, source.revenue, source.first_event, source.last_event);
"""

    HOURLY_REPORT = """
-- hourly_kpi_report.sql — Scheduled hourly
CREATE OR REPLACE TABLE `{project}.{dataset}.kpi_hourly`
PARTITION BY DATE(hour)
CLUSTER BY metric_name
AS
SELECT
    TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,
    'active_users' AS metric_name,
    COUNT(DISTINCT user_id) AS metric_value
FROM `{project}.{dataset}.raw_events`
WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
    AND event_timestamp < TIMESTAMP_TRUNC(@run_time, HOUR)
GROUP BY hour

UNION ALL

SELECT
    TIMESTAMP_TRUNC(event_timestamp, HOUR),
    'revenue',
    SUM(revenue)
FROM `{project}.{dataset}.raw_events`
WHERE event_name = 'purchase'
    AND event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
    AND event_timestamp < TIMESTAMP_TRUNC(@run_time, HOUR)
GROUP BY 1;
"""

    def show_daily(self):
        print("=== Daily Aggregation (MERGE) ===")
        print(self.DAILY_AGGREGATION[:500])

    def show_hourly(self):
        print(f"\n=== Hourly KPI Report ===")
        print(self.HOURLY_REPORT[:400])

sq = ScheduledQueryExamples()
sq.show_daily()
sq.show_hourly()

Terraform Infrastructure as Code

# terraform.tf — Terraform config for BigQuery scheduled queries
# Factor III (Config) + Factor V (Build, Release, Run)

# main.tf
resource "google_bigquery_data_transfer_config" "daily_metrics" {
  display_name           = "Daily User Metrics"
  location               = var.region
  data_source_id         = "scheduled_query"
  schedule               = "every day 02:00"
  destination_dataset_id = google_bigquery_dataset.analytics.dataset_id

  params = {
    query                 = file("/queries/daily_user_metrics.sql")
    destination_table_name_template = "user_metrics_daily"
    write_disposition     = "WRITE_APPEND"
  }

  service_account_name = google_service_account.bq_scheduler.email

  email_preferences {
    enable_failure_email = true
  }
}

resource "google_bigquery_data_transfer_config" "hourly_kpi" {
  display_name           = "Hourly KPI Report"
  location               = var.region
  data_source_id         = "scheduled_query"
  schedule               = "every 1 hours"
  destination_dataset_id = google_bigquery_dataset.analytics.dataset_id

  params = {
    query = templatefile("/queries/hourly_kpi.sql", {
      project = var.project_id
      dataset = var.dataset_id
    })
  }

  service_account_name = google_service_account.bq_scheduler.email
}

# variables.tf
variable "project_id" {
  description = "GCP Project ID"
  type        = string
}

variable "dataset_id" {
  description = "BigQuery Dataset ID"
  type        = string
  default     = "analytics"
}

variable "region" {
  description = "BigQuery region"
  type        = string
  default     = "asia-southeast1"
}

# environments/prod.tfvars
# project_id = "my-prod-project"
# dataset_id = "prod_analytics"

# environments/dev.tfvars
# project_id = "my-dev-project"
# dataset_id = "dev_analytics"

Python Automation & Monitoring

# automation.py — Python BQ scheduled query management
import json
import random

class BQAutomation:
    CODE = """
# bq_manager.py — Manage BigQuery scheduled queries
from google.cloud import bigquery_datatransfer_v1
from google.cloud import bigquery
import json

class BQScheduledQueryManager:
    def __init__(self, project_id, location='asia-southeast1'):
        self.project_id = project_id
        self.location = location
        self.transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()
        self.bq_client = bigquery.Client(project=project_id)
        self.parent = f"projects/{project_id}/locations/{location}"
    
    def list_scheduled_queries(self):
        '''List all scheduled queries'''
        configs = self.transfer_client.list_transfer_configs(
            request={"parent": self.parent}
        )
        results = []
        for config in configs:
            results.append({
                "name": config.display_name,
                "schedule": config.schedule,
                "state": config.state.name,
                "next_run": str(config.next_run_time),
            })
        return results
    
    def get_run_history(self, config_name, limit=10):
        '''Get run history for a scheduled query'''
        runs = self.transfer_client.list_transfer_runs(
            request={"parent": config_name}
        )
        results = []
        for run in list(runs)[:limit]:
            results.append({
                "run_time": str(run.run_time),
                "state": run.state.name,
                "error": run.error_status.message if run.error_status else None,
                "bytes_processed": run.params.get("bytes_processed"),
            })
        return results
    
    def dry_run(self, sql):
        '''Dry run a query to estimate cost'''
        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
        query_job = self.bq_client.query(sql, job_config=job_config)
        
        bytes_processed = query_job.total_bytes_processed
        cost_usd = (bytes_processed / 1e12) * 6.25  # $6.25/TB on-demand
        
        return {
            "bytes_processed": bytes_processed,
            "gb_processed": round(bytes_processed / 1e9, 2),
            "estimated_cost_usd": round(cost_usd, 4),
        }
    
    def validate_sql(self, sql_file):
        '''Validate SQL file syntax'''
        with open(sql_file) as f:
            sql = f.read()
        try:
            self.dry_run(sql)
            return {"valid": True, "file": sql_file}
        except Exception as e:
            return {"valid": False, "file": sql_file, "error": str(e)}

manager = BQScheduledQueryManager("my-project")
queries = manager.list_scheduled_queries()
for q in queries:
    print(f"  [{q['state']}] {q['name']} — {q['schedule']}")
"""

    def show_code(self):
        print("=== BQ Manager ===")
        print(self.CODE[:600])

    def cost_dashboard(self):
        print(f"\n=== Cost Dashboard (Monthly) ===")
        queries = [
            {"name": "Daily User Metrics", "runs": 30, "gb": random.uniform(10, 100), "cost": 0},
            {"name": "Hourly KPI", "runs": 720, "gb": random.uniform(5, 50), "cost": 0},
            {"name": "Weekly Cohort", "runs": 4, "gb": random.uniform(50, 200), "cost": 0},
            {"name": "Monthly Report", "runs": 1, "gb": random.uniform(100, 500), "cost": 0},
        ]
        total_cost = 0
        for q in queries:
            total_gb = q["gb"] * q["runs"]
            q["cost"] = total_gb * 6.25 / 1000
            total_cost += q["cost"]
            print(f"  {q['name']:<25} {q['runs']:>4} runs | {total_gb:>8.1f} GB | ")
        print(f"  {'TOTAL':<25} {'':>4}      | {'':>8}    | ")

auto = BQAutomation()
auto.show_code()
auto.cost_dashboard()

CI/CD Pipeline

# cicd.py — CI/CD for BigQuery scheduled queries
import json

class CICD:
    GITHUB_ACTIONS = """
# .github/workflows/bq-deploy.yml
name: Deploy BigQuery Scheduled Queries
on:
  push:
    branches: [main]
    paths: ['queries/**', 'terraform/**']
  pull_request:
    paths: ['queries/**', 'terraform/**']

jobs:
  validate:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'
      
      - name: Install dependencies
        run: pip install google-cloud-bigquery sqlfluff
      
      - name: Lint SQL
        run: sqlfluff lint queries/ --dialect bigquery
      
      - name: Dry Run Queries
        env:
          GOOGLE_APPLICATION_CREDENTIALS: }
        run: python scripts/validate_queries.py queries/

  deploy:
    needs: validate
    if: github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v3
      
      - name: Terraform Init
        run: terraform init
        working-directory: terraform/
      
      - name: Terraform Plan
        run: terraform plan -var-file=environments/prod.tfvars
        working-directory: terraform/
      
      - name: Terraform Apply
        run: terraform apply -auto-approve -var-file=environments/prod.tfvars
        working-directory: terraform/
"""

    PROJECT_STRUCTURE = """
    bigquery-pipelines/
    ├── queries/
    │   ├── daily_user_metrics.sql
    │   ├── hourly_kpi.sql
    │   └── weekly_cohort.sql
    ├── terraform/
    │   ├── main.tf
    │   ├── variables.tf
    │   ├── outputs.tf
    │   └── environments/
    │       ├── dev.tfvars
    │       ├── staging.tfvars
    │       └── prod.tfvars
    ├── scripts/
    │   ├── validate_queries.py
    │   └── backfill.py
    ├── tests/
    │   └── test_queries.py
    ├── .sqlfluff
    └── .github/workflows/bq-deploy.yml
    """

    def show_pipeline(self):
        print("=== CI/CD Pipeline ===")
        print(self.GITHUB_ACTIONS[:500])

    def show_structure(self):
        print(f"\n=== Project Structure ===")
        print(self.PROJECT_STRUCTURE[:500])

cicd = CICD()
cicd.show_pipeline()
cicd.show_structure()

FAQ - คำถามที่พบบ่อย

Q: BigQuery Scheduled Query กับ Cloud Composer (Airflow) อันไหนดี?

A: Scheduled Query: ง่าย, ฟรี (จ่ายแค่ query cost), เหมาะ simple ETL/aggregation Cloud Composer: ซับซ้อนกว่า, dependencies, branching, retry logic, multi-step pipelines ใช้ Scheduled Query: SQL-only jobs, simple scheduling ใช้ Composer: complex DAGs, cross-service orchestration, conditional logic

Q: ค่าใช้จ่าย BigQuery Scheduled Query เท่าไหร่?

A: Scheduling: ฟรี (ไม่มีค่า scheduling fee) Query cost: $6.25/TB processed (on-demand) หรือ flat-rate slots ลดค่า: ใช้ partitioned tables, clustering, materialized views Cache: repeated queries ไม่เสียค่า (ถ้า result cached) ประมาณ: daily query scan 100GB = $0.625/day = ~$19/month

Q: Idempotent queries สำคัญอย่างไร?

A: Idempotent = รัน query ซ้ำกี่ครั้งก็ได้ผลเหมือนเดิม สำคัญมากสำหรับ scheduled queries เพราะ: retry ได้โดยไม่สร้าง duplicate data, backfill ได้ง่าย Patterns: MERGE (upsert), CREATE OR REPLACE, INSERT OVERWRITE หลีกเลี่ยง: INSERT INTO (สร้าง duplicates ถ้ารันซ้ำ)

Q: 12-Factor App เหมาะกับ data pipelines ไหม?

A: เหมาะมาก โดยเฉพาะ: Config in environment (ไม่ hardcode project IDs), Codebase in Git, Dev/Prod parity (แยก datasets), Logs as event streams, Disposability (idempotent queries) ช่วยให้ pipelines maintainable, testable, deployable ข้าม environments

📖 บทความที่เกี่ยวข้อง

BigQuery Scheduled Query Zero Downtime Deploymentอ่านบทความ → BigQuery Scheduled Query Distributed Systemอ่านบทความ → BigQuery Scheduled Query Technical Debt Managementอ่านบทความ → BigQuery Scheduled Query Service Mesh Setupอ่านบทความ → GCP BigQuery ML 12 Factor Appอ่านบทความ →

📚 ดูบทความทั้งหมด →