Technology

Fivetran Connector Automation Script

fivetran connector automation script
Fivetran Connector Automation Script | SiamCafe Blog
2026-01-12· อ. บอม — SiamCafe.net· 8,556 คำ

Fivetran Automation

Fivetran Connector Automation REST API ELT Pipeline Snowflake BigQuery Sync Schedule Monitor Alert Python Terraform CI/CD

OperationAPI EndpointMethodUse Case
List Connectors/v1/connectorsGETดูรายการ Connector ทั้งหมด
Create Connector/v1/connectorsPOSTสร้าง Connector ใหม่อัตโนมัติ
Update Config/v1/connectors/{id}PATCHแก้ไข Schedule Schema Config
Force Sync/v1/connectors/{id}/force-syncPOSTบังคับ Sync ทันที
Pause/Resume/v1/connectors/{id}PATCHหยุด/เริ่ม Connector
Get Status/v1/connectors/{id}GETตรวจ Sync Status

Python Automation Script

# === Fivetran Connector Automation ===

# import requests
# import os
# from datetime import datetime, timedelta
#
# FIVETRAN_API_KEY = os.environ['FIVETRAN_API_KEY']
# FIVETRAN_API_SECRET = os.environ['FIVETRAN_API_SECRET']
# BASE_URL = 'https://api.fivetran.com/v1'
#
# session = requests.Session()
# session.auth = (FIVETRAN_API_KEY, FIVETRAN_API_SECRET)
# session.headers.update({'Content-Type': 'application/json'})
#
# # List all connectors
# def list_connectors(group_id):
#     resp = session.get(f'{BASE_URL}/groups/{group_id}/connectors')
#     return resp.json()['data']['items']
#
# # Force sync a connector
# def force_sync(connector_id):
#     resp = session.post(f'{BASE_URL}/connectors/{connector_id}/force-sync')
#     return resp.json()
#
# # Pause/Resume connector
# def set_paused(connector_id, paused: bool):
#     resp = session.patch(f'{BASE_URL}/connectors/{connector_id}',
#         json={'paused': paused})
#     return resp.json()
#
# # Check connector health
# def check_health(connector_id):
#     resp = session.get(f'{BASE_URL}/connectors/{connector_id}')
#     data = resp.json()['data']
#     status = data['status']
#     return {
#         'id': connector_id,
#         'name': data['schema'],
#         'sync_state': status['sync_state'],
#         'setup_state': status['setup_state'],
#         'is_historical_sync': status.get('is_historical_sync', False),
#         'succeeded_at': data.get('succeeded_at'),
#         'failed_at': data.get('failed_at'),
#     }

from dataclasses import dataclass

@dataclass
class ConnectorConfig:
    name: str
    service: str
    destination: str
    sync_frequency: int
    schema_prefix: str
    paused: bool

connectors = [
    ConnectorConfig("production_mysql",
        "mysql", "Snowflake",
        60, "prod_mysql", False),
    ConnectorConfig("salesforce_crm",
        "salesforce", "BigQuery",
        360, "sf_crm", False),
    ConnectorConfig("stripe_payments",
        "stripe", "Snowflake",
        60, "stripe", False),
    ConnectorConfig("google_analytics",
        "google_analytics_4", "BigQuery",
        720, "ga4", False),
    ConnectorConfig("shopify_store",
        "shopify", "Snowflake",
        120, "shopify", False),
    ConnectorConfig("hubspot_marketing",
        "hubspot", "BigQuery",
        360, "hubspot", True),
]

print("=== Connectors ===")
for c in connectors:
    status = "PAUSED" if c.paused else "ACTIVE"
    print(f"  [{c.name}] {c.service} → {c.destination} [{status}]")
    print(f"    Sync: every {c.sync_frequency} min | Schema: {c.schema_prefix}")

Monitoring & Alerting

# === Connector Monitoring Script ===

# def monitor_all_connectors(group_id):
#     connectors = list_connectors(group_id)
#     alerts = []
#     for conn in connectors:
#         health = check_health(conn['id'])
#         # Check if sync failed
#         if health['sync_state'] == 'failure':
#             alerts.append({
#                 'connector': health['name'],
#                 'issue': 'Sync Failed',
#                 'failed_at': health['failed_at'],
#                 'severity': 'critical'
#             })
#         # Check if sync is stale (no success in 2x frequency)
#         if health['succeeded_at']:
#             last_sync = datetime.fromisoformat(health['succeeded_at'])
#             if datetime.utcnow() - last_sync > timedelta(hours=4):
#                 alerts.append({
#                     'connector': health['name'],
#                     'issue': 'Stale Data',
#                     'last_sync': str(last_sync),
#                     'severity': 'warning'
#                 })
#     return alerts
#
# def send_slack_alert(alerts):
#     webhook = os.environ['SLACK_WEBHOOK']
#     for alert in alerts:
#         requests.post(webhook, json={
#             'text': f"[{alert['severity'].upper()}] {alert['connector']}: {alert['issue']}"
#         })

@dataclass
class AlertRule:
    rule: str
    condition: str
    severity: str
    action: str

rules = [
    AlertRule("Sync Failure",
        "sync_state == 'failure'",
        "CRITICAL",
        "Slack + PagerDuty + Auto-retry 1x"),
    AlertRule("Stale Data",
        "last_sync > 2x sync_frequency",
        "WARNING",
        "Slack + Email + Force Sync"),
    AlertRule("Schema Change",
        "schema_change_detected == true",
        "INFO",
        "Slack + Log + Review Required"),
    AlertRule("Row Count Drop",
        "row_count < 50% of average",
        "WARNING",
        "Slack + Data Quality Check"),
    AlertRule("Historical Sync Stuck",
        "is_historical_sync > 24h",
        "WARNING",
        "Slack + Review Connector Config"),
    AlertRule("API Rate Limit",
        "429 response from Source API",
        "INFO",
        "Log + Adjust Sync Frequency"),
]

print("=== Alert Rules ===")
for r in rules:
    print(f"  [{r.rule}] {r.severity}")
    print(f"    Condition: {r.condition}")
    print(f"    Action: {r.action}")

Terraform IaC

# === Fivetran Terraform Provider ===

# terraform {
#   required_providers {
#     fivetran = {
#       source  = "fivetran/fivetran"
#       version = "~> 1.0"
#     }
#   }
# }
#
# provider "fivetran" {
#   api_key    = var.fivetran_api_key
#   api_secret = var.fivetran_api_secret
# }
#
# resource "fivetran_connector" "mysql_production" {
#   group_id         = fivetran_group.data_team.id
#   service          = "mysql"
#   sync_frequency   = 60
#   paused           = false
#   trust_certificates = true
#
#   destination_schema {
#     prefix = "prod_mysql"
#   }
#
#   config {
#     host     = var.mysql_host
#     port     = 3306
#     user     = var.mysql_user
#     password = var.mysql_password
#     database = "production"
#   }
# }

@dataclass
class TerraformResource:
    resource: str
    purpose: str
    key_config: str
    lifecycle: str

tf_resources = [
    TerraformResource("fivetran_group",
        "Group สำหรับจัดกลุ่ม Connector + Destination",
        "name, destination_id",
        "สร้างก่อน Connector"),
    TerraformResource("fivetran_connector",
        "Connector ดึงข้อมูลจาก Source",
        "service, sync_frequency, config (host/user/pass)",
        "Depends on Group + Destination"),
    TerraformResource("fivetran_connector_schema_config",
        "กำหนด Table/Column ที่ต้องการ Sync",
        "schema_change_handling, schemas",
        "Apply หลัง Connector สร้างเสร็จ"),
    TerraformResource("fivetran_destination",
        "Destination (Snowflake/BigQuery)",
        "service, config (host/database/credentials)",
        "สร้างก่อน Group"),
    TerraformResource("fivetran_dbt_transformation",
        "dbt Transformation หลัง Sync",
        "dbt_project_id, run_tests",
        "Optional สำหรับ Transform"),
]

print("=== Terraform Resources ===")
for t in tf_resources:
    print(f"  [{t.resource}] {t.purpose}")
    print(f"    Config: {t.key_config}")
    print(f"    Lifecycle: {t.lifecycle}")

เคล็ดลับ

Fivetran คืออะไร

Cloud ELT Platform 300+ Connectors MySQL PostgreSQL Salesforce Snowflake BigQuery Auto Schema Migration dbt REST API Schedule Sync

Automation Script ทำอะไรได้

Create Update Pause Resume Force Sync Status History Monitor Alert Batch Terraform IaC Python REST API GitHub Actions CI/CD

ใช้ REST API อย่างไร

API Key Secret Basic Auth GET POST PATCH DELETE /v1/connectors force-sync schemas state Python requests JSON Rate Limit 100/min

Monitor อย่างไร

Sync Status succeeded_at failed_at Cron Slack PagerDuty Webhook Row Count Schema Change SLA Data Freshness Prometheus Grafana Dashboard

สรุป

Fivetran Connector Automation REST API Python Terraform IaC Monitor Alert Slack Sync Schedule Schema Force Sync dbt Production Pipeline

📖 บทความที่เกี่ยวข้อง

Fivetran Connector Scaling Strategy วิธี Scaleอ่านบทความ → Fivetran Connector GitOps Workflowอ่านบทความ → Fivetran Connector Freelance IT Careerอ่านบทความ → Fivetran Connector Service Mesh Setupอ่านบทความ → Fivetran Connector Container Orchestrationอ่านบทความ →

📚 ดูบทความทั้งหมด →