Fivetran Connector Automation Script — สร้าง
Fivetran Automation
Fivetran Connector Automation REST API ELT Pipeline Snowflake BigQuery Sync Schedule Monitor Alert Python Terraform CI/CD
| Operation | API Endpoint | Method | Use Case |
|---|---|---|---|
| List Connectors | /v1/connectors | GET | ดูรายการ Connector ทั้งหมด |
| Create Connector | /v1/connectors | POST | สร้าง Connector ใหม่อัตโนมัติ |
| Update Config | /v1/connectors/{id} | PATCH | แก้ไข Schedule Schema Config |
| Force Sync | /v1/connectors/{id}/force-sync | POST | บังคับ Sync ทันที |
| Pause/Resume | /v1/connectors/{id} | PATCH | หยุด/เริ่ม Connector |
| Get Status | /v1/connectors/{id} | GET | ตรวจ Sync Status |
Python Automation Script
# === Fivetran Connector Automation ===
# import requests
# import os
# from datetime import datetime, timedelta
#
# FIVETRAN_API_KEY = os.environ['FIVETRAN_API_KEY']
# FIVETRAN_API_SECRET = os.environ['FIVETRAN_API_SECRET']
# BASE_URL = 'https://api.fivetran.com/v1'
#
# session = requests.Session()
# session.auth = (FIVETRAN_API_KEY, FIVETRAN_API_SECRET)
# session.headers.update({'Content-Type': 'application/json'})
#
# # List all connectors
# def list_connectors(group_id):
# resp = session.get(f'{BASE_URL}/groups/{group_id}/connectors')
# return resp.json()['data']['items']
#
# # Force sync a connector
# def force_sync(connector_id):
# resp = session.post(f'{BASE_URL}/connectors/{connector_id}/force-sync')
# return resp.json()
#
# # Pause/Resume connector
# def set_paused(connector_id, paused: bool):
# resp = session.patch(f'{BASE_URL}/connectors/{connector_id}',
# json={'paused': paused})
# return resp.json()
#
# # Check connector health
# def check_health(connector_id):
# resp = session.get(f'{BASE_URL}/connectors/{connector_id}')
# data = resp.json()['data']
# status = data['status']
# return {
# 'id': connector_id,
# 'name': data['schema'],
# 'sync_state': status['sync_state'],
# 'setup_state': status['setup_state'],
# 'is_historical_sync': status.get('is_historical_sync', False),
# 'succeeded_at': data.get('succeeded_at'),
# 'failed_at': data.get('failed_at'),
# }
from dataclasses import dataclass
@dataclass
class ConnectorConfig:
name: str
service: str
destination: str
sync_frequency: int
schema_prefix: str
paused: bool
connectors = [
ConnectorConfig("production_mysql",
"mysql", "Snowflake",
60, "prod_mysql", False),
ConnectorConfig("salesforce_crm",
"salesforce", "BigQuery",
360, "sf_crm", False),
ConnectorConfig("stripe_payments",
"stripe", "Snowflake",
60, "stripe", False),
ConnectorConfig("google_analytics",
"google_analytics_4", "BigQuery",
720, "ga4", False),
ConnectorConfig("shopify_store",
"shopify", "Snowflake",
120, "shopify", False),
ConnectorConfig("hubspot_marketing",
"hubspot", "BigQuery",
360, "hubspot", True),
]
print("=== Connectors ===")
for c in connectors:
status = "PAUSED" if c.paused else "ACTIVE"
print(f" [{c.name}] {c.service} → {c.destination} [{status}]")
print(f" Sync: every {c.sync_frequency} min | Schema: {c.schema_prefix}")
Monitoring & Alerting
# === Connector Monitoring Script ===
# def monitor_all_connectors(group_id):
# connectors = list_connectors(group_id)
# alerts = []
# for conn in connectors:
# health = check_health(conn['id'])
# # Check if sync failed
# if health['sync_state'] == 'failure':
# alerts.append({
# 'connector': health['name'],
# 'issue': 'Sync Failed',
# 'failed_at': health['failed_at'],
# 'severity': 'critical'
# })
# # Check if sync is stale (no success in 2x frequency)
# if health['succeeded_at']:
# last_sync = datetime.fromisoformat(health['succeeded_at'])
# if datetime.utcnow() - last_sync > timedelta(hours=4):
# alerts.append({
# 'connector': health['name'],
# 'issue': 'Stale Data',
# 'last_sync': str(last_sync),
# 'severity': 'warning'
# })
# return alerts
#
# def send_slack_alert(alerts):
# webhook = os.environ['SLACK_WEBHOOK']
# for alert in alerts:
# requests.post(webhook, json={
# 'text': f"[{alert['severity'].upper()}] {alert['connector']}: {alert['issue']}"
# })
@dataclass
class AlertRule:
rule: str
condition: str
severity: str
action: str
rules = [
AlertRule("Sync Failure",
"sync_state == 'failure'",
"CRITICAL",
"Slack + PagerDuty + Auto-retry 1x"),
AlertRule("Stale Data",
"last_sync > 2x sync_frequency",
"WARNING",
"Slack + Email + Force Sync"),
AlertRule("Schema Change",
"schema_change_detected == true",
"INFO",
"Slack + Log + Review Required"),
AlertRule("Row Count Drop",
"row_count < 50% of average",
"WARNING",
"Slack + Data Quality Check"),
AlertRule("Historical Sync Stuck",
"is_historical_sync > 24h",
"WARNING",
"Slack + Review Connector Config"),
AlertRule("API Rate Limit",
"429 response from Source API",
"INFO",
"Log + Adjust Sync Frequency"),
]
print("=== Alert Rules ===")
for r in rules:
print(f" [{r.rule}] {r.severity}")
print(f" Condition: {r.condition}")
print(f" Action: {r.action}")
Terraform IaC
# === Fivetran Terraform Provider ===
# terraform {
# required_providers {
# fivetran = {
# source = "fivetran/fivetran"
# version = "~> 1.0"
# }
# }
# }
#
# provider "fivetran" {
# api_key = var.fivetran_api_key
# api_secret = var.fivetran_api_secret
# }
#
# resource "fivetran_connector" "mysql_production" {
# group_id = fivetran_group.data_team.id
# service = "mysql"
# sync_frequency = 60
# paused = false
# trust_certificates = true
#
# destination_schema {
# prefix = "prod_mysql"
# }
#
# config {
# host = var.mysql_host
# port = 3306
# user = var.mysql_user
# password = var.mysql_password
# database = "production"
# }
# }
@dataclass
class TerraformResource:
resource: str
purpose: str
key_config: str
lifecycle: str
tf_resources = [
TerraformResource("fivetran_group",
"Group สำหรับจัดกลุ่ม Connector + Destination",
"name, destination_id",
"สร้างก่อน Connector"),
TerraformResource("fivetran_connector",
"Connector ดึงข้อมูลจาก Source",
"service, sync_frequency, config (host/user/pass)",
"Depends on Group + Destination"),
TerraformResource("fivetran_connector_schema_config",
"กำหนด Table/Column ที่ต้องการ Sync",
"schema_change_handling, schemas",
"Apply หลัง Connector สร้างเสร็จ"),
TerraformResource("fivetran_destination",
"Destination (Snowflake/BigQuery)",
"service, config (host/database/credentials)",
"สร้างก่อน Group"),
TerraformResource("fivetran_dbt_transformation",
"dbt Transformation หลัง Sync",
"dbt_project_id, run_tests",
"Optional สำหรับ Transform"),
]
print("=== Terraform Resources ===")
for t in tf_resources:
print(f" [{t.resource}] {t.purpose}")
print(f" Config: {t.key_config}")
print(f" Lifecycle: {t.lifecycle}")
เคล็ดลับ
- IaC: ใช้ Terraform จัดการ Connector เป็น Code Version Control ได้
- Monitor: ตรวจ Sync Status ทุก 15 นาที แจ้ง Slack ทันทีถ้า Fail
- Schema: ตั้ง schema_change_handling เป็น ALLOW_ALL สำหรับ Source ที่เปลี่ยนบ่อย
- Rate Limit: API 100 req/min ใช้ Batch Operations ลด Request
- dbt: เชื่อม dbt Transform หลัง Sync อัตโนมัติ
Fivetran คืออะไร
Cloud ELT Platform 300+ Connectors MySQL PostgreSQL Salesforce Snowflake BigQuery Auto Schema Migration dbt REST API Schedule Sync
อ่านเพิ่ม: Rate Limiting คืออะไร? สอนป้องกัน API ด้วย Rate Limit, Throt · อ่านเพิ่ม: Cloud Computing สำหรับ Developer สอน AWS เบื้องต้น EC2 S3 La · อ่านเพิ่ม: Kubernetes ขั้นสูง สอน Helm Charts, Operators, Service Mesh