ai

Airbyte ETL Hexagonal Architecture — ออกแบบ Data Pipeline ที่ยืดหยุ่น

Airbyte ETL Hexagonal Architecture — ออกแบบ Data Pipeline ที่ยืดหยุ่น

Airbyte Hexagonal Architecture

Airbyte ETL Hexagonal Architecture — ออกแบบ Data Pipeline ที่ยืดหยุ่น

Airbyte ETL Hexagonal Architecture Ports Adapters Connector Design Testing Deployment Production Data Pipeline

LayerRoleContainsDepends OnTested With
Core (Domain)Business LogicValidation, Transform, RulesNothing (pure)Unit Test + Mock
Ports (Interface)Define contractsISourceReader, IWriterCoreContract Test
Adapters (Infra)External connectionsAPI Client, DB ClientPortsIntegration Test
Airbyte CDKConnector frameworkSource, Destination baseAdaptersCAT (Acceptance)

Connector Design

=== Hexagonal Connector Design ===

from dataclasses import dataclass

Port (Interface)

class ISourceReader(ABC):

@abstractmethod

def connect(self, config: dict) -> bool: ...

@abstractmethod

def read_records(self, stream: str) -> Iterator[dict]: ...

@abstractmethod

def get_streams(self) -> List[str]: ...

class ITransformer(ABC):

@abstractmethod

def transform(self, record: dict, schema: dict) -> dict: ...

@abstractmethod

def validate(self, record: dict) -> bool: ...

class IDestinationWriter(ABC):

@abstractmethod

def write_records(self, records: List[dict], stream: str) -> int: ...

@abstractmethod

def close(self) -> None: ...

Adapter (Implementation)

class RestApiSourceAdapter(ISourceReader):

def __init__(self, base_url: str, auth_token: str):

self.client = httpx.Client(base_url=base_url,

headers={"Authorization": f"Bearer {auth_token}"})

def connect(self, config):

resp = self.client.get("/health")

return resp.status_code == 200

def read_records(self, stream):

page = 1

while True:

resp = self.client.get(f"/{stream}", params={"page": page})

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน CDK Construct Production Setup Guide

data = resp.json()

if not data["items"]: break

yield from data["items"]

page += 1

Core (Business Logic)

class ETLCore:

def __init__(self, reader: ISourceReader,

transformer: ITransformer,

แนะนำเพิ่มเติม — SiamCafeBook

writer: IDestinationWriter):

self.reader = reader

self.transformer = transformer

self.writer = writer

def run_sync(self, stream: str, schema: dict) -> SyncResult:

records = []

for record in self.reader.read_records(stream):

if self.transformer.validate(record):

transformed = self.transformer.transform(record, schema)

records.append(transformed)

written = self.writer.write_records(records, stream)

return SyncResult(read=len(records), written=written)

@dataclass

class ConnectorComponent:

component: str

layer: str

responsibility: str

example: str

components = [

ConnectorComponent("ISourceReader", "Port",

"Define how to read from any source",

"connect(), read_records(), get_streams()"),

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Passkeys WebAuthn Blue Green Canary Deploy

ConnectorComponent("RestApiSourceAdapter", "Adapter",

"Implement REST API reading with pagination",

"HTTP client, auth, pagination logic"),

ConnectorComponent("DatabaseSourceAdapter", "Adapter",

"Implement database reading with cursor",

"SQL query, cursor-based incremental"),

ConnectorComponent("ETLCore", "Core",

"Orchestrate read → transform → write",

"Validation, transformation rules, sync logic"),

ConnectorComponent("IDestinationWriter", "Port",

"Define how to write to any destination",

"write_records(), close(), create_table()"),

ConnectorComponent("BigQueryWriterAdapter", "Adapter",

"Implement BigQuery writing",

แนะนำเพิ่มเติม — สัญญาณเทรดรายวัน XM Signal

"BigQuery client, batch insert, schema mapping"),

]

print("=== Connector Components ===")

for c in components:

Airbyte ETL Hexagonal Architecture — ออกแบบ Data Pipeline ที่ยืดหยุ่น

print(f" [{c.layer}] {c.component}")

print(f" Role: {c.responsibility}")

print(f" Example: {c.example}")

Testing Strategy

=== Testing Pyramid ===

Unit Test (Core Logic)

def test_transform_valid_record():

transformer = DefaultTransformer()

record = {"name": "test", "value": 42}

schema = {"name": "string", "value": "integer"}

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ C# Blazor Blue Green Canary Deploy

result = transformer.transform(record, schema)

assert result["name"] == "test"

assert result["value"] == 42

def test_validate_rejects_invalid():

transformer = DefaultTransformer()

record = {"name": None, "value": "not_a_number"}

assert transformer.validate(record) == False

Integration Test (Adapter)

@pytest.fixture

def postgres_container():

with PostgresContainer("postgres:15") as pg:

yield pg

def test_database_adapter_reads(postgres_container):

adapter = DatabaseSourceAdapter(postgres_container.get_connection_url())

adapter.connect({})

records = list(adapter.read_records("users"))

assert len(records) > 0

E2E Test (Full Pipeline)

def test_full_sync():

source = MockSourceAdapter(records=[...])

transformer = DefaultTransformer()

dest = MockDestinationAdapter()

core = ETLCore(source, transformer, dest)

result = core.run_sync("users", schema)

assert result.written == result.read

@dataclass

class TestStrategy:

test_type: str

scope: str

tools: str

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน additional domain controller คือ

speed: str

when: str

strategies = [

TestStrategy("Unit Test", "Core logic only, mocked ports",

"pytest + Mock/MagicMock", "< 1 sec", "Every commit"),

TestStrategy("Contract Test", "Adapter implements Port correctly",

"pytest + ABC enforcement", "< 5 sec", "Every commit"),

TestStrategy("Integration Test", "Adapter + real external system",

"pytest + Testcontainers", "30-60 sec", "Every PR"),

TestStrategy("Acceptance Test (CAT)", "Full Airbyte connector standard",

"Airbyte CAT framework", "5-10 min", "Before release"),

TestStrategy("E2E Test", "Full pipeline source → destination",

"pytest + Docker Compose", "5-15 min", "Before release"),

]

print("=== Testing Strategy ===")

for t in strategies:

print(f" [{t.test_type}] {t.scope}")

print(f" Tools: {t.tools}")

print(f" Speed: {t.speed} | When: {t.when}")

Deployment and Operations

# === Production Operations ===



@dataclass

class OperationTask:

 task: str

 tool: str

 frequency: str

 alert: str

 action: str



operations = [

 OperationTask("Sync Monitoring", "Airbyte UI + Prometheus",

 "Real-time", "Sync failed or duration > 2x average",

 "Check logs, fix connector, re-sync"),

 OperationTask("Data Quality", "Great Expectations + dbt tests",

 "After each sync", "Schema drift, null spikes, row count anomaly",

 "Alert data team, pause downstream"),

 OperationTask("Connector Updates", "Airbyte releases",

 "Monthly", "New version available with fixes",

 "Test in staging, update production"),

 OperationTask("Resource Monitoring", "Grafana + cAdvisor",

 "Real-time", "Memory > 80%, CPU > 90%",

 "Scale workers, optimize connector"),

 OperationTask("Cost Tracking", "Cloud billing + labels",

 "Weekly", "Cost > 150% of budget",

 "Review sync frequency, optimize queries"),

]



print("=== Operations ===")

for o in operations:

 print(f" [{o.task}] Tool: {o.tool}")

 print(f" Frequency: {o.frequency}")

 print(f" Alert: {o.alert}")

 print(f" Action: {o.action}")

เคล็ดลับ

  • Ports: ออกแบบ Port ให้เป็น Interface ก่อน แล้วค่อย Implement Adapter
  • Test: Core Logic ต้องมี Unit Test 100% Coverage ไม่พึ่ง External
  • CDK: ใช้ Airbyte CDK สร้าง Custom Connector ตาม Standard
  • Incremental: ใช้ Incremental Sync เสมอ ลด Cost และ Duration
  • Monitor: ตั้ง Alert สำหรับ Sync Failure Data Quality Cost Spike

Hexagonal Architecture คืออะไร

Ports Adapters แยก Business Logic จาก Infrastructure Interface Adapter Testable Flexible Maintainable Unit Test Mock

Airbyte คืออะไร

Open Source ELT Platform 300+ Connectors CDK Custom Connector Full Refresh Incremental CDC Docker Kubernetes Cloud Self-hosted dbt Integration

รวม Airbyte กับ Hexagonal Architecture อย่างไร

Custom Connector Core Logic Adapter API Client Database Port Interface ISourceReader IDestinationWriter Mock Test เปลี่ยน Adapter ไม่แก้ Core

Testing Strategy ทำอย่างไร

Unit Test Core Mock Integration Test Adapter Testcontainers Contract Test Interface CAT Acceptance E2E Full Pipeline CI/CD PR

สรุป

Airbyte ETL Hexagonal Architecture Ports Adapters Connector CDK Testing Unit Integration E2E Deployment Kubernetes Production

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง