Hexagonal + Ceph

Ceph Storage Cluster Hexagonal Architecture Ports Adapters Pattern Domain Business Logic Storage Port Ceph Adapter S3 Local Testing Mock Production

LayerComponentResponsibilityDependenciesTestability
DomainBusiness LogicStorage Policy ValidationNone (pure)Unit Test easy
PortStoragePortInterface ContractNone (abstract)Contract Test
AdapterCephAdapterCeph RADOS/S3 accesslibrados/boto3Integration Test
AdapterS3AdapterAWS S3 accessboto3Integration Test
AdapterLocalAdapterLocal filesystemos moduleUnit Test
AdapterInMemoryAdapterTesting onlydictUnit Test fast

Domain and Ports

# === Hexagonal Architecture — Storage Domain ===

# from abc import ABC, abstractmethod
# from dataclasses import dataclass
# from typing import Optional, List
# from datetime import datetime
#
# # --- Value Objects ---
# @dataclass(frozen=True)
# class StorageObject:
#     key: str
#     data: bytes
#     content_type: str
#     size: int
#     checksum: str
#     created_at: datetime
#
# @dataclass(frozen=True)
# class StorageMetadata:
#     key: str
#     size: int
#     content_type: str
#     last_modified: datetime
#
# # --- Port (Interface) ---
# class StoragePort(ABC):
#     @abstractmethod
#     def store(self, key: str, data: bytes, content_type: str) -> StorageObject:
#         pass
#
#     @abstractmethod
#     def retrieve(self, key: str) -> Optional[StorageObject]:
#         pass
#
#     @abstractmethod
#     def delete(self, key: str) -> bool:
#         pass
#
#     @abstractmethod
#     def list_objects(self, prefix: str = "") -> List[StorageMetadata]:
#         pass
#
#     @abstractmethod
#     def exists(self, key: str) -> bool:
#         pass
#
# # --- Domain Service ---
# class StorageService:
#     def __init__(self, storage: StoragePort, max_size: int = 100_000_000):
#         self._storage = storage
#         self._max_size = max_size
#
#     def upload(self, key: str, data: bytes, content_type: str) -> StorageObject:
#         if len(data) > self._max_size:
#             raise ValueError(f"File too large: {len(data)} > {self._max_size}")
#         if not key or "/" not in key:
#             raise ValueError("Key must include bucket prefix: bucket/filename")
#         return self._storage.store(key, data, content_type)
#
#     def download(self, key: str) -> Optional[StorageObject]:
#         return self._storage.retrieve(key)
#
#     def remove(self, key: str) -> bool:
#         if not self._storage.exists(key):
#             raise FileNotFoundError(f"Object not found: {key}")
#         return self._storage.delete(key)

from dataclasses import dataclass

@dataclass
class PortMethod:
    method: str
    params: str
    returns: str
    description: str

methods = [
    PortMethod("store", "key, data, content_type", "StorageObject", "Upload object to storage"),
    PortMethod("retrieve", "key", "Optional[StorageObject]", "Download object by key"),
    PortMethod("delete", "key", "bool", "Remove object from storage"),
    PortMethod("list_objects", "prefix", "List[StorageMetadata]", "List objects by prefix"),
    PortMethod("exists", "key", "bool", "Check if object exists"),
    PortMethod("get_metadata", "key", "StorageMetadata", "Get object metadata only"),
]

print("=== StoragePort Interface ===")
for m in methods:
    print(f"  {m.method}({m.params}) -> {m.returns}")
    print(f"    Description: {m.description}")

Adapters

# === Storage Adapters ===

# Ceph RADOS Adapter
# import rados
# class CephRadosAdapter(StoragePort):
#     def __init__(self, conf_file="/etc/ceph/ceph.conf", pool="data"):
#         self.cluster = rados.Rados(conffile=conf_file)
#         self.cluster.connect()
#         self.ioctx = self.cluster.open_ioctx(pool)
#
#     def store(self, key, data, content_type):
#         self.ioctx.write_full(key, data)
#         self.ioctx.set_xattr(key, "content-type", content_type.encode())
#         return StorageObject(key=key, data=data, ...)
#
#     def retrieve(self, key):
#         try:
#             size, _ = self.ioctx.stat(key)
#             data = self.ioctx.read(key, size)
#             ct = self.ioctx.get_xattr(key, "content-type").decode()
#             return StorageObject(key=key, data=data, content_type=ct, ...)
#         except rados.ObjectNotFound:
#             return None

# S3 Adapter (Ceph RGW or AWS S3)
# import boto3
# class S3Adapter(StoragePort):
#     def __init__(self, endpoint, access_key, secret_key, bucket):
#         self.s3 = boto3.client("s3",
#             endpoint_url=endpoint,
#             aws_access_key_id=access_key,
#             aws_secret_access_key=secret_key)
#         self.bucket = bucket
#
#     def store(self, key, data, content_type):
#         self.s3.put_object(Bucket=self.bucket, Key=key,
#             Body=data, ContentType=content_type)
#         return StorageObject(...)

# In-Memory Adapter (for testing)
# class InMemoryAdapter(StoragePort):
#     def __init__(self):
#         self._store = {}
#
#     def store(self, key, data, content_type):
#         obj = StorageObject(key=key, data=data, content_type=content_type, ...)
#         self._store[key] = obj
#         return obj
#
#     def retrieve(self, key):
#         return self._store.get(key)

@dataclass
class AdapterComparison:
    adapter: str
    backend: str
    performance: str
    use_case: str
    config: str

adapters = [
    AdapterComparison("CephRadosAdapter", "Ceph RADOS", "Very high (native)", "Production high-perf", "ceph.conf + pool name"),
    AdapterComparison("CephS3Adapter", "Ceph RGW (S3)", "High", "Production S3-compatible", "endpoint + keys + bucket"),
    AdapterComparison("AWSS3Adapter", "AWS S3", "High (network)", "Cloud production", "region + keys + bucket"),
    AdapterComparison("LocalAdapter", "Local filesystem", "Fast (disk)", "Development", "base_path directory"),
    AdapterComparison("InMemoryAdapter", "Python dict", "Very fast", "Unit testing", "None"),
    AdapterComparison("MinioAdapter", "MinIO S3", "High", "Self-hosted S3", "endpoint + keys + bucket"),
]

print("\n=== Adapter Comparison ===")
for a in adapters:
    print(f"  [{a.adapter}] Backend: {a.backend}")
    print(f"    Perf: {a.performance} | Use: {a.use_case}")
    print(f"    Config: {a.config}")

Testing Strategy

# === Testing Hexagonal Storage ===

# Unit Test — Domain with Mock
# def test_upload_validates_size():
#     storage = InMemoryAdapter()
#     service = StorageService(storage, max_size=1000)
#     with pytest.raises(ValueError):
#         service.upload("bucket/file.txt", b"x" * 1001, "text/plain")
#
# def test_upload_validates_key():
#     storage = InMemoryAdapter()
#     service = StorageService(storage)
#     with pytest.raises(ValueError):
#         service.upload("no-bucket", b"data", "text/plain")
#
# def test_upload_stores_object():
#     storage = InMemoryAdapter()
#     service = StorageService(storage)
#     result = service.upload("bucket/test.txt", b"hello", "text/plain")
#     assert result.key == "bucket/test.txt"
#     assert storage.exists("bucket/test.txt")

# Integration Test — Real Ceph
# @pytest.fixture
# def ceph_adapter():
#     adapter = CephS3Adapter(
#         endpoint="http://ceph-rgw:7480",
#         access_key="test", secret_key="test", bucket="test-bucket")
#     yield adapter
#     # Cleanup
#
# def test_ceph_store_retrieve(ceph_adapter):
#     ceph_adapter.store("test/file.txt", b"hello", "text/plain")
#     obj = ceph_adapter.retrieve("test/file.txt")
#     assert obj.data == b"hello"

@dataclass
class TestType:
    test_type: str
    adapter_used: str
    speed: str
    coverage: str
    ci_friendly: bool

test_types = [
    TestType("Unit Test (Domain)", "InMemoryAdapter", "< 1ms", "Business logic", True),
    TestType("Unit Test (Adapter)", "InMemoryAdapter", "< 1ms", "Adapter logic", True),
    TestType("Contract Test", "All Adapters", "Varies", "Port compliance", True),
    TestType("Integration Test", "CephS3Adapter", "100-500ms", "Real Ceph access", False),
    TestType("E2E Test", "Production Adapter", "1-5s", "Full flow", False),
    TestType("Performance Test", "CephRadosAdapter", "Minutes", "Throughput latency", False),
]

print("Testing Strategy:")
for t in test_types:
    ci = "CI-friendly" if t.ci_friendly else "Requires infra"
    print(f"  [{t.test_type}] Adapter: {t.adapter_used}")
    print(f"    Speed: {t.speed} | Coverage: {t.coverage} | {ci}")

เคล็ดลับ

  • Port First: ออกแบบ Port ก่อน แล้วค่อยสร้าง Adapter
  • InMemory: ใช้ InMemoryAdapter สำหรับ Unit Test เร็วมาก
  • Config: เปลี่ยน Adapter ผ่าน Config ไม่แก้ Code
  • Contract: ทดสอบ Contract ทุก Adapter ต้อง Pass เหมือนกัน
  • Domain Pure: Domain ไม่ Import External Library เด็ดขาด

Hexagonal Architecture คืออะไร

Ports Adapters Pattern แยก Domain จาก External Port Interface Adapter Implementation เปลี่ยน Adapter ไม่กระทบ Domain ทดสอบง่าย Mock