Technology

Ceph Storage Cluster Hexagonal Architecture

ceph storage cluster hexagonal architecture
Ceph Storage Cluster Hexagonal Architecture | SiamCafe Blog
2026-03-22· อ. บอม — SiamCafe.net· 10,593 คำ

Hexagonal + Ceph

Ceph Storage Cluster Hexagonal Architecture Ports Adapters Pattern Domain Business Logic Storage Port Ceph Adapter S3 Local Testing Mock Production

LayerComponentResponsibilityDependenciesTestability
DomainBusiness LogicStorage Policy ValidationNone (pure)Unit Test easy
PortStoragePortInterface ContractNone (abstract)Contract Test
AdapterCephAdapterCeph RADOS/S3 accesslibrados/boto3Integration Test
AdapterS3AdapterAWS S3 accessboto3Integration Test
AdapterLocalAdapterLocal filesystemos moduleUnit Test
AdapterInMemoryAdapterTesting onlydictUnit Test fast

Domain and Ports

# === Hexagonal Architecture — Storage Domain ===

# from abc import ABC, abstractmethod
# from dataclasses import dataclass
# from typing import Optional, List
# from datetime import datetime
#
# # --- Value Objects ---
# @dataclass(frozen=True)
# class StorageObject:
#     key: str
#     data: bytes
#     content_type: str
#     size: int
#     checksum: str
#     created_at: datetime
#
# @dataclass(frozen=True)
# class StorageMetadata:
#     key: str
#     size: int
#     content_type: str
#     last_modified: datetime
#
# # --- Port (Interface) ---
# class StoragePort(ABC):
#     @abstractmethod
#     def store(self, key: str, data: bytes, content_type: str) -> StorageObject:
#         pass
#
#     @abstractmethod
#     def retrieve(self, key: str) -> Optional[StorageObject]:
#         pass
#
#     @abstractmethod
#     def delete(self, key: str) -> bool:
#         pass
#
#     @abstractmethod
#     def list_objects(self, prefix: str = "") -> List[StorageMetadata]:
#         pass
#
#     @abstractmethod
#     def exists(self, key: str) -> bool:
#         pass
#
# # --- Domain Service ---
# class StorageService:
#     def __init__(self, storage: StoragePort, max_size: int = 100_000_000):
#         self._storage = storage
#         self._max_size = max_size
#
#     def upload(self, key: str, data: bytes, content_type: str) -> StorageObject:
#         if len(data) > self._max_size:
#             raise ValueError(f"File too large: {len(data)} > {self._max_size}")
#         if not key or "/" not in key:
#             raise ValueError("Key must include bucket prefix: bucket/filename")
#         return self._storage.store(key, data, content_type)
#
#     def download(self, key: str) -> Optional[StorageObject]:
#         return self._storage.retrieve(key)
#
#     def remove(self, key: str) -> bool:
#         if not self._storage.exists(key):
#             raise FileNotFoundError(f"Object not found: {key}")
#         return self._storage.delete(key)

from dataclasses import dataclass

@dataclass
class PortMethod:
    method: str
    params: str
    returns: str
    description: str

methods = [
    PortMethod("store", "key, data, content_type", "StorageObject", "Upload object to storage"),
    PortMethod("retrieve", "key", "Optional[StorageObject]", "Download object by key"),
    PortMethod("delete", "key", "bool", "Remove object from storage"),
    PortMethod("list_objects", "prefix", "List[StorageMetadata]", "List objects by prefix"),
    PortMethod("exists", "key", "bool", "Check if object exists"),
    PortMethod("get_metadata", "key", "StorageMetadata", "Get object metadata only"),
]

print("=== StoragePort Interface ===")
for m in methods:
    print(f"  {m.method}({m.params}) -> {m.returns}")
    print(f"    Description: {m.description}")

Adapters

# === Storage Adapters ===

# Ceph RADOS Adapter
# import rados
# class CephRadosAdapter(StoragePort):
#     def __init__(self, conf_file="/etc/ceph/ceph.conf", pool="data"):
#         self.cluster = rados.Rados(conffile=conf_file)
#         self.cluster.connect()
#         self.ioctx = self.cluster.open_ioctx(pool)
#
#     def store(self, key, data, content_type):
#         self.ioctx.write_full(key, data)
#         self.ioctx.set_xattr(key, "content-type", content_type.encode())
#         return StorageObject(key=key, data=data, ...)
#
#     def retrieve(self, key):
#         try:
#             size, _ = self.ioctx.stat(key)
#             data = self.ioctx.read(key, size)
#             ct = self.ioctx.get_xattr(key, "content-type").decode()
#             return StorageObject(key=key, data=data, content_type=ct, ...)
#         except rados.ObjectNotFound:
#             return None

# S3 Adapter (Ceph RGW or AWS S3)
# import boto3
# class S3Adapter(StoragePort):
#     def __init__(self, endpoint, access_key, secret_key, bucket):
#         self.s3 = boto3.client("s3",
#             endpoint_url=endpoint,
#             aws_access_key_id=access_key,
#             aws_secret_access_key=secret_key)
#         self.bucket = bucket
#
#     def store(self, key, data, content_type):
#         self.s3.put_object(Bucket=self.bucket, Key=key,
#             Body=data, ContentType=content_type)
#         return StorageObject(...)

# In-Memory Adapter (for testing)
# class InMemoryAdapter(StoragePort):
#     def __init__(self):
#         self._store = {}
#
#     def store(self, key, data, content_type):
#         obj = StorageObject(key=key, data=data, content_type=content_type, ...)
#         self._store[key] = obj
#         return obj
#
#     def retrieve(self, key):
#         return self._store.get(key)

@dataclass
class AdapterComparison:
    adapter: str
    backend: str
    performance: str
    use_case: str
    config: str

adapters = [
    AdapterComparison("CephRadosAdapter", "Ceph RADOS", "Very high (native)", "Production high-perf", "ceph.conf + pool name"),
    AdapterComparison("CephS3Adapter", "Ceph RGW (S3)", "High", "Production S3-compatible", "endpoint + keys + bucket"),
    AdapterComparison("AWSS3Adapter", "AWS S3", "High (network)", "Cloud production", "region + keys + bucket"),
    AdapterComparison("LocalAdapter", "Local filesystem", "Fast (disk)", "Development", "base_path directory"),
    AdapterComparison("InMemoryAdapter", "Python dict", "Very fast", "Unit testing", "None"),
    AdapterComparison("MinioAdapter", "MinIO S3", "High", "Self-hosted S3", "endpoint + keys + bucket"),
]

print("\n=== Adapter Comparison ===")
for a in adapters:
    print(f"  [{a.adapter}] Backend: {a.backend}")
    print(f"    Perf: {a.performance} | Use: {a.use_case}")
    print(f"    Config: {a.config}")

Testing Strategy

# === Testing Hexagonal Storage ===

# Unit Test — Domain with Mock
# def test_upload_validates_size():
#     storage = InMemoryAdapter()
#     service = StorageService(storage, max_size=1000)
#     with pytest.raises(ValueError):
#         service.upload("bucket/file.txt", b"x" * 1001, "text/plain")
#
# def test_upload_validates_key():
#     storage = InMemoryAdapter()
#     service = StorageService(storage)
#     with pytest.raises(ValueError):
#         service.upload("no-bucket", b"data", "text/plain")
#
# def test_upload_stores_object():
#     storage = InMemoryAdapter()
#     service = StorageService(storage)
#     result = service.upload("bucket/test.txt", b"hello", "text/plain")
#     assert result.key == "bucket/test.txt"
#     assert storage.exists("bucket/test.txt")

# Integration Test — Real Ceph
# @pytest.fixture
# def ceph_adapter():
#     adapter = CephS3Adapter(
#         endpoint="http://ceph-rgw:7480",
#         access_key="test", secret_key="test", bucket="test-bucket")
#     yield adapter
#     # Cleanup
#
# def test_ceph_store_retrieve(ceph_adapter):
#     ceph_adapter.store("test/file.txt", b"hello", "text/plain")
#     obj = ceph_adapter.retrieve("test/file.txt")
#     assert obj.data == b"hello"

@dataclass
class TestType:
    test_type: str
    adapter_used: str
    speed: str
    coverage: str
    ci_friendly: bool

test_types = [
    TestType("Unit Test (Domain)", "InMemoryAdapter", "< 1ms", "Business logic", True),
    TestType("Unit Test (Adapter)", "InMemoryAdapter", "< 1ms", "Adapter logic", True),
    TestType("Contract Test", "All Adapters", "Varies", "Port compliance", True),
    TestType("Integration Test", "CephS3Adapter", "100-500ms", "Real Ceph access", False),
    TestType("E2E Test", "Production Adapter", "1-5s", "Full flow", False),
    TestType("Performance Test", "CephRadosAdapter", "Minutes", "Throughput latency", False),
]

print("Testing Strategy:")
for t in test_types:
    ci = "CI-friendly" if t.ci_friendly else "Requires infra"
    print(f"  [{t.test_type}] Adapter: {t.adapter_used}")
    print(f"    Speed: {t.speed} | Coverage: {t.coverage} | {ci}")

เคล็ดลับ

Hexagonal Architecture คืออะไร

Ports Adapters Pattern แยก Domain จาก External Port Interface Adapter Implementation เปลี่ยน Adapter ไม่กระทบ Domain ทดสอบง่าย Mock

ใช้กับ Ceph Storage อย่างไร

StoragePort Interface store retrieve delete CephAdapter librados S3 API LocalAdapter Development InMemoryAdapter Testing Config เปลี่ยน Adapter

Port กับ Adapter คืออะไร

Port Interface Abstract Method Domain ต้องการ Adapter Implement Port เชื่อม External CephAdapter S3Adapter Primary Input Secondary Output

ทดสอบ Hexagonal Architecture อย่างไร

Unit Test Domain Mock Adapter Integration Test Ceph Cluster E2E Full Flow Contract Test Port Interface InMemoryAdapter CI/CD Testcontainers

สรุป

Ceph Storage Cluster Hexagonal Architecture Ports Adapters Domain StoragePort CephAdapter S3Adapter InMemoryAdapter Testing Contract Integration Production

📖 บทความที่เกี่ยวข้อง

Ceph Storage Cluster Platform Engineeringอ่านบทความ → Ceph Storage Cluster Freelance IT Careerอ่านบทความ → Ceph Storage Cluster Blue Green Canary Deployอ่านบทความ → Ceph Storage Cluster Domain Driven Design DDDอ่านบทความ → Ceph Storage Cluster Agile Scrum Kanbanอ่านบทความ →

📚 ดูบทความทั้งหมด →