SiamCafe · Blog
Airbyte ETL Hexagonal Architecture — ออกแบบ Data Pipeline ที่ยืดหยุ่น
บทความ

Airbyte ETL Hexagonal Architecture — ออกแบบ Data Pipeline ที่ยืดหยุ่น

เผยแพร่ 28 พฤษภาคม 2569

Airbyte Hexagonal Architecture

Airbyte ETL Hexagonal Architecture Ports Adapters Connector Design Testing Deployment Production Data Pipeline

LayerRoleContainsDepends OnTested With
Core (Domain)Business LogicValidation, Transform, RulesNothing (pure)Unit Test + Mock
Ports (Interface)Define contractsISourceReader, IWriterCoreContract Test
Adapters (Infra)External connectionsAPI Client, DB ClientPortsIntegration Test
Airbyte CDKConnector frameworkSource, Destination baseAdaptersCAT (Acceptance)

Connector Design

=== Hexagonal Connector Design ===

from dataclasses import dataclass

Port (Interface)

class ISourceReader(ABC):

@abstractmethod

def connect(self, config: dict) -> bool: ...

@abstractmethod

def read_records(self, stream: str) -> Iterator[dict]: ...

@abstractmethod

def get_streams(self) -> List[str]: ...

class ITransformer(ABC):

@abstractmethod

def transform(self, record: dict, schema: dict) -> dict: ...

@abstractmethod

def validate(self, record: dict) -> bool: ...

class IDestinationWriter(ABC):

@abstractmethod

def write_records(self, records: List[dict], stream: str) -> int: ...

@abstractmethod

def close(self) -> None: ...

Adapter (Implementation)

class RestApiSourceAdapter(ISourceReader):

def __init__(self, base_url: str, auth_token: str):

self.client = httpx.Client(base_url=base_url,

headers={"Authorization": f"Bearer {auth_token}"})

def connect(self, config):

resp = self.client.get("/health")

return resp.status_code == 200

def read_records(self, stream):

page = 1

while True:

resp = self.client.get(f"/{stream}", params={"page": page})

data = resp.json()

if not data["items"]: break

yield from data["items"]

page += 1

Core (Business Logic)

class ETLCore:

def __init__(self, reader: ISourceReader,

transformer: ITransformer,

writer: IDestinationWriter):

self.reader = reader

self.transformer = transformer

self.writer = writer

def run_sync(self, stream: str, schema: dict) -> SyncResult:

records = []

for record in self.reader.read_records(stream):

if self.transformer.validate(record):

transformed = self.transformer.transform(record, schema)

records.append(transformed)

written = self.writer.write_records(records, stream)

return SyncResult(read=len(records), written=written)

@dataclass

class ConnectorComponent:

component: str

layer: str

responsibility: str

example: str

components = [

ConnectorComponent("ISourceReader", "Port",

"Define how to read from any source",

"connect(), read_records(), get_streams()"),

ConnectorComponent("RestApiSourceAdapter", "Adapter",

"Implement REST API reading with pagination",

"HTTP client, auth, pagination logic"),

ConnectorComponent("DatabaseSourceAdapter", "Adapter",

"Implement database reading with cursor",

"SQL query, cursor-based incremental"),

ConnectorComponent("ETLCore", "Core",

"Orchestrate read → transform → write",

"Validation, transformation rules, sync logic"),

ConnectorComponent("IDestinationWriter", "Port",

"Define how to write to any destination",

"write_records(), close(), create_table()"),

ConnectorComponent("BigQueryWriterAdapter", "Adapter",

"Implement BigQuery writing",

"BigQuery client, batch insert, schema mapping"),

]

print("=== Connector Components ===")

for c in components:

print(f" [{c.layer}] {c.component}")

print(f" Role: {c.responsibility}")

print(f" Example: {c.example}")

Testing Strategy

=== Testing Pyramid ===

Unit Test (Core Logic)

def test_transform_valid_record():

transformer = DefaultTransformer()

record = {"name": "test", "value": 42}

schema = {"name": "string", "value": "integer"}

result = transformer.transform(record, schema)

assert result["name"] == "test"

assert result["value"] == 42

def test_validate_rejects_invalid():

transformer = DefaultTransformer()

record = {"name": None, "value": "not_a_number"}

assert transformer.validate(record) == False

Integration Test (Adapter)

@pytest.fixture

def postgres_container():

with PostgresContainer("postgres:15") as pg:

yield pg

def test_database_adapter_reads(postgres_container):

adapter = DatabaseSourceAdapter(postgres_container.get_connection_url())

adapter.connect({})

records = list(adapter.read_records("users"))

assert len(records) > 0

E2E Test (Full Pipeline)

def test_full_sync():

source = MockSourceAdapter(records=[...])

transformer = DefaultTransformer()

dest = MockDestinationAdapter()

core = ETLCore(source, transformer, dest)

result = core.run_sync("users", schema)

assert result.written == result.read

@dataclass

class TestStrategy:

test_type: str

scope: str

tools: str

speed: str

when: str

strategies = [

TestStrategy("Unit Test", "Core logic only, mocked ports",

"pytest + Mock/MagicMock", "< 1 sec", "Every commit"),

TestStrategy("Contract Test", "Adapter implements Port correctly",

"pytest + ABC enforcement", "< 5 sec", "Every commit"),

TestStrategy("Integration Test", "Adapter + real external system",

"pytest + Testcontainers", "30-60 sec", "Every PR"),

TestStrategy("Acceptance Test (CAT)", "Full Airbyte connector standard",

"Airbyte CAT framework", "5-10 min", "Before release"),

TestStrategy("E2E Test", "Full pipeline source → destination",

"pytest + Docker Compose", "5-15 min", "Before release"),

]

print("=== Testing Strategy ===")

for t in strategies:

print(f" [{t.test_type}] {t.scope}")

print(f" Tools: {t.tools}")

print(f" Speed: {t.speed} | When: {t.when}")

Deployment and Operations

# === Production Operations ===

@dataclass
class OperationTask:
 task: str
 tool: str
 frequency: str
 alert: str
 action: str

operations = [
 OperationTask("Sync Monitoring", "Airbyte UI + Prometheus",
 "Real-time", "Sync failed or duration > 2x average",
 "Check logs, fix connector, re-sync"),
 OperationTask("Data Quality", "Great Expectations + dbt tests",
 "After each sync", "Schema drift, null spikes, row count anomaly",
 "Alert data team, pause downstream"),
 OperationTask("Connector Updates", "Airbyte releases",
 "Monthly", "New version available with fixes",
 "Test in staging, update production"),
 OperationTask("Resource Monitoring", "Grafana + cAdvisor",
 "Real-time", "Memory > 80%, CPU > 90%",
 "Scale workers, optimize connector"),
 OperationTask("Cost Tracking", "Cloud billing + labels",
 "Weekly", "Cost > 150% of budget",
 "Review sync frequency, optimize queries"),
]

print("=== Operations ===")
for o in operations:
 print(f" [{o.task}] Tool: {o.tool}")
 print(f" Frequency: {o.frequency}")
 print(f" Alert: {o.alert}")
 print(f" Action: {o.action}")

เคล็ดลับ

  • Ports: ออกแบบ Port ให้เป็น Interface ก่อน แล้วค่อย Implement Adapter
  • Test: Core Logic ต้องมี Unit Test 100% Coverage ไม่พึ่ง External
  • CDK: ใช้ Airbyte CDK สร้าง Custom Connector ตาม Standard
  • Incremental: ใช้ Incremental Sync เสมอ ลด Cost และ Duration
  • Monitor: ตั้ง Alert สำหรับ Sync Failure Data Quality Cost Spike

Hexagonal Architecture คืออะไร

Ports Adapters แยก Business Logic จาก Infrastructure Interface Adapter Testable Flexible Maintainable Unit Test Mock

Airbyte คืออะไร

Open Source ELT Platform 300+ Connectors CDK Custom Connector Full Refresh Incremental CDC Docker Kubernetes Cloud Self-hosted dbt Integration

รวม Airbyte กับ Hexagonal Architecture อย่างไร

Custom Connector Core Logic Adapter API Client Database Port Interface ISourceReader IDestinationWriter Mock Test เปลี่ยน Adapter ไม่แก้ Core

Testing Strategy ทำอย่างไร

Unit Test Core Mock Integration Test Adapter Testcontainers Contract Test Interface CAT Acceptance E2E Full Pipeline CI/CD PR

สรุป

Airbyte ETL Hexagonal Architecture Ports Adapters Connector CDK Testing Unit Integration E2E Deployment Kubernetes Production