Airbyte Hexagonal Architecture

Airbyte ETL Hexagonal Architecture Ports Adapters Connector Design Testing Deployment Production Data Pipeline

LayerRoleContainsDepends OnTested With
Core (Domain)Business LogicValidation, Transform, RulesNothing (pure)Unit Test + Mock
Ports (Interface)Define contractsISourceReader, IWriterCoreContract Test
Adapters (Infra)External connectionsAPI Client, DB ClientPortsIntegration Test
Airbyte CDKConnector frameworkSource, Destination baseAdaptersCAT (Acceptance)

Connector Design

# === Hexagonal Connector Design ===

from dataclasses import dataclass

# Port (Interface)
# class ISourceReader(ABC):
# @abstractmethod
# def connect(self, config: dict) -> bool: ...
# @abstractmethod
# def read_records(self, stream: str) -> Iterator[dict]: ...
# @abstractmethod
# def get_streams(self) -> List[str]: ...
#
# class ITransformer(ABC):
# @abstractmethod
# def transform(self, record: dict, schema: dict) -> dict: ...
# @abstractmethod
# def validate(self, record: dict) -> bool: ...
#
# class IDestinationWriter(ABC):
# @abstractmethod
# def write_records(self, records: List[dict], stream: str) -> int: ...
# @abstractmethod
# def close(self) -> None: ...

# Adapter (Implementation)
# class RestApiSourceAdapter(ISourceReader):
# def __init__(self, base_url: str, auth_token: str):
# self.client = httpx.Client(base_url=base_url,
# headers={"Authorization": f"Bearer {auth_token}"})
#
# def connect(self, config):
# resp = self.client.get("/health")
# return resp.status_code == 200
#
# def read_records(self, stream):
# page = 1
# while True:
# resp = self.client.get(f"/{stream}", params={"page": page})
# data = resp.json()
# if not data["items"]: break
# yield from data["items"]
# page += 1

# Core (Business Logic)
# class ETLCore:
# def __init__(self, reader: ISourceReader,
# transformer: ITransformer,
# writer: IDestinationWriter):
# self.reader = reader
# self.transformer = transformer
# self.writer = writer
#
# def run_sync(self, stream: str, schema: dict) -> SyncResult:
# records = []
# for record in self.reader.read_records(stream):
# if self.transformer.validate(record):
# transformed = self.transformer.transform(record, schema)
# records.append(transformed)
# written = self.writer.write_records(records, stream)
# return SyncResult(read=len(records), written=written)

@dataclass
class ConnectorComponent:
 component: str
 layer: str
 responsibility: str
 example: str

components = [
 ConnectorComponent("ISourceReader", "Port",
 "Define how to read from any source",
 "connect(), read_records(), get_streams()"),
 ConnectorComponent("RestApiSourceAdapter", "Adapter",
 "Implement REST API reading with pagination",
 "HTTP client, auth, pagination logic"),
 ConnectorComponent("DatabaseSourceAdapter", "Adapter",
 "Implement database reading with cursor",
 "SQL query, cursor-based incremental"),
 ConnectorComponent("ETLCore", "Core",
 "Orchestrate read → transform → write",
 "Validation, transformation rules, sync logic"),
 ConnectorComponent("IDestinationWriter", "Port",
 "Define how to write to any destination",
 "write_records(), close(), create_table()"),
 ConnectorComponent("BigQueryWriterAdapter", "Adapter",
 "Implement BigQuery writing",
 "BigQuery client, batch insert, schema mapping"),
]

print("=== Connector Components ===")
for c in components:
 print(f" [{c.layer}] {c.component}")
 print(f" Role: {c.responsibility}")
 print(f" Example: {c.example}")

Testing Strategy

# === Testing Pyramid ===

# Unit Test (Core Logic)
# def test_transform_valid_record():
# transformer = DefaultTransformer()
# record = {"name": "test", "value": 42}
# schema = {"name": "string", "value": "integer"}
# result = transformer.transform(record, schema)
# assert result["name"] == "test"
# assert result["value"] == 42
#
# def test_validate_rejects_invalid():
# transformer = DefaultTransformer()
# record = {"name": None, "value": "not_a_number"}
# assert transformer.validate(record) == False

# Integration Test (Adapter)
# @pytest.fixture
# def postgres_container():
# with PostgresContainer("postgres:15") as pg:
# yield pg
#
# def test_database_adapter_reads(postgres_container):
# adapter = DatabaseSourceAdapter(postgres_container.get_connection_url())
# adapter.connect({})
# records = list(adapter.read_records("users"))
# assert len(records) > 0

# E2E Test (Full Pipeline)
# def test_full_sync():
# source = MockSourceAdapter(records=[...])
# transformer = DefaultTransformer()
# dest = MockDestinationAdapter()
# core = ETLCore(source, transformer, dest)
# result = core.run_sync("users", schema)
# assert result.written == result.read

@dataclass
class TestStrategy:
 test_type: str
 scope: str
 tools: str
 speed: str
 when: str

strategies = [
 TestStrategy("Unit Test", "Core logic only, mocked ports",
 "pytest + Mock/MagicMock", "< 1 sec", "Every commit"),
 TestStrategy("Contract Test", "Adapter implements Port correctly",
 "pytest + ABC enforcement", "< 5 sec", "Every commit"),
 TestStrategy("Integration Test", "Adapter + real external system",
 "pytest + Testcontainers", "30-60 sec", "Every PR"),
 TestStrategy("Acceptance Test (CAT)", "Full Airbyte connector standard",
 "Airbyte CAT framework", "5-10 min", "Before release"),
 TestStrategy("E2E Test", "Full pipeline source → destination",
 "pytest + Docker Compose", "5-15 min", "Before release"),
]

print("=== Testing Strategy ===")
for t in strategies:
 print(f" [{t.test_type}] {t.scope}")
 print(f" Tools: {t.tools}")
 print(f" Speed: {t.speed} | When: {t.when}")

Deployment and Operations

# === Production Operations ===

@dataclass
class OperationTask:
 task: str
 tool: str
 frequency: str
 alert: str
 action: str

operations = [
 OperationTask("Sync Monitoring", "Airbyte UI + Prometheus",
 "Real-time", "Sync failed or duration > 2x average",
 "Check logs, fix connector, re-sync"),
 OperationTask("Data Quality", "Great Expectations + dbt tests",
 "After each sync", "Schema drift, null spikes, row count anomaly",
 "Alert data team, pause downstream"),
 OperationTask("Connector Updates", "Airbyte releases",
 "Monthly", "New version available with fixes",
 "Test in staging, update production"),
 OperationTask("Resource Monitoring", "Grafana + cAdvisor",
 "Real-time", "Memory > 80%, CPU > 90%",
 "Scale workers, optimize connector"),
 OperationTask("Cost Tracking", "Cloud billing + labels",
 "Weekly", "Cost > 150% of budget",
 "Review sync frequency, optimize queries"),
]

print("=== Operations ===")
for o in operations:
 print(f" [{o.task}] Tool: {o.tool}")
 print(f" Frequency: {o.frequency}")
 print(f" Alert: {o.alert}")
 print(f" Action: {o.action}")

เคล็ดลับ

  • Ports: ออกแบบ Port ให้เป็น Interface ก่อน แล้วค่อย Implement Adapter
  • Test: Core Logic ต้องมี Unit Test 100% Coverage ไม่พึ่ง External
  • CDK: ใช้ Airbyte CDK สร้าง Custom Connector ตาม Standard
  • Incremental: ใช้ Incremental Sync เสมอ ลด Cost และ Duration
  • Monitor: ตั้ง Alert สำหรับ Sync Failure Data Quality Cost Spike

Hexagonal Architecture คืออะไร

Ports Adapters แยก Business Logic จาก Infrastructure Interface Adapter Testable Flexible Maintainable Unit Test Mock