it

Fivetran Connector Automation Script — สร้าง

Fivetran Connector Automation Script — สร้าง

Fivetran Automation

Fivetran Connector Automation Script — สร้าง

Fivetran Connector Automation REST API ELT Pipeline Snowflake BigQuery Sync Schedule Monitor Alert Python Terraform CI/CD

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Kustomize Overlay Production Setup Guide

OperationAPI EndpointMethodUse Case
List Connectors/v1/connectorsGETดูรายการ Connector ทั้งหมด
Create Connector/v1/connectorsPOSTสร้าง Connector ใหม่อัตโนมัติ
Update Config/v1/connectors/{id}PATCHแก้ไข Schedule Schema Config
Force Sync/v1/connectors/{id}/force-syncPOSTบังคับ Sync ทันที
Pause/Resume/v1/connectors/{id}PATCHหยุด/เริ่ม Connector
Get Status/v1/connectors/{id}GETตรวจ Sync Status

Python Automation Script

# === Fivetran Connector Automation ===



# import requests

# import os

# from datetime import datetime, timedelta

#

# FIVETRAN_API_KEY = os.environ['FIVETRAN_API_KEY']

# FIVETRAN_API_SECRET = os.environ['FIVETRAN_API_SECRET']

# BASE_URL = 'https://api.fivetran.com/v1'

#

# session = requests.Session()

# session.auth = (FIVETRAN_API_KEY, FIVETRAN_API_SECRET)

# session.headers.update({'Content-Type': 'application/json'})

#

# # List all connectors

# def list_connectors(group_id):

#     resp = session.get(f'{BASE_URL}/groups/{group_id}/connectors')

#     return resp.json()['data']['items']

#

# # Force sync a connector

# def force_sync(connector_id):

#     resp = session.post(f'{BASE_URL}/connectors/{connector_id}/force-sync')

#     return resp.json()

#

# # Pause/Resume connector

# def set_paused(connector_id, paused: bool):

#     resp = session.patch(f'{BASE_URL}/connectors/{connector_id}',

#         json={'paused': paused})

#     return resp.json()

#

# # Check connector health

# def check_health(connector_id):

#     resp = session.get(f'{BASE_URL}/connectors/{connector_id}')

#     data = resp.json()['data']

#     status = data['status']

#     return {

#         'id': connector_id,

#         'name': data['schema'],

#         'sync_state': status['sync_state'],

#         'setup_state': status['setup_state'],

#         'is_historical_sync': status.get('is_historical_sync', False),

#         'succeeded_at': data.get('succeeded_at'),

#         'failed_at': data.get('failed_at'),

#     }



from dataclasses import dataclass



@dataclass

class ConnectorConfig:

    name: str

    service: str

    destination: str

    sync_frequency: int

    schema_prefix: str

    paused: bool



connectors = [

    ConnectorConfig("production_mysql",

        "mysql", "Snowflake",

        60, "prod_mysql", False),

    ConnectorConfig("salesforce_crm",

        "salesforce", "BigQuery",

        360, "sf_crm", False),

    ConnectorConfig("stripe_payments",

        "stripe", "Snowflake",

        60, "stripe", False),

    ConnectorConfig("google_analytics",

        "google_analytics_4", "BigQuery",

        720, "ga4", False),

    ConnectorConfig("shopify_store",

        "shopify", "Snowflake",

        120, "shopify", False),

    ConnectorConfig("hubspot_marketing",

        "hubspot", "BigQuery",

        360, "hubspot", True),

]



print("=== Connectors ===")

for c in connectors:

    status = "PAUSED" if c.paused else "ACTIVE"

    print(f"  [{c.name}] {c.service} → {c.destination} [{status}]")

    print(f"    Sync: every {c.sync_frequency} min | Schema: {c.schema_prefix}")

Monitoring & Alerting

Fivetran Connector Automation Script — สร้าง
# === Connector Monitoring Script ===



# def monitor_all_connectors(group_id):

#     connectors = list_connectors(group_id)

#     alerts = []

#     for conn in connectors:

#         health = check_health(conn['id'])

#         # Check if sync failed

#         if health['sync_state'] == 'failure':

#             alerts.append({

#                 'connector': health['name'],

#                 'issue': 'Sync Failed',

#                 'failed_at': health['failed_at'],

#                 'severity': 'critical'

#             })

#         # Check if sync is stale (no success in 2x frequency)

#         if health['succeeded_at']:

#             last_sync = datetime.fromisoformat(health['succeeded_at'])

#             if datetime.utcnow() - last_sync > timedelta(hours=4):

#                 alerts.append({

#                     'connector': health['name'],

#                     'issue': 'Stale Data',

#                     'last_sync': str(last_sync),

#                     'severity': 'warning'

#                 })

#     return alerts

#

# def send_slack_alert(alerts):

#     webhook = os.environ['SLACK_WEBHOOK']

#     for alert in alerts:

#         requests.post(webhook, json={

#             'text': f"[{alert['severity'].upper()}] {alert['connector']}: {alert['issue']}"

#         })



@dataclass

class AlertRule:

    rule: str

    condition: str

    severity: str

    action: str



rules = [

    AlertRule("Sync Failure",

        "sync_state == 'failure'",

        "CRITICAL",

        "Slack + PagerDuty + Auto-retry 1x"),

    AlertRule("Stale Data",

        "last_sync > 2x sync_frequency",

        "WARNING",

        "Slack + Email + Force Sync"),

    AlertRule("Schema Change",

        "schema_change_detected == true",

        "INFO",

        "Slack + Log + Review Required"),

    AlertRule("Row Count Drop",

        "row_count < 50% of average",

        "WARNING",

        "Slack + Data Quality Check"),

    AlertRule("Historical Sync Stuck",

        "is_historical_sync > 24h",

        "WARNING",

        "Slack + Review Connector Config"),

    AlertRule("API Rate Limit",

        "429 response from Source API",

        "INFO",

        "Log + Adjust Sync Frequency"),

]



print("=== Alert Rules ===")

for r in rules:

    print(f"  [{r.rule}] {r.severity}")

    print(f"    Condition: {r.condition}")

    print(f"    Action: {r.action}")

Terraform IaC

# === Fivetran Terraform Provider ===



# terraform {

#   required_providers {

#     fivetran = {

#       source  = "fivetran/fivetran"

#       version = "~> 1.0"

#     }

#   }

# }

#

# provider "fivetran" {

#   api_key    = var.fivetran_api_key

#   api_secret = var.fivetran_api_secret

# }

#

# resource "fivetran_connector" "mysql_production" {

#   group_id         = fivetran_group.data_team.id

#   service          = "mysql"

#   sync_frequency   = 60

#   paused           = false

#   trust_certificates = true

#

#   destination_schema {

#     prefix = "prod_mysql"

#   }

#

#   config {

#     host     = var.mysql_host

#     port     = 3306

#     user     = var.mysql_user

#     password = var.mysql_password

#     database = "production"

#   }

# }



@dataclass

class TerraformResource:

    resource: str

    purpose: str

    key_config: str

    lifecycle: str



tf_resources = [

    TerraformResource("fivetran_group",

        "Group สำหรับจัดกลุ่ม Connector + Destination",

        "name, destination_id",

        "สร้างก่อน Connector"),

    TerraformResource("fivetran_connector",

        "Connector ดึงข้อมูลจาก Source",

        "service, sync_frequency, config (host/user/pass)",

        "Depends on Group + Destination"),

    TerraformResource("fivetran_connector_schema_config",

        "กำหนด Table/Column ที่ต้องการ Sync",

        "schema_change_handling, schemas",

        "Apply หลัง Connector สร้างเสร็จ"),

    TerraformResource("fivetran_destination",

        "Destination (Snowflake/BigQuery)",

        "service, config (host/database/credentials)",

        "สร้างก่อน Group"),

    TerraformResource("fivetran_dbt_transformation",

        "dbt Transformation หลัง Sync",

        "dbt_project_id, run_tests",

        "Optional สำหรับ Transform"),

]



print("=== Terraform Resources ===")

for t in tf_resources:

    print(f"  [{t.resource}] {t.purpose}")

    print(f"    Config: {t.key_config}")

    print(f"    Lifecycle: {t.lifecycle}")

เคล็ดลับ

  • IaC: ใช้ Terraform จัดการ Connector เป็น Code Version Control ได้
  • Monitor: ตรวจ Sync Status ทุก 15 นาที แจ้ง Slack ทันทีถ้า Fail
  • Schema: ตั้ง schema_change_handling เป็น ALLOW_ALL สำหรับ Source ที่เปลี่ยนบ่อย
  • Rate Limit: API 100 req/min ใช้ Batch Operations ลด Request
  • dbt: เชื่อม dbt Transform หลัง Sync อัตโนมัติ

Fivetran คืออะไร

Cloud ELT Platform 300+ Connectors MySQL PostgreSQL Salesforce Snowflake BigQuery Auto Schema Migration dbt REST API Schedule Sync

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: CSS Container Queries

อ่านเพิ่ม: Rate Limiting คืออะไร? สอนป้องกัน API ด้วย Rate Limit, Throt · อ่านเพิ่ม: Cloud Computing สำหรับ Developer สอน AWS เบื้องต้น EC2 S3 La · อ่านเพิ่ม: Kubernetes ขั้นสูง สอน Helm Charts, Operators, Service Mesh

แนะนำเพิ่มเติม — ติดตาม XM Signal

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ eBPF Networking Pub Sub Architecture

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง