SiamCafe · Blog
Fivetran Connector Automation Script — สร้าง
บทความ

Fivetran Connector Automation Script — สร้าง

เผยแพร่ 28 พฤษภาคม 2569

Fivetran Automation

Fivetran Connector Automation REST API ELT Pipeline Snowflake BigQuery Sync Schedule Monitor Alert Python Terraform CI/CD

OperationAPI EndpointMethodUse Case
List Connectors/v1/connectorsGETดูรายการ Connector ทั้งหมด
Create Connector/v1/connectorsPOSTสร้าง Connector ใหม่อัตโนมัติ
Update Config/v1/connectors/{id}PATCHแก้ไข Schedule Schema Config
Force Sync/v1/connectors/{id}/force-syncPOSTบังคับ Sync ทันที
Pause/Resume/v1/connectors/{id}PATCHหยุด/เริ่ม Connector
Get Status/v1/connectors/{id}GETตรวจ Sync Status

Python Automation Script

# === Fivetran Connector Automation ===

# import requests
# import os
# from datetime import datetime, timedelta
#
# FIVETRAN_API_KEY = os.environ['FIVETRAN_API_KEY']
# FIVETRAN_API_SECRET = os.environ['FIVETRAN_API_SECRET']
# BASE_URL = 'https://api.fivetran.com/v1'
#
# session = requests.Session()
# session.auth = (FIVETRAN_API_KEY, FIVETRAN_API_SECRET)
# session.headers.update({'Content-Type': 'application/json'})
#
# # List all connectors
# def list_connectors(group_id):
#     resp = session.get(f'{BASE_URL}/groups/{group_id}/connectors')
#     return resp.json()['data']['items']
#
# # Force sync a connector
# def force_sync(connector_id):
#     resp = session.post(f'{BASE_URL}/connectors/{connector_id}/force-sync')
#     return resp.json()
#
# # Pause/Resume connector
# def set_paused(connector_id, paused: bool):
#     resp = session.patch(f'{BASE_URL}/connectors/{connector_id}',
#         json={'paused': paused})
#     return resp.json()
#
# # Check connector health
# def check_health(connector_id):
#     resp = session.get(f'{BASE_URL}/connectors/{connector_id}')
#     data = resp.json()['data']
#     status = data['status']
#     return {
#         'id': connector_id,
#         'name': data['schema'],
#         'sync_state': status['sync_state'],
#         'setup_state': status['setup_state'],
#         'is_historical_sync': status.get('is_historical_sync', False),
#         'succeeded_at': data.get('succeeded_at'),
#         'failed_at': data.get('failed_at'),
#     }

from dataclasses import dataclass

@dataclass
class ConnectorConfig:
    name: str
    service: str
    destination: str
    sync_frequency: int
    schema_prefix: str
    paused: bool

connectors = [
    ConnectorConfig("production_mysql",
        "mysql", "Snowflake",
        60, "prod_mysql", False),
    ConnectorConfig("salesforce_crm",
        "salesforce", "BigQuery",
        360, "sf_crm", False),
    ConnectorConfig("stripe_payments",
        "stripe", "Snowflake",
        60, "stripe", False),
    ConnectorConfig("google_analytics",
        "google_analytics_4", "BigQuery",
        720, "ga4", False),
    ConnectorConfig("shopify_store",
        "shopify", "Snowflake",
        120, "shopify", False),
    ConnectorConfig("hubspot_marketing",
        "hubspot", "BigQuery",
        360, "hubspot", True),
]

print("=== Connectors ===")
for c in connectors:
    status = "PAUSED" if c.paused else "ACTIVE"
    print(f"  [{c.name}] {c.service} → {c.destination} [{status}]")
    print(f"    Sync: every {c.sync_frequency} min | Schema: {c.schema_prefix}")

Monitoring & Alerting

# === Connector Monitoring Script ===

# def monitor_all_connectors(group_id):
#     connectors = list_connectors(group_id)
#     alerts = []
#     for conn in connectors:
#         health = check_health(conn['id'])
#         # Check if sync failed
#         if health['sync_state'] == 'failure':
#             alerts.append({
#                 'connector': health['name'],
#                 'issue': 'Sync Failed',
#                 'failed_at': health['failed_at'],
#                 'severity': 'critical'
#             })
#         # Check if sync is stale (no success in 2x frequency)
#         if health['succeeded_at']:
#             last_sync = datetime.fromisoformat(health['succeeded_at'])
#             if datetime.utcnow() - last_sync > timedelta(hours=4):
#                 alerts.append({
#                     'connector': health['name'],
#                     'issue': 'Stale Data',
#                     'last_sync': str(last_sync),
#                     'severity': 'warning'
#                 })
#     return alerts
#
# def send_slack_alert(alerts):
#     webhook = os.environ['SLACK_WEBHOOK']
#     for alert in alerts:
#         requests.post(webhook, json={
#             'text': f"[{alert['severity'].upper()}] {alert['connector']}: {alert['issue']}"
#         })

@dataclass
class AlertRule:
    rule: str
    condition: str
    severity: str
    action: str

rules = [
    AlertRule("Sync Failure",
        "sync_state == 'failure'",
        "CRITICAL",
        "Slack + PagerDuty + Auto-retry 1x"),
    AlertRule("Stale Data",
        "last_sync > 2x sync_frequency",
        "WARNING",
        "Slack + Email + Force Sync"),
    AlertRule("Schema Change",
        "schema_change_detected == true",
        "INFO",
        "Slack + Log + Review Required"),
    AlertRule("Row Count Drop",
        "row_count < 50% of average",
        "WARNING",
        "Slack + Data Quality Check"),
    AlertRule("Historical Sync Stuck",
        "is_historical_sync > 24h",
        "WARNING",
        "Slack + Review Connector Config"),
    AlertRule("API Rate Limit",
        "429 response from Source API",
        "INFO",
        "Log + Adjust Sync Frequency"),
]

print("=== Alert Rules ===")
for r in rules:
    print(f"  [{r.rule}] {r.severity}")
    print(f"    Condition: {r.condition}")
    print(f"    Action: {r.action}")

Terraform IaC

# === Fivetran Terraform Provider ===

# terraform {
#   required_providers {
#     fivetran = {
#       source  = "fivetran/fivetran"
#       version = "~> 1.0"
#     }
#   }
# }
#
# provider "fivetran" {
#   api_key    = var.fivetran_api_key
#   api_secret = var.fivetran_api_secret
# }
#
# resource "fivetran_connector" "mysql_production" {
#   group_id         = fivetran_group.data_team.id
#   service          = "mysql"
#   sync_frequency   = 60
#   paused           = false
#   trust_certificates = true
#
#   destination_schema {
#     prefix = "prod_mysql"
#   }
#
#   config {
#     host     = var.mysql_host
#     port     = 3306
#     user     = var.mysql_user
#     password = var.mysql_password
#     database = "production"
#   }
# }

@dataclass
class TerraformResource:
    resource: str
    purpose: str
    key_config: str
    lifecycle: str

tf_resources = [
    TerraformResource("fivetran_group",
        "Group สำหรับจัดกลุ่ม Connector + Destination",
        "name, destination_id",
        "สร้างก่อน Connector"),
    TerraformResource("fivetran_connector",
        "Connector ดึงข้อมูลจาก Source",
        "service, sync_frequency, config (host/user/pass)",
        "Depends on Group + Destination"),
    TerraformResource("fivetran_connector_schema_config",
        "กำหนด Table/Column ที่ต้องการ Sync",
        "schema_change_handling, schemas",
        "Apply หลัง Connector สร้างเสร็จ"),
    TerraformResource("fivetran_destination",
        "Destination (Snowflake/BigQuery)",
        "service, config (host/database/credentials)",
        "สร้างก่อน Group"),
    TerraformResource("fivetran_dbt_transformation",
        "dbt Transformation หลัง Sync",
        "dbt_project_id, run_tests",
        "Optional สำหรับ Transform"),
]

print("=== Terraform Resources ===")
for t in tf_resources:
    print(f"  [{t.resource}] {t.purpose}")
    print(f"    Config: {t.key_config}")
    print(f"    Lifecycle: {t.lifecycle}")

เคล็ดลับ

  • IaC: ใช้ Terraform จัดการ Connector เป็น Code Version Control ได้
  • Monitor: ตรวจ Sync Status ทุก 15 นาที แจ้ง Slack ทันทีถ้า Fail
  • Schema: ตั้ง schema_change_handling เป็น ALLOW_ALL สำหรับ Source ที่เปลี่ยนบ่อย
  • Rate Limit: API 100 req/min ใช้ Batch Operations ลด Request
  • dbt: เชื่อม dbt Transform หลัง Sync อัตโนมัติ

Fivetran คืออะไร

Cloud ELT Platform 300+ Connectors MySQL PostgreSQL Salesforce Snowflake BigQuery Auto Schema Migration dbt REST API Schedule Sync

อ่านเพิ่ม: Rate Limiting คืออะไร? สอนป้องกัน API ด้วย Rate Limit, Throt · อ่านเพิ่ม: Cloud Computing สำหรับ Developer สอน AWS เบื้องต้น EC2 S3 La · อ่านเพิ่ม: Kubernetes ขั้นสูง สอน Helm Charts, Operators, Service Mesh