Fivetran Automation
Fivetran Connector Automation REST API ELT Pipeline Snowflake BigQuery Sync Schedule Monitor Alert Python Terraform CI/CD
| Operation | API Endpoint | Method | Use Case |
|---|---|---|---|
| List Connectors | /v1/connectors | GET | ดูรายการ Connector ทั้งหมด |
| Create Connector | /v1/connectors | POST | สร้าง Connector ใหม่อัตโนมัติ |
| Update Config | /v1/connectors/{id} | PATCH | แก้ไข Schedule Schema Config |
| Force Sync | /v1/connectors/{id}/force-sync | POST | บังคับ Sync ทันที |
| Pause/Resume | /v1/connectors/{id} | PATCH | หยุด/เริ่ม Connector |
| Get Status | /v1/connectors/{id} | GET | ตรวจ Sync Status |
Python Automation Script
# === Fivetran Connector Automation ===
# import requests
# import os
# from datetime import datetime, timedelta
#
# FIVETRAN_API_KEY = os.environ['FIVETRAN_API_KEY']
# FIVETRAN_API_SECRET = os.environ['FIVETRAN_API_SECRET']
# BASE_URL = 'https://api.fivetran.com/v1'
#
# session = requests.Session()
# session.auth = (FIVETRAN_API_KEY, FIVETRAN_API_SECRET)
# session.headers.update({'Content-Type': 'application/json'})
#
# # List all connectors
# def list_connectors(group_id):
# resp = session.get(f'{BASE_URL}/groups/{group_id}/connectors')
# return resp.json()['data']['items']
#
# # Force sync a connector
# def force_sync(connector_id):
# resp = session.post(f'{BASE_URL}/connectors/{connector_id}/force-sync')
# return resp.json()
#
# # Pause/Resume connector
# def set_paused(connector_id, paused: bool):
# resp = session.patch(f'{BASE_URL}/connectors/{connector_id}',
# json={'paused': paused})
# return resp.json()
#
# # Check connector health
# def check_health(connector_id):
# resp = session.get(f'{BASE_URL}/connectors/{connector_id}')
# data = resp.json()['data']
# status = data['status']
# return {
# 'id': connector_id,
# 'name': data['schema'],
# 'sync_state': status['sync_state'],
# 'setup_state': status['setup_state'],
# 'is_historical_sync': status.get('is_historical_sync', False),
# 'succeeded_at': data.get('succeeded_at'),
# 'failed_at': data.get('failed_at'),
# }
from dataclasses import dataclass
@dataclass
class ConnectorConfig:
name: str
service: str
destination: str
sync_frequency: int
schema_prefix: str
paused: bool
connectors = [
ConnectorConfig("production_mysql",
"mysql", "Snowflake",
60, "prod_mysql", False),
ConnectorConfig("salesforce_crm",
"salesforce", "BigQuery",
360, "sf_crm", False),
ConnectorConfig("stripe_payments",
"stripe", "Snowflake",
60, "stripe", False),
ConnectorConfig("google_analytics",
"google_analytics_4", "BigQuery",
720, "ga4", False),
ConnectorConfig("shopify_store",
"shopify", "Snowflake",
120, "shopify", False),
ConnectorConfig("hubspot_marketing",
"hubspot", "BigQuery",
360, "hubspot", True),
]
print("=== Connectors ===")
for c in connectors:
status = "PAUSED" if c.paused else "ACTIVE"
print(f" [{c.name}] {c.service} → {c.destination} [{status}]")
print(f" Sync: every {c.sync_frequency} min | Schema: {c.schema_prefix}")
Monitoring & Alerting
# === Connector Monitoring Script ===
# def monitor_all_connectors(group_id):
# connectors = list_connectors(group_id)
# alerts = []
# for conn in connectors:
# health = check_health(conn['id'])
# # Check if sync failed
# if health['sync_state'] == 'failure':
# alerts.append({
# 'connector': health['name'],
# 'issue': 'Sync Failed',
# 'failed_at': health['failed_at'],
# 'severity': 'critical'
# })
# # Check if sync is stale (no success in 2x frequency)
# if health['succeeded_at']:
# last_sync = datetime.fromisoformat(health['succeeded_at'])
# if datetime.utcnow() - last_sync > timedelta(hours=4):
# alerts.append({
# 'connector': health['name'],
# 'issue': 'Stale Data',
# 'last_sync': str(last_sync),
# 'severity': 'warning'
# })
# return alerts
#
# def send_slack_alert(alerts):
# webhook = os.environ['SLACK_WEBHOOK']
# for alert in alerts:
# requests.post(webhook, json={
# 'text': f"[{alert['severity'].upper()}] {alert['connector']}: {alert['issue']}"
# })
@dataclass
class AlertRule:
rule: str
condition: str
severity: str
action: str
rules = [
AlertRule("Sync Failure",
"sync_state == 'failure'",
"CRITICAL",
"Slack + PagerDuty + Auto-retry 1x"),
AlertRule("Stale Data",
"last_sync > 2x sync_frequency",
"WARNING",
"Slack + Email + Force Sync"),
AlertRule("Schema Change",
"schema_change_detected == true",
"INFO",
"Slack + Log + Review Required"),
AlertRule("Row Count Drop",
"row_count < 50% of average",
"WARNING",
"Slack + Data Quality Check"),
AlertRule("Historical Sync Stuck",
"is_historical_sync > 24h",
"WARNING",
"Slack + Review Connector Config"),
AlertRule("API Rate Limit",
"429 response from Source API",
"INFO",
"Log + Adjust Sync Frequency"),
]
print("=== Alert Rules ===")
for r in rules:
print(f" [{r.rule}] {r.severity}")
print(f" Condition: {r.condition}")
print(f" Action: {r.action}")
Terraform IaC
# === Fivetran Terraform Provider ===
# terraform {
# required_providers {
# fivetran = {
# source = "fivetran/fivetran"
# version = "~> 1.0"
# }
# }
# }
#
# provider "fivetran" {
# api_key = var.fivetran_api_key
# api_secret = var.fivetran_api_secret
# }
#
# resource "fivetran_connector" "mysql_production" {
# group_id = fivetran_group.data_team.id
# service = "mysql"
# sync_frequency = 60
# paused = false
# trust_certificates = true
#
# destination_schema {
# prefix = "prod_mysql"
# }
#
# config {
# host = var.mysql_host
# port = 3306
# user = var.mysql_user
# password = var.mysql_password
# database = "production"
# }
# }
@dataclass
class TerraformResource:
resource: str
purpose: str
key_config: str
lifecycle: str
tf_resources = [
TerraformResource("fivetran_group",
"Group สำหรับจัดกลุ่ม Connector + Destination",
"name, destination_id",
"สร้างก่อน Connector"),
TerraformResource("fivetran_connector",
"Connector ดึงข้อมูลจาก Source",
"service, sync_frequency, config (host/user/pass)",
"Depends on Group + Destination"),
TerraformResource("fivetran_connector_schema_config",
"กำหนด Table/Column ที่ต้องการ Sync",
"schema_change_handling, schemas",
"Apply หลัง Connector สร้างเสร็จ"),
TerraformResource("fivetran_destination",
"Destination (Snowflake/BigQuery)",
"service, config (host/database/credentials)",
"สร้างก่อน Group"),
TerraformResource("fivetran_dbt_transformation",
"dbt Transformation หลัง Sync",
"dbt_project_id, run_tests",
"Optional สำหรับ Transform"),
]
print("=== Terraform Resources ===")
for t in tf_resources:
print(f" [{t.resource}] {t.purpose}")
print(f" Config: {t.key_config}")
print(f" Lifecycle: {t.lifecycle}")
เคล็ดลับ
- IaC: ใช้ Terraform จัดการ Connector เป็น Code Version Control ได้
- Monitor: ตรวจ Sync Status ทุก 15 นาที แจ้ง Slack ทันทีถ้า Fail
- Schema: ตั้ง schema_change_handling เป็น ALLOW_ALL สำหรับ Source ที่เปลี่ยนบ่อย
- Rate Limit: API 100 req/min ใช้ Batch Operations ลด Request
- dbt: เชื่อม dbt Transform หลัง Sync อัตโนมัติ
Fivetran คืออะไร
Cloud ELT Platform 300+ Connectors MySQL PostgreSQL Salesforce Snowflake BigQuery Auto Schema Migration dbt REST API Schedule Sync
Automation Script ทำอะไรได้
Create Update Pause Resume Force Sync Status History Monitor Alert Batch Terraform IaC Python REST API GitHub Actions CI/CD
ใช้ REST API อย่างไร
API Key Secret Basic Auth GET POST PATCH DELETE /v1/connectors force-sync schemas state Python requests JSON Rate Limit 100/min
Monitor อย่างไร
Sync Status succeeded_at failed_at Cron Slack PagerDuty Webhook Row Count Schema Change SLA Data Freshness Prometheus Grafana Dashboard
สรุป
Fivetran Connector Automation REST API Python Terraform IaC Monitor Alert Slack Sync Schedule Schema Force Sync dbt Production Pipeline
