Technology

BigQuery Scheduled Query Pub Sub Architecture

bigquery scheduled query pub sub architecture
BigQuery Scheduled Query Pub Sub Architecture | SiamCafe Blog
2026-03-15· อ. บอม — SiamCafe.net· 9,423 คำ

BigQuery Scheduled Query Pub/Sub

BigQuery Scheduled Query Google Pub/Sub Data Pipeline Automation SQL Scheduling Event Notification Cloud Function Trigger Materialized View ETL Summary Table

FeatureScheduled QueryAirflowCloud Workflows
Complexityต่ำ (SQL only)สูง (Python DAG)ปานกลาง (YAML)
Multi-stepไม่ได้ได้ได้
Costฟรี (จ่ายแค่ Query)$200+/mo (Composer)ตาม Execution
Setup5 นาที30+ นาที15 นาที
Monitoringพื้นฐานดีมากดี

Scheduled Query Setup

# === BigQuery Scheduled Query ===

# bq CLI — Create Scheduled Query
# bq query \
#   --use_legacy_sql=false \
#   --destination_table=analytics.daily_summary \
#   --schedule="every 24 hours" \
#   --display_name="Daily Sales Summary" \
#   --replace=true \
#   'SELECT
#     DATE(order_date) as date,
#     COUNT(*) as total_orders,
#     SUM(amount) as total_revenue,
#     AVG(amount) as avg_order,
#     COUNT(DISTINCT customer_id) as unique_customers
#   FROM `project.sales.orders`
#   WHERE DATE(order_date) = @run_date
#   GROUP BY 1'

# Terraform — Scheduled Query
# resource "google_bigquery_data_transfer_config" "daily_summary" {
#   display_name   = "Daily Sales Summary"
#   data_source_id = "scheduled_query"
#   schedule       = "every 24 hours"
#   location       = "US"
#
#   destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
#
#   params = {
#     destination_table_name_template = "daily_summary"
#     write_disposition               = "WRITE_TRUNCATE"
#     query                          = <<-SQL
#       SELECT DATE(order_date) as date,
#              COUNT(*) as orders,
#              SUM(amount) as revenue
#       FROM `project.sales.orders`
#       WHERE DATE(order_date) = @run_date
#       GROUP BY 1
#     SQL
#   }
#
#   service_account_name = google_service_account.bq_scheduler.email
# }

from dataclasses import dataclass
from typing import List

@dataclass
class ScheduledQuery:
    name: str
    schedule: str
    destination: str
    status: str
    last_run: str
    duration: str

queries = [
    ScheduledQuery("Daily Sales Summary", "Every day 02:00", "analytics.daily_summary", "Success", "2024-03-05 02:00", "45s"),
    ScheduledQuery("Hourly Active Users", "Every hour", "analytics.hourly_users", "Success", "2024-03-05 10:00", "12s"),
    ScheduledQuery("Weekly Cohort", "Every Monday 03:00", "analytics.weekly_cohort", "Success", "2024-03-04 03:00", "2m"),
    ScheduledQuery("Monthly Revenue", "1st of month 04:00", "analytics.monthly_revenue", "Success", "2024-03-01 04:00", "5m"),
    ScheduledQuery("Product Recommendations", "Every 6 hours", "ml.recommendations", "Failed", "2024-03-05 06:00", "—"),
]

print("=== Scheduled Queries ===")
for q in queries:
    print(f"  [{q.status}] {q.name}")
    print(f"    Schedule: {q.schedule} | Last: {q.last_run} | Duration: {q.duration}")

Pub/Sub Integration

# === Pub/Sub Event Pipeline ===

# Create Topic & Subscription
# gcloud pubsub topics create bq-query-complete
# gcloud pubsub subscriptions create bq-notify \
#   --topic=bq-query-complete \
#   --push-endpoint=https://api.example.com/webhook/bq

# Cloud Function — Triggered by Pub/Sub
# import functions_framework
# from google.cloud import bigquery
# import json
# import requests
#
# @functions_framework.cloud_event
# def on_query_complete(cloud_event):
#     data = json.loads(cloud_event.data["message"]["data"])
#     query_name = data.get("query_name")
#     status = data.get("status")
#     run_time = data.get("run_time")
#
#     if status == "SUCCEEDED":
#         # Trigger downstream actions
#         if query_name == "daily_summary":
#             generate_report(data)
#             notify_slack(f"Daily Summary updated: {run_time}")
#             invalidate_cache("daily-dashboard")
#
#     elif status == "FAILED":
#         alert_oncall(f"Query failed: {query_name}")
#
# def notify_slack(message):
#     webhook = os.environ["SLACK_WEBHOOK"]
#     requests.post(webhook, json={"text": message})

# Python — Publish Event after Query
# from google.cloud import pubsub_v1
# import json
#
# publisher = pubsub_v1.PublisherClient()
# topic_path = publisher.topic_path("project-id", "bq-query-complete")
#
# def publish_query_event(query_name, status, rows_affected):
#     event = {
#         "query_name": query_name,
#         "status": status,
#         "rows_affected": rows_affected,
#         "timestamp": datetime.utcnow().isoformat(),
#     }
#     future = publisher.publish(
#         topic_path,
#         json.dumps(event).encode("utf-8"),
#         query_name=query_name,
#     )
#     return future.result()

@dataclass
class PipelineEvent:
    event: str
    source: str
    action: str
    latency: str

events = [
    PipelineEvent("query.completed", "Scheduled Query", "Publish to Pub/Sub", "< 1s"),
    PipelineEvent("report.generate", "Cloud Function", "Create PDF Report", "< 30s"),
    PipelineEvent("slack.notify", "Cloud Function", "Send Slack Message", "< 2s"),
    PipelineEvent("cache.invalidate", "Cloud Function", "Clear CDN Cache", "< 5s"),
    PipelineEvent("ml.trigger", "Cloud Function", "Start ML Pipeline", "< 10s"),
    PipelineEvent("dashboard.refresh", "Cloud Function", "Refresh Looker", "< 15s"),
]

print("\n=== Event Pipeline ===")
for e in events:
    print(f"  [{e.source}] {e.event}")
    print(f"    Action: {e.action} | Latency: {e.latency}")

Monitoring และ Error Handling

# === Monitoring & Alerting ===

# Cloud Monitoring Alert
# gcloud alpha monitoring policies create \
#   --display-name="BQ Scheduled Query Failed" \
#   --condition-filter='resource.type="bigquery_dts_config"
#     AND metric.type="bigquery.googleapis.com/transfer/run_count"
#     AND metric.label.completion_state="FAILED"' \
#   --notification-channels=projects/my-project/notificationChannels/123

# Error Handling Pattern
# def retry_failed_query(query_config_id, max_retries=3):
#     client = bigquery_datatransfer.DataTransferServiceClient()
#     for attempt in range(max_retries):
#         try:
#             response = client.start_manual_transfer_runs(
#                 parent=query_config_id,
#                 requested_run_time=timestamp_pb2.Timestamp(
#                     seconds=int(time.time())
#                 ),
#             )
#             return response
#         except Exception as e:
#             if attempt < max_retries - 1:
#                 time.sleep(60 * (attempt + 1))
#             else:
#                 raise

monitoring = {
    "Query Success Rate": "98.5% (last 30 days)",
    "Avg Query Duration": "35 seconds",
    "Failed Queries (24h)": "1",
    "Pub/Sub Delivery Rate": "99.99%",
    "Unacked Messages": "0",
    "Dead Letter Messages": "2 (last 7 days)",
}

print("Monitoring Dashboard:")
for k, v in monitoring.items():
    print(f"  {k}: {v}")

# Cost Optimization
costs = {
    "Scheduled Query (daily)": "~$5/day (1TB scanned)",
    "Pub/Sub (10K msgs/day)": "~$0.04/day",
    "Cloud Functions (100 invocations)": "~$0.01/day",
    "Total Monthly": "~$155/month",
}

print(f"\n\nCost Breakdown:")
for item, cost in costs.items():
    print(f"  {item}: {cost}")

เคล็ดลับ

การนำไปใช้งานจริงในองค์กร

สำหรับองค์กรขนาดกลางถึงใหญ่ แนะนำให้ใช้หลัก Three-Tier Architecture คือ Core Layer ที่เป็นแกนกลางของระบบ Distribution Layer ที่ทำหน้าที่กระจาย Traffic และ Access Layer ที่เชื่อมต่อกับผู้ใช้โดยตรง การแบ่ง Layer ชัดเจนช่วยให้การ Troubleshoot ง่ายขึ้นและสามารถ Scale ระบบได้ตามความต้องการ

เรื่อง Network Security ก็สำคัญไม่แพ้กัน ควรติดตั้ง Next-Generation Firewall ที่สามารถ Deep Packet Inspection ได้ ใช้ Network Segmentation แยก VLAN สำหรับแต่ละแผนก ติดตั้ง IDS/IPS เพื่อตรวจจับการโจมตี และทำ Regular Security Audit อย่างน้อยปีละ 2 ครั้ง

เปรียบเทียบข้อดีและข้อเสีย

ข้อดีข้อเสีย
ประสิทธิภาพสูง ทำงานได้เร็วและแม่นยำ ลดเวลาทำงานซ้ำซ้อนต้องใช้เวลาเรียนรู้เบื้องต้นพอสมควร มี Learning Curve สูง
มี Community ขนาดใหญ่ มีคนช่วยเหลือและแหล่งเรียนรู้มากมายบางฟีเจอร์อาจยังไม่เสถียร หรือมีการเปลี่ยนแปลงบ่อยในเวอร์ชันใหม่
รองรับ Integration กับเครื่องมือและบริการอื่นได้หลากหลายต้นทุนอาจสูงสำหรับ Enterprise License หรือ Cloud Service
เป็น Open Source หรือมีเวอร์ชันฟรีให้เริ่มต้นใช้งานต้องการ Hardware หรือ Infrastructure ที่เพียงพอ

จากตารางเปรียบเทียบจะเห็นว่าข้อดีมีมากกว่าข้อเสียอย่างชัดเจน โดยเฉพาะในแง่ของประสิทธิภาพและความสามารถในการ Scale สำหรับข้อเสียส่วนใหญ่สามารถแก้ไขได้ด้วยการเรียนรู้อย่างเป็นระบบและวางแผนทรัพยากรให้เหมาะสม

BigQuery Scheduled Query คืออะไร

รัน SQL อัตโนมัติตามเวลา Materialized View Summary Table ETL Console CLI Parameter @run_date Service Account

Google Pub/Sub คืออะไร

Managed Messaging Publisher Topic Subscriber Push Pull At-least-once Ordering Retention Dead Letter Scale อัตโนมัติ

BigQuery กับ Pub/Sub ใช้ร่วมกันอย่างไร

Query เสร็จ Publish Event Subscriber Cloud Function Report Slack Dashboard Cache ML Pipeline Event-Driven

Scheduled Query กับ Airflow ต่างกันอย่างไร

Scheduled Query SQL ง่าย ฟรี ตั้ง 5 นาที Airflow Complex Pipeline DAG Dependency Retry ใช้ร่วมกันได้

สรุป

BigQuery Scheduled Query Google Pub/Sub Data Pipeline SQL Automation Cloud Function Event-Driven Report Dashboard Monitoring Dead Letter Retry Cost Terraform Airflow

📖 บทความที่เกี่ยวข้อง

BigQuery Scheduled Query Zero Downtime Deploymentอ่านบทความ → BigQuery Scheduled Query Distributed Systemอ่านบทความ → BigQuery Scheduled Query Technical Debt Managementอ่านบทความ → BigQuery Scheduled Query SaaS Architectureอ่านบทความ → BigQuery Scheduled Query Certification Pathอ่านบทความ →

📚 ดูบทความทั้งหมด →