SiamCafe · Blog
BigQuery Scheduled Query Micro-segmentation —
บทความ

BigQuery Scheduled Query Micro-segmentation —

เผยแพร่ 28 พฤษภาคม 2569

BigQuery Segmentation

BigQuery Scheduled Query Micro-segmentation RFM Analysis Customer Segmentation SQL Analytics Automation Dashboard CRM Marketing Personalized Campaign Production Pipeline

SegmentRFM ScoreDescriptionSizeStrategy
Champion544-555ซื้อบ่อย ล่าสุด มาก5-10%Loyalty Reward VIP
Loyal434-455ซื้อบ่อย ยอดดี10-15%Upsell Cross-sell
Potential334-345ซื้อปานกลาง โตได้15-20%Engagement Campaign
At Risk244-255เคยซื้อบ่อย หายไป10-15%Win-back Offer
Lost111-155นานไม่ซื้อ น้อย20-30%Re-activation

RFM Analysis SQL

=== BigQuery RFM Analysis ===

-- Step 1: Calculate RFM Metrics

WITH rfm_base AS (

SELECT

customer_id,

DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) AS recency_days,

COUNT(DISTINCT order_id) AS frequency,

SUM(total_amount) AS monetary

FROM `project.dataset.orders`

WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)

AND status = 'completed'

GROUP BY customer_id

),

-- Step 2: Assign RFM Scores (1-5)

rfm_scored AS (

SELECT

customer_id,

recency_days,

frequency,

monetary,

5 - NTILE(5) OVER (ORDER BY recency_days) + 1 AS r_score,

NTILE(5) OVER (ORDER BY frequency) AS f_score,

NTILE(5) OVER (ORDER BY monetary) AS m_score

FROM rfm_base

),

-- Step 3: Create Segments

rfm_segments AS (

SELECT

*,

CONCAT(CAST(r_score AS STRING), CAST(f_score AS STRING), CAST(m_score AS STRING)) AS rfm_score,

CASE

WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champion'

WHEN r_score >= 3 AND f_score >= 3 AND m_score >= 3 THEN 'Loyal'

WHEN r_score >= 3 AND f_score >= 2 THEN 'Potential'

WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'

WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'

ELSE 'Other'

END AS segment

FROM rfm_scored

)

SELECT

segment,

COUNT(*) AS customers,

ROUND(AVG(recency_days), 0) AS avg_recency,

ROUND(AVG(frequency), 1) AS avg_frequency,

ROUND(AVG(monetary), 2) AS avg_monetary,

ROUND(SUM(monetary), 2) AS total_revenue

FROM rfm_segments

GROUP BY segment

ORDER BY total_revenue DESC;

from dataclasses import dataclass

@dataclass

class RFMSegment:

segment: str

customers: int

avg_recency: int

avg_frequency: float

avg_monetary: float

total_revenue: float

pct_revenue: str

segments = [

RFMSegment("Champion", 1250, 8, 12.5, 8500.00, 10625000.00, "35.2%"),

RFMSegment("Loyal", 2800, 25, 8.2, 4200.00, 11760000.00, "38.9%"),

RFMSegment("Potential", 3500, 45, 4.1, 1800.00, 6300000.00, "20.9%"),

RFMSegment("At Risk", 1800, 120, 6.5, 3200.00, 5760000.00, "3.2%"),

RFMSegment("Lost", 5650, 250, 1.5, 450.00, 2542500.00, "1.8%"),

]

print("=== RFM Segment Results ===")

total_customers = sum(s.customers for s in segments)

for s in segments:

pct_cust = s.customers / total_customers * 100

print(f" [{s.segment}] Customers: {s.customers} ({pct_cust:.1f}%)")

print(f" Recency: {s.avg_recency}d | Freq: {s.avg_frequency} | AOV: ")

print(f" Revenue: ({s.pct_revenue})")

Scheduled Query Setup

=== BigQuery Scheduled Query Configuration ===

Console: BigQuery > Scheduled Queries > Create

CLI:

bq mk --transfer_config \

--project_id=my-project \

--data_source=scheduled_query \

--target_dataset=analytics \

--display_name="Daily RFM Segmentation" \

--schedule="every day 02:00" \

--params='{

"query": "INSERT INTO analytics.rfm_daily SELECT ... FROM ...",

"destination_table_name_template": "rfm_daily_{run_date}",

"write_disposition": "WRITE_TRUNCATE"

}'

Terraform:

resource "google_bigquery_data_transfer_config" "rfm_daily" {

display_name = "Daily RFM Segmentation"

data_source_id = "scheduled_query"

schedule = "every day 02:00"

location = "asia-southeast1"

destination_dataset_id = google_bigquery_dataset.analytics.dataset_id

params = {

query = file("sql/rfm_daily.sql")

destination_table_name_template = "rfm_daily"

write_disposition = "WRITE_TRUNCATE"

}

email_preferences {

enable_failure_email = true

}

}

Incremental Query with @run_date

-- Daily incremental update

MERGE `analytics.customer_segments` AS target

USING (

SELECT customer_id, segment, rfm_score, updated_at

FROM rfm_analysis

WHERE DATE(updated_at) = @run_date

) AS source

ON target.customer_id = source.customer_id

WHEN MATCHED THEN

UPDATE SET segment = source.segment, rfm_score = source.rfm_score, updated_at = source.updated_at

WHEN NOT MATCHED THEN

INSERT (customer_id, segment, rfm_score, updated_at)

VALUES (source.customer_id, source.segment, source.rfm_score, source.updated_at);

@dataclass

class ScheduledQuery:

name: str

schedule: str

query_type: str

destination: str

write_mode: str

cost_estimate: str

queries = [

ScheduledQuery("Daily RFM", "every day 02:00", "Full refresh", "analytics.rfm_daily", "WRITE_TRUNCATE", "$2.50/day"),

ScheduledQuery("Hourly Engagement", "every 1 hours", "Incremental", "analytics.engagement_hourly", "WRITE_APPEND", "$0.50/run"),

ScheduledQuery("Weekly Cohort", "every sunday 03:00", "Full refresh", "analytics.cohort_weekly", "WRITE_TRUNCATE", "$5.00/week"),

ScheduledQuery("Monthly LTV", "1 of month 04:00", "Full refresh", "analytics.ltv_monthly", "WRITE_TRUNCATE", "$8.00/month"),

ScheduledQuery("Real-time Alerts", "every 15 minutes", "Incremental", "analytics.alerts", "WRITE_APPEND", "$0.10/run"),

]

print("\n=== Scheduled Queries ===")

for q in queries:

print(f" [{q.name}] Schedule: {q.schedule}")

print(f" Type: {q.query_type} | Dest: {q.destination}")

print(f" Write: {q.write_mode} | Cost: {q.cost_estimate}")

Dashboard Integration

# === Dashboard and Marketing Integration ===

# Looker Studio Connection
# 1. Create BigQuery data source in Looker Studio
# 2. Select analytics.rfm_daily table
# 3. Build dashboard with:
#    - Segment distribution pie chart
#    - Revenue by segment bar chart
#    - RFM score heatmap
#    - Trend over time line chart

# Export to CRM (via Cloud Functions)
# import functions_framework
# from google.cloud import bigquery
# import requests
#
# @functions_framework.http
# def export_segments(request):
#     client = bigquery.Client()
#     query = """
#         SELECT customer_id, email, segment, rfm_score
#         FROM analytics.rfm_daily
#         WHERE segment IN ('At Risk', 'Lost')
#     """
#     results = client.query(query).result()
#     for row in results:
#         # Push to CRM API
#         requests.post("https://crm.example.com/api/segments", json={
#             "email": row.email,
#             "segment": row.segment,
#             "tags": [f"rfm_{row.rfm_score}"],
#         })

@dataclass
class Integration:
    destination: str
    method: str
    frequency: str
    data_sent: str
    use_case: str

integrations = [
    Integration("Looker Studio", "BigQuery connector", "Real-time", "All segments", "Dashboard reporting"),
    Integration("Google Ads", "Customer Match", "Daily", "High-value segments", "Targeted ads"),
    Integration("Mailchimp", "Cloud Functions API", "Daily", "At Risk + Lost", "Win-back email"),
    Integration("Slack", "Pub/Sub + Webhook", "On change", "Alert segments", "Team notification"),
    Integration("CRM (HubSpot)", "API push", "Hourly", "All segments", "Sales prioritization"),
    Integration("Data Studio", "BigQuery connector", "Real-time", "Metrics summary", "Executive dashboard"),
]

print("Integrations:")
for i in integrations:
    print(f"  [{i.destination}] Method: {i.method}")
    print(f"    Frequency: {i.frequency} | Data: {i.data_sent}")
    print(f"    Use Case: {i.use_case}")

cost_optimization = {
    "Partitioning": "Partition by date ลด Scan ลด Cost",
    "Clustering": "Cluster by customer_id segment",
    "Materialized View": "ใช้แทน Scheduled Query บาง Case",
    "Slot Reservation": "Flat-rate pricing ถ้า Query เยอะ",
    "Query Optimization": "ใช้ APPROX_COUNT_DISTINCT ลด Cost",
}

print(f"\n\nCost Optimization:")
for k, v in cost_optimization.items():
    print(f"  [{k}]: {v}")

เคล็ดลับ

  • Partition: Partition Table by date ลด Query Cost
  • MERGE: ใช้ MERGE สำหรับ Incremental Update
  • @run_date: ใช้ Parameter @run_date สำหรับ Incremental
  • Alert: ตั้ง Failure Email Notification ทุก Scheduled Query
  • Cost: Monitor Query Cost ทุกเดือน ปรับ Schedule ตามความจำเป็น

BigQuery Scheduled Query คืออะไร

ตั้งเวลา SQL รันอัตโนมัติ Daily Hourly Weekly Destination Table ETL Pipeline Report Parameterized @run_date Console CLI Terraform