ai

Ceph Storage Cluster Hexagonal Architecture —

Ceph Storage Cluster Hexagonal Architecture —

Hexagonal + Ceph

Ceph Storage Cluster Hexagonal Architecture —

Ceph Storage Cluster Hexagonal Architecture Ports Adapters Pattern Domain Business Logic Storage Port Ceph Adapter S3 Local Testing Mock Production

LayerComponentResponsibilityDependenciesTestability
DomainBusiness LogicStorage Policy ValidationNone (pure)Unit Test easy
PortStoragePortInterface ContractNone (abstract)Contract Test
AdapterCephAdapterCeph RADOS/S3 accesslibrados/boto3Integration Test
AdapterS3AdapterAWS S3 accessboto3Integration Test
AdapterLocalAdapterLocal filesystemos moduleUnit Test
AdapterInMemoryAdapterTesting onlydictUnit Test fast

Domain and Ports

=== Hexagonal Architecture — Storage Domain ===

from abc import ABC, abstractmethod

from dataclasses import dataclass

from typing import Optional, List

from datetime import datetime

# --- Value Objects ---

@dataclass(frozen=True)

class StorageObject:

key: str

data: bytes

content_type: str

size: int

checksum: str

created_at: datetime

@dataclass(frozen=True)

class StorageMetadata:

key: str

size: int

content_type: str

last_modified: datetime

# --- Port (Interface) ---

class StoragePort(ABC):

@abstractmethod

def store(self, key: str, data: bytes, content_type: str) -> StorageObject:

pass

@abstractmethod

def retrieve(self, key: str) -> Optional[StorageObject]:

pass

@abstractmethod

def delete(self, key: str) -> bool:

pass

@abstractmethod

def list_objects(self, prefix: str = "") -> List[StorageMetadata]:

pass

@abstractmethod

def exists(self, key: str) -> bool:

pass

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ Prometheus Federation Stream Processing

# --- Domain Service ---

class StorageService:

def __init__(self, storage: StoragePort, max_size: int = 100_000_000):

self._storage = storage

self._max_size = max_size

def upload(self, key: str, data: bytes, content_type: str) -> StorageObject:

if len(data) > self._max_size:

raise ValueError(f"File too large: {len(data)} > {self._max_size}")

if not key or "/" not in key:

raise ValueError("Key must include bucket prefix: bucket/filename")

return self._storage.store(key, data, content_type)

def download(self, key: str) -> Optional[StorageObject]:

return self._storage.retrieve(key)

def remove(self, key: str) -> bool:

if not self._storage.exists(key):

raise FileNotFoundError(f"Object not found: {key}")

แนะนำเพิ่มเติม — หนังสือเทรดที่ SiamCafeBook

return self._storage.delete(key)

from dataclasses import dataclass

@dataclass

class PortMethod:

method: str

params: str

returns: str

description: str

methods = [

PortMethod("store", "key, data, content_type", "StorageObject", "Upload object to storage"),

PortMethod("retrieve", "key", "Optional[StorageObject]", "Download object by key"),

PortMethod("delete", "key", "bool", "Remove object from storage"),

PortMethod("list_objects", "prefix", "List[StorageMetadata]", "List objects by prefix"),

PortMethod("exists", "key", "bool", "Check if object exists"),

PortMethod("get_metadata", "key", "StorageMetadata", "Get object metadata only"),

]

print("=== StoragePort Interface ===")

for m in methods:

print(f" {m.method}({m.params}) -> {m.returns}")

print(f" Description: {m.description}")

Adapters

=== Storage Adapters ===

Ceph RADOS Adapter

import rados

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Agile คืออะไร — คู่มือฉบับสมบูรณ์ 2026

class CephRadosAdapter(StoragePort):

def __init__(self, conf_file="/etc/ceph/ceph.conf", pool="data"):

self.cluster = rados.Rados(conffile=conf_file)

self.cluster.connect()

self.ioctx = self.cluster.open_ioctx(pool)

def store(self, key, data, content_type):

Ceph Storage Cluster Hexagonal Architecture —

self.ioctx.write_full(key, data)

self.ioctx.set_xattr(key, "content-type", content_type.encode())

return StorageObject(key=key, data=data, ...)

def retrieve(self, key):

try:

size, _ = self.ioctx.stat(key)

data = self.ioctx.read(key, size)

ct = self.ioctx.get_xattr(key, "content-type").decode()

return StorageObject(key=key, data=data, content_type=ct, ...)

except rados.ObjectNotFound:

return None

S3 Adapter (Ceph RGW or AWS S3)

import boto3

class S3Adapter(StoragePort):

def __init__(self, endpoint, access_key, secret_key, bucket):

self.s3 = boto3.client("s3",

endpoint_url=endpoint,

aws_access_key_id=access_key,

แนะนำเพิ่มเติม — ติดตาม XM Signal

aws_secret_access_key=secret_key)

self.bucket = bucket

def store(self, key, data, content_type):

self.s3.put_object(Bucket=self.bucket, Key=key,

Body=data, ContentType=content_type)

return StorageObject(...)

In-Memory Adapter (for testing)

class InMemoryAdapter(StoragePort):

def __init__(self):

self._store = {}

def store(self, key, data, content_type):

obj = StorageObject(key=key, data=data, content_type=content_type, ...)

self._store[key] = obj

return obj

def retrieve(self, key):

return self._store.get(key)

@dataclass

class AdapterComparison:

adapter: str

backend: str

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Delta Lake SSL TLS Certificate —

performance: str

use_case: str

config: str

adapters = [

AdapterComparison("CephRadosAdapter", "Ceph RADOS", "Very high (native)", "Production high-perf", "ceph.conf + pool name"),

AdapterComparison("CephS3Adapter", "Ceph RGW (S3)", "High", "Production S3-compatible", "endpoint + keys + bucket"),

AdapterComparison("AWSS3Adapter", "AWS S3", "High (network)", "Cloud production", "region + keys + bucket"),

AdapterComparison("LocalAdapter", "Local filesystem", "Fast (disk)", "Development", "base_path directory"),

AdapterComparison("InMemoryAdapter", "Python dict", "Very fast", "Unit testing", "None"),

AdapterComparison("MinioAdapter", "MinIO S3", "High", "Self-hosted S3", "endpoint + keys + bucket"),

]

print("\n=== Adapter Comparison ===")

for a in adapters:

print(f" [{a.adapter}] Backend: {a.backend}")

print(f" Perf: {a.performance} | Use: {a.use_case}")

print(f" Config: {a.config}")

Testing Strategy

=== Testing Hexagonal Storage ===

Unit Test — Domain with Mock

def test_upload_validates_size():

storage = InMemoryAdapter()

service = StorageService(storage, max_size=1000)

with pytest.raises(ValueError):

service.upload("bucket/file.txt", b"x" * 1001, "text/plain")

def test_upload_validates_key():

storage = InMemoryAdapter()

service = StorageService(storage)

with pytest.raises(ValueError):

service.upload("no-bucket", b"data", "text/plain")

def test_upload_stores_object():

storage = InMemoryAdapter()

service = StorageService(storage)

result = service.upload("bucket/test.txt", b"hello", "text/plain")

assert result.key == "bucket/test.txt"

assert storage.exists("bucket/test.txt")

Integration Test — Real Ceph

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ HTTP/3 QUIC GreenOps Sustainability —

@pytest.fixture

def ceph_adapter():

adapter = CephS3Adapter(

endpoint="http://ceph-rgw:7480",

access_key="test", secret_key="test", bucket="test-bucket")

yield adapter

# Cleanup

def test_ceph_store_retrieve(ceph_adapter):

ceph_adapter.store("test/file.txt", b"hello", "text/plain")

obj = ceph_adapter.retrieve("test/file.txt")

assert obj.data == b"hello"

@dataclass

class TestType:

test_type: str

adapter_used: str

speed: str

coverage: str

ci_friendly: bool

test_types = [

TestType("Unit Test (Domain)", "InMemoryAdapter", "< 1ms", "Business logic", True),

TestType("Unit Test (Adapter)", "InMemoryAdapter", "< 1ms", "Adapter logic", True),

TestType("Contract Test", "All Adapters", "Varies", "Port compliance", True),

TestType("Integration Test", "CephS3Adapter", "100-500ms", "Real Ceph access", False),

TestType("E2E Test", "Production Adapter", "1-5s", "Full flow", False),

TestType("Performance Test", "CephRadosAdapter", "Minutes", "Throughput latency", False),

]

print("Testing Strategy:")

for t in test_types:

ci = "CI-friendly" if t.ci_friendly else "Requires infra"

print(f" [{t.test_type}] Adapter: {t.adapter_used}")

print(f" Speed: {t.speed} | Coverage: {t.coverage} | {ci}")

เคล็ดลับ

  • Port First: ออกแบบ Port ก่อน แล้วค่อยสร้าง Adapter
  • InMemory: ใช้ InMemoryAdapter สำหรับ Unit Test เร็วมาก
  • Config: เปลี่ยน Adapter ผ่าน Config ไม่แก้ Code
  • Contract: ทดสอบ Contract ทุก Adapter ต้อง Pass เหมือนกัน
  • Domain Pure: Domain ไม่ Import External Library เด็ดขาด

Hexagonal Architecture คืออะไร

Ports Adapters Pattern แยก Domain จาก External Port Interface Adapter Implementation เปลี่ยน Adapter ไม่กระทบ Domain ทดสอบง่าย Mock

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง