Airbyte Hexagonal Architecture
Airbyte ETL Hexagonal Architecture Ports Adapters Connector Design Testing Deployment Production Data Pipeline
| Layer | Role | Contains | Depends On | Tested With |
|---|---|---|---|---|
| Core (Domain) | Business Logic | Validation, Transform, Rules | Nothing (pure) | Unit Test + Mock |
| Ports (Interface) | Define contracts | ISourceReader, IWriter | Core | Contract Test |
| Adapters (Infra) | External connections | API Client, DB Client | Ports | Integration Test |
| Airbyte CDK | Connector framework | Source, Destination base | Adapters | CAT (Acceptance) |
Connector Design
# === Hexagonal Connector Design ===
from dataclasses import dataclass
# Port (Interface)
# class ISourceReader(ABC):
# @abstractmethod
# def connect(self, config: dict) -> bool: ...
# @abstractmethod
# def read_records(self, stream: str) -> Iterator[dict]: ...
# @abstractmethod
# def get_streams(self) -> List[str]: ...
#
# class ITransformer(ABC):
# @abstractmethod
# def transform(self, record: dict, schema: dict) -> dict: ...
# @abstractmethod
# def validate(self, record: dict) -> bool: ...
#
# class IDestinationWriter(ABC):
# @abstractmethod
# def write_records(self, records: List[dict], stream: str) -> int: ...
# @abstractmethod
# def close(self) -> None: ...
# Adapter (Implementation)
# class RestApiSourceAdapter(ISourceReader):
# def __init__(self, base_url: str, auth_token: str):
# self.client = httpx.Client(base_url=base_url,
# headers={"Authorization": f"Bearer {auth_token}"})
#
# def connect(self, config):
# resp = self.client.get("/health")
# return resp.status_code == 200
#
# def read_records(self, stream):
# page = 1
# while True:
# resp = self.client.get(f"/{stream}", params={"page": page})
# data = resp.json()
# if not data["items"]: break
# yield from data["items"]
# page += 1
# Core (Business Logic)
# class ETLCore:
# def __init__(self, reader: ISourceReader,
# transformer: ITransformer,
# writer: IDestinationWriter):
# self.reader = reader
# self.transformer = transformer
# self.writer = writer
#
# def run_sync(self, stream: str, schema: dict) -> SyncResult:
# records = []
# for record in self.reader.read_records(stream):
# if self.transformer.validate(record):
# transformed = self.transformer.transform(record, schema)
# records.append(transformed)
# written = self.writer.write_records(records, stream)
# return SyncResult(read=len(records), written=written)
@dataclass
class ConnectorComponent:
component: str
layer: str
responsibility: str
example: str
components = [
ConnectorComponent("ISourceReader", "Port",
"Define how to read from any source",
"connect(), read_records(), get_streams()"),
ConnectorComponent("RestApiSourceAdapter", "Adapter",
"Implement REST API reading with pagination",
"HTTP client, auth, pagination logic"),
ConnectorComponent("DatabaseSourceAdapter", "Adapter",
"Implement database reading with cursor",
"SQL query, cursor-based incremental"),
ConnectorComponent("ETLCore", "Core",
"Orchestrate read → transform → write",
"Validation, transformation rules, sync logic"),
ConnectorComponent("IDestinationWriter", "Port",
"Define how to write to any destination",
"write_records(), close(), create_table()"),
ConnectorComponent("BigQueryWriterAdapter", "Adapter",
"Implement BigQuery writing",
"BigQuery client, batch insert, schema mapping"),
]
print("=== Connector Components ===")
for c in components:
print(f" [{c.layer}] {c.component}")
print(f" Role: {c.responsibility}")
print(f" Example: {c.example}")
Testing Strategy
# === Testing Pyramid ===
# Unit Test (Core Logic)
# def test_transform_valid_record():
# transformer = DefaultTransformer()
# record = {"name": "test", "value": 42}
# schema = {"name": "string", "value": "integer"}
# result = transformer.transform(record, schema)
# assert result["name"] == "test"
# assert result["value"] == 42
#
# def test_validate_rejects_invalid():
# transformer = DefaultTransformer()
# record = {"name": None, "value": "not_a_number"}
# assert transformer.validate(record) == False
# Integration Test (Adapter)
# @pytest.fixture
# def postgres_container():
# with PostgresContainer("postgres:15") as pg:
# yield pg
#
# def test_database_adapter_reads(postgres_container):
# adapter = DatabaseSourceAdapter(postgres_container.get_connection_url())
# adapter.connect({})
# records = list(adapter.read_records("users"))
# assert len(records) > 0
# E2E Test (Full Pipeline)
# def test_full_sync():
# source = MockSourceAdapter(records=[...])
# transformer = DefaultTransformer()
# dest = MockDestinationAdapter()
# core = ETLCore(source, transformer, dest)
# result = core.run_sync("users", schema)
# assert result.written == result.read
@dataclass
class TestStrategy:
test_type: str
scope: str
tools: str
speed: str
when: str
strategies = [
TestStrategy("Unit Test", "Core logic only, mocked ports",
"pytest + Mock/MagicMock", "< 1 sec", "Every commit"),
TestStrategy("Contract Test", "Adapter implements Port correctly",
"pytest + ABC enforcement", "< 5 sec", "Every commit"),
TestStrategy("Integration Test", "Adapter + real external system",
"pytest + Testcontainers", "30-60 sec", "Every PR"),
TestStrategy("Acceptance Test (CAT)", "Full Airbyte connector standard",
"Airbyte CAT framework", "5-10 min", "Before release"),
TestStrategy("E2E Test", "Full pipeline source → destination",
"pytest + Docker Compose", "5-15 min", "Before release"),
]
print("=== Testing Strategy ===")
for t in strategies:
print(f" [{t.test_type}] {t.scope}")
print(f" Tools: {t.tools}")
print(f" Speed: {t.speed} | When: {t.when}")
Deployment and Operations
# === Production Operations ===
@dataclass
class OperationTask:
task: str
tool: str
frequency: str
alert: str
action: str
operations = [
OperationTask("Sync Monitoring", "Airbyte UI + Prometheus",
"Real-time", "Sync failed or duration > 2x average",
"Check logs, fix connector, re-sync"),
OperationTask("Data Quality", "Great Expectations + dbt tests",
"After each sync", "Schema drift, null spikes, row count anomaly",
"Alert data team, pause downstream"),
OperationTask("Connector Updates", "Airbyte releases",
"Monthly", "New version available with fixes",
"Test in staging, update production"),
OperationTask("Resource Monitoring", "Grafana + cAdvisor",
"Real-time", "Memory > 80%, CPU > 90%",
"Scale workers, optimize connector"),
OperationTask("Cost Tracking", "Cloud billing + labels",
"Weekly", "Cost > 150% of budget",
"Review sync frequency, optimize queries"),
]
print("=== Operations ===")
for o in operations:
print(f" [{o.task}] Tool: {o.tool}")
print(f" Frequency: {o.frequency}")
print(f" Alert: {o.alert}")
print(f" Action: {o.action}")
เคล็ดลับ
- Ports: ออกแบบ Port ให้เป็น Interface ก่อน แล้วค่อย Implement Adapter
- Test: Core Logic ต้องมี Unit Test 100% Coverage ไม่พึ่ง External
- CDK: ใช้ Airbyte CDK สร้าง Custom Connector ตาม Standard
- Incremental: ใช้ Incremental Sync เสมอ ลด Cost และ Duration
- Monitor: ตั้ง Alert สำหรับ Sync Failure Data Quality Cost Spike
Hexagonal Architecture คืออะไร
Ports Adapters แยก Business Logic จาก Infrastructure Interface Adapter Testable Flexible Maintainable Unit Test Mock
Airbyte คืออะไร
Open Source ELT Platform 300+ Connectors CDK Custom Connector Full Refresh Incremental CDC Docker Kubernetes Cloud Self-hosted dbt Integration
รวม Airbyte กับ Hexagonal Architecture อย่างไร
Custom Connector Core Logic Adapter API Client Database Port Interface ISourceReader IDestinationWriter Mock Test เปลี่ยน Adapter ไม่แก้ Core
Testing Strategy ทำอย่างไร
Unit Test Core Mock Integration Test Adapter Testcontainers Contract Test Interface CAT Acceptance E2E Full Pipeline CI/CD PR
สรุป
Airbyte ETL Hexagonal Architecture Ports Adapters Connector CDK Testing Unit Integration E2E Deployment Kubernetes Production
