CircleCI Orbs คืออะไรและใช้สร้าง ETL Pipeline อย่างไร
CircleCI Orbs เป็น reusable packages ของ CircleCI configuration ที่รวม jobs, commands และ executors ไว้ด้วยกัน เหมือน libraries สำหรับ CI/CD pipelines สามารถ share และ reuse ข้าม projects ได้ ลดการเขียน config ซ้ำซ้อน
ETL (Extract, Transform, Load) Pipeline คือกระบวนการดึงข้อมูลจาก sources ต่างๆ แปลงข้อมูลให้อยู่ในรูปแบบที่ต้องการ และ load เข้า destination (data warehouse, data lake) การใช้ CircleCI สำหรับ ETL ให้ข้อดีคือ scheduled pipelines สำหรับ batch ETL, version control สำหรับ ETL code, automated testing สำหรับ data quality และ monitoring/alerting เมื่อ pipeline fail
Orbs ที่มีประโยชน์สำหรับ ETL ได้แก่ circleci/python สำหรับ Python-based ETL, circleci/aws-cli สำหรับ AWS services (S3, Redshift), circleci/gcp-cli สำหรับ Google Cloud (BigQuery), circleci/slack สำหรับ notifications และ custom orbs ที่สร้างเองสำหรับ ETL-specific tasks
ข้อดีของ CircleCI เมื่อเทียบกับ Airflow สำหรับ simple ETL คือ ไม่ต้อง manage infrastructure, built-in scheduling, Docker support ดี, parallel execution และ caching ที่ช่วยเร่ง pipeline
ติดตั้งและตั้งค่า CircleCI สำหรับ Data Pipeline
เริ่มต้นใช้ CircleCI สำหรับ ETL
# === CircleCI Configuration ===
# .circleci/config.yml
version: 2.1
# ใช้ Orbs
orbs:
python: circleci/python@2.1
aws-cli: circleci/aws-cli@4.1
slack: circleci/slack@4.12
# Parameters สำหรับ scheduled runs
parameters:
run-schedule:
type: boolean
default: false
etl-type:
type: string
default: "full"
# Executors
executors:
etl-executor:
docker:
- image: cimg/python:3.11
resource_class: large
environment:
PYTHONUNBUFFERED: "1"
ETL_ENV: "production"
# Commands (reusable steps)
commands:
setup-etl:
description: "Setup ETL environment"
steps:
- checkout
- python/install-packages:
pkg-manager: pip
pip-dependency-file: requirements.txt
- run:
name: Install additional tools
command: |
pip install awscli boto3 pandas pyarrow sqlalchemy
pip install great-expectations dbt-core
validate-data:
description: "Validate data quality"
parameters:
source:
type: string
steps:
- run:
name: "Validate << parameters.source >> data"
command: |
python scripts/validate.py --source << parameters.source >>
notify-status:
description: "Send notification"
parameters:
status:
type: string
steps:
- slack/notify:
event: << parameters.status >>
template: basic_success_1
# Jobs
jobs:
extract:
executor: etl-executor
steps:
- setup-etl
- aws-cli/setup
- run:
name: Extract data from sources
command: |
python etl/extract.py \
--date $(date +%Y-%m-%d) \
--type << pipeline.parameters.etl-type >>
no_output_timeout: 30m
- persist_to_workspace:
root: data
paths: ["raw/*"]
transform:
executor: etl-executor
steps:
- setup-etl
- attach_workspace:
at: data
- run:
name: Transform data
command: |
python etl/transform.py --input data/raw/ --output data/transformed/
- validate-data:
source: "transformed"
- persist_to_workspace:
root: data
paths: ["transformed/*"]
load:
executor: etl-executor
steps:
- setup-etl
- aws-cli/setup
- attach_workspace:
at: data
- run:
name: Load data to warehouse
command: |
python etl/load.py --input data/transformed/ --target warehouse
- notify-status:
status: pass
# Workflows
workflows:
daily-etl:
when: << pipeline.parameters.run-schedule >>
jobs:
- extract
- transform:
requires: [extract]
- load:
requires: [transform]
manual-etl:
unless: << pipeline.parameters.run-schedule >>
jobs:
- extract
- transform:
requires: [extract]
- load:
requires: [transform]
filters:
branches:
only: main
สร้าง Custom Orb สำหรับ ETL
สร้าง reusable ETL Orb
# === Custom ETL Orb ===
# orb.yml — etl-toolkit orb
version: 2.1
description: "ETL Toolkit Orb for data pipeline operations"
executors:
python-etl:
docker:
- image: cimg/python:3.11
resource_class: << parameters.resource_class >>
parameters:
resource_class:
type: string
default: medium
commands:
extract-csv:
description: "Extract data from CSV files"
parameters:
source_path:
type: string
output_path:
type: string
default: "/tmp/extracted"
steps:
- run:
name: "Extract CSV data"
command: |
mkdir -p << parameters.output_path >>
python3 << include(scripts/extract_csv.py) >> \
--source << parameters.source_path >> \
--output << parameters.output_path >>
extract-api:
description: "Extract data from REST API"
parameters:
api_url:
type: string
auth_token_env:
type: env_var_name
default: API_TOKEN
output_path:
type: string
steps:
- run:
name: "Extract API data"
command: |
python3 -c "
import requests, json, os
url = '<< parameters.api_url >>'
token = os.environ.get('<< parameters.auth_token_env >>')
headers = {'Authorization': f'Bearer {token}'} if token else {}
resp = requests.get(url, headers=headers, timeout=60)
resp.raise_for_status()
output = '<< parameters.output_path >>'
os.makedirs(os.path.dirname(output), exist_ok=True)
with open(output, 'w') as f:
json.dump(resp.json(), f)
print(f'Extracted {len(resp.json())} records to {output}')
"
extract-s3:
description: "Extract data from S3"
parameters:
bucket:
type: string
prefix:
type: string
output_path:
type: string
steps:
- run:
name: "Extract S3 data"
command: |
aws s3 sync \
s3://<< parameters.bucket >>/<< parameters.prefix >> \
<< parameters.output_path >> \
--exclude "*.tmp"
echo "Downloaded from s3://<< parameters.bucket >>/<< parameters.prefix >>"
transform-pandas:
description: "Transform data with pandas"
parameters:
script:
type: string
input_path:
type: string
output_path:
type: string
steps:
- run:
name: "Transform with pandas"
command: |
python3 << parameters.script >> \
--input << parameters.input_path >> \
--output << parameters.output_path >>
load-warehouse:
description: "Load data to data warehouse"
parameters:
target:
type: enum
enum: ["redshift", "bigquery", "snowflake", "postgres"]
connection_string_env:
type: env_var_name
table_name:
type: string
input_path:
type: string
steps:
- run:
name: "Load to << parameters.target >>"
command: |
python3 -c "
import pandas as pd
from sqlalchemy import create_engine
import os
conn = os.environ['<< parameters.connection_string_env >>']
engine = create_engine(conn)
df = pd.read_parquet('<< parameters.input_path >>')
df.to_sql('<< parameters.table_name >>', engine, if_exists='append', index=False)
print(f'Loaded {len(df)} rows to << parameters.table_name >>')
"
validate-quality:
description: "Validate data quality"
parameters:
expectations_path:
type: string
data_path:
type: string
steps:
- run:
name: "Validate data quality"
command: |
python3 scripts/validate_quality.py \
--expectations << parameters.expectations_path >> \
--data << parameters.data_path >>
jobs:
etl-pipeline:
executor: python-etl
parameters:
extract_script:
type: string
transform_script:
type: string
load_target:
type: string
steps:
- checkout
- run:
name: Install dependencies
command: pip install -r requirements.txt
- run:
name: Extract
command: python << parameters.extract_script >>
- run:
name: Transform
command: python << parameters.transform_script >>
- run:
name: Load
command: python << parameters.load_target >>
# Publish: circleci orb publish orb.yml myorg/etl-toolkit@1.0.0
ออกแบบ ETL Pipeline ด้วย CircleCI Workflows
Python ETL scripts สำหรับ pipeline
#!/usr/bin/env python3
# etl/extract.py — Data Extraction Module
import argparse
import json
import logging
import os
from datetime import datetime, timedelta
from pathlib import Path
import requests
import boto3
import pandas as pd
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("etl.extract")
class DataExtractor:
def __init__(self, output_dir="data/raw"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def extract_api(self, url, headers=None, params=None):
logger.info(f"Extracting from API: {url}")
resp = requests.get(url, headers=headers, params=params, timeout=120)
resp.raise_for_status()
data = resp.json()
output_path = self.output_dir / f"api_{datetime.now().strftime('%Y%m%d_%H%M')}.json"
with open(output_path, "w") as f:
json.dump(data, f)
logger.info(f"Extracted {len(data) if isinstance(data, list) else 1} records -> {output_path}")
return output_path
def extract_s3(self, bucket, prefix, date_str=None):
logger.info(f"Extracting from S3: s3://{bucket}/{prefix}")
s3 = boto3.client("s3")
if date_str:
prefix = f"{prefix}/{date_str}"
paginator = s3.get_paginator("list_objects_v2")
files_downloaded = 0
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
for obj in page.get("Contents", []):
key = obj["Key"]
local_path = self.output_dir / Path(key).name
s3.download_file(bucket, key, str(local_path))
files_downloaded += 1
logger.info(f"Downloaded {files_downloaded} files from S3")
return files_downloaded
def extract_database(self, connection_string, query, output_name="db_extract"):
from sqlalchemy import create_engine
logger.info(f"Extracting from database...")
engine = create_engine(connection_string)
df = pd.read_sql(query, engine)
output_path = self.output_dir / f"{output_name}_{datetime.now().strftime('%Y%m%d')}.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Extracted {len(df)} rows -> {output_path}")
return output_path
# etl/transform.py — Data Transformation Module
class DataTransformer:
def __init__(self, input_dir="data/raw", output_dir="data/transformed"):
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def transform(self, input_file, transformations=None):
logger.info(f"Transforming: {input_file}")
if str(input_file).endswith(".json"):
df = pd.read_json(input_file)
elif str(input_file).endswith(".parquet"):
df = pd.read_parquet(input_file)
elif str(input_file).endswith(".csv"):
df = pd.read_csv(input_file)
else:
raise ValueError(f"Unsupported format: {input_file}")
initial_rows = len(df)
# Standard transformations
df = df.drop_duplicates()
df = df.dropna(subset=df.columns[:3])
for col in df.select_dtypes(include=["object"]).columns:
df[col] = df[col].str.strip()
# Custom transformations
if transformations:
for t in transformations:
df = t(df)
output_path = self.output_dir / f"transformed_{Path(input_file).stem}.parquet"
df.to_parquet(output_path, index=False)
logger.info(f"Transformed: {initial_rows} -> {len(df)} rows -> {output_path}")
return output_path
# etl/load.py — Data Loading Module
class DataLoader:
def __init__(self, target_type="postgres"):
self.target_type = target_type
def load(self, input_path, table_name, connection_string=None):
logger.info(f"Loading {input_path} to {table_name}")
df = pd.read_parquet(input_path)
if self.target_type in ("postgres", "redshift", "snowflake"):
from sqlalchemy import create_engine
conn = connection_string or os.environ.get("DATABASE_URL")
engine = create_engine(conn)
df.to_sql(table_name, engine, if_exists="append", index=False, method="multi")
elif self.target_type == "s3":
s3_path = connection_string or os.environ.get("S3_OUTPUT_PATH")
df.to_parquet(f"{s3_path}/{table_name}/{datetime.now().strftime('%Y%m%d')}.parquet")
logger.info(f"Loaded {len(df)} rows to {table_name}")
return len(df)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--date", default=datetime.now().strftime("%Y-%m-%d"))
parser.add_argument("--type", default="full")
args = parser.parse_args()
extractor = DataExtractor()
logger.info(f"Starting extraction for {args.date} (type: {args.type})")
Testing และ Data Validation ใน Pipeline
ทดสอบ ETL pipeline และ validate data quality
#!/usr/bin/env python3
# tests/test_etl.py — ETL Pipeline Tests
import pytest
import pandas as pd
import json
from pathlib import Path
from datetime import datetime
class TestExtraction:
def test_api_extraction_returns_data(self):
# Mock API response
sample_data = [
{"id": 1, "name": "Product A", "price": 100},
{"id": 2, "name": "Product B", "price": 200},
]
df = pd.DataFrame(sample_data)
assert len(df) > 0
assert "id" in df.columns
def test_csv_extraction_handles_encoding(self):
test_csv = "id, name, value\n1, test,100\n2, ทดสอบ,200\n"
from io import StringIO
df = pd.read_csv(StringIO(test_csv))
assert len(df) == 2
assert df.iloc[1]["name"] == "ทดสอบ"
def test_extraction_handles_empty_response(self):
df = pd.DataFrame([])
assert len(df) == 0
class TestTransformation:
def test_dedup_removes_duplicates(self):
data = [
{"id": 1, "value": "a"},
{"id": 1, "value": "a"},
{"id": 2, "value": "b"},
]
df = pd.DataFrame(data).drop_duplicates()
assert len(df) == 2
def test_null_handling(self):
data = [
{"id": 1, "name": "A", "value": 100},
{"id": 2, "name": None, "value": 200},
{"id": 3, "name": "C", "value": None},
]
df = pd.DataFrame(data)
df_clean = df.dropna(subset=["name"])
assert len(df_clean) == 2
def test_string_trimming(self):
data = [{"name": " hello "}, {"name": "world "}]
df = pd.DataFrame(data)
df["name"] = df["name"].str.strip()
assert df.iloc[0]["name"] == "hello"
assert df.iloc[1]["name"] == "world"
def test_type_conversion(self):
data = [{"date": "2024-01-15", "amount": "100.50"}]
df = pd.DataFrame(data)
df["date"] = pd.to_datetime(df["date"])
df["amount"] = df["amount"].astype(float)
assert df["amount"].dtype == float
class TestDataQuality:
def test_no_null_primary_keys(self):
data = [{"id": 1}, {"id": 2}, {"id": 3}]
df = pd.DataFrame(data)
assert df["id"].notna().all()
def test_unique_primary_keys(self):
data = [{"id": 1}, {"id": 2}, {"id": 3}]
df = pd.DataFrame(data)
assert df["id"].is_unique
def test_values_in_range(self):
data = [{"price": 10}, {"price": 50}, {"price": 100}]
df = pd.DataFrame(data)
assert (df["price"] >= 0).all()
assert (df["price"] <= 10000).all()
def test_date_not_future(self):
data = [{"date": "2024-01-15"}, {"date": "2024-06-01"}]
df = pd.DataFrame(data)
df["date"] = pd.to_datetime(df["date"])
assert (df["date"] <= pd.Timestamp.now()).all()
def test_row_count_within_expected(self):
data = list(range(100))
df = pd.DataFrame({"id": data})
min_expected = 50
max_expected = 200
assert min_expected <= len(df) <= max_expected
# scripts/validate_quality.py — Great Expectations validation
# import great_expectations as gx
#
# context = gx.get_context()
#
# validator = context.sources.pandas_default.read_parquet(
# "data/transformed/output.parquet"
# )
#
# validator.expect_column_values_to_not_be_null("id")
# validator.expect_column_values_to_be_unique("id")
# validator.expect_column_values_to_be_between("price", min_value=0, max_value=10000)
# validator.expect_table_row_count_to_be_between(min_value=100, max_value=1000000)
#
# results = validator.validate()
# if not results.success:
# print("Data quality check FAILED!")
# for r in results.results:
# if not r.success:
# print(f" FAIL: {r.expectation_config.expectation_type}")
# exit(1)
# print("Data quality check PASSED!")
# Run tests: pytest tests/test_etl.py -v
Monitoring และ Alerting สำหรับ ETL Jobs
ระบบ monitoring สำหรับ ETL pipeline
#!/usr/bin/env python3
# monitoring/etl_monitor.py — ETL Pipeline Monitoring
import requests
import json
import os
import logging
from datetime import datetime, timedelta
from typing import Dict, List
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("etl_monitor")
class CircleCIMonitor:
def __init__(self, token=None, org_slug=None):
self.token = token or os.environ.get("CIRCLECI_TOKEN")
self.org_slug = org_slug or os.environ.get("CIRCLECI_ORG")
self.base_url = "https://circleci.com/api/v2"
self.headers = {"Circle-Token": self.token}
def get_pipeline_runs(self, project_slug, branch="main", limit=20):
resp = requests.get(
f"{self.base_url}/project/{project_slug}/pipeline",
headers=self.headers,
params={"branch": branch},
)
resp.raise_for_status()
return resp.json().get("items", [])[:limit]
def get_workflow_status(self, pipeline_id):
resp = requests.get(
f"{self.base_url}/pipeline/{pipeline_id}/workflow",
headers=self.headers,
)
resp.raise_for_status()
return resp.json().get("items", [])
def get_job_details(self, workflow_id):
resp = requests.get(
f"{self.base_url}/workflow/{workflow_id}/job",
headers=self.headers,
)
resp.raise_for_status()
return resp.json().get("items", [])
def check_etl_health(self, project_slug):
pipelines = self.get_pipeline_runs(project_slug, limit=5)
results = []
for pipeline in pipelines:
workflows = self.get_workflow_status(pipeline["id"])
for wf in workflows:
results.append({
"pipeline_id": pipeline["id"],
"workflow": wf["name"],
"status": wf["status"],
"created_at": wf["created_at"],
"stopped_at": wf.get("stopped_at"),
})
failed = [r for r in results if r["status"] == "failed"]
success_rate = (len(results) - len(failed)) / len(results) * 100 if results else 0
health = {
"total_runs": len(results),
"failed": len(failed),
"success_rate": round(success_rate, 1),
"last_status": results[0]["status"] if results else "unknown",
"last_run": results[0]["created_at"] if results else None,
"healthy": len(failed) == 0,
}
if not health["healthy"]:
self._send_alert(health, failed)
return health
def _send_alert(self, health, failed_runs):
webhook = os.environ.get("SLACK_WEBHOOK")
if not webhook:
return
message = (
f"*ETL Pipeline Alert*\n"
f"Success Rate: {health['success_rate']}%\n"
f"Failed Runs: {len(failed_runs)}\n"
f"Last Status: {health['last_status']}\n"
)
for run in failed_runs[:3]:
message += f"\n- {run['workflow']} failed at {run['created_at']}"
requests.post(webhook, json={"text": message}, timeout=10)
logger.warning(f"Alert sent: {len(failed_runs)} failed ETL runs")
def generate_report(self, project_slug, days=7):
pipelines = self.get_pipeline_runs(project_slug, limit=50)
daily_stats = {}
for pipeline in pipelines:
date = pipeline["created_at"][:10]
if date not in daily_stats:
daily_stats[date] = {"total": 0, "success": 0, "failed": 0}
workflows = self.get_workflow_status(pipeline["id"])
for wf in workflows:
daily_stats[date]["total"] += 1
if wf["status"] == "success":
daily_stats[date]["success"] += 1
elif wf["status"] == "failed":
daily_stats[date]["failed"] += 1
return {
"period_days": days,
"daily_stats": daily_stats,
"generated_at": datetime.utcnow().isoformat(),
}
# monitor = CircleCIMonitor()
# health = monitor.check_etl_health("gh/myorg/etl-pipeline")
# print(json.dumps(health, indent=2))
FAQ คำถามที่พบบ่อย
Q: CircleCI Orbs กับ GitHub Actions reusable workflows ต่างกันอย่างไร?
A: CircleCI Orbs เป็น full packages ที่รวม executors, commands และ jobs ไว้ด้วยกัน publish ผ่าน Orb Registry มี versioning ชัดเจน (semver) GitHub Actions reusable workflows เป็น workflow files ที่ reference ข้าม repos ได้ มี Marketplace สำหรับ individual actions Orbs มี structure ที่ rigorous กว่า เหมาะสำหรับ complex reusable components Actions มี ecosystem ใหญ่กว่าและเข้าถึงง่ายกว่า
Q: CircleCI เหมาะกับ ETL ขนาดไหน?
A: CircleCI เหมาะสำหรับ batch ETL ขนาดเล็กถึงกลาง (ข้อมูล GB-level) ที่รัน scheduled (hourly, daily) resource class ใหญ่สุดมี 128GB RAM และ 64 vCPUs เพียงพอสำหรับ processing datasets หลาย GB ข้อจำกัดคือ max job runtime 5 ชั่วโมง (สำหรับ performance plan) สำหรับ ETL ขนาดใหญ่ (TB-level) หรือ real-time streaming ควรใช้ dedicated tools เช่น Airflow, Spark, Flink
Q: จะ schedule ETL pipeline บน CircleCI อย่างไร?
A: ใช้ Scheduled Pipelines ผ่าน CircleCI UI หรือ API กำหนด cron expression เช่น 0 2 * * * สำหรับทุกวัน 02:00 UTC ใช้ pipeline parameters เพื่อแยก scheduled runs จาก manual runs ข้อดีคือ trigger pipeline ด้วย specific parameters ได้ ดู schedule status ใน dashboard และ manage schedules ผ่าน API
Q: วิธี handle secrets ใน ETL pipeline?
A: ใช้ CircleCI Environment Variables (Project Settings -> Environment Variables) สำหรับ database credentials, API keys, cloud credentials ใช้ Contexts สำหรับ share secrets ข้าม projects ห้าม hardcode secrets ใน config.yml ใช้ OIDC tokens สำหรับ AWS/GCP แทน static credentials เมื่อเป็นไปได้ Rotate secrets เป็นประจำ และ audit access logs
