Technology

BigQuery Scheduled Query Micro-segmentation

bigquery scheduled query micro segmentation
BigQuery Scheduled Query Micro-segmentation | SiamCafe Blog
2025-12-26· อ. บอม — SiamCafe.net· 8,642 คำ

BigQuery Segmentation

BigQuery Scheduled Query Micro-segmentation RFM Analysis Customer Segmentation SQL Analytics Automation Dashboard CRM Marketing Personalized Campaign Production Pipeline

SegmentRFM ScoreDescriptionSizeStrategy
Champion544-555ซื้อบ่อย ล่าสุด มาก5-10%Loyalty Reward VIP
Loyal434-455ซื้อบ่อย ยอดดี10-15%Upsell Cross-sell
Potential334-345ซื้อปานกลาง โตได้15-20%Engagement Campaign
At Risk244-255เคยซื้อบ่อย หายไป10-15%Win-back Offer
Lost111-155นานไม่ซื้อ น้อย20-30%Re-activation

RFM Analysis SQL

# === BigQuery RFM Analysis ===

# -- Step 1: Calculate RFM Metrics
# WITH rfm_base AS (
#   SELECT
#     customer_id,
#     DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) AS recency_days,
#     COUNT(DISTINCT order_id) AS frequency,
#     SUM(total_amount) AS monetary
#   FROM `project.dataset.orders`
#   WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
#     AND status = 'completed'
#   GROUP BY customer_id
# ),
#
# -- Step 2: Assign RFM Scores (1-5)
# rfm_scored AS (
#   SELECT
#     customer_id,
#     recency_days,
#     frequency,
#     monetary,
#     5 - NTILE(5) OVER (ORDER BY recency_days) + 1 AS r_score,
#     NTILE(5) OVER (ORDER BY frequency) AS f_score,
#     NTILE(5) OVER (ORDER BY monetary) AS m_score
#   FROM rfm_base
# ),
#
# -- Step 3: Create Segments
# rfm_segments AS (
#   SELECT
#     *,
#     CONCAT(CAST(r_score AS STRING), CAST(f_score AS STRING), CAST(m_score AS STRING)) AS rfm_score,
#     CASE
#       WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champion'
#       WHEN r_score >= 3 AND f_score >= 3 AND m_score >= 3 THEN 'Loyal'
#       WHEN r_score >= 3 AND f_score >= 2 THEN 'Potential'
#       WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'
#       WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
#       ELSE 'Other'
#     END AS segment
#   FROM rfm_scored
# )
#
# SELECT
#   segment,
#   COUNT(*) AS customers,
#   ROUND(AVG(recency_days), 0) AS avg_recency,
#   ROUND(AVG(frequency), 1) AS avg_frequency,
#   ROUND(AVG(monetary), 2) AS avg_monetary,
#   ROUND(SUM(monetary), 2) AS total_revenue
# FROM rfm_segments
# GROUP BY segment
# ORDER BY total_revenue DESC;

from dataclasses import dataclass

@dataclass
class RFMSegment:
    segment: str
    customers: int
    avg_recency: int
    avg_frequency: float
    avg_monetary: float
    total_revenue: float
    pct_revenue: str

segments = [
    RFMSegment("Champion", 1250, 8, 12.5, 8500.00, 10625000.00, "35.2%"),
    RFMSegment("Loyal", 2800, 25, 8.2, 4200.00, 11760000.00, "38.9%"),
    RFMSegment("Potential", 3500, 45, 4.1, 1800.00, 6300000.00, "20.9%"),
    RFMSegment("At Risk", 1800, 120, 6.5, 3200.00, 5760000.00, "3.2%"),
    RFMSegment("Lost", 5650, 250, 1.5, 450.00, 2542500.00, "1.8%"),
]

print("=== RFM Segment Results ===")
total_customers = sum(s.customers for s in segments)
for s in segments:
    pct_cust = s.customers / total_customers * 100
    print(f"  [{s.segment}] Customers: {s.customers} ({pct_cust:.1f}%)")
    print(f"    Recency: {s.avg_recency}d | Freq: {s.avg_frequency} | AOV: ")
    print(f"    Revenue:  ({s.pct_revenue})")

Scheduled Query Setup

# === BigQuery Scheduled Query Configuration ===

# Console: BigQuery > Scheduled Queries > Create
# CLI:
# bq mk --transfer_config \
#   --project_id=my-project \
#   --data_source=scheduled_query \
#   --target_dataset=analytics \
#   --display_name="Daily RFM Segmentation" \
#   --schedule="every day 02:00" \
#   --params='{
#     "query": "INSERT INTO analytics.rfm_daily SELECT ... FROM ...",
#     "destination_table_name_template": "rfm_daily_{run_date}",
#     "write_disposition": "WRITE_TRUNCATE"
#   }'

# Terraform:
# resource "google_bigquery_data_transfer_config" "rfm_daily" {
#   display_name   = "Daily RFM Segmentation"
#   data_source_id = "scheduled_query"
#   schedule       = "every day 02:00"
#   location       = "asia-southeast1"
#
#   destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
#
#   params = {
#     query                      = file("sql/rfm_daily.sql")
#     destination_table_name_template = "rfm_daily"
#     write_disposition          = "WRITE_TRUNCATE"
#   }
#
#   email_preferences {
#     enable_failure_email = true
#   }
# }

# Incremental Query with @run_date
# -- Daily incremental update
# MERGE `analytics.customer_segments` AS target
# USING (
#   SELECT customer_id, segment, rfm_score, updated_at
#   FROM rfm_analysis
#   WHERE DATE(updated_at) = @run_date
# ) AS source
# ON target.customer_id = source.customer_id
# WHEN MATCHED THEN
#   UPDATE SET segment = source.segment, rfm_score = source.rfm_score, updated_at = source.updated_at
# WHEN NOT MATCHED THEN
#   INSERT (customer_id, segment, rfm_score, updated_at)
#   VALUES (source.customer_id, source.segment, source.rfm_score, source.updated_at);

@dataclass
class ScheduledQuery:
    name: str
    schedule: str
    query_type: str
    destination: str
    write_mode: str
    cost_estimate: str

queries = [
    ScheduledQuery("Daily RFM", "every day 02:00", "Full refresh", "analytics.rfm_daily", "WRITE_TRUNCATE", "$2.50/day"),
    ScheduledQuery("Hourly Engagement", "every 1 hours", "Incremental", "analytics.engagement_hourly", "WRITE_APPEND", "$0.50/run"),
    ScheduledQuery("Weekly Cohort", "every sunday 03:00", "Full refresh", "analytics.cohort_weekly", "WRITE_TRUNCATE", "$5.00/week"),
    ScheduledQuery("Monthly LTV", "1 of month 04:00", "Full refresh", "analytics.ltv_monthly", "WRITE_TRUNCATE", "$8.00/month"),
    ScheduledQuery("Real-time Alerts", "every 15 minutes", "Incremental", "analytics.alerts", "WRITE_APPEND", "$0.10/run"),
]

print("\n=== Scheduled Queries ===")
for q in queries:
    print(f"  [{q.name}] Schedule: {q.schedule}")
    print(f"    Type: {q.query_type} | Dest: {q.destination}")
    print(f"    Write: {q.write_mode} | Cost: {q.cost_estimate}")

Dashboard Integration

# === Dashboard and Marketing Integration ===

# Looker Studio Connection
# 1. Create BigQuery data source in Looker Studio
# 2. Select analytics.rfm_daily table
# 3. Build dashboard with:
#    - Segment distribution pie chart
#    - Revenue by segment bar chart
#    - RFM score heatmap
#    - Trend over time line chart

# Export to CRM (via Cloud Functions)
# import functions_framework
# from google.cloud import bigquery
# import requests
#
# @functions_framework.http
# def export_segments(request):
#     client = bigquery.Client()
#     query = """
#         SELECT customer_id, email, segment, rfm_score
#         FROM analytics.rfm_daily
#         WHERE segment IN ('At Risk', 'Lost')
#     """
#     results = client.query(query).result()
#     for row in results:
#         # Push to CRM API
#         requests.post("https://crm.example.com/api/segments", json={
#             "email": row.email,
#             "segment": row.segment,
#             "tags": [f"rfm_{row.rfm_score}"],
#         })

@dataclass
class Integration:
    destination: str
    method: str
    frequency: str
    data_sent: str
    use_case: str

integrations = [
    Integration("Looker Studio", "BigQuery connector", "Real-time", "All segments", "Dashboard reporting"),
    Integration("Google Ads", "Customer Match", "Daily", "High-value segments", "Targeted ads"),
    Integration("Mailchimp", "Cloud Functions API", "Daily", "At Risk + Lost", "Win-back email"),
    Integration("Slack", "Pub/Sub + Webhook", "On change", "Alert segments", "Team notification"),
    Integration("CRM (HubSpot)", "API push", "Hourly", "All segments", "Sales prioritization"),
    Integration("Data Studio", "BigQuery connector", "Real-time", "Metrics summary", "Executive dashboard"),
]

print("Integrations:")
for i in integrations:
    print(f"  [{i.destination}] Method: {i.method}")
    print(f"    Frequency: {i.frequency} | Data: {i.data_sent}")
    print(f"    Use Case: {i.use_case}")

cost_optimization = {
    "Partitioning": "Partition by date ลด Scan ลด Cost",
    "Clustering": "Cluster by customer_id segment",
    "Materialized View": "ใช้แทน Scheduled Query บาง Case",
    "Slot Reservation": "Flat-rate pricing ถ้า Query เยอะ",
    "Query Optimization": "ใช้ APPROX_COUNT_DISTINCT ลด Cost",
}

print(f"\n\nCost Optimization:")
for k, v in cost_optimization.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

BigQuery Scheduled Query คืออะไร

ตั้งเวลา SQL รันอัตโนมัติ Daily Hourly Weekly Destination Table ETL Pipeline Report Parameterized @run_date Console CLI Terraform

Micro-segmentation ลูกค้าคืออะไร

แบ่งลูกค้ากลุ่มย่อย RFM Recency Frequency Monetary Purchase Behavior Engagement Lifecycle Geographic BigQuery CRM Marketing Personalized

ตั้ง Scheduled Query อย่างไร

Console Schedule Frequency Destination Write Truncate Append Parameter @run_date CLI bq mk Terraform Notification Failed Email Pub/Sub

RFM Analysis ทำอย่างไร

Recency วันล่าสุด Frequency จำนวนครั้ง Monetary ยอดเงิน Score 1-5 NTILE Segment Champion Loyal At Risk Lost Marketing Strategy ต่างกัน

สรุป

BigQuery Scheduled Query Micro-segmentation RFM Analysis Customer SQL Analytics Automation Looker Studio CRM Marketing Personalized Campaign Production Pipeline

📖 บทความที่เกี่ยวข้อง

BigQuery Scheduled Query Distributed Systemอ่านบทความ → BigQuery Scheduled Query Zero Downtime Deploymentอ่านบทความ → BigQuery Scheduled Query Technical Debt Managementอ่านบทความ → BigQuery Scheduled Query Stream Processingอ่านบทความ → BigQuery Scheduled Query Certification Pathอ่านบทความ →

📚 ดูบทความทั้งหมด →