SiamCafe · Blog
BigQuery Scheduled Query Pub Sub Architecture —
บทความ

BigQuery Scheduled Query Pub Sub Architecture —

เผยแพร่ 28 พฤษภาคม 2569

BigQuery Scheduled Query Pub/Sub

BigQuery Scheduled Query Pub Sub Architecture —

BigQuery Scheduled Query Google Pub/Sub Data Pipeline Automation SQL Scheduling Event Notification Cloud Function Trigger Materialized View ETL Summary Table

FeatureScheduled QueryAirflowCloud Workflows
Complexityต่ำ (SQL only)สูง (Python DAG)ปานกลาง (YAML)
Multi-stepไม่ได้ได้ได้
Costฟรี (จ่ายแค่ Query)$200+/mo (Composer)ตาม Execution
Setup5 นาที30+ นาที15 นาที
Monitoringพื้นฐานดีมากดี

Scheduled Query Setup

=== BigQuery Scheduled Query ===

bq CLI — Create Scheduled Query

bq query \

--use_legacy_sql=false \

--destination_table=analytics.daily_summary \

--schedule="every 24 hours" \

--display_name="Daily Sales Summary" \

--replace=true \

'SELECT

DATE(order_date) as date,

COUNT(*) as total_orders,

SUM(amount) as total_revenue,

AVG(amount) as avg_order,

COUNT(DISTINCT customer_id) as unique_customers

FROM `project.sales.orders`

WHERE DATE(order_date) = @run_date

GROUP BY 1'

Terraform — Scheduled Query

resource "google_bigquery_data_transfer_config" "daily_summary" {

display_name = "Daily Sales Summary"

data_source_id = "scheduled_query"

schedule = "every 24 hours"

location = "US"

destination_dataset_id = google_bigquery_dataset.analytics.dataset_id

params = {

destination_table_name_template = "daily_summary"

write_disposition = "WRITE_TRUNCATE"

query = <<-SQL

SELECT DATE(order_date) as date,

COUNT(*) as orders,

SUM(amount) as revenue

FROM `project.sales.orders`

WHERE DATE(order_date) = @run_date

GROUP BY 1

SQL

}

service_account_name = google_service_account.bq_scheduler.email

}

from dataclasses import dataclass

from typing import List

@dataclass

class ScheduledQuery:

name: str

schedule: str

destination: str

status: str

last_run: str

duration: str

queries = [

ScheduledQuery("Daily Sales Summary", "Every day 02:00", "analytics.daily_summary", "Success", "2024-03-05 02:00", "45s"),

ScheduledQuery("Hourly Active Users", "Every hour", "analytics.hourly_users", "Success", "2024-03-05 10:00", "12s"),

ScheduledQuery("Weekly Cohort", "Every Monday 03:00", "analytics.weekly_cohort", "Success", "2024-03-04 03:00", "2m"),

ScheduledQuery("Monthly Revenue", "1st of month 04:00", "analytics.monthly_revenue", "Success", "2024-03-01 04:00", "5m"),

ScheduledQuery("Product Recommendations", "Every 6 hours", "ml.recommendations", "Failed", "2024-03-05 06:00", "—"),

]

print("=== Scheduled Queries ===")

for q in queries:

print(f" [{q.status}] {q.name}")

print(f" Schedule: {q.schedule} | Last: {q.last_run} | Duration: {q.duration}")

Pub/Sub Integration

=== Pub/Sub Event Pipeline ===

Create Topic & Subscription

gcloud pubsub topics create bq-query-complete

gcloud pubsub subscriptions create bq-notify \

--topic=bq-query-complete \

--push-endpoint=https://api.example.com/webhook/bq

Cloud Function — Triggered by Pub/Sub

import functions_framework

from google.cloud import bigquery

import json

import requests

@functions_framework.cloud_event

def on_query_complete(cloud_event):

data = json.loads(cloud_event.data["message"]["data"])

query_name = data.get("query_name")

status = data.get("status")

run_time = data.get("run_time")

if status == "SUCCEEDED":

# Trigger downstream actions

if query_name == "daily_summary":

generate_report(data)

notify_slack(f"Daily Summary updated: {run_time}")

invalidate_cache("daily-dashboard")

elif status == "FAILED":

alert_oncall(f"Query failed: {query_name}")

def notify_slack(message):

webhook = os.environ["SLACK_WEBHOOK"]

requests.post(webhook, json={"text": message})

Python — Publish Event after Query

from google.cloud import pubsub_v1

import json

publisher = pubsub_v1.PublisherClient()

topic_path = publisher.topic_path("project-id", "bq-query-complete")

def publish_query_event(query_name, status, rows_affected):

event = {

"query_name": query_name,

"status": status,

"rows_affected": rows_affected,

"timestamp": datetime.utcnow().isoformat(),

}

future = publisher.publish(

topic_path,

json.dumps(event).encode("utf-8"),

query_name=query_name,

)

return future.result()

@dataclass

class PipelineEvent:

event: str

source: str

action: str

latency: str

events = [

PipelineEvent("query.completed", "Scheduled Query", "Publish to Pub/Sub", "< 1s"),

PipelineEvent("report.generate", "Cloud Function", "Create PDF Report", "< 30s"),

PipelineEvent("slack.notify", "Cloud Function", "Send Slack Message", "< 2s"),

PipelineEvent("cache.invalidate", "Cloud Function", "Clear CDN Cache", "< 5s"),

PipelineEvent("ml.trigger", "Cloud Function", "Start ML Pipeline", "< 10s"),

PipelineEvent("dashboard.refresh", "Cloud Function", "Refresh Looker", "< 15s"),

]

print("\n=== Event Pipeline ===")

for e in events:

print(f" [{e.source}] {e.event}")

print(f" Action: {e.action} | Latency: {e.latency}")

Monitoring และ Error Handling

=== Monitoring & Alerting ===

Cloud Monitoring Alert

gcloud alpha monitoring policies create \

--display-name="BQ Scheduled Query Failed" \

--condition-filter='resource.type="bigquery_dts_config"

AND metric.type="bigquery.googleapis.com/transfer/run_count"

AND metric.label.completion_state="FAILED"' \

--notification-channels=projects/my-project/notificationChannels/123

Error Handling Pattern

def retry_failed_query(query_config_id, max_retries=3):

BigQuery Scheduled Query Pub Sub Architecture —

client = bigquery_datatransfer.DataTransferServiceClient()

for attempt in range(max_retries):

try:

response = client.start_manual_transfer_runs(

parent=query_config_id,

requested_run_time=timestamp_pb2.Timestamp(

seconds=int(time.time())

),

)

return response

except Exception as e:

if attempt < max_retries - 1:

time.sleep(60 * (attempt + 1))

else:

raise

monitoring = {

"Query Success Rate": "98.5% (last 30 days)",

"Avg Query Duration": "35 seconds",

"Failed Queries (24h)": "1",

"Pub/Sub Delivery Rate": "99.99%",

"Unacked Messages": "0",

"Dead Letter Messages": "2 (last 7 days)",

}

print("Monitoring Dashboard:")

for k, v in monitoring.items():

print(f" {k}: {v}")

Cost Optimization

costs = {

"Scheduled Query (daily)": "~$5/day (1TB scanned)",

"Pub/Sub (10K msgs/day)": "~$0.04/day",

"Cloud Functions (100 invocations)": "~$0.01/day",

"Total Monthly": "~$155/month",

}

print(f"\n\nCost Breakdown:")

for item, cost in costs.items():

print(f" {item}: {cost}")

เคล็ดลับ

  • Partition: ใช้ @run_date Parameter Query เฉพาะ Partition ที่ต้องการ
  • Dead Letter: ตั้ง Dead Letter Topic สำหรับ Message ที่ Process ไม่ได้
  • Retry: ตั้ง Retry Policy สำหรับ Subscription
  • Cost: ใช้ Scheduled Query แทน Airflow ถ้าเป็น SQL อย่างเดียว
  • Monitor: Alert ทุก Failed Query ไม่รอดู Console

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

เปรียบเทียบข้อดีและข้อเสีย

ข้อดีข้อเสีย
ประสิทธิภาพสูง ทำงานได้เร็วและแม่นยำ ลดเวลาทำงานซ้ำซ้อนต้องใช้เวลาเรียนรู้เบื้องต้นพอสมควร มี Learning Curve สูง
มี Community ขนาดใหญ่ มีคนช่วยเหลือและแหล่งเรียนรู้มากมายบางฟีเจอร์อาจยังไม่เสถียร หรือมีการเปลี่ยนแปลงบ่อยในเวอร์ชันใหม่
รองรับ Integration กับเครื่องมือและบริการอื่นได้หลากหลายต้นทุนอาจสูงสำหรับ Enterprise License หรือ Cloud Service
เป็น Open Source หรือมีเวอร์ชันฟรีให้เริ่มต้นใช้งานต้องการ Hardware หรือ Infrastructure ที่เพียงพอ

จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม

BigQuery Scheduled Query คืออะไร

รัน SQL อัตโนมัติตามเวลา Materialized View Summary Table ETL Console CLI Parameter @run_date Service Account

Google Pub/Sub คืออะไร

Managed Messaging Publisher Topic Subscriber Push Pull At-least-once Ordering Retention Dead Letter Scale อัตโนมัติ

BigQuery กับ Pub/Sub ใช้ร่วมกันอย่างไร

Query เสร็จ Publish Event Subscriber Cloud Function Report Slack Dashboard Cache ML Pipeline Event-Driven

Scheduled Query กับ Airflow ต่างกันอย่างไร

Scheduled Query SQL ง่าย ฟรี ตั้ง 5 นาที Airflow Complex Pipeline DAG Dependency Retry ใช้ร่วมกันได้

สรุป

BigQuery Scheduled Query Google Pub/Sub Data Pipeline SQL Automation Cloud Function Event-Driven Report Dashboard Monitoring Dead Letter Retry Cost Terraform Airflow