Technology

CircleCI Orbs Data Pipeline ETL — สร้าง ETL Pipeline ด้วย CircleCI Orbs

circleci orbs data pipeline etl
CircleCI Orbs Data Pipeline ETL | SiamCafe Blog
2025-12-06· อ. บอม — SiamCafe.net· 1,692 คำ

CircleCI Orbs คืออะไรและใช้สร้าง ETL Pipeline อย่างไร

CircleCI Orbs เป็น reusable packages ของ CircleCI configuration ที่รวม jobs, commands และ executors ไว้ด้วยกัน เหมือน libraries สำหรับ CI/CD pipelines สามารถ share และ reuse ข้าม projects ได้ ลดการเขียน config ซ้ำซ้อน

ETL (Extract, Transform, Load) Pipeline คือกระบวนการดึงข้อมูลจาก sources ต่างๆ แปลงข้อมูลให้อยู่ในรูปแบบที่ต้องการ และ load เข้า destination (data warehouse, data lake) การใช้ CircleCI สำหรับ ETL ให้ข้อดีคือ scheduled pipelines สำหรับ batch ETL, version control สำหรับ ETL code, automated testing สำหรับ data quality และ monitoring/alerting เมื่อ pipeline fail

Orbs ที่มีประโยชน์สำหรับ ETL ได้แก่ circleci/python สำหรับ Python-based ETL, circleci/aws-cli สำหรับ AWS services (S3, Redshift), circleci/gcp-cli สำหรับ Google Cloud (BigQuery), circleci/slack สำหรับ notifications และ custom orbs ที่สร้างเองสำหรับ ETL-specific tasks

ข้อดีของ CircleCI เมื่อเทียบกับ Airflow สำหรับ simple ETL คือ ไม่ต้อง manage infrastructure, built-in scheduling, Docker support ดี, parallel execution และ caching ที่ช่วยเร่ง pipeline

ติดตั้งและตั้งค่า CircleCI สำหรับ Data Pipeline

เริ่มต้นใช้ CircleCI สำหรับ ETL

# === CircleCI Configuration ===
# .circleci/config.yml

version: 2.1

# ใช้ Orbs
orbs:
  python: circleci/python@2.1
  aws-cli: circleci/aws-cli@4.1
  slack: circleci/slack@4.12

# Parameters สำหรับ scheduled runs
parameters:
  run-schedule:
    type: boolean
    default: false
  etl-type:
    type: string
    default: "full"

# Executors
executors:
  etl-executor:
    docker:
      - image: cimg/python:3.11
    resource_class: large
    environment:
      PYTHONUNBUFFERED: "1"
      ETL_ENV: "production"

# Commands (reusable steps)
commands:
  setup-etl:
    description: "Setup ETL environment"
    steps:
      - checkout
      - python/install-packages:
          pkg-manager: pip
          pip-dependency-file: requirements.txt
      - run:
          name: Install additional tools
          command: |
            pip install awscli boto3 pandas pyarrow sqlalchemy
            pip install great-expectations dbt-core

  validate-data:
    description: "Validate data quality"
    parameters:
      source:
        type: string
    steps:
      - run:
          name: "Validate << parameters.source >> data"
          command: |
            python scripts/validate.py --source << parameters.source >>

  notify-status:
    description: "Send notification"
    parameters:
      status:
        type: string
    steps:
      - slack/notify:
          event: << parameters.status >>
          template: basic_success_1

# Jobs
jobs:
  extract:
    executor: etl-executor
    steps:
      - setup-etl
      - aws-cli/setup
      - run:
          name: Extract data from sources
          command: |
            python etl/extract.py \
              --date $(date +%Y-%m-%d) \
              --type << pipeline.parameters.etl-type >>
          no_output_timeout: 30m
      - persist_to_workspace:
          root: data
          paths: ["raw/*"]

  transform:
    executor: etl-executor
    steps:
      - setup-etl
      - attach_workspace:
          at: data
      - run:
          name: Transform data
          command: |
            python etl/transform.py --input data/raw/ --output data/transformed/
      - validate-data:
          source: "transformed"
      - persist_to_workspace:
          root: data
          paths: ["transformed/*"]

  load:
    executor: etl-executor
    steps:
      - setup-etl
      - aws-cli/setup
      - attach_workspace:
          at: data
      - run:
          name: Load data to warehouse
          command: |
            python etl/load.py --input data/transformed/ --target warehouse
      - notify-status:
          status: pass

# Workflows
workflows:
  daily-etl:
    when: << pipeline.parameters.run-schedule >>
    jobs:
      - extract
      - transform:
          requires: [extract]
      - load:
          requires: [transform]

  manual-etl:
    unless: << pipeline.parameters.run-schedule >>
    jobs:
      - extract
      - transform:
          requires: [extract]
      - load:
          requires: [transform]
          filters:
            branches:
              only: main

สร้าง Custom Orb สำหรับ ETL

สร้าง reusable ETL Orb

# === Custom ETL Orb ===
# orb.yml — etl-toolkit orb

version: 2.1

description: "ETL Toolkit Orb for data pipeline operations"

executors:
  python-etl:
    docker:
      - image: cimg/python:3.11
    resource_class: << parameters.resource_class >>
    parameters:
      resource_class:
        type: string
        default: medium

commands:
  extract-csv:
    description: "Extract data from CSV files"
    parameters:
      source_path:
        type: string
      output_path:
        type: string
        default: "/tmp/extracted"
    steps:
      - run:
          name: "Extract CSV data"
          command: |
            mkdir -p << parameters.output_path >>
            python3 << include(scripts/extract_csv.py) >> \
              --source << parameters.source_path >> \
              --output << parameters.output_path >>

  extract-api:
    description: "Extract data from REST API"
    parameters:
      api_url:
        type: string
      auth_token_env:
        type: env_var_name
        default: API_TOKEN
      output_path:
        type: string
    steps:
      - run:
          name: "Extract API data"
          command: |
            python3 -c "
            import requests, json, os
            url = '<< parameters.api_url >>'
            token = os.environ.get('<< parameters.auth_token_env >>')
            headers = {'Authorization': f'Bearer {token}'} if token else {}
            
            resp = requests.get(url, headers=headers, timeout=60)
            resp.raise_for_status()
            
            output = '<< parameters.output_path >>'
            os.makedirs(os.path.dirname(output), exist_ok=True)
            with open(output, 'w') as f:
                json.dump(resp.json(), f)
            print(f'Extracted {len(resp.json())} records to {output}')
            "

  extract-s3:
    description: "Extract data from S3"
    parameters:
      bucket:
        type: string
      prefix:
        type: string
      output_path:
        type: string
    steps:
      - run:
          name: "Extract S3 data"
          command: |
            aws s3 sync \
              s3://<< parameters.bucket >>/<< parameters.prefix >> \
              << parameters.output_path >> \
              --exclude "*.tmp"
            echo "Downloaded from s3://<< parameters.bucket >>/<< parameters.prefix >>"

  transform-pandas:
    description: "Transform data with pandas"
    parameters:
      script:
        type: string
      input_path:
        type: string
      output_path:
        type: string
    steps:
      - run:
          name: "Transform with pandas"
          command: |
            python3 << parameters.script >> \
              --input << parameters.input_path >> \
              --output << parameters.output_path >>

  load-warehouse:
    description: "Load data to data warehouse"
    parameters:
      target:
        type: enum
        enum: ["redshift", "bigquery", "snowflake", "postgres"]
      connection_string_env:
        type: env_var_name
      table_name:
        type: string
      input_path:
        type: string
    steps:
      - run:
          name: "Load to << parameters.target >>"
          command: |
            python3 -c "
            import pandas as pd
            from sqlalchemy import create_engine
            import os
            
            conn = os.environ['<< parameters.connection_string_env >>']
            engine = create_engine(conn)
            
            df = pd.read_parquet('<< parameters.input_path >>')
            df.to_sql('<< parameters.table_name >>', engine, if_exists='append', index=False)
            print(f'Loaded {len(df)} rows to << parameters.table_name >>')
            "

  validate-quality:
    description: "Validate data quality"
    parameters:
      expectations_path:
        type: string
      data_path:
        type: string
    steps:
      - run:
          name: "Validate data quality"
          command: |
            python3 scripts/validate_quality.py \
              --expectations << parameters.expectations_path >> \
              --data << parameters.data_path >>

jobs:
  etl-pipeline:
    executor: python-etl
    parameters:
      extract_script:
        type: string
      transform_script:
        type: string
      load_target:
        type: string
    steps:
      - checkout
      - run:
          name: Install dependencies
          command: pip install -r requirements.txt
      - run:
          name: Extract
          command: python << parameters.extract_script >>
      - run:
          name: Transform
          command: python << parameters.transform_script >>
      - run:
          name: Load
          command: python << parameters.load_target >>

# Publish: circleci orb publish orb.yml myorg/etl-toolkit@1.0.0

ออกแบบ ETL Pipeline ด้วย CircleCI Workflows

Python ETL scripts สำหรับ pipeline

#!/usr/bin/env python3
# etl/extract.py — Data Extraction Module
import argparse
import json
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
import requests
import boto3
import pandas as pd

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("etl.extract")

class DataExtractor:
    def __init__(self, output_dir="data/raw"):
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def extract_api(self, url, headers=None, params=None):
        logger.info(f"Extracting from API: {url}")
        resp = requests.get(url, headers=headers, params=params, timeout=120)
        resp.raise_for_status()
        
        data = resp.json()
        output_path = self.output_dir / f"api_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
        
        with open(output_path, "w") as f:
            json.dump(data, f)
        
        logger.info(f"Extracted {len(data) if isinstance(data, list) else 1} records -> {output_path}")
        return output_path
    
    def extract_s3(self, bucket, prefix, date_str=None):
        logger.info(f"Extracting from S3: s3://{bucket}/{prefix}")
        s3 = boto3.client("s3")
        
        if date_str:
            prefix = f"{prefix}/{date_str}"
        
        paginator = s3.get_paginator("list_objects_v2")
        files_downloaded = 0
        
        for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
            for obj in page.get("Contents", []):
                key = obj["Key"]
                local_path = self.output_dir / Path(key).name
                s3.download_file(bucket, key, str(local_path))
                files_downloaded += 1
        
        logger.info(f"Downloaded {files_downloaded} files from S3")
        return files_downloaded
    
    def extract_database(self, connection_string, query, output_name="db_extract"):
        from sqlalchemy import create_engine
        
        logger.info(f"Extracting from database...")
        engine = create_engine(connection_string)
        
        df = pd.read_sql(query, engine)
        output_path = self.output_dir / f"{output_name}_{datetime.now().strftime('%Y%m%d')}.parquet"
        df.to_parquet(output_path, index=False)
        
        logger.info(f"Extracted {len(df)} rows -> {output_path}")
        return output_path

# etl/transform.py — Data Transformation Module
class DataTransformer:
    def __init__(self, input_dir="data/raw", output_dir="data/transformed"):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(parents=True, exist_ok=True)
    
    def transform(self, input_file, transformations=None):
        logger.info(f"Transforming: {input_file}")
        
        if str(input_file).endswith(".json"):
            df = pd.read_json(input_file)
        elif str(input_file).endswith(".parquet"):
            df = pd.read_parquet(input_file)
        elif str(input_file).endswith(".csv"):
            df = pd.read_csv(input_file)
        else:
            raise ValueError(f"Unsupported format: {input_file}")
        
        initial_rows = len(df)
        
        # Standard transformations
        df = df.drop_duplicates()
        df = df.dropna(subset=df.columns[:3])
        
        for col in df.select_dtypes(include=["object"]).columns:
            df[col] = df[col].str.strip()
        
        # Custom transformations
        if transformations:
            for t in transformations:
                df = t(df)
        
        output_path = self.output_dir / f"transformed_{Path(input_file).stem}.parquet"
        df.to_parquet(output_path, index=False)
        
        logger.info(f"Transformed: {initial_rows} -> {len(df)} rows -> {output_path}")
        return output_path

# etl/load.py — Data Loading Module  
class DataLoader:
    def __init__(self, target_type="postgres"):
        self.target_type = target_type
    
    def load(self, input_path, table_name, connection_string=None):
        logger.info(f"Loading {input_path} to {table_name}")
        
        df = pd.read_parquet(input_path)
        
        if self.target_type in ("postgres", "redshift", "snowflake"):
            from sqlalchemy import create_engine
            conn = connection_string or os.environ.get("DATABASE_URL")
            engine = create_engine(conn)
            df.to_sql(table_name, engine, if_exists="append", index=False, method="multi")
        
        elif self.target_type == "s3":
            s3_path = connection_string or os.environ.get("S3_OUTPUT_PATH")
            df.to_parquet(f"{s3_path}/{table_name}/{datetime.now().strftime('%Y%m%d')}.parquet")
        
        logger.info(f"Loaded {len(df)} rows to {table_name}")
        return len(df)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--date", default=datetime.now().strftime("%Y-%m-%d"))
    parser.add_argument("--type", default="full")
    args = parser.parse_args()
    
    extractor = DataExtractor()
    logger.info(f"Starting extraction for {args.date} (type: {args.type})")

Testing และ Data Validation ใน Pipeline

ทดสอบ ETL pipeline และ validate data quality

#!/usr/bin/env python3
# tests/test_etl.py — ETL Pipeline Tests
import pytest
import pandas as pd
import json
from pathlib import Path
from datetime import datetime

class TestExtraction:
    def test_api_extraction_returns_data(self):
        # Mock API response
        sample_data = [
            {"id": 1, "name": "Product A", "price": 100},
            {"id": 2, "name": "Product B", "price": 200},
        ]
        df = pd.DataFrame(sample_data)
        assert len(df) > 0
        assert "id" in df.columns
    
    def test_csv_extraction_handles_encoding(self):
        test_csv = "id, name, value\n1, test,100\n2, ทดสอบ,200\n"
        from io import StringIO
        df = pd.read_csv(StringIO(test_csv))
        assert len(df) == 2
        assert df.iloc[1]["name"] == "ทดสอบ"
    
    def test_extraction_handles_empty_response(self):
        df = pd.DataFrame([])
        assert len(df) == 0

class TestTransformation:
    def test_dedup_removes_duplicates(self):
        data = [
            {"id": 1, "value": "a"},
            {"id": 1, "value": "a"},
            {"id": 2, "value": "b"},
        ]
        df = pd.DataFrame(data).drop_duplicates()
        assert len(df) == 2
    
    def test_null_handling(self):
        data = [
            {"id": 1, "name": "A", "value": 100},
            {"id": 2, "name": None, "value": 200},
            {"id": 3, "name": "C", "value": None},
        ]
        df = pd.DataFrame(data)
        df_clean = df.dropna(subset=["name"])
        assert len(df_clean) == 2
    
    def test_string_trimming(self):
        data = [{"name": "  hello  "}, {"name": "world  "}]
        df = pd.DataFrame(data)
        df["name"] = df["name"].str.strip()
        assert df.iloc[0]["name"] == "hello"
        assert df.iloc[1]["name"] == "world"
    
    def test_type_conversion(self):
        data = [{"date": "2024-01-15", "amount": "100.50"}]
        df = pd.DataFrame(data)
        df["date"] = pd.to_datetime(df["date"])
        df["amount"] = df["amount"].astype(float)
        assert df["amount"].dtype == float

class TestDataQuality:
    def test_no_null_primary_keys(self):
        data = [{"id": 1}, {"id": 2}, {"id": 3}]
        df = pd.DataFrame(data)
        assert df["id"].notna().all()
    
    def test_unique_primary_keys(self):
        data = [{"id": 1}, {"id": 2}, {"id": 3}]
        df = pd.DataFrame(data)
        assert df["id"].is_unique
    
    def test_values_in_range(self):
        data = [{"price": 10}, {"price": 50}, {"price": 100}]
        df = pd.DataFrame(data)
        assert (df["price"] >= 0).all()
        assert (df["price"] <= 10000).all()
    
    def test_date_not_future(self):
        data = [{"date": "2024-01-15"}, {"date": "2024-06-01"}]
        df = pd.DataFrame(data)
        df["date"] = pd.to_datetime(df["date"])
        assert (df["date"] <= pd.Timestamp.now()).all()
    
    def test_row_count_within_expected(self):
        data = list(range(100))
        df = pd.DataFrame({"id": data})
        min_expected = 50
        max_expected = 200
        assert min_expected <= len(df) <= max_expected

# scripts/validate_quality.py — Great Expectations validation
# import great_expectations as gx
# 
# context = gx.get_context()
# 
# validator = context.sources.pandas_default.read_parquet(
#     "data/transformed/output.parquet"
# )
# 
# validator.expect_column_values_to_not_be_null("id")
# validator.expect_column_values_to_be_unique("id")
# validator.expect_column_values_to_be_between("price", min_value=0, max_value=10000)
# validator.expect_table_row_count_to_be_between(min_value=100, max_value=1000000)
# 
# results = validator.validate()
# if not results.success:
#     print("Data quality check FAILED!")
#     for r in results.results:
#         if not r.success:
#             print(f"  FAIL: {r.expectation_config.expectation_type}")
#     exit(1)
# print("Data quality check PASSED!")

# Run tests: pytest tests/test_etl.py -v

Monitoring และ Alerting สำหรับ ETL Jobs

ระบบ monitoring สำหรับ ETL pipeline

#!/usr/bin/env python3
# monitoring/etl_monitor.py — ETL Pipeline Monitoring
import requests
import json
import os
import logging
from datetime import datetime, timedelta
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("etl_monitor")

class CircleCIMonitor:
    def __init__(self, token=None, org_slug=None):
        self.token = token or os.environ.get("CIRCLECI_TOKEN")
        self.org_slug = org_slug or os.environ.get("CIRCLECI_ORG")
        self.base_url = "https://circleci.com/api/v2"
        self.headers = {"Circle-Token": self.token}
    
    def get_pipeline_runs(self, project_slug, branch="main", limit=20):
        resp = requests.get(
            f"{self.base_url}/project/{project_slug}/pipeline",
            headers=self.headers,
            params={"branch": branch},
        )
        resp.raise_for_status()
        return resp.json().get("items", [])[:limit]
    
    def get_workflow_status(self, pipeline_id):
        resp = requests.get(
            f"{self.base_url}/pipeline/{pipeline_id}/workflow",
            headers=self.headers,
        )
        resp.raise_for_status()
        return resp.json().get("items", [])
    
    def get_job_details(self, workflow_id):
        resp = requests.get(
            f"{self.base_url}/workflow/{workflow_id}/job",
            headers=self.headers,
        )
        resp.raise_for_status()
        return resp.json().get("items", [])
    
    def check_etl_health(self, project_slug):
        pipelines = self.get_pipeline_runs(project_slug, limit=5)
        
        results = []
        for pipeline in pipelines:
            workflows = self.get_workflow_status(pipeline["id"])
            
            for wf in workflows:
                results.append({
                    "pipeline_id": pipeline["id"],
                    "workflow": wf["name"],
                    "status": wf["status"],
                    "created_at": wf["created_at"],
                    "stopped_at": wf.get("stopped_at"),
                })
        
        failed = [r for r in results if r["status"] == "failed"]
        success_rate = (len(results) - len(failed)) / len(results) * 100 if results else 0
        
        health = {
            "total_runs": len(results),
            "failed": len(failed),
            "success_rate": round(success_rate, 1),
            "last_status": results[0]["status"] if results else "unknown",
            "last_run": results[0]["created_at"] if results else None,
            "healthy": len(failed) == 0,
        }
        
        if not health["healthy"]:
            self._send_alert(health, failed)
        
        return health
    
    def _send_alert(self, health, failed_runs):
        webhook = os.environ.get("SLACK_WEBHOOK")
        if not webhook:
            return
        
        message = (
            f"*ETL Pipeline Alert*\n"
            f"Success Rate: {health['success_rate']}%\n"
            f"Failed Runs: {len(failed_runs)}\n"
            f"Last Status: {health['last_status']}\n"
        )
        
        for run in failed_runs[:3]:
            message += f"\n- {run['workflow']} failed at {run['created_at']}"
        
        requests.post(webhook, json={"text": message}, timeout=10)
        logger.warning(f"Alert sent: {len(failed_runs)} failed ETL runs")
    
    def generate_report(self, project_slug, days=7):
        pipelines = self.get_pipeline_runs(project_slug, limit=50)
        
        daily_stats = {}
        
        for pipeline in pipelines:
            date = pipeline["created_at"][:10]
            if date not in daily_stats:
                daily_stats[date] = {"total": 0, "success": 0, "failed": 0}
            
            workflows = self.get_workflow_status(pipeline["id"])
            for wf in workflows:
                daily_stats[date]["total"] += 1
                if wf["status"] == "success":
                    daily_stats[date]["success"] += 1
                elif wf["status"] == "failed":
                    daily_stats[date]["failed"] += 1
        
        return {
            "period_days": days,
            "daily_stats": daily_stats,
            "generated_at": datetime.utcnow().isoformat(),
        }

# monitor = CircleCIMonitor()
# health = monitor.check_etl_health("gh/myorg/etl-pipeline")
# print(json.dumps(health, indent=2))

FAQ คำถามที่พบบ่อย

Q: CircleCI Orbs กับ GitHub Actions reusable workflows ต่างกันอย่างไร?

A: CircleCI Orbs เป็น full packages ที่รวม executors, commands และ jobs ไว้ด้วยกัน publish ผ่าน Orb Registry มี versioning ชัดเจน (semver) GitHub Actions reusable workflows เป็น workflow files ที่ reference ข้าม repos ได้ มี Marketplace สำหรับ individual actions Orbs มี structure ที่ rigorous กว่า เหมาะสำหรับ complex reusable components Actions มี ecosystem ใหญ่กว่าและเข้าถึงง่ายกว่า

Q: CircleCI เหมาะกับ ETL ขนาดไหน?

A: CircleCI เหมาะสำหรับ batch ETL ขนาดเล็กถึงกลาง (ข้อมูล GB-level) ที่รัน scheduled (hourly, daily) resource class ใหญ่สุดมี 128GB RAM และ 64 vCPUs เพียงพอสำหรับ processing datasets หลาย GB ข้อจำกัดคือ max job runtime 5 ชั่วโมง (สำหรับ performance plan) สำหรับ ETL ขนาดใหญ่ (TB-level) หรือ real-time streaming ควรใช้ dedicated tools เช่น Airflow, Spark, Flink

Q: จะ schedule ETL pipeline บน CircleCI อย่างไร?

A: ใช้ Scheduled Pipelines ผ่าน CircleCI UI หรือ API กำหนด cron expression เช่น 0 2 * * * สำหรับทุกวัน 02:00 UTC ใช้ pipeline parameters เพื่อแยก scheduled runs จาก manual runs ข้อดีคือ trigger pipeline ด้วย specific parameters ได้ ดู schedule status ใน dashboard และ manage schedules ผ่าน API

Q: วิธี handle secrets ใน ETL pipeline?

A: ใช้ CircleCI Environment Variables (Project Settings -> Environment Variables) สำหรับ database credentials, API keys, cloud credentials ใช้ Contexts สำหรับ share secrets ข้าม projects ห้าม hardcode secrets ใน config.yml ใช้ OIDC tokens สำหรับ AWS/GCP แทน static credentials เมื่อเป็นไปได้ Rotate secrets เป็นประจำ และ audit access logs

📖 บทความที่เกี่ยวข้อง

CircleCI Orbs Team Productivityอ่านบทความ → React Server Components Data Pipeline ETLอ่านบทความ → CircleCI Orbs Cost Optimization ลดค่าใช้จ่ายอ่านบทความ → CircleCI Orbs Container Orchestrationอ่านบทความ → CircleCI Orbs Schema Evolutionอ่านบทความ →

📚 ดูบทความทั้งหมด →