Hexagonal + Ceph
Ceph Storage Cluster Hexagonal Architecture Ports Adapters Pattern Domain Business Logic Storage Port Ceph Adapter S3 Local Testing Mock Production
| Layer | Component | Responsibility | Dependencies | Testability |
|---|---|---|---|---|
| Domain | Business Logic | Storage Policy Validation | None (pure) | Unit Test easy |
| Port | StoragePort | Interface Contract | None (abstract) | Contract Test |
| Adapter | CephAdapter | Ceph RADOS/S3 access | librados/boto3 | Integration Test |
| Adapter | S3Adapter | AWS S3 access | boto3 | Integration Test |
| Adapter | LocalAdapter | Local filesystem | os module | Unit Test |
| Adapter | InMemoryAdapter | Testing only | dict | Unit Test fast |
Domain and Ports
# === Hexagonal Architecture — Storage Domain ===
# from abc import ABC, abstractmethod
# from dataclasses import dataclass
# from typing import Optional, List
# from datetime import datetime
#
# # --- Value Objects ---
# @dataclass(frozen=True)
# class StorageObject:
# key: str
# data: bytes
# content_type: str
# size: int
# checksum: str
# created_at: datetime
#
# @dataclass(frozen=True)
# class StorageMetadata:
# key: str
# size: int
# content_type: str
# last_modified: datetime
#
# # --- Port (Interface) ---
# class StoragePort(ABC):
# @abstractmethod
# def store(self, key: str, data: bytes, content_type: str) -> StorageObject:
# pass
#
# @abstractmethod
# def retrieve(self, key: str) -> Optional[StorageObject]:
# pass
#
# @abstractmethod
# def delete(self, key: str) -> bool:
# pass
#
# @abstractmethod
# def list_objects(self, prefix: str = "") -> List[StorageMetadata]:
# pass
#
# @abstractmethod
# def exists(self, key: str) -> bool:
# pass
#
# # --- Domain Service ---
# class StorageService:
# def __init__(self, storage: StoragePort, max_size: int = 100_000_000):
# self._storage = storage
# self._max_size = max_size
#
# def upload(self, key: str, data: bytes, content_type: str) -> StorageObject:
# if len(data) > self._max_size:
# raise ValueError(f"File too large: {len(data)} > {self._max_size}")
# if not key or "/" not in key:
# raise ValueError("Key must include bucket prefix: bucket/filename")
# return self._storage.store(key, data, content_type)
#
# def download(self, key: str) -> Optional[StorageObject]:
# return self._storage.retrieve(key)
#
# def remove(self, key: str) -> bool:
# if not self._storage.exists(key):
# raise FileNotFoundError(f"Object not found: {key}")
# return self._storage.delete(key)
from dataclasses import dataclass
@dataclass
class PortMethod:
method: str
params: str
returns: str
description: str
methods = [
PortMethod("store", "key, data, content_type", "StorageObject", "Upload object to storage"),
PortMethod("retrieve", "key", "Optional[StorageObject]", "Download object by key"),
PortMethod("delete", "key", "bool", "Remove object from storage"),
PortMethod("list_objects", "prefix", "List[StorageMetadata]", "List objects by prefix"),
PortMethod("exists", "key", "bool", "Check if object exists"),
PortMethod("get_metadata", "key", "StorageMetadata", "Get object metadata only"),
]
print("=== StoragePort Interface ===")
for m in methods:
print(f" {m.method}({m.params}) -> {m.returns}")
print(f" Description: {m.description}")
Adapters
# === Storage Adapters ===
# Ceph RADOS Adapter
# import rados
# class CephRadosAdapter(StoragePort):
# def __init__(self, conf_file="/etc/ceph/ceph.conf", pool="data"):
# self.cluster = rados.Rados(conffile=conf_file)
# self.cluster.connect()
# self.ioctx = self.cluster.open_ioctx(pool)
#
# def store(self, key, data, content_type):
# self.ioctx.write_full(key, data)
# self.ioctx.set_xattr(key, "content-type", content_type.encode())
# return StorageObject(key=key, data=data, ...)
#
# def retrieve(self, key):
# try:
# size, _ = self.ioctx.stat(key)
# data = self.ioctx.read(key, size)
# ct = self.ioctx.get_xattr(key, "content-type").decode()
# return StorageObject(key=key, data=data, content_type=ct, ...)
# except rados.ObjectNotFound:
# return None
# S3 Adapter (Ceph RGW or AWS S3)
# import boto3
# class S3Adapter(StoragePort):
# def __init__(self, endpoint, access_key, secret_key, bucket):
# self.s3 = boto3.client("s3",
# endpoint_url=endpoint,
# aws_access_key_id=access_key,
# aws_secret_access_key=secret_key)
# self.bucket = bucket
#
# def store(self, key, data, content_type):
# self.s3.put_object(Bucket=self.bucket, Key=key,
# Body=data, ContentType=content_type)
# return StorageObject(...)
# In-Memory Adapter (for testing)
# class InMemoryAdapter(StoragePort):
# def __init__(self):
# self._store = {}
#
# def store(self, key, data, content_type):
# obj = StorageObject(key=key, data=data, content_type=content_type, ...)
# self._store[key] = obj
# return obj
#
# def retrieve(self, key):
# return self._store.get(key)
@dataclass
class AdapterComparison:
adapter: str
backend: str
performance: str
use_case: str
config: str
adapters = [
AdapterComparison("CephRadosAdapter", "Ceph RADOS", "Very high (native)", "Production high-perf", "ceph.conf + pool name"),
AdapterComparison("CephS3Adapter", "Ceph RGW (S3)", "High", "Production S3-compatible", "endpoint + keys + bucket"),
AdapterComparison("AWSS3Adapter", "AWS S3", "High (network)", "Cloud production", "region + keys + bucket"),
AdapterComparison("LocalAdapter", "Local filesystem", "Fast (disk)", "Development", "base_path directory"),
AdapterComparison("InMemoryAdapter", "Python dict", "Very fast", "Unit testing", "None"),
AdapterComparison("MinioAdapter", "MinIO S3", "High", "Self-hosted S3", "endpoint + keys + bucket"),
]
print("\n=== Adapter Comparison ===")
for a in adapters:
print(f" [{a.adapter}] Backend: {a.backend}")
print(f" Perf: {a.performance} | Use: {a.use_case}")
print(f" Config: {a.config}")
Testing Strategy
# === Testing Hexagonal Storage ===
# Unit Test — Domain with Mock
# def test_upload_validates_size():
# storage = InMemoryAdapter()
# service = StorageService(storage, max_size=1000)
# with pytest.raises(ValueError):
# service.upload("bucket/file.txt", b"x" * 1001, "text/plain")
#
# def test_upload_validates_key():
# storage = InMemoryAdapter()
# service = StorageService(storage)
# with pytest.raises(ValueError):
# service.upload("no-bucket", b"data", "text/plain")
#
# def test_upload_stores_object():
# storage = InMemoryAdapter()
# service = StorageService(storage)
# result = service.upload("bucket/test.txt", b"hello", "text/plain")
# assert result.key == "bucket/test.txt"
# assert storage.exists("bucket/test.txt")
# Integration Test — Real Ceph
# @pytest.fixture
# def ceph_adapter():
# adapter = CephS3Adapter(
# endpoint="http://ceph-rgw:7480",
# access_key="test", secret_key="test", bucket="test-bucket")
# yield adapter
# # Cleanup
#
# def test_ceph_store_retrieve(ceph_adapter):
# ceph_adapter.store("test/file.txt", b"hello", "text/plain")
# obj = ceph_adapter.retrieve("test/file.txt")
# assert obj.data == b"hello"
@dataclass
class TestType:
test_type: str
adapter_used: str
speed: str
coverage: str
ci_friendly: bool
test_types = [
TestType("Unit Test (Domain)", "InMemoryAdapter", "< 1ms", "Business logic", True),
TestType("Unit Test (Adapter)", "InMemoryAdapter", "< 1ms", "Adapter logic", True),
TestType("Contract Test", "All Adapters", "Varies", "Port compliance", True),
TestType("Integration Test", "CephS3Adapter", "100-500ms", "Real Ceph access", False),
TestType("E2E Test", "Production Adapter", "1-5s", "Full flow", False),
TestType("Performance Test", "CephRadosAdapter", "Minutes", "Throughput latency", False),
]
print("Testing Strategy:")
for t in test_types:
ci = "CI-friendly" if t.ci_friendly else "Requires infra"
print(f" [{t.test_type}] Adapter: {t.adapter_used}")
print(f" Speed: {t.speed} | Coverage: {t.coverage} | {ci}")
เคล็ดลับ
- Port First: ออกแบบ Port ก่อน แล้วค่อยสร้าง Adapter
- InMemory: ใช้ InMemoryAdapter สำหรับ Unit Test เร็วมาก
- Config: เปลี่ยน Adapter ผ่าน Config ไม่แก้ Code
- Contract: ทดสอบ Contract ทุก Adapter ต้อง Pass เหมือนกัน
- Domain Pure: Domain ไม่ Import External Library เด็ดขาด
Hexagonal Architecture คืออะไร
Ports Adapters Pattern แยก Domain จาก External Port Interface Adapter Implementation เปลี่ยน Adapter ไม่กระทบ Domain ทดสอบง่าย Mock
ใช้กับ Ceph Storage อย่างไร
StoragePort Interface store retrieve delete CephAdapter librados S3 API LocalAdapter Development InMemoryAdapter Testing Config เปลี่ยน Adapter
Port กับ Adapter คืออะไร
Port Interface Abstract Method Domain ต้องการ Adapter Implement Port เชื่อม External CephAdapter S3Adapter Primary Input Secondary Output
ทดสอบ Hexagonal Architecture อย่างไร
Unit Test Domain Mock Adapter Integration Test Ceph Cluster E2E Full Flow Contract Test Port Interface InMemoryAdapter CI/CD Testcontainers
สรุป
Ceph Storage Cluster Hexagonal Architecture Ports Adapters Domain StoragePort CephAdapter S3Adapter InMemoryAdapter Testing Contract Integration Production
