BigQuery Scheduled Query 12 Factor App คืออะไร
BigQuery เป็น serverless data warehouse จาก Google Cloud ที่ประมวลผล SQL queries บน petabytes ของข้อมูลได้ในวินาที Scheduled Queries คือการตั้งเวลารัน SQL อัตโนมัติ เช่น ETL jobs, reports, aggregations 12-Factor App เป็นหลักการออกแบบ cloud-native applications 12 ข้อจาก Heroku การรวมสามแนวคิดนี้ช่วยสร้าง data pipelines บน BigQuery ที่ maintainable, scalable และ production-ready ตามหลัก 12-Factor
12-Factor Principles สำหรับ Data Pipelines
# twelve_factor.py — 12-Factor principles for BigQuery pipelines
import json
class TwelveFactorBQ:
FACTORS = {
"1_codebase": {
"name": "I. Codebase",
"principle": "One codebase tracked in version control, many deploys",
"bq_application": "SQL queries เก็บใน Git repo — dev, staging, prod ใช้ codebase เดียว",
"example": "queries/ folder ใน Git + CI/CD deploy scheduled queries",
},
"2_dependencies": {
"name": "II. Dependencies",
"principle": "Explicitly declare and isolate dependencies",
"bq_application": "Declare datasets, tables, UDFs ที่ query ต้องใช้ — ไม่ hardcode project IDs",
"example": "ใช้ parameters: @project_id, @dataset แทน hardcoded values",
},
"3_config": {
"name": "III. Config",
"principle": "Store config in the environment",
"bq_application": "Project ID, dataset names, schedule ใน environment variables ไม่ใช่ใน SQL",
"example": "Terraform variables สำหรับ scheduled query config",
},
"4_backing": {
"name": "IV. Backing Services",
"principle": "Treat backing services as attached resources",
"bq_application": "BigQuery, Cloud Storage, Pub/Sub เป็น attached resources — เปลี่ยนได้โดยไม่แก้ code",
},
"5_build_release_run": {
"name": "V. Build, Release, Run",
"principle": "Strictly separate build and run stages",
"bq_application": "Build: validate SQL + dry run → Release: deploy to BQ → Run: scheduled execution",
},
"6_processes": {
"name": "VI. Processes",
"principle": "Execute the app as stateless processes",
"bq_application": "Scheduled queries เป็น stateless — ไม่เก็บ state ระหว่าง runs, ใช้ tables เป็น state",
},
"7_port_binding": {
"name": "VII. Port Binding",
"principle": "Export services via port binding",
"bq_application": "Expose results ผ่าน BigQuery API, Connected Sheets, Looker",
},
"8_concurrency": {
"name": "VIII. Concurrency",
"principle": "Scale out via the process model",
"bq_application": "BigQuery auto-scales — ไม่ต้อง manage capacity, ใช้ slots สำหรับ concurrency",
},
"9_disposability": {
"name": "IX. Disposability",
"principle": "Maximize robustness with fast startup and graceful shutdown",
"bq_application": "Queries idempotent — rerun ได้ไม่กระทบ, ใช้ MERGE/INSERT OVERWRITE",
},
"10_dev_prod_parity": {
"name": "X. Dev/Prod Parity",
"principle": "Keep development, staging, and production as similar as possible",
"bq_application": "ใช้ dataset แยก: dev_dataset, staging_dataset, prod_dataset — SQL เดียวกัน",
},
"11_logs": {
"name": "XI. Logs",
"principle": "Treat logs as event streams",
"bq_application": "BigQuery audit logs → Cloud Logging → monitoring/alerting",
},
"12_admin": {
"name": "XII. Admin Processes",
"principle": "Run admin/management tasks as one-off processes",
"bq_application": "Backfill queries, migration scripts เป็น one-off — ไม่ใช่ scheduled",
},
}
def show_factors(self):
print("=== 12-Factor for BigQuery ===\n")
for key, factor in list(self.FACTORS.items())[:6]:
print(f"[{factor['name']}]")
print(f" BQ: {factor['bq_application']}")
print()
tf = TwelveFactorBQ()
tf.show_factors()
Scheduled Query Setup
# scheduled_query.py — BigQuery Scheduled Query examples
import json
class ScheduledQueryExamples:
DAILY_AGGREGATION = """
-- daily_user_metrics.sql — Scheduled daily at 02:00 UTC
-- Idempotent: MERGE pattern (Factor IX: Disposability)
MERGE `{project}.{dataset}.user_metrics_daily` AS target
USING (
SELECT
DATE(event_timestamp) AS date,
user_id,
COUNT(*) AS total_events,
COUNT(DISTINCT session_id) AS sessions,
SUM(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS purchases,
SUM(CASE WHEN event_name = 'purchase' THEN revenue ELSE 0 END) AS revenue,
MIN(event_timestamp) AS first_event,
MAX(event_timestamp) AS last_event
FROM `{project}.{dataset}.raw_events`
WHERE DATE(event_timestamp) = @run_date
GROUP BY date, user_id
) AS source
ON target.date = source.date AND target.user_id = source.user_id
WHEN MATCHED THEN
UPDATE SET
total_events = source.total_events,
sessions = source.sessions,
purchases = source.purchases,
revenue = source.revenue,
first_event = source.first_event,
last_event = source.last_event
WHEN NOT MATCHED THEN
INSERT (date, user_id, total_events, sessions, purchases, revenue, first_event, last_event)
VALUES (source.date, source.user_id, source.total_events, source.sessions,
source.purchases, source.revenue, source.first_event, source.last_event);
"""
HOURLY_REPORT = """
-- hourly_kpi_report.sql — Scheduled hourly
CREATE OR REPLACE TABLE `{project}.{dataset}.kpi_hourly`
PARTITION BY DATE(hour)
CLUSTER BY metric_name
AS
SELECT
TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,
'active_users' AS metric_name,
COUNT(DISTINCT user_id) AS metric_value
FROM `{project}.{dataset}.raw_events`
WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
AND event_timestamp < TIMESTAMP_TRUNC(@run_time, HOUR)
GROUP BY hour
UNION ALL
SELECT
TIMESTAMP_TRUNC(event_timestamp, HOUR),
'revenue',
SUM(revenue)
FROM `{project}.{dataset}.raw_events`
WHERE event_name = 'purchase'
AND event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)
AND event_timestamp < TIMESTAMP_TRUNC(@run_time, HOUR)
GROUP BY 1;
"""
def show_daily(self):
print("=== Daily Aggregation (MERGE) ===")
print(self.DAILY_AGGREGATION[:500])
def show_hourly(self):
print(f"\n=== Hourly KPI Report ===")
print(self.HOURLY_REPORT[:400])
sq = ScheduledQueryExamples()
sq.show_daily()
sq.show_hourly()
Terraform Infrastructure as Code
# terraform.tf — Terraform config for BigQuery scheduled queries
# Factor III (Config) + Factor V (Build, Release, Run)
# main.tf
resource "google_bigquery_data_transfer_config" "daily_metrics" {
display_name = "Daily User Metrics"
location = var.region
data_source_id = "scheduled_query"
schedule = "every day 02:00"
destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
params = {
query = file("/queries/daily_user_metrics.sql")
destination_table_name_template = "user_metrics_daily"
write_disposition = "WRITE_APPEND"
}
service_account_name = google_service_account.bq_scheduler.email
email_preferences {
enable_failure_email = true
}
}
resource "google_bigquery_data_transfer_config" "hourly_kpi" {
display_name = "Hourly KPI Report"
location = var.region
data_source_id = "scheduled_query"
schedule = "every 1 hours"
destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
params = {
query = templatefile("/queries/hourly_kpi.sql", {
project = var.project_id
dataset = var.dataset_id
})
}
service_account_name = google_service_account.bq_scheduler.email
}
# variables.tf
variable "project_id" {
description = "GCP Project ID"
type = string
}
variable "dataset_id" {
description = "BigQuery Dataset ID"
type = string
default = "analytics"
}
variable "region" {
description = "BigQuery region"
type = string
default = "asia-southeast1"
}
# environments/prod.tfvars
# project_id = "my-prod-project"
# dataset_id = "prod_analytics"
# environments/dev.tfvars
# project_id = "my-dev-project"
# dataset_id = "dev_analytics"
Python Automation & Monitoring
# automation.py — Python BQ scheduled query management
import json
import random
class BQAutomation:
CODE = """
# bq_manager.py — Manage BigQuery scheduled queries
from google.cloud import bigquery_datatransfer_v1
from google.cloud import bigquery
import json
class BQScheduledQueryManager:
def __init__(self, project_id, location='asia-southeast1'):
self.project_id = project_id
self.location = location
self.transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()
self.bq_client = bigquery.Client(project=project_id)
self.parent = f"projects/{project_id}/locations/{location}"
def list_scheduled_queries(self):
'''List all scheduled queries'''
configs = self.transfer_client.list_transfer_configs(
request={"parent": self.parent}
)
results = []
for config in configs:
results.append({
"name": config.display_name,
"schedule": config.schedule,
"state": config.state.name,
"next_run": str(config.next_run_time),
})
return results
def get_run_history(self, config_name, limit=10):
'''Get run history for a scheduled query'''
runs = self.transfer_client.list_transfer_runs(
request={"parent": config_name}
)
results = []
for run in list(runs)[:limit]:
results.append({
"run_time": str(run.run_time),
"state": run.state.name,
"error": run.error_status.message if run.error_status else None,
"bytes_processed": run.params.get("bytes_processed"),
})
return results
def dry_run(self, sql):
'''Dry run a query to estimate cost'''
job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
query_job = self.bq_client.query(sql, job_config=job_config)
bytes_processed = query_job.total_bytes_processed
cost_usd = (bytes_processed / 1e12) * 6.25 # $6.25/TB on-demand
return {
"bytes_processed": bytes_processed,
"gb_processed": round(bytes_processed / 1e9, 2),
"estimated_cost_usd": round(cost_usd, 4),
}
def validate_sql(self, sql_file):
'''Validate SQL file syntax'''
with open(sql_file) as f:
sql = f.read()
try:
self.dry_run(sql)
return {"valid": True, "file": sql_file}
except Exception as e:
return {"valid": False, "file": sql_file, "error": str(e)}
manager = BQScheduledQueryManager("my-project")
queries = manager.list_scheduled_queries()
for q in queries:
print(f" [{q['state']}] {q['name']} — {q['schedule']}")
"""
def show_code(self):
print("=== BQ Manager ===")
print(self.CODE[:600])
def cost_dashboard(self):
print(f"\n=== Cost Dashboard (Monthly) ===")
queries = [
{"name": "Daily User Metrics", "runs": 30, "gb": random.uniform(10, 100), "cost": 0},
{"name": "Hourly KPI", "runs": 720, "gb": random.uniform(5, 50), "cost": 0},
{"name": "Weekly Cohort", "runs": 4, "gb": random.uniform(50, 200), "cost": 0},
{"name": "Monthly Report", "runs": 1, "gb": random.uniform(100, 500), "cost": 0},
]
total_cost = 0
for q in queries:
total_gb = q["gb"] * q["runs"]
q["cost"] = total_gb * 6.25 / 1000
total_cost += q["cost"]
print(f" {q['name']:<25} {q['runs']:>4} runs | {total_gb:>8.1f} GB | ")
print(f" {'TOTAL':<25} {'':>4} | {'':>8} | ")
auto = BQAutomation()
auto.show_code()
auto.cost_dashboard()
CI/CD Pipeline
# cicd.py — CI/CD for BigQuery scheduled queries
import json
class CICD:
GITHUB_ACTIONS = """
# .github/workflows/bq-deploy.yml
name: Deploy BigQuery Scheduled Queries
on:
push:
branches: [main]
paths: ['queries/**', 'terraform/**']
pull_request:
paths: ['queries/**', 'terraform/**']
jobs:
validate:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install google-cloud-bigquery sqlfluff
- name: Lint SQL
run: sqlfluff lint queries/ --dialect bigquery
- name: Dry Run Queries
env:
GOOGLE_APPLICATION_CREDENTIALS: }
run: python scripts/validate_queries.py queries/
deploy:
needs: validate
if: github.ref == 'refs/heads/main'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Terraform
uses: hashicorp/setup-terraform@v3
- name: Terraform Init
run: terraform init
working-directory: terraform/
- name: Terraform Plan
run: terraform plan -var-file=environments/prod.tfvars
working-directory: terraform/
- name: Terraform Apply
run: terraform apply -auto-approve -var-file=environments/prod.tfvars
working-directory: terraform/
"""
PROJECT_STRUCTURE = """
bigquery-pipelines/
├── queries/
│ ├── daily_user_metrics.sql
│ ├── hourly_kpi.sql
│ └── weekly_cohort.sql
├── terraform/
│ ├── main.tf
│ ├── variables.tf
│ ├── outputs.tf
│ └── environments/
│ ├── dev.tfvars
│ ├── staging.tfvars
│ └── prod.tfvars
├── scripts/
│ ├── validate_queries.py
│ └── backfill.py
├── tests/
│ └── test_queries.py
├── .sqlfluff
└── .github/workflows/bq-deploy.yml
"""
def show_pipeline(self):
print("=== CI/CD Pipeline ===")
print(self.GITHUB_ACTIONS[:500])
def show_structure(self):
print(f"\n=== Project Structure ===")
print(self.PROJECT_STRUCTURE[:500])
cicd = CICD()
cicd.show_pipeline()
cicd.show_structure()
FAQ - คำถามที่พบบ่อย
Q: BigQuery Scheduled Query กับ Cloud Composer (Airflow) อันไหนดี?
A: Scheduled Query: ง่าย, ฟรี (จ่ายแค่ query cost), เหมาะ simple ETL/aggregation Cloud Composer: ซับซ้อนกว่า, dependencies, branching, retry logic, multi-step pipelines ใช้ Scheduled Query: SQL-only jobs, simple scheduling ใช้ Composer: complex DAGs, cross-service orchestration, conditional logic
Q: ค่าใช้จ่าย BigQuery Scheduled Query เท่าไหร่?
A: Scheduling: ฟรี (ไม่มีค่า scheduling fee) Query cost: $6.25/TB processed (on-demand) หรือ flat-rate slots ลดค่า: ใช้ partitioned tables, clustering, materialized views Cache: repeated queries ไม่เสียค่า (ถ้า result cached) ประมาณ: daily query scan 100GB = $0.625/day = ~$19/month
Q: Idempotent queries สำคัญอย่างไร?
A: Idempotent = รัน query ซ้ำกี่ครั้งก็ได้ผลเหมือนเดิม สำคัญมากสำหรับ scheduled queries เพราะ: retry ได้โดยไม่สร้าง duplicate data, backfill ได้ง่าย Patterns: MERGE (upsert), CREATE OR REPLACE, INSERT OVERWRITE หลีกเลี่ยง: INSERT INTO (สร้าง duplicates ถ้ารันซ้ำ)
Q: 12-Factor App เหมาะกับ data pipelines ไหม?
A: เหมาะมาก โดยเฉพาะ: Config in environment (ไม่ hardcode project IDs), Codebase in Git, Dev/Prod parity (แยก datasets), Logs as event streams, Disposability (idempotent queries) ช่วยให้ pipelines maintainable, testable, deployable ข้าม environments
