Supabase Realtime ETL
Supabase Realtime Data Pipeline ETL CDC PostgreSQL WebSocket Edge Functions BigQuery Elasticsearch Transform Load Production
| Component | Tool | Purpose | Latency |
|---|---|---|---|
| Source | Supabase PostgreSQL | Origin Data (Tables) | - |
| CDC | Supabase Realtime (WAL) | Capture Changes Real-time | < 100ms |
| Transform | Edge Functions / External | Business Logic Enrichment | 50-500ms |
| Load | BigQuery ES Kafka Redis | Destination Storage | 100ms-5s |
| Monitor | Dashboard Prometheus | CDC Lag Error Rate | - |
CDC Setup
# === Supabase Realtime CDC Pipeline ===
# SQL: Enable Realtime for tables
# ALTER PUBLICATION supabase_realtime ADD TABLE orders;
# ALTER PUBLICATION supabase_realtime ADD TABLE customers;
# ALTER PUBLICATION supabase_realtime ADD TABLE products;
# JavaScript Client (Node.js)
# import { createClient } from '@supabase/supabase-js'
#
# const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_KEY)
#
# // Listen to INSERT on orders
# const channel = supabase
# .channel('orders-pipeline')
# .on('postgres_changes',
# { event: 'INSERT', schema: 'public', table: 'orders' },
# async (payload) => {
# const order = payload.new
# // Transform
# const enriched = {
# order_id: order.id,
# customer_name: await getCustomerName(order.customer_id),
# total_revenue: order.quantity * order.price,
# created_at: order.created_at,
# }
# // Load to BigQuery
# await loadToBigQuery('orders_analytics', enriched)
# // Invalidate Redis cache
# await redis.del(`customer::orders`)
# }
# )
# .subscribe()
from dataclasses import dataclass
@dataclass
class CDCConfig:
table: str
events: str
transform: str
destinations: str
configs = [
CDCConfig("orders",
"INSERT UPDATE",
"Calculate revenue, enrich customer name, add geo",
"BigQuery (analytics) + Redis (cache invalidate)"),
CDCConfig("customers",
"INSERT UPDATE DELETE",
"Normalize name, validate email, calculate LTV",
"Elasticsearch (search) + CRM API (sync)"),
CDCConfig("products",
"INSERT UPDATE",
"Calculate margin, update category stats",
"Elasticsearch (catalog) + Redis (cache)"),
CDCConfig("payments",
"INSERT",
"Validate amount, calculate fees, reconcile",
"BigQuery (finance) + Webhook (accounting)"),
CDCConfig("user_events",
"INSERT",
"Sessionize, calculate engagement score",
"BigQuery (analytics) + Kafka (streaming)"),
]
print("=== CDC Pipeline Config ===")
for c in configs:
print(f" [{c.table}] Events: {c.events}")
print(f" Transform: {c.transform}")
print(f" Destinations: {c.destinations}")
Edge Function Transform
# === Supabase Edge Function for Transform ===
# // supabase/functions/order-pipeline/index.ts
# import { serve } from "https://deno.land/std/http/server.ts"
# import { createClient } from "https://esm.sh/@supabase/supabase-js"
#
# serve(async (req) => {
# const { record, type, table } = await req.json()
#
# if (table === 'orders' && type === 'INSERT') {
# // Transform
# const enriched = {
# order_id: record.id,
# revenue: record.quantity * record.price,
# tax: record.quantity * record.price * 0.07,
# region: getRegion(record.shipping_address),
# processed_at: new Date().toISOString(),
# }
#
# // Load to BigQuery
# await fetch('https://bigquery.googleapis.com/...', {
# method: 'POST',
# headers: { Authorization: `Bearer ` },
# body: JSON.stringify({ rows: [enriched] }),
# })
#
# return new Response(JSON.stringify({ status: 'ok' }))
# }
# })
# Database Trigger to call Edge Function
# CREATE OR REPLACE FUNCTION notify_order_pipeline()
# RETURNS trigger AS $$
# BEGIN
# PERFORM net.http_post(
# url := 'https://project.supabase.co/functions/v1/order-pipeline',
# headers := '{"Authorization": "Bearer SERVICE_KEY"}'::jsonb,
# body := jsonb_build_object('record', NEW, 'type', TG_OP, 'table', TG_TABLE_NAME)
# );
# RETURN NEW;
# END;
# $$ LANGUAGE plpgsql;
#
# CREATE TRIGGER order_pipeline_trigger
# AFTER INSERT ON orders
# FOR EACH ROW EXECUTE FUNCTION notify_order_pipeline();
@dataclass
class TransformStep:
step: str
input_field: str
output_field: str
logic: str
steps = [
TransformStep("Calculate Revenue",
"quantity, price",
"revenue, tax",
"revenue = qty * price | tax = revenue * 0.07"),
TransformStep("Enrich Customer",
"customer_id",
"customer_name, customer_tier",
"Lookup customers table JOIN"),
TransformStep("Geo Enrichment",
"shipping_address",
"region, country, city",
"Parse address → geo lookup"),
TransformStep("Deduplication",
"order_id",
"is_duplicate",
"Check idempotency_key ใน Redis"),
TransformStep("Validation",
"all fields",
"is_valid, errors[]",
"Check NOT NULL range format"),
]
print("=== Transform Pipeline ===")
for s in steps:
print(f" [{s.step}] {s.input_field} → {s.output_field}")
print(f" Logic: {s.logic}")
Monitoring & Scaling
# === Production Monitoring ===
@dataclass
class PipeMetric:
metric: str
source: str
target: str
alert: str
metrics = [
PipeMetric("CDC Lag",
"Supabase Dashboard / Custom metric",
"< 1 second",
"> 5s → P2 Check DB Load Replication"),
PipeMetric("Events per Second",
"Custom counter in Transform",
"ตาม Baseline (10-1000 eps)",
"Drop > 50% → P2 Check Source CDC"),
PipeMetric("Transform Error Rate",
"Edge Function Logs / Custom",
"< 0.1%",
"> 1% → P1 Check Transform Logic Data"),
PipeMetric("Load Latency",
"Custom timer Source → Destination",
"< 5 seconds end-to-end",
"> 30s → P2 Check Destination Network"),
PipeMetric("Realtime Connections",
"Supabase Dashboard",
"< 80% of Plan Limit",
"> 90% → P3 Upgrade Plan Scale"),
PipeMetric("Dead Letter Queue",
"DLQ count (Redis/Table)",
"0 (empty)",
"> 0 → P2 Process Failed Events"),
]
print("=== Pipeline Monitoring ===")
for m in metrics:
print(f" [{m.metric}] Target: {m.target}")
print(f" Source: {m.source}")
print(f" Alert: {m.alert}")
เคล็ดลับ
- Publication: เปิด Realtime เฉพาะ Table ที่ต้องการ ลด Load
- Read Replica: ใช้ Read Replica สำหรับ CDC ไม่กระทบ Primary
- Idempotency: ใช้ Idempotency Key ป้องกัน Duplicate Processing
- DLQ: ตั้ง Dead Letter Queue สำหรับ Failed Events
- Batch: Batch Load ไป Destination ลด API Call
Supabase คืออะไร
Open Source Firebase Alternative PostgreSQL Auth Storage Edge Functions Realtime WebSocket CDC WAL Row Level Security Managed Cloud
CDC Pipeline ทำอย่างไร
postgres_changes WAL Logical Replication ALTER PUBLICATION WebSocket Client Transform Load BigQuery Elasticsearch Redis Kafka Webhook
Transform ทำอย่างไร
Edge Functions Deno Database Trigger pg_net Revenue Calculate Enrich Geo Deduplication Validation Error Handling DLQ Retry Idempotency
Production ตั้งอย่างไร
Supabase Cloud Pro Team Self-hosted Kubernetes Read Replica Supavisor Prometheus Grafana CDC Lag Events/s Error Rate RLS API Key TLS
สรุป
Supabase Realtime Data Pipeline ETL CDC PostgreSQL WAL Edge Functions Transform BigQuery Elasticsearch Redis Monitoring Production
