BigQuery Segmentation
BigQuery Scheduled Query Micro-segmentation RFM Analysis Customer Segmentation SQL Analytics Automation Dashboard CRM Marketing Personalized Campaign Production Pipeline
| Segment | RFM Score | Description | Size | Strategy |
|---|---|---|---|---|
| Champion | 544-555 | ซื้อบ่อย ล่าสุด มาก | 5-10% | Loyalty Reward VIP |
| Loyal | 434-455 | ซื้อบ่อย ยอดดี | 10-15% | Upsell Cross-sell |
| Potential | 334-345 | ซื้อปานกลาง โตได้ | 15-20% | Engagement Campaign |
| At Risk | 244-255 | เคยซื้อบ่อย หายไป | 10-15% | Win-back Offer |
| Lost | 111-155 | นานไม่ซื้อ น้อย | 20-30% | Re-activation |
RFM Analysis SQL
# === BigQuery RFM Analysis ===
# -- Step 1: Calculate RFM Metrics
# WITH rfm_base AS (
# SELECT
# customer_id,
# DATE_DIFF(CURRENT_DATE(), MAX(order_date), DAY) AS recency_days,
# COUNT(DISTINCT order_id) AS frequency,
# SUM(total_amount) AS monetary
# FROM `project.dataset.orders`
# WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 365 DAY)
# AND status = 'completed'
# GROUP BY customer_id
# ),
#
# -- Step 2: Assign RFM Scores (1-5)
# rfm_scored AS (
# SELECT
# customer_id,
# recency_days,
# frequency,
# monetary,
# 5 - NTILE(5) OVER (ORDER BY recency_days) + 1 AS r_score,
# NTILE(5) OVER (ORDER BY frequency) AS f_score,
# NTILE(5) OVER (ORDER BY monetary) AS m_score
# FROM rfm_base
# ),
#
# -- Step 3: Create Segments
# rfm_segments AS (
# SELECT
# *,
# CONCAT(CAST(r_score AS STRING), CAST(f_score AS STRING), CAST(m_score AS STRING)) AS rfm_score,
# CASE
# WHEN r_score >= 4 AND f_score >= 4 AND m_score >= 4 THEN 'Champion'
# WHEN r_score >= 3 AND f_score >= 3 AND m_score >= 3 THEN 'Loyal'
# WHEN r_score >= 3 AND f_score >= 2 THEN 'Potential'
# WHEN r_score <= 2 AND f_score >= 3 THEN 'At Risk'
# WHEN r_score <= 2 AND f_score <= 2 THEN 'Lost'
# ELSE 'Other'
# END AS segment
# FROM rfm_scored
# )
#
# SELECT
# segment,
# COUNT(*) AS customers,
# ROUND(AVG(recency_days), 0) AS avg_recency,
# ROUND(AVG(frequency), 1) AS avg_frequency,
# ROUND(AVG(monetary), 2) AS avg_monetary,
# ROUND(SUM(monetary), 2) AS total_revenue
# FROM rfm_segments
# GROUP BY segment
# ORDER BY total_revenue DESC;
from dataclasses import dataclass
@dataclass
class RFMSegment:
segment: str
customers: int
avg_recency: int
avg_frequency: float
avg_monetary: float
total_revenue: float
pct_revenue: str
segments = [
RFMSegment("Champion", 1250, 8, 12.5, 8500.00, 10625000.00, "35.2%"),
RFMSegment("Loyal", 2800, 25, 8.2, 4200.00, 11760000.00, "38.9%"),
RFMSegment("Potential", 3500, 45, 4.1, 1800.00, 6300000.00, "20.9%"),
RFMSegment("At Risk", 1800, 120, 6.5, 3200.00, 5760000.00, "3.2%"),
RFMSegment("Lost", 5650, 250, 1.5, 450.00, 2542500.00, "1.8%"),
]
print("=== RFM Segment Results ===")
total_customers = sum(s.customers for s in segments)
for s in segments:
pct_cust = s.customers / total_customers * 100
print(f" [{s.segment}] Customers: {s.customers} ({pct_cust:.1f}%)")
print(f" Recency: {s.avg_recency}d | Freq: {s.avg_frequency} | AOV: ")
print(f" Revenue: ({s.pct_revenue})")
Scheduled Query Setup
# === BigQuery Scheduled Query Configuration ===
# Console: BigQuery > Scheduled Queries > Create
# CLI:
# bq mk --transfer_config \
# --project_id=my-project \
# --data_source=scheduled_query \
# --target_dataset=analytics \
# --display_name="Daily RFM Segmentation" \
# --schedule="every day 02:00" \
# --params='{
# "query": "INSERT INTO analytics.rfm_daily SELECT ... FROM ...",
# "destination_table_name_template": "rfm_daily_{run_date}",
# "write_disposition": "WRITE_TRUNCATE"
# }'
# Terraform:
# resource "google_bigquery_data_transfer_config" "rfm_daily" {
# display_name = "Daily RFM Segmentation"
# data_source_id = "scheduled_query"
# schedule = "every day 02:00"
# location = "asia-southeast1"
#
# destination_dataset_id = google_bigquery_dataset.analytics.dataset_id
#
# params = {
# query = file("sql/rfm_daily.sql")
# destination_table_name_template = "rfm_daily"
# write_disposition = "WRITE_TRUNCATE"
# }
#
# email_preferences {
# enable_failure_email = true
# }
# }
# Incremental Query with @run_date
# -- Daily incremental update
# MERGE `analytics.customer_segments` AS target
# USING (
# SELECT customer_id, segment, rfm_score, updated_at
# FROM rfm_analysis
# WHERE DATE(updated_at) = @run_date
# ) AS source
# ON target.customer_id = source.customer_id
# WHEN MATCHED THEN
# UPDATE SET segment = source.segment, rfm_score = source.rfm_score, updated_at = source.updated_at
# WHEN NOT MATCHED THEN
# INSERT (customer_id, segment, rfm_score, updated_at)
# VALUES (source.customer_id, source.segment, source.rfm_score, source.updated_at);
@dataclass
class ScheduledQuery:
name: str
schedule: str
query_type: str
destination: str
write_mode: str
cost_estimate: str
queries = [
ScheduledQuery("Daily RFM", "every day 02:00", "Full refresh", "analytics.rfm_daily", "WRITE_TRUNCATE", "$2.50/day"),
ScheduledQuery("Hourly Engagement", "every 1 hours", "Incremental", "analytics.engagement_hourly", "WRITE_APPEND", "$0.50/run"),
ScheduledQuery("Weekly Cohort", "every sunday 03:00", "Full refresh", "analytics.cohort_weekly", "WRITE_TRUNCATE", "$5.00/week"),
ScheduledQuery("Monthly LTV", "1 of month 04:00", "Full refresh", "analytics.ltv_monthly", "WRITE_TRUNCATE", "$8.00/month"),
ScheduledQuery("Real-time Alerts", "every 15 minutes", "Incremental", "analytics.alerts", "WRITE_APPEND", "$0.10/run"),
]
print("\n=== Scheduled Queries ===")
for q in queries:
print(f" [{q.name}] Schedule: {q.schedule}")
print(f" Type: {q.query_type} | Dest: {q.destination}")
print(f" Write: {q.write_mode} | Cost: {q.cost_estimate}")
Dashboard Integration
# === Dashboard and Marketing Integration ===
# Looker Studio Connection
# 1. Create BigQuery data source in Looker Studio
# 2. Select analytics.rfm_daily table
# 3. Build dashboard with:
# - Segment distribution pie chart
# - Revenue by segment bar chart
# - RFM score heatmap
# - Trend over time line chart
# Export to CRM (via Cloud Functions)
# import functions_framework
# from google.cloud import bigquery
# import requests
#
# @functions_framework.http
# def export_segments(request):
# client = bigquery.Client()
# query = """
# SELECT customer_id, email, segment, rfm_score
# FROM analytics.rfm_daily
# WHERE segment IN ('At Risk', 'Lost')
# """
# results = client.query(query).result()
# for row in results:
# # Push to CRM API
# requests.post("https://crm.example.com/api/segments", json={
# "email": row.email,
# "segment": row.segment,
# "tags": [f"rfm_{row.rfm_score}"],
# })
@dataclass
class Integration:
destination: str
method: str
frequency: str
data_sent: str
use_case: str
integrations = [
Integration("Looker Studio", "BigQuery connector", "Real-time", "All segments", "Dashboard reporting"),
Integration("Google Ads", "Customer Match", "Daily", "High-value segments", "Targeted ads"),
Integration("Mailchimp", "Cloud Functions API", "Daily", "At Risk + Lost", "Win-back email"),
Integration("Slack", "Pub/Sub + Webhook", "On change", "Alert segments", "Team notification"),
Integration("CRM (HubSpot)", "API push", "Hourly", "All segments", "Sales prioritization"),
Integration("Data Studio", "BigQuery connector", "Real-time", "Metrics summary", "Executive dashboard"),
]
print("Integrations:")
for i in integrations:
print(f" [{i.destination}] Method: {i.method}")
print(f" Frequency: {i.frequency} | Data: {i.data_sent}")
print(f" Use Case: {i.use_case}")
cost_optimization = {
"Partitioning": "Partition by date ลด Scan ลด Cost",
"Clustering": "Cluster by customer_id segment",
"Materialized View": "ใช้แทน Scheduled Query บาง Case",
"Slot Reservation": "Flat-rate pricing ถ้า Query เยอะ",
"Query Optimization": "ใช้ APPROX_COUNT_DISTINCT ลด Cost",
}
print(f"\n\nCost Optimization:")
for k, v in cost_optimization.items():
print(f" [{k}]: {v}")
เคล็ดลับ
- Partition: Partition Table by date ลด Query Cost
- MERGE: ใช้ MERGE สำหรับ Incremental Update
- @run_date: ใช้ Parameter @run_date สำหรับ Incremental
- Alert: ตั้ง Failure Email Notification ทุก Scheduled Query
- Cost: Monitor Query Cost ทุกเดือน ปรับ Schedule ตามความจำเป็น
BigQuery Scheduled Query คืออะไร
ตั้งเวลา SQL รันอัตโนมัติ Daily Hourly Weekly Destination Table ETL Pipeline Report Parameterized @run_date Console CLI Terraform
Micro-segmentation ลูกค้าคืออะไร
แบ่งลูกค้ากลุ่มย่อย RFM Recency Frequency Monetary Purchase Behavior Engagement Lifecycle Geographic BigQuery CRM Marketing Personalized
ตั้ง Scheduled Query อย่างไร
Console Schedule Frequency Destination Write Truncate Append Parameter @run_date CLI bq mk Terraform Notification Failed Email Pub/Sub
RFM Analysis ทำอย่างไร
Recency วันล่าสุด Frequency จำนวนครั้ง Monetary ยอดเงิน Score 1-5 NTILE Segment Champion Loyal At Risk Lost Marketing Strategy ต่างกัน
สรุป
BigQuery Scheduled Query Micro-segmentation RFM Analysis Customer SQL Analytics Automation Looker Studio CRM Marketing Personalized Campaign Production Pipeline
