Technology

Airbyte ETL Hexagonal Architecture

airbyte etl hexagonal architecture
Airbyte ETL Hexagonal Architecture | SiamCafe Blog
2025-08-14· อ. บอม — SiamCafe.net· 11,681 คำ

Airbyte Hexagonal Architecture

Airbyte ETL Hexagonal Architecture Ports Adapters Connector Design Testing Deployment Production Data Pipeline

LayerRoleContainsDepends OnTested With
Core (Domain)Business LogicValidation, Transform, RulesNothing (pure)Unit Test + Mock
Ports (Interface)Define contractsISourceReader, IWriterCoreContract Test
Adapters (Infra)External connectionsAPI Client, DB ClientPortsIntegration Test
Airbyte CDKConnector frameworkSource, Destination baseAdaptersCAT (Acceptance)

Connector Design

# === Hexagonal Connector Design ===

from dataclasses import dataclass

# Port (Interface)
# class ISourceReader(ABC):
#     @abstractmethod
#     def connect(self, config: dict) -> bool: ...
#     @abstractmethod
#     def read_records(self, stream: str) -> Iterator[dict]: ...
#     @abstractmethod
#     def get_streams(self) -> List[str]: ...
#
# class ITransformer(ABC):
#     @abstractmethod
#     def transform(self, record: dict, schema: dict) -> dict: ...
#     @abstractmethod
#     def validate(self, record: dict) -> bool: ...
#
# class IDestinationWriter(ABC):
#     @abstractmethod
#     def write_records(self, records: List[dict], stream: str) -> int: ...
#     @abstractmethod
#     def close(self) -> None: ...

# Adapter (Implementation)
# class RestApiSourceAdapter(ISourceReader):
#     def __init__(self, base_url: str, auth_token: str):
#         self.client = httpx.Client(base_url=base_url,
#             headers={"Authorization": f"Bearer {auth_token}"})
#
#     def connect(self, config):
#         resp = self.client.get("/health")
#         return resp.status_code == 200
#
#     def read_records(self, stream):
#         page = 1
#         while True:
#             resp = self.client.get(f"/{stream}", params={"page": page})
#             data = resp.json()
#             if not data["items"]: break
#             yield from data["items"]
#             page += 1

# Core (Business Logic)
# class ETLCore:
#     def __init__(self, reader: ISourceReader,
#                  transformer: ITransformer,
#                  writer: IDestinationWriter):
#         self.reader = reader
#         self.transformer = transformer
#         self.writer = writer
#
#     def run_sync(self, stream: str, schema: dict) -> SyncResult:
#         records = []
#         for record in self.reader.read_records(stream):
#             if self.transformer.validate(record):
#                 transformed = self.transformer.transform(record, schema)
#                 records.append(transformed)
#         written = self.writer.write_records(records, stream)
#         return SyncResult(read=len(records), written=written)

@dataclass
class ConnectorComponent:
    component: str
    layer: str
    responsibility: str
    example: str

components = [
    ConnectorComponent("ISourceReader", "Port",
        "Define how to read from any source",
        "connect(), read_records(), get_streams()"),
    ConnectorComponent("RestApiSourceAdapter", "Adapter",
        "Implement REST API reading with pagination",
        "HTTP client, auth, pagination logic"),
    ConnectorComponent("DatabaseSourceAdapter", "Adapter",
        "Implement database reading with cursor",
        "SQL query, cursor-based incremental"),
    ConnectorComponent("ETLCore", "Core",
        "Orchestrate read → transform → write",
        "Validation, transformation rules, sync logic"),
    ConnectorComponent("IDestinationWriter", "Port",
        "Define how to write to any destination",
        "write_records(), close(), create_table()"),
    ConnectorComponent("BigQueryWriterAdapter", "Adapter",
        "Implement BigQuery writing",
        "BigQuery client, batch insert, schema mapping"),
]

print("=== Connector Components ===")
for c in components:
    print(f"  [{c.layer}] {c.component}")
    print(f"    Role: {c.responsibility}")
    print(f"    Example: {c.example}")

Testing Strategy

# === Testing Pyramid ===

# Unit Test (Core Logic)
# def test_transform_valid_record():
#     transformer = DefaultTransformer()
#     record = {"name": "test", "value": 42}
#     schema = {"name": "string", "value": "integer"}
#     result = transformer.transform(record, schema)
#     assert result["name"] == "test"
#     assert result["value"] == 42
#
# def test_validate_rejects_invalid():
#     transformer = DefaultTransformer()
#     record = {"name": None, "value": "not_a_number"}
#     assert transformer.validate(record) == False

# Integration Test (Adapter)
# @pytest.fixture
# def postgres_container():
#     with PostgresContainer("postgres:15") as pg:
#         yield pg
#
# def test_database_adapter_reads(postgres_container):
#     adapter = DatabaseSourceAdapter(postgres_container.get_connection_url())
#     adapter.connect({})
#     records = list(adapter.read_records("users"))
#     assert len(records) > 0

# E2E Test (Full Pipeline)
# def test_full_sync():
#     source = MockSourceAdapter(records=[...])
#     transformer = DefaultTransformer()
#     dest = MockDestinationAdapter()
#     core = ETLCore(source, transformer, dest)
#     result = core.run_sync("users", schema)
#     assert result.written == result.read

@dataclass
class TestStrategy:
    test_type: str
    scope: str
    tools: str
    speed: str
    when: str

strategies = [
    TestStrategy("Unit Test", "Core logic only, mocked ports",
        "pytest + Mock/MagicMock", "< 1 sec", "Every commit"),
    TestStrategy("Contract Test", "Adapter implements Port correctly",
        "pytest + ABC enforcement", "< 5 sec", "Every commit"),
    TestStrategy("Integration Test", "Adapter + real external system",
        "pytest + Testcontainers", "30-60 sec", "Every PR"),
    TestStrategy("Acceptance Test (CAT)", "Full Airbyte connector standard",
        "Airbyte CAT framework", "5-10 min", "Before release"),
    TestStrategy("E2E Test", "Full pipeline source → destination",
        "pytest + Docker Compose", "5-15 min", "Before release"),
]

print("=== Testing Strategy ===")
for t in strategies:
    print(f"  [{t.test_type}] {t.scope}")
    print(f"    Tools: {t.tools}")
    print(f"    Speed: {t.speed} | When: {t.when}")

Deployment and Operations

# === Production Operations ===

@dataclass
class OperationTask:
    task: str
    tool: str
    frequency: str
    alert: str
    action: str

operations = [
    OperationTask("Sync Monitoring", "Airbyte UI + Prometheus",
        "Real-time", "Sync failed or duration > 2x average",
        "Check logs, fix connector, re-sync"),
    OperationTask("Data Quality", "Great Expectations + dbt tests",
        "After each sync", "Schema drift, null spikes, row count anomaly",
        "Alert data team, pause downstream"),
    OperationTask("Connector Updates", "Airbyte releases",
        "Monthly", "New version available with fixes",
        "Test in staging, update production"),
    OperationTask("Resource Monitoring", "Grafana + cAdvisor",
        "Real-time", "Memory > 80%, CPU > 90%",
        "Scale workers, optimize connector"),
    OperationTask("Cost Tracking", "Cloud billing + labels",
        "Weekly", "Cost > 150% of budget",
        "Review sync frequency, optimize queries"),
]

print("=== Operations ===")
for o in operations:
    print(f"  [{o.task}] Tool: {o.tool}")
    print(f"    Frequency: {o.frequency}")
    print(f"    Alert: {o.alert}")
    print(f"    Action: {o.action}")

เคล็ดลับ

Hexagonal Architecture คืออะไร

Ports Adapters แยก Business Logic จาก Infrastructure Interface Adapter Testable Flexible Maintainable Unit Test Mock

Airbyte คืออะไร

Open Source ELT Platform 300+ Connectors CDK Custom Connector Full Refresh Incremental CDC Docker Kubernetes Cloud Self-hosted dbt Integration

รวม Airbyte กับ Hexagonal Architecture อย่างไร

Custom Connector Core Logic Adapter API Client Database Port Interface ISourceReader IDestinationWriter Mock Test เปลี่ยน Adapter ไม่แก้ Core

Testing Strategy ทำอย่างไร

Unit Test Core Mock Integration Test Adapter Testcontainers Contract Test Interface CAT Acceptance E2E Full Pipeline CI/CD PR

สรุป

Airbyte ETL Hexagonal Architecture Ports Adapters Connector CDK Testing Unit Integration E2E Deployment Kubernetes Production

📖 บทความที่เกี่ยวข้อง

Airbyte ETL Clean Architectureอ่านบทความ → AWS Glue ETL Hexagonal Architectureอ่านบทความ → Airbyte ETL Serverless Architectureอ่านบทความ → Airbyte ETL SaaS Architectureอ่านบทความ → Airbyte ETL Architecture Design Patternอ่านบทความ →

📚 ดูบทความทั้งหมด →