ai
Supabase Realtime Data Pipeline ETL — สร้าง ETL
Supabase Realtime ETL

Supabase Realtime Data Pipeline ETL CDC PostgreSQL WebSocket Edge Functions BigQuery Elasticsearch Transform Load Production
เนื้อหาเกี่ยวข้อง — เงินฝืดผลกระทบ — ข้อมูลครบถ้วน 2026
| Component | Tool | Purpose | Latency |
|---|---|---|---|
| Source | Supabase PostgreSQL | Origin Data (Tables) | - |
| CDC | Supabase Realtime (WAL) | Capture Changes Real-time | < 100ms |
| Transform | Edge Functions / External | Business Logic Enrichment | 50-500ms |
| Load | BigQuery ES Kafka Redis | Destination Storage | 100ms-5s |
| Monitor | Dashboard Prometheus | CDC Lag Error Rate | - |
CDC Setup
# === Supabase Realtime CDC Pipeline ===
# SQL: Enable Realtime for tables
# ALTER PUBLICATION supabase_realtime ADD TABLE orders;
# ALTER PUBLICATION supabase_realtime ADD TABLE customers;
# ALTER PUBLICATION supabase_realtime ADD TABLE products;
# JavaScript Client (Node.js)
# import { createClient } from '@supabase/supabase-js'
#
# const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_KEY)
#
# // Listen to INSERT on orders
# const channel = supabase
# .channel('orders-pipeline')
# .on('postgres_changes',
# { event: 'INSERT', schema: 'public', table: 'orders' },
# async (payload) => {
# const order = payload.new
# // Transform
# const enriched = {
# order_id: order.id,
# customer_name: await getCustomerName(order.customer_id),
# total_revenue: order.quantity * order.price,
# created_at: order.created_at,
# }
# // Load to BigQuery
# await loadToBigQuery('orders_analytics', enriched)
# // Invalidate Redis cache
# await redis.del(`customer::orders`)
# }
# )
# .subscribe()
from dataclasses import dataclass
@dataclass
class CDCConfig:
table: str
events: str
transform: str
destinations: str
configs = [
CDCConfig("orders",
"INSERT UPDATE",
"Calculate revenue, enrich customer name, add geo",
"BigQuery (analytics) + Redis (cache invalidate)"),
CDCConfig("customers",
"INSERT UPDATE DELETE",
"Normalize name, validate email, calculate LTV",
"Elasticsearch (search) + CRM API (sync)"),
CDCConfig("products",
"INSERT UPDATE",
"Calculate margin, update category stats",
"Elasticsearch (catalog) + Redis (cache)"),
CDCConfig("payments",
"INSERT",
"Validate amount, calculate fees, reconcile",
"BigQuery (finance) + Webhook (accounting)"),
CDCConfig("user_events",
"INSERT",
"Sessionize, calculate engagement score",
"BigQuery (analytics) + Kafka (streaming)"),
]
print("=== CDC Pipeline Config ===")
for c in configs:
print(f" [{c.table}] Events: {c.events}")
print(f" Transform: {c.transform}")
print(f" Destinations: {c.destinations}")
Edge Function Transform

# === Supabase Edge Function for Transform ===
# // supabase/functions/order-pipeline/index.ts
# import { serve } from "https://deno.land/std/http/server.ts"
# import { createClient } from "https://esm.sh/@supabase/supabase-js"
#
# serve(async (req) => {
# const { record, type, table } = await req.json()
#
# if (table === 'orders' && type === 'INSERT') {
# // Transform
# const enriched = {
# order_id: record.id,
# revenue: record.quantity * record.price,
# tax: record.quantity * record.price * 0.07,
# region: getRegion(record.shipping_address),
# processed_at: new Date().toISOString(),
# }
#
# // Load to BigQuery
# await fetch('https://bigquery.googleapis.com/...', {
# method: 'POST',
# headers: { Authorization: `Bearer ` },
# body: JSON.stringify({ rows: [enriched] }),
# })
#
# return new Response(JSON.stringify({ status: 'ok' }))
# }
# })
# Database Trigger to call Edge Function
# CREATE OR REPLACE FUNCTION notify_order_pipeline()
# RETURNS trigger AS $$
# BEGIN
# PERFORM net.http_post(
# url := 'https://project.supabase.co/functions/v1/order-pipeline',
# headers := '{"Authorization": "Bearer SERVICE_KEY"}'::jsonb,
# body := jsonb_build_object('record', NEW, 'type', TG_OP, 'table', TG_TABLE_NAME)
# );
# RETURN NEW;
# END;
# $$ LANGUAGE plpgsql;
#
# CREATE TRIGGER order_pipeline_trigger
# AFTER INSERT ON orders
# FOR EACH ROW EXECUTE FUNCTION notify_order_pipeline();
@dataclass
class TransformStep:
step: str
input_field: str
output_field: str
logic: str
steps = [
TransformStep("Calculate Revenue",
"quantity, price",
"revenue, tax",
"revenue = qty * price | tax = revenue * 0.07"),
TransformStep("Enrich Customer",
"customer_id",
"customer_name, customer_tier",
"Lookup customers table JOIN"),
TransformStep("Geo Enrichment",
"shipping_address",
"region, country, city",
"Parse address → geo lookup"),
TransformStep("Deduplication",
"order_id",
"is_duplicate",
"Check idempotency_key ใน Redis"),
TransformStep("Validation",
"all fields",
"is_valid, errors[]",
"Check NOT NULL range format"),
]
print("=== Transform Pipeline ===")
for s in steps:
print(f" [{s.step}] {s.input_field} → {s.output_field}")
print(f" Logic: {s.logic}")
Monitoring & Scaling
# === Production Monitoring ===
@dataclass
class PipeMetric:
metric: str
source: str
target: str
alert: str
metrics = [
PipeMetric("CDC Lag",
"Supabase Dashboard / Custom metric",
"< 1 second",
"> 5s → P2 Check DB Load Replication"),
PipeMetric("Events per Second",
"Custom counter in Transform",
"ตาม Baseline (10-1000 eps)",
"Drop > 50% → P2 Check Source CDC"),
PipeMetric("Transform Error Rate",
"Edge Function Logs / Custom",
"< 0.1%",
"> 1% → P1 Check Transform Logic Data"),
PipeMetric("Load Latency",
"Custom timer Source → Destination",
"< 5 seconds end-to-end",
"> 30s → P2 Check Destination Network"),
PipeMetric("Realtime Connections",
"Supabase Dashboard",
"< 80% of Plan Limit",
"> 90% → P3 Upgrade Plan Scale"),
PipeMetric("Dead Letter Queue",
"DLQ count (Redis/Table)",
"0 (empty)",
"> 0 → P2 Process Failed Events"),
]
print("=== Pipeline Monitoring ===")
for m in metrics:
print(f" [{m.metric}] Target: {m.target}")
print(f" Source: {m.source}")
print(f" Alert: {m.alert}")
เคล็ดลับ
- Publication: เปิด Realtime เฉพาะ Table ที่ต้องการ ลด Load
- Read Replica: ใช้ Read Replica สำหรับ CDC ไม่กระทบ Primary
- Idempotency: ใช้ Idempotency Key ป้องกัน Duplicate Processing
- DLQ: ตั้ง Dead Letter Queue สำหรับ Failed Events
- Batch: Batch Load ไป Destination ลด API Call
Supabase คืออะไร
Open Source Firebase Alternative PostgreSQL Auth Storage Edge Functions Realtime WebSocket CDC WAL Row Level Security Managed Cloud
แนะนำเพิ่มเติม — ระบบเทรดของ iCafeForex
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Databricks Unity Catalog Disaster Recovery Plan
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน DuckDB Analytics Zero Downtime Deployment





