Ceph Storage Cluster Hexagonal Architecture —
Hexagonal + Ceph

Ceph Storage Cluster Hexagonal Architecture Ports Adapters Pattern Domain Business Logic Storage Port Ceph Adapter S3 Local Testing Mock Production
| Layer | Component | Responsibility | Dependencies | Testability |
|---|---|---|---|---|
| Domain | Business Logic | Storage Policy Validation | None (pure) | Unit Test easy |
| Port | StoragePort | Interface Contract | None (abstract) | Contract Test |
| Adapter | CephAdapter | Ceph RADOS/S3 access | librados/boto3 | Integration Test |
| Adapter | S3Adapter | AWS S3 access | boto3 | Integration Test |
| Adapter | LocalAdapter | Local filesystem | os module | Unit Test |
| Adapter | InMemoryAdapter | Testing only | dict | Unit Test fast |
Domain and Ports
=== Hexagonal Architecture — Storage Domain ===
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional, List
from datetime import datetime
# --- Value Objects ---
@dataclass(frozen=True)
class StorageObject:
key: str
data: bytes
content_type: str
size: int
checksum: str
created_at: datetime
@dataclass(frozen=True)
class StorageMetadata:
key: str
size: int
content_type: str
last_modified: datetime
# --- Port (Interface) ---
class StoragePort(ABC):
@abstractmethod
def store(self, key: str, data: bytes, content_type: str) -> StorageObject:
pass
@abstractmethod
def retrieve(self, key: str) -> Optional[StorageObject]:
pass
@abstractmethod
def delete(self, key: str) -> bool:
pass
@abstractmethod
def list_objects(self, prefix: str = "") -> List[StorageMetadata]:
pass
@abstractmethod
def exists(self, key: str) -> bool:
pass
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Prometheus Federation Stream Processing
# --- Domain Service ---
class StorageService:
def __init__(self, storage: StoragePort, max_size: int = 100_000_000):
self._storage = storage
self._max_size = max_size
def upload(self, key: str, data: bytes, content_type: str) -> StorageObject:
if len(data) > self._max_size:
raise ValueError(f"File too large: {len(data)} > {self._max_size}")
if not key or "/" not in key:
raise ValueError("Key must include bucket prefix: bucket/filename")
return self._storage.store(key, data, content_type)
def download(self, key: str) -> Optional[StorageObject]:
return self._storage.retrieve(key)
def remove(self, key: str) -> bool:
if not self._storage.exists(key):
raise FileNotFoundError(f"Object not found: {key}")
แนะนำเพิ่มเติม — หนังสือเทรดที่ SiamCafeBook
return self._storage.delete(key)
from dataclasses import dataclass
@dataclass
class PortMethod:
method: str
params: str
returns: str
description: str
methods = [
PortMethod("store", "key, data, content_type", "StorageObject", "Upload object to storage"),
PortMethod("retrieve", "key", "Optional[StorageObject]", "Download object by key"),
PortMethod("delete", "key", "bool", "Remove object from storage"),
PortMethod("list_objects", "prefix", "List[StorageMetadata]", "List objects by prefix"),
PortMethod("exists", "key", "bool", "Check if object exists"),
PortMethod("get_metadata", "key", "StorageMetadata", "Get object metadata only"),
]
print("=== StoragePort Interface ===")
for m in methods:
print(f" {m.method}({m.params}) -> {m.returns}")
print(f" Description: {m.description}")
Adapters
=== Storage Adapters ===
Ceph RADOS Adapter
import rados
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Agile คืออะไร — คู่มือฉบับสมบูรณ์ 2026
class CephRadosAdapter(StoragePort):
def __init__(self, conf_file="/etc/ceph/ceph.conf", pool="data"):
self.cluster = rados.Rados(conffile=conf_file)
self.cluster.connect()
self.ioctx = self.cluster.open_ioctx(pool)
def store(self, key, data, content_type):

self.ioctx.write_full(key, data)
self.ioctx.set_xattr(key, "content-type", content_type.encode())
return StorageObject(key=key, data=data, ...)
def retrieve(self, key):
try:
size, _ = self.ioctx.stat(key)
data = self.ioctx.read(key, size)
ct = self.ioctx.get_xattr(key, "content-type").decode()
return StorageObject(key=key, data=data, content_type=ct, ...)
except rados.ObjectNotFound:
return None
S3 Adapter (Ceph RGW or AWS S3)
import boto3
class S3Adapter(StoragePort):
def __init__(self, endpoint, access_key, secret_key, bucket):
self.s3 = boto3.client("s3",
endpoint_url=endpoint,
aws_access_key_id=access_key,
แนะนำเพิ่มเติม — ติดตาม XM Signal
aws_secret_access_key=secret_key)
self.bucket = bucket
def store(self, key, data, content_type):
self.s3.put_object(Bucket=self.bucket, Key=key,
Body=data, ContentType=content_type)
return StorageObject(...)
In-Memory Adapter (for testing)
class InMemoryAdapter(StoragePort):
def __init__(self):
self._store = {}
def store(self, key, data, content_type):
obj = StorageObject(key=key, data=data, content_type=content_type, ...)
self._store[key] = obj
return obj
def retrieve(self, key):
return self._store.get(key)
@dataclass
class AdapterComparison:
adapter: str
backend: str
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Delta Lake SSL TLS Certificate —
performance: str
use_case: str
config: str
adapters = [
AdapterComparison("CephRadosAdapter", "Ceph RADOS", "Very high (native)", "Production high-perf", "ceph.conf + pool name"),
AdapterComparison("CephS3Adapter", "Ceph RGW (S3)", "High", "Production S3-compatible", "endpoint + keys + bucket"),
AdapterComparison("AWSS3Adapter", "AWS S3", "High (network)", "Cloud production", "region + keys + bucket"),
AdapterComparison("LocalAdapter", "Local filesystem", "Fast (disk)", "Development", "base_path directory"),
AdapterComparison("InMemoryAdapter", "Python dict", "Very fast", "Unit testing", "None"),
AdapterComparison("MinioAdapter", "MinIO S3", "High", "Self-hosted S3", "endpoint + keys + bucket"),
]
print("\n=== Adapter Comparison ===")
for a in adapters:
print(f" [{a.adapter}] Backend: {a.backend}")
print(f" Perf: {a.performance} | Use: {a.use_case}")
print(f" Config: {a.config}")
Testing Strategy
=== Testing Hexagonal Storage ===
Unit Test — Domain with Mock
def test_upload_validates_size():
storage = InMemoryAdapter()
service = StorageService(storage, max_size=1000)
with pytest.raises(ValueError):
service.upload("bucket/file.txt", b"x" * 1001, "text/plain")
def test_upload_validates_key():
storage = InMemoryAdapter()
service = StorageService(storage)
with pytest.raises(ValueError):
service.upload("no-bucket", b"data", "text/plain")
def test_upload_stores_object():
storage = InMemoryAdapter()
service = StorageService(storage)
result = service.upload("bucket/test.txt", b"hello", "text/plain")
assert result.key == "bucket/test.txt"
assert storage.exists("bucket/test.txt")
Integration Test — Real Ceph
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ HTTP/3 QUIC GreenOps Sustainability —
@pytest.fixture
def ceph_adapter():
adapter = CephS3Adapter(
endpoint="http://ceph-rgw:7480",
access_key="test", secret_key="test", bucket="test-bucket")
yield adapter
# Cleanup
def test_ceph_store_retrieve(ceph_adapter):
ceph_adapter.store("test/file.txt", b"hello", "text/plain")
obj = ceph_adapter.retrieve("test/file.txt")
assert obj.data == b"hello"
@dataclass
class TestType:
test_type: str
adapter_used: str
speed: str
coverage: str
ci_friendly: bool
test_types = [
TestType("Unit Test (Domain)", "InMemoryAdapter", "< 1ms", "Business logic", True),
TestType("Unit Test (Adapter)", "InMemoryAdapter", "< 1ms", "Adapter logic", True),
TestType("Contract Test", "All Adapters", "Varies", "Port compliance", True),
TestType("Integration Test", "CephS3Adapter", "100-500ms", "Real Ceph access", False),
TestType("E2E Test", "Production Adapter", "1-5s", "Full flow", False),
TestType("Performance Test", "CephRadosAdapter", "Minutes", "Throughput latency", False),
]
print("Testing Strategy:")
for t in test_types:
ci = "CI-friendly" if t.ci_friendly else "Requires infra"
print(f" [{t.test_type}] Adapter: {t.adapter_used}")
print(f" Speed: {t.speed} | Coverage: {t.coverage} | {ci}")
เคล็ดลับ
- Port First: ออกแบบ Port ก่อน แล้วค่อยสร้าง Adapter
- InMemory: ใช้ InMemoryAdapter สำหรับ Unit Test เร็วมาก
- Config: เปลี่ยน Adapter ผ่าน Config ไม่แก้ Code
- Contract: ทดสอบ Contract ทุก Adapter ต้อง Pass เหมือนกัน
- Domain Pure: Domain ไม่ Import External Library เด็ดขาด
Hexagonal Architecture คืออะไร
Ports Adapters Pattern แยก Domain จาก External Port Interface Adapter Implementation เปลี่ยน Adapter ไม่กระทบ Domain ทดสอบง่าย Mock





