SiamCafe.net Blog
Technology

Supabase Realtime Data Pipeline ETL

supabase realtime data pipeline etl
Supabase Realtime Data Pipeline ETL | SiamCafe Blog
2025-06-22· อ. บอม — SiamCafe.net· 9,936 คำ

Supabase Realtime ETL

Supabase Realtime Data Pipeline ETL CDC PostgreSQL WebSocket Edge Functions BigQuery Elasticsearch Transform Load Production

ComponentToolPurposeLatency
SourceSupabase PostgreSQLOrigin Data (Tables)-
CDCSupabase Realtime (WAL)Capture Changes Real-time< 100ms
TransformEdge Functions / ExternalBusiness Logic Enrichment50-500ms
LoadBigQuery ES Kafka RedisDestination Storage100ms-5s
MonitorDashboard PrometheusCDC Lag Error Rate-

CDC Setup

# === Supabase Realtime CDC Pipeline ===

# SQL: Enable Realtime for tables
# ALTER PUBLICATION supabase_realtime ADD TABLE orders;
# ALTER PUBLICATION supabase_realtime ADD TABLE customers;
# ALTER PUBLICATION supabase_realtime ADD TABLE products;

# JavaScript Client (Node.js)
# import { createClient } from '@supabase/supabase-js'
#
# const supabase = createClient(SUPABASE_URL, SUPABASE_SERVICE_KEY)
#
# // Listen to INSERT on orders
# const channel = supabase
#   .channel('orders-pipeline')
#   .on('postgres_changes',
#     { event: 'INSERT', schema: 'public', table: 'orders' },
#     async (payload) => {
#       const order = payload.new
#       // Transform
#       const enriched = {
#         order_id: order.id,
#         customer_name: await getCustomerName(order.customer_id),
#         total_revenue: order.quantity * order.price,
#         created_at: order.created_at,
#       }
#       // Load to BigQuery
#       await loadToBigQuery('orders_analytics', enriched)
#       // Invalidate Redis cache
#       await redis.del(`customer::orders`)
#     }
#   )
#   .subscribe()

from dataclasses import dataclass

@dataclass
class CDCConfig:
    table: str
    events: str
    transform: str
    destinations: str

configs = [
    CDCConfig("orders",
        "INSERT UPDATE",
        "Calculate revenue, enrich customer name, add geo",
        "BigQuery (analytics) + Redis (cache invalidate)"),
    CDCConfig("customers",
        "INSERT UPDATE DELETE",
        "Normalize name, validate email, calculate LTV",
        "Elasticsearch (search) + CRM API (sync)"),
    CDCConfig("products",
        "INSERT UPDATE",
        "Calculate margin, update category stats",
        "Elasticsearch (catalog) + Redis (cache)"),
    CDCConfig("payments",
        "INSERT",
        "Validate amount, calculate fees, reconcile",
        "BigQuery (finance) + Webhook (accounting)"),
    CDCConfig("user_events",
        "INSERT",
        "Sessionize, calculate engagement score",
        "BigQuery (analytics) + Kafka (streaming)"),
]

print("=== CDC Pipeline Config ===")
for c in configs:
    print(f"  [{c.table}] Events: {c.events}")
    print(f"    Transform: {c.transform}")
    print(f"    Destinations: {c.destinations}")

Edge Function Transform

# === Supabase Edge Function for Transform ===

# // supabase/functions/order-pipeline/index.ts
# import { serve } from "https://deno.land/std/http/server.ts"
# import { createClient } from "https://esm.sh/@supabase/supabase-js"
#
# serve(async (req) => {
#   const { record, type, table } = await req.json()
#
#   if (table === 'orders' && type === 'INSERT') {
#     // Transform
#     const enriched = {
#       order_id: record.id,
#       revenue: record.quantity * record.price,
#       tax: record.quantity * record.price * 0.07,
#       region: getRegion(record.shipping_address),
#       processed_at: new Date().toISOString(),
#     }
#
#     // Load to BigQuery
#     await fetch('https://bigquery.googleapis.com/...', {
#       method: 'POST',
#       headers: { Authorization: `Bearer ` },
#       body: JSON.stringify({ rows: [enriched] }),
#     })
#
#     return new Response(JSON.stringify({ status: 'ok' }))
#   }
# })

# Database Trigger to call Edge Function
# CREATE OR REPLACE FUNCTION notify_order_pipeline()
# RETURNS trigger AS $$
# BEGIN
#   PERFORM net.http_post(
#     url := 'https://project.supabase.co/functions/v1/order-pipeline',
#     headers := '{"Authorization": "Bearer SERVICE_KEY"}'::jsonb,
#     body := jsonb_build_object('record', NEW, 'type', TG_OP, 'table', TG_TABLE_NAME)
#   );
#   RETURN NEW;
# END;
# $$ LANGUAGE plpgsql;
#
# CREATE TRIGGER order_pipeline_trigger
# AFTER INSERT ON orders
# FOR EACH ROW EXECUTE FUNCTION notify_order_pipeline();

@dataclass
class TransformStep:
    step: str
    input_field: str
    output_field: str
    logic: str

steps = [
    TransformStep("Calculate Revenue",
        "quantity, price",
        "revenue, tax",
        "revenue = qty * price | tax = revenue * 0.07"),
    TransformStep("Enrich Customer",
        "customer_id",
        "customer_name, customer_tier",
        "Lookup customers table JOIN"),
    TransformStep("Geo Enrichment",
        "shipping_address",
        "region, country, city",
        "Parse address → geo lookup"),
    TransformStep("Deduplication",
        "order_id",
        "is_duplicate",
        "Check idempotency_key ใน Redis"),
    TransformStep("Validation",
        "all fields",
        "is_valid, errors[]",
        "Check NOT NULL range format"),
]

print("=== Transform Pipeline ===")
for s in steps:
    print(f"  [{s.step}] {s.input_field} → {s.output_field}")
    print(f"    Logic: {s.logic}")

Monitoring & Scaling

# === Production Monitoring ===

@dataclass
class PipeMetric:
    metric: str
    source: str
    target: str
    alert: str

metrics = [
    PipeMetric("CDC Lag",
        "Supabase Dashboard / Custom metric",
        "< 1 second",
        "> 5s → P2 Check DB Load Replication"),
    PipeMetric("Events per Second",
        "Custom counter in Transform",
        "ตาม Baseline (10-1000 eps)",
        "Drop > 50% → P2 Check Source CDC"),
    PipeMetric("Transform Error Rate",
        "Edge Function Logs / Custom",
        "< 0.1%",
        "> 1% → P1 Check Transform Logic Data"),
    PipeMetric("Load Latency",
        "Custom timer Source → Destination",
        "< 5 seconds end-to-end",
        "> 30s → P2 Check Destination Network"),
    PipeMetric("Realtime Connections",
        "Supabase Dashboard",
        "< 80% of Plan Limit",
        "> 90% → P3 Upgrade Plan Scale"),
    PipeMetric("Dead Letter Queue",
        "DLQ count (Redis/Table)",
        "0 (empty)",
        "> 0 → P2 Process Failed Events"),
]

print("=== Pipeline Monitoring ===")
for m in metrics:
    print(f"  [{m.metric}] Target: {m.target}")
    print(f"    Source: {m.source}")
    print(f"    Alert: {m.alert}")

เคล็ดลับ

Supabase คืออะไร

Open Source Firebase Alternative PostgreSQL Auth Storage Edge Functions Realtime WebSocket CDC WAL Row Level Security Managed Cloud

CDC Pipeline ทำอย่างไร

postgres_changes WAL Logical Replication ALTER PUBLICATION WebSocket Client Transform Load BigQuery Elasticsearch Redis Kafka Webhook

Transform ทำอย่างไร

Edge Functions Deno Database Trigger pg_net Revenue Calculate Enrich Geo Deduplication Validation Error Handling DLQ Retry Idempotency

Production ตั้งอย่างไร

Supabase Cloud Pro Team Self-hosted Kubernetes Read Replica Supavisor Prometheus Grafana CDC Lag Events/s Error Rate RLS API Key TLS

สรุป

Supabase Realtime Data Pipeline ETL CDC PostgreSQL WAL Edge Functions Transform BigQuery Elasticsearch Redis Monitoring Production

📖 บทความที่เกี่ยวข้อง

Supabase Realtime Monitoring และ Alertingอ่านบทความ → Supabase Realtime Real-time Processingอ่านบทความ → PlanetScale Vitess Data Pipeline ETLอ่านบทความ → React Server Components Data Pipeline ETLอ่านบทความ → ONNX Runtime Data Pipeline ETLอ่านบทความ →

📚 ดูบทความทั้งหมด →