BigQuery Scheduled Query Pub/Sub
BigQuery Scheduled Query Google Pub/Sub Data Pipeline Automation SQL Scheduling Event Notification Cloud Function Trigger Materialized View ETL Summary Table
| Feature | Scheduled Query | Airflow | Cloud Workflows |
|---|---|---|---|
| Complexity | ต่ำ (SQL only) | สูง (Python DAG) | ปานกลาง (YAML) |
| Multi-step | ไม่ได้ | ได้ | ได้ |
| Cost | ฟรี (จ่ายแค่ Query) | $200+/mo (Composer) | ตาม Execution |
| Setup | 5 นาที | 30+ นาที | 15 นาที |
| Monitoring | พื้นฐาน | ดีมาก | ดี |
Scheduled Query Setup
# === BigQuery Scheduled Query ===
# bq CLI — Create Scheduled Query
# bq query \
# --use_legacy_sql=false \
# --destination_table=analytics.daily_summary \
# --schedule="every 24 hours" \
# --display_name="Daily Sales Summary" \
# --replace=true \
# 'SELECT
# DATE(order_date) as date,
# COUNT(*) as total_orders,
# SUM(amount) as total_revenue,
# AVG(amount) as avg_order,
# COUNT(DISTINCT customer_id) as unique_customers
# FROM `project.sales.orders`
# WHERE DATE(order_date) = @run_date
# GROUP BY 1'
# Terraform — Scheduled Query
# resource "google_bigquery_data_transfer_config" "daily_summary" {
# display_name = "Daily Sales Summary"
# data_source_id = "scheduled_query"
# schedule = "every 24 hours"
# location = "US"
#
# destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
#
# params = {
# destination_table_name_template = "daily_summary"
# write_disposition = "WRITE_TRUNCATE"
# query = <<-SQL
# SELECT DATE(order_date) as date,
# COUNT(*) as orders,
# SUM(amount) as revenue
# FROM `project.sales.orders`
# WHERE DATE(order_date) = @run_date
# GROUP BY 1
# SQL
# }
#
# service_account_name = google_service_account.bq_scheduler.email
# }
from dataclasses import dataclass
from typing import List
@dataclass
class ScheduledQuery:
name: str
schedule: str
destination: str
status: str
last_run: str
duration: str
queries = [
ScheduledQuery("Daily Sales Summary", "Every day 02:00", "analytics.daily_summary", "Success", "2024-03-05 02:00", "45s"),
ScheduledQuery("Hourly Active Users", "Every hour", "analytics.hourly_users", "Success", "2024-03-05 10:00", "12s"),
ScheduledQuery("Weekly Cohort", "Every Monday 03:00", "analytics.weekly_cohort", "Success", "2024-03-04 03:00", "2m"),
ScheduledQuery("Monthly Revenue", "1st of month 04:00", "analytics.monthly_revenue", "Success", "2024-03-01 04:00", "5m"),
ScheduledQuery("Product Recommendations", "Every 6 hours", "ml.recommendations", "Failed", "2024-03-05 06:00", "—"),
]
print("=== Scheduled Queries ===")
for q in queries:
print(f" [{q.status}] {q.name}")
print(f" Schedule: {q.schedule} | Last: {q.last_run} | Duration: {q.duration}")
Pub/Sub Integration
# === Pub/Sub Event Pipeline ===
# Create Topic & Subscription
# gcloud pubsub topics create bq-query-complete
# gcloud pubsub subscriptions create bq-notify \
# --topic=bq-query-complete \
# --push-endpoint=https://api.example.com/webhook/bq
# Cloud Function — Triggered by Pub/Sub
# import functions_framework
# from google.cloud import bigquery
# import json
# import requests
#
# @functions_framework.cloud_event
# def on_query_complete(cloud_event):
# data = json.loads(cloud_event.data["message"]["data"])
# query_name = data.get("query_name")
# status = data.get("status")
# run_time = data.get("run_time")
#
# if status == "SUCCEEDED":
# # Trigger downstream actions
# if query_name == "daily_summary":
# generate_report(data)
# notify_slack(f"Daily Summary updated: {run_time}")
# invalidate_cache("daily-dashboard")
#
# elif status == "FAILED":
# alert_oncall(f"Query failed: {query_name}")
#
# def notify_slack(message):
# webhook = os.environ["SLACK_WEBHOOK"]
# requests.post(webhook, json={"text": message})
# Python — Publish Event after Query
# from google.cloud import pubsub_v1
# import json
#
# publisher = pubsub_v1.PublisherClient()
# topic_path = publisher.topic_path("project-id", "bq-query-complete")
#
# def publish_query_event(query_name, status, rows_affected):
# event = {
# "query_name": query_name,
# "status": status,
# "rows_affected": rows_affected,
# "timestamp": datetime.utcnow().isoformat(),
# }
# future = publisher.publish(
# topic_path,
# json.dumps(event).encode("utf-8"),
# query_name=query_name,
# )
# return future.result()
@dataclass
class PipelineEvent:
event: str
source: str
action: str
latency: str
events = [
PipelineEvent("query.completed", "Scheduled Query", "Publish to Pub/Sub", "< 1s"),
PipelineEvent("report.generate", "Cloud Function", "Create PDF Report", "< 30s"),
PipelineEvent("slack.notify", "Cloud Function", "Send Slack Message", "< 2s"),
PipelineEvent("cache.invalidate", "Cloud Function", "Clear CDN Cache", "< 5s"),
PipelineEvent("ml.trigger", "Cloud Function", "Start ML Pipeline", "< 10s"),
PipelineEvent("dashboard.refresh", "Cloud Function", "Refresh Looker", "< 15s"),
]
print("\n=== Event Pipeline ===")
for e in events:
print(f" [{e.source}] {e.event}")
print(f" Action: {e.action} | Latency: {e.latency}")
Monitoring และ Error Handling
# === Monitoring & Alerting ===
# Cloud Monitoring Alert
# gcloud alpha monitoring policies create \
# --display-name="BQ Scheduled Query Failed" \
# --condition-filter='resource.type="bigquery_dts_config"
# AND metric.type="bigquery.googleapis.com/transfer/run_count"
# AND metric.label.completion_state="FAILED"' \
# --notification-channels=projects/my-project/notificationChannels/123
# Error Handling Pattern
# def retry_failed_query(query_config_id, max_retries=3):
# client = bigquery_datatransfer.DataTransferServiceClient()
# for attempt in range(max_retries):
# try:
# response = client.start_manual_transfer_runs(
# parent=query_config_id,
# requested_run_time=timestamp_pb2.Timestamp(
# seconds=int(time.time())
# ),
# )
# return response
# except Exception as e:
# if attempt < max_retries - 1:
# time.sleep(60 * (attempt + 1))
# else:
# raise
monitoring = {
"Query Success Rate": "98.5% (last 30 days)",
"Avg Query Duration": "35 seconds",
"Failed Queries (24h)": "1",
"Pub/Sub Delivery Rate": "99.99%",
"Unacked Messages": "0",
"Dead Letter Messages": "2 (last 7 days)",
}
print("Monitoring Dashboard:")
for k, v in monitoring.items():
print(f" {k}: {v}")
# Cost Optimization
costs = {
"Scheduled Query (daily)": "~$5/day (1TB scanned)",
"Pub/Sub (10K msgs/day)": "~$0.04/day",
"Cloud Functions (100 invocations)": "~$0.01/day",
"Total Monthly": "~$155/month",
}
print(f"\n\nCost Breakdown:")
for item, cost in costs.items():
print(f" {item}: {cost}")
เคล็ดลับ
- Partition: ใช้ @run_date Parameter Query เฉพาะ Partition ที่ต้องการ
- Dead Letter: ตั้ง Dead Letter Topic สำหรับ Message ที่ Process ไม่ได้
- Retry: ตั้ง Retry Policy สำหรับ Subscription
- Cost: ใช้ Scheduled Query แทน Airflow ถ้าเป็น SQL อย่างเดียว
- Monitor: Alert ทุก Failed Query ไม่รอดู Console
การนำไปใช้งานจริงในองค์กร
สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ
เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง
เปรียบเทียบข้อดีและข้อเสีย
จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม
BigQuery Scheduled Query คืออะไร
รัน SQL อัตโนมัติตามเวลา Materialized View Summary Table ETL Console CLI Parameter @run_date Service Account
Google Pub/Sub คืออะไร
Managed Messaging Publisher Topic Subscriber Push Pull At-least-once Ordering Retention Dead Letter Scale อัตโนมัติ
BigQuery กับ Pub/Sub ใช้ร่วมกันอย่างไร
Query เสร็จ Publish Event Subscriber Cloud Function Report Slack Dashboard Cache ML Pipeline Event-Driven
Scheduled Query กับ Airflow ต่างกันอย่างไร
Scheduled Query SQL ง่าย ฟรี ตั้ง 5 นาที Airflow Complex Pipeline DAG Dependency Retry ใช้ร่วมกันได้
สรุป
BigQuery Scheduled Query Google Pub/Sub Data Pipeline SQL Automation Cloud Function Event-Driven Report Dashboard Monitoring Dead Letter Retry Cost Terraform Airflow
