it

BigQuery Scheduled Query 12 Factor App

BigQuery Scheduled Query 12 Factor App

BigQuery Scheduled Query 12 Factor App คืออะไร

BigQuery Scheduled Query 12 Factor App

BigQuery เป็น serverless data warehouse จาก Google Cloud ที่ประมวลผล SQL queries บน petabytes ของข้อมูลได้ในวินาที Scheduled Queries คือการตั้งเวลารัน SQL อัตโนมัติ เช่น ETL jobs, reports, aggregations 12-Factor App เป็นหลักการออกแบบ cloud-native applications 12 ข้อจาก Heroku การรวมสามแนวคิดนี้ช่วยสร้าง data pipelines บน BigQuery ที่ maintainable, scalable และ production-ready ตามหลัก 12-Factor

12-Factor Principles สำหรับ Data Pipelines

# twelve_factor.py — 12-Factor principles for BigQuery pipelines

import json



class TwelveFactorBQ:

    FACTORS = {

        "1_codebase": {

            "name": "I. Codebase",

            "principle": "One codebase tracked in version control, many deploys",

            "bq_application": "SQL queries เก็บใน Git repo — dev, staging, prod ใช้ codebase เดียว",

            "example": "queries/ folder ใน Git + CI/CD deploy scheduled queries",

        },

        "2_dependencies": {

            "name": "II. Dependencies",

            "principle": "Explicitly declare and isolate dependencies",

            "bq_application": "Declare datasets, tables, UDFs ที่ query ต้องใช้ — ไม่ hardcode project IDs",

            "example": "ใช้ parameters: @project_id, @dataset แทน hardcoded values",

        },

        "3_config": {

            "name": "III. Config",

            "principle": "Store config in the environment",

            "bq_application": "Project ID, dataset names, schedule ใน environment variables ไม่ใช่ใน SQL",

            "example": "Terraform variables สำหรับ scheduled query config",

        },

        "4_backing": {

            "name": "IV. Backing Services",

            "principle": "Treat backing services as attached resources",

            "bq_application": "BigQuery, Cloud Storage, Pub/Sub เป็น attached resources — เปลี่ยนได้โดยไม่แก้ code",

        },

        "5_build_release_run": {

            "name": "V. Build, Release, Run",

            "principle": "Strictly separate build and run stages",

            "bq_application": "Build: validate SQL + dry run → Release: deploy to BQ → Run: scheduled execution",

        },

        "6_processes": {

            "name": "VI. Processes",

            "principle": "Execute the app as stateless processes",

            "bq_application": "Scheduled queries เป็น stateless — ไม่เก็บ state ระหว่าง runs, ใช้ tables เป็น state",

        },

        "7_port_binding": {

            "name": "VII. Port Binding",

            "principle": "Export services via port binding",

            "bq_application": "Expose results ผ่าน BigQuery API, Connected Sheets, Looker",

        },

        "8_concurrency": {

            "name": "VIII. Concurrency",

            "principle": "Scale out via the process model",

            "bq_application": "BigQuery auto-scales — ไม่ต้อง manage capacity, ใช้ slots สำหรับ concurrency",

        },

        "9_disposability": {

            "name": "IX. Disposability",

            "principle": "Maximize robustness with fast startup and graceful shutdown",

            "bq_application": "Queries idempotent — rerun ได้ไม่กระทบ, ใช้ MERGE/INSERT OVERWRITE",

        },

        "10_dev_prod_parity": {

            "name": "X. Dev/Prod Parity",

            "principle": "Keep development, staging, and production as similar as possible",

            "bq_application": "ใช้ dataset แยก: dev_dataset, staging_dataset, prod_dataset — SQL เดียวกัน",

        },

        "11_logs": {

            "name": "XI. Logs",

            "principle": "Treat logs as event streams",

            "bq_application": "BigQuery audit logs → Cloud Logging → monitoring/alerting",

        },

        "12_admin": {

            "name": "XII. Admin Processes",

            "principle": "Run admin/management tasks as one-off processes",

            "bq_application": "Backfill queries, migration scripts เป็น one-off — ไม่ใช่ scheduled",

        },

    }



    def show_factors(self):

        print("=== 12-Factor for BigQuery ===\n")

        for key, factor in list(self.FACTORS.items())[:6]:

            print(f"[{factor['name']}]")

            print(f"  BQ: {factor['bq_application']}")

            print()



tf = TwelveFactorBQ()

tf.show_factors()

Scheduled Query Setup

# scheduled_query.py — BigQuery Scheduled Query examples

import json



class ScheduledQueryExamples:

    DAILY_AGGREGATION = """

-- daily_user_metrics.sql — Scheduled daily at 02:00 UTC

-- Idempotent: MERGE pattern (Factor IX: Disposability)

MERGE `{project}.{dataset}.user_metrics_daily` AS target

USING (

    SELECT

        DATE(event_timestamp) AS date,

        user_id,

        COUNT(*) AS total_events,

        COUNT(DISTINCT session_id) AS sessions,

        SUM(CASE WHEN event_name = 'purchase' THEN 1 ELSE 0 END) AS purchases,

        SUM(CASE WHEN event_name = 'purchase' THEN revenue ELSE 0 END) AS revenue,

        MIN(event_timestamp) AS first_event,

        MAX(event_timestamp) AS last_event

    FROM `{project}.{dataset}.raw_events`

    WHERE DATE(event_timestamp) = @run_date

    GROUP BY date, user_id

) AS source

ON target.date = source.date AND target.user_id = source.user_id

WHEN MATCHED THEN

    UPDATE SET

        total_events = source.total_events,

        sessions = source.sessions,

        purchases = source.purchases,

        revenue = source.revenue,

        first_event = source.first_event,

        last_event = source.last_event

WHEN NOT MATCHED THEN

    INSERT (date, user_id, total_events, sessions, purchases, revenue, first_event, last_event)

    VALUES (source.date, source.user_id, source.total_events, source.sessions,

            source.purchases, source.revenue, source.first_event, source.last_event);

"""



    HOURLY_REPORT = """

-- hourly_kpi_report.sql — Scheduled hourly

CREATE OR REPLACE TABLE `{project}.{dataset}.kpi_hourly`

PARTITION BY DATE(hour)

CLUSTER BY metric_name

AS

SELECT

    TIMESTAMP_TRUNC(event_timestamp, HOUR) AS hour,

    'active_users' AS metric_name,

    COUNT(DISTINCT user_id) AS metric_value

FROM `{project}.{dataset}.raw_events`

WHERE event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)

    AND event_timestamp < TIMESTAMP_TRUNC(@run_time, HOUR)

GROUP BY hour



UNION ALL



SELECT

    TIMESTAMP_TRUNC(event_timestamp, HOUR),

    'revenue',

    SUM(revenue)

FROM `{project}.{dataset}.raw_events`

WHERE event_name = 'purchase'

    AND event_timestamp >= TIMESTAMP_SUB(@run_time, INTERVAL 2 HOUR)

    AND event_timestamp < TIMESTAMP_TRUNC(@run_time, HOUR)

GROUP BY 1;

"""



    def show_daily(self):

        print("=== Daily Aggregation (MERGE) ===")

        print(self.DAILY_AGGREGATION[:500])



    def show_hourly(self):

        print(f"\n=== Hourly KPI Report ===")

        print(self.HOURLY_REPORT[:400])



sq = ScheduledQueryExamples()

sq.show_daily()

sq.show_hourly()

Terraform Infrastructure as Code

BigQuery Scheduled Query 12 Factor App
# terraform.tf — Terraform config for BigQuery scheduled queries

# Factor III (Config) + Factor V (Build, Release, Run)



# main.tf

resource "google_bigquery_data_transfer_config" "daily_metrics" {

  display_name           = "Daily User Metrics"

  location               = var.region

  data_source_id         = "scheduled_query"

  schedule               = "every day 02:00"

  destination_dataset_id = google_bigquery_dataset.analytics.dataset_id



  params = {

    query                 = file("/queries/daily_user_metrics.sql")

    destination_table_name_template = "user_metrics_daily"

    write_disposition     = "WRITE_APPEND"

  }



  service_account_name = google_service_account.bq_scheduler.email



  email_preferences {

    enable_failure_email = true

  }

}



resource "google_bigquery_data_transfer_config" "hourly_kpi" {

  display_name           = "Hourly KPI Report"

  location               = var.region

  data_source_id         = "scheduled_query"

  schedule               = "every 1 hours"

  destination_dataset_id = google_bigquery_dataset.analytics.dataset_id



  params = {

    query = templatefile("/queries/hourly_kpi.sql", {

      project = var.project_id

      dataset = var.dataset_id

    })

  }



  service_account_name = google_service_account.bq_scheduler.email

}



# variables.tf

variable "project_id" {

  description = "GCP Project ID"

  type        = string

}



variable "dataset_id" {

  description = "BigQuery Dataset ID"

  type        = string

  default     = "analytics"

}



variable "region" {

  description = "BigQuery region"

  type        = string

  default     = "asia-southeast1"

}



# environments/prod.tfvars

# project_id = "my-prod-project"

# dataset_id = "prod_analytics"



# environments/dev.tfvars

# project_id = "my-dev-project"

# dataset_id = "dev_analytics"

Python Automation & Monitoring

# automation.py — Python BQ scheduled query management

import json

import random



class BQAutomation:

    CODE = """

# bq_manager.py — Manage BigQuery scheduled queries

from google.cloud import bigquery_datatransfer_v1

from google.cloud import bigquery

import json



class BQScheduledQueryManager:

    def __init__(self, project_id, location='asia-southeast1'):

        self.project_id = project_id

        self.location = location

        self.transfer_client = bigquery_datatransfer_v1.DataTransferServiceClient()

        self.bq_client = bigquery.Client(project=project_id)

        self.parent = f"projects/{project_id}/locations/{location}"

    

    def list_scheduled_queries(self):

        '''List all scheduled queries'''

        configs = self.transfer_client.list_transfer_configs(

            request={"parent": self.parent}

        )

        results = []

        for config in configs:

            results.append({

                "name": config.display_name,

                "schedule": config.schedule,

                "state": config.state.name,

                "next_run": str(config.next_run_time),

            })

        return results

    

    def get_run_history(self, config_name, limit=10):

        '''Get run history for a scheduled query'''

        runs = self.transfer_client.list_transfer_runs(

            request={"parent": config_name}

        )

        results = []

        for run in list(runs)[:limit]:

            results.append({

                "run_time": str(run.run_time),

                "state": run.state.name,

                "error": run.error_status.message if run.error_status else None,

                "bytes_processed": run.params.get("bytes_processed"),

            })

        return results

    

    def dry_run(self, sql):

        '''Dry run a query to estimate cost'''

        job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)

        query_job = self.bq_client.query(sql, job_config=job_config)

        

        bytes_processed = query_job.total_bytes_processed

        cost_usd = (bytes_processed / 1e12) * 6.25  # $6.25/TB on-demand

        

        return {

            "bytes_processed": bytes_processed,

            "gb_processed": round(bytes_processed / 1e9, 2),

            "estimated_cost_usd": round(cost_usd, 4),

        }

    

    def validate_sql(self, sql_file):

        '''Validate SQL file syntax'''

        with open(sql_file) as f:

            sql = f.read()

        try:

            self.dry_run(sql)

            return {"valid": True, "file": sql_file}

        except Exception as e:

            return {"valid": False, "file": sql_file, "error": str(e)}



manager = BQScheduledQueryManager("my-project")

queries = manager.list_scheduled_queries()

for q in queries:

    print(f"  [{q['state']}] {q['name']} — {q['schedule']}")

"""



    def show_code(self):

        print("=== BQ Manager ===")

        print(self.CODE[:600])



    def cost_dashboard(self):

        print(f"\n=== Cost Dashboard (Monthly) ===")

        queries = [

            {"name": "Daily User Metrics", "runs": 30, "gb": random.uniform(10, 100), "cost": 0},

            {"name": "Hourly KPI", "runs": 720, "gb": random.uniform(5, 50), "cost": 0},

            {"name": "Weekly Cohort", "runs": 4, "gb": random.uniform(50, 200), "cost": 0},

            {"name": "Monthly Report", "runs": 1, "gb": random.uniform(100, 500), "cost": 0},

        ]

        total_cost = 0

        for q in queries:

            total_gb = q["gb"] * q["runs"]

            q["cost"] = total_gb * 6.25 / 1000

            total_cost += q["cost"]

            print(f"  {q['name']:<25} {q['runs']:>4} runs | {total_gb:>8.1f} GB | ")

        print(f"  {'TOTAL':<25} {'':>4}      | {'':>8}    | ")



auto = BQAutomation()

auto.show_code()

auto.cost_dashboard()

CI/CD Pipeline

# cicd.py — CI/CD for BigQuery scheduled queries

import json



class CICD:

    GITHUB_ACTIONS = """

# .github/workflows/bq-deploy.yml

name: Deploy BigQuery Scheduled Queries

on:

  push:

    branches: [main]

    paths: ['queries/**', 'terraform/**']

  pull_request:

    paths: ['queries/**', 'terraform/**']



jobs:

  validate:

    runs-on: ubuntu-latest

    steps:

      - uses: actions/checkout@v4

      

      - name: Setup Python

        uses: actions/setup-python@v5

        with:

          python-version: '3.11'

      

      - name: Install dependencies

        run: pip install google-cloud-bigquery sqlfluff

      

      - name: Lint SQL

        run: sqlfluff lint queries/ --dialect bigquery

      

      - name: Dry Run Queries

        env:

          GOOGLE_APPLICATION_CREDENTIALS: }

        run: python scripts/validate_queries.py queries/



  deploy:

    needs: validate

    if: github.ref == 'refs/heads/main'

    runs-on: ubuntu-latest

    steps:

      - uses: actions/checkout@v4

      

      - name: Setup Terraform

        uses: hashicorp/setup-terraform@v3

      

      - name: Terraform Init

        run: terraform init

        working-directory: terraform/

      

      - name: Terraform Plan

        run: terraform plan -var-file=environments/prod.tfvars

        working-directory: terraform/

      

      - name: Terraform Apply

        run: terraform apply -auto-approve -var-file=environments/prod.tfvars

        working-directory: terraform/

"""



    PROJECT_STRUCTURE = """

    bigquery-pipelines/

    ├── queries/

    │   ├── daily_user_metrics.sql

    │   ├── hourly_kpi.sql

    │   └── weekly_cohort.sql

    ├── terraform/

    │   ├── main.tf

    │   ├── variables.tf

    │   ├── outputs.tf

    │   └── environments/

    │       ├── dev.tfvars

    │       ├── staging.tfvars

    │       └── prod.tfvars

    ├── scripts/

    │   ├── validate_queries.py

    │   └── backfill.py

    ├── tests/

    │   └── test_queries.py

    ├── .sqlfluff

    └── .github/workflows/bq-deploy.yml

    """



    def show_pipeline(self):

        print("=== CI/CD Pipeline ===")

        print(self.GITHUB_ACTIONS[:500])



    def show_structure(self):

        print(f"\n=== Project Structure ===")

        print(self.PROJECT_STRUCTURE[:500])



cicd = CICD()

cicd.show_pipeline()

cicd.show_structure()

FAQ - คำถามที่พบบ่อย

Q: BigQuery Scheduled Query กับ Cloud Composer (Airflow) อันไหนดี?

A: Scheduled Query: ง่าย, ฟรี (จ่ายแค่ query cost), เหมาะ simple ETL/aggregation Cloud Composer: ซับซ้อนกว่า, dependencies, branching, retry logic, multi-step pipelines ใช้ Scheduled Query: SQL-only jobs, simple scheduling ใช้ Composer: complex DAGs, cross-service orchestration, conditional logic

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน AlmaLinux Setup CI CD Automation Pipeline

Q: ค่าใช้จ่าย BigQuery Scheduled Query เท่าไหร่?

แนะนำเพิ่มเติม — XM Signal

A: Scheduling: ฟรี (ไม่มีค่า scheduling fee) Query cost: $6.25/TB processed (on-demand) หรือ flat-rate slots ลดค่า: ใช้ partitioned tables, clustering, materialized views Cache: repeated queries ไม่เสียค่า (ถ้า result cached) ประมาณ: daily query scan 100GB = $0.625/day = ~$19/month

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน HashiCorp Vault Secrets Certification Path

Q: Idempotent queries สำคัญอย่างไร?

A: Idempotent = รัน query ซ้ำกี่ครั้งก็ได้ผลเหมือนเดิม สำคัญมากสำหรับ scheduled queries เพราะ: retry ได้โดยไม่สร้าง duplicate data, backfill ได้ง่าย Patterns: MERGE (upsert), CREATE OR REPLACE, INSERT OVERWRITE หลีกเลี่ยง: INSERT INTO (สร้าง duplicates ถ้ารันซ้ำ)

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน TTS Coqui Real-time Processing

Q: 12-Factor App เหมาะกับ data pipelines ไหม?

A: เหมาะมาก โดยเฉพาะ: Config in environment (ไม่ hardcode project IDs), Codebase in Git, Dev/Prod parity (แยก datasets), Logs as event streams, Disposability (idempotent queries) ช่วยให้ pipelines maintainable, testable, deployable ข้าม environments

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: SSD 240gb คืออะไร — วิธีตั้งค่าและใช้งานจริงพร้อมตัวอย่าง

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง