Weights & Biases Hexagonal Architecture คืออะไร
Weights & Biases (W&B) เป็น MLOps platform สำหรับ experiment tracking, model versioning, dataset management และ collaboration ในทีม ML Hexagonal Architecture (Ports and Adapters) เป็นรูปแบบการออกแบบซอฟต์แวร์ที่แยก business logic ออกจาก external dependencies ผ่าน ports (interfaces) และ adapters (implementations) การรวมสองแนวคิดนี้ช่วยสร้าง ML systems ที่ testable, maintainable และเปลี่ยน infrastructure ได้ง่าย เช่น สลับจาก W&B ไป MLflow โดยไม่แก้ core logic
Hexagonal Architecture Fundamentals
# hexagonal.py — Hexagonal Architecture for ML
import json
class HexagonalArchitecture:
LAYERS = {
"domain": {
"name": "Domain (Core Business Logic)",
"description": "ML training logic, model evaluation, data processing — ไม่ depend บน framework ใดๆ",
"examples": "TrainingService, ModelEvaluator, DataProcessor",
"rule": "ไม่ import external libraries โดยตรง — ใช้ interfaces",
},
"ports": {
"name": "Ports (Interfaces)",
"description": "Contracts ที่ domain กำหนด — external world ต้อง implement",
"types": {
"inbound": "Driving ports — UI, API, CLI เรียกเข้ามา (e.g., TrainModelUseCase)",
"outbound": "Driven ports — domain เรียกออกไป (e.g., ExperimentTracker, ModelRegistry)",
},
},
"adapters": {
"name": "Adapters (Implementations)",
"description": "Concrete implementations ของ ports — connect กับ external systems",
"types": {
"inbound": "FastAPI controller, CLI handler, Airflow DAG",
"outbound": "WandbTracker, MLflowTracker, S3ModelStore, LocalFileStore",
},
},
}
def show_layers(self):
print("=== Hexagonal Architecture ===\n")
for key, layer in self.LAYERS.items():
print(f"[{layer['name']}]")
print(f" {layer['description']}")
if 'types' in layer:
for t, desc in layer['types'].items():
print(f" {t}: {desc}")
print()
hex_arch = HexagonalArchitecture()
hex_arch.show_layers()
Ports — Interfaces
# ports.py — Port definitions (interfaces)
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Dict, List, Optional
import numpy as np
# Domain Models
@dataclass
class Experiment:
name: str
config: Dict
tags: Dict[str, str] = None
@dataclass
class MetricLog:
name: str
value: float
step: int
@dataclass
class ModelArtifact:
name: str
version: str
path: str
metrics: Dict[str, float]
# Outbound Ports (Driven)
class ExperimentTrackerPort(ABC):
"""Port สำหรับ track experiments — W&B, MLflow, etc."""
@abstractmethod
def init_experiment(self, experiment: Experiment) -> str:
"""Initialize experiment run, return run_id"""
pass
@abstractmethod
def log_metrics(self, metrics: Dict[str, float], step: int) -> None:
pass
@abstractmethod
def log_params(self, params: Dict) -> None:
pass
@abstractmethod
def log_artifact(self, path: str, name: str) -> None:
pass
@abstractmethod
def finish(self) -> None:
pass
class ModelRegistryPort(ABC):
"""Port สำหรับ model versioning"""
@abstractmethod
def register_model(self, artifact: ModelArtifact) -> str:
pass
@abstractmethod
def get_model(self, name: str, version: str) -> ModelArtifact:
pass
@abstractmethod
def promote_model(self, name: str, version: str, stage: str) -> None:
pass
class DataLoaderPort(ABC):
"""Port สำหรับ load data"""
@abstractmethod
def load_train_data(self) -> tuple:
pass
@abstractmethod
def load_test_data(self) -> tuple:
pass
# Inbound Ports (Driving)
class TrainModelUseCase(ABC):
"""Use case: Train a model"""
@abstractmethod
def execute(self, config: Dict) -> ModelArtifact:
pass
class EvaluateModelUseCase(ABC):
"""Use case: Evaluate a model"""
@abstractmethod
def execute(self, model_name: str, version: str) -> Dict[str, float]:
pass
print("Ports defined — ready for adapter implementations")
W&B Adapter
# wandb_adapter.py — Weights & Biases adapter
import json
class WandbAdapter:
CODE = """
# adapters/wandb_tracker.py
import wandb
from ports import ExperimentTrackerPort, Experiment
class WandbExperimentTracker(ExperimentTrackerPort):
def __init__(self, project: str, entity: str = None):
self.project = project
self.entity = entity
self.run = None
def init_experiment(self, experiment: Experiment) -> str:
self.run = wandb.init(
project=self.project,
entity=self.entity,
name=experiment.name,
config=experiment.config,
tags=list(experiment.tags.values()) if experiment.tags else None,
)
return self.run.id
def log_metrics(self, metrics: dict, step: int) -> None:
wandb.log(metrics, step=step)
def log_params(self, params: dict) -> None:
wandb.config.update(params)
def log_artifact(self, path: str, name: str) -> None:
artifact = wandb.Artifact(name, type='model')
artifact.add_file(path)
self.run.log_artifact(artifact)
def finish(self) -> None:
if self.run:
wandb.finish()
# adapters/wandb_registry.py
class WandbModelRegistry(ModelRegistryPort):
def __init__(self, project: str, entity: str = None):
self.api = wandb.Api()
self.project = project
self.entity = entity
def register_model(self, artifact: ModelArtifact) -> str:
run = wandb.init(project=self.project, job_type='register')
art = wandb.Artifact(artifact.name, type='model')
art.add_file(artifact.path)
art.metadata = artifact.metrics
run.log_artifact(art)
wandb.finish()
return f"{artifact.name}:latest"
def get_model(self, name: str, version: str) -> ModelArtifact:
artifact = self.api.artifact(f"{self.entity}/{self.project}/{name}:{version}")
path = artifact.download()
return ModelArtifact(name=name, version=version, path=path, metrics=artifact.metadata)
def promote_model(self, name: str, version: str, stage: str) -> None:
artifact = self.api.artifact(f"{self.entity}/{self.project}/{name}:{version}")
artifact.aliases.append(stage)
artifact.save()
"""
MLFLOW_ADAPTER = """
# adapters/mlflow_tracker.py — Alternative adapter
import mlflow
class MLflowExperimentTracker(ExperimentTrackerPort):
def __init__(self, tracking_uri: str):
mlflow.set_tracking_uri(tracking_uri)
def init_experiment(self, experiment: Experiment) -> str:
mlflow.set_experiment(experiment.name)
run = mlflow.start_run()
mlflow.log_params(experiment.config)
return run.info.run_id
def log_metrics(self, metrics: dict, step: int) -> None:
mlflow.log_metrics(metrics, step=step)
def log_params(self, params: dict) -> None:
mlflow.log_params(params)
def log_artifact(self, path: str, name: str) -> None:
mlflow.log_artifact(path)
def finish(self) -> None:
mlflow.end_run()
"""
def show_wandb(self):
print("=== W&B Adapter ===")
print(self.CODE[:600])
def show_mlflow(self):
print(f"\n=== MLflow Adapter (swap!) ===")
print(self.MLFLOW_ADAPTER[:400])
adapter = WandbAdapter()
adapter.show_wandb()
adapter.show_mlflow()
Domain Service — Training
# domain_service.py — Core training service
import json
class DomainService:
CODE = """
# domain/training_service.py — Core ML training logic
from ports import (
ExperimentTrackerPort, ModelRegistryPort,
DataLoaderPort, TrainModelUseCase,
Experiment, ModelArtifact
)
class TrainingService(TrainModelUseCase):
'''Core training logic — ไม่ depend บน W&B, MLflow, etc.'''
def __init__(
self,
tracker: ExperimentTrackerPort,
registry: ModelRegistryPort,
data_loader: DataLoaderPort,
):
self.tracker = tracker
self.registry = registry
self.data_loader = data_loader
def execute(self, config: dict) -> ModelArtifact:
# 1. Init experiment
experiment = Experiment(
name=config.get('experiment_name', 'default'),
config=config,
tags={'type': 'training'}
)
run_id = self.tracker.init_experiment(experiment)
self.tracker.log_params(config)
# 2. Load data
X_train, y_train = self.data_loader.load_train_data()
X_test, y_test = self.data_loader.load_test_data()
# 3. Train model (framework-agnostic logic)
model = self._build_model(config)
for epoch in range(config.get('epochs', 10)):
loss = self._train_epoch(model, X_train, y_train)
val_metrics = self._evaluate(model, X_test, y_test)
# Log via port — ไม่สนว่าเป็น W&B หรือ MLflow
self.tracker.log_metrics({
'train_loss': loss,
**val_metrics
}, step=epoch)
# 4. Save & register model
model_path = self._save_model(model, config)
self.tracker.log_artifact(model_path, 'trained_model')
artifact = ModelArtifact(
name=config['model_name'],
version='latest',
path=model_path,
metrics=val_metrics,
)
self.registry.register_model(artifact)
# 5. Cleanup
self.tracker.finish()
return artifact
def _build_model(self, config):
# Pure domain logic — no external dependencies
pass
def _train_epoch(self, model, X, y):
# Training logic
return 0.5 # loss
def _evaluate(self, model, X, y):
return {'accuracy': 0.95, 'f1': 0.93}
def _save_model(self, model, config):
return f"/tmp/{config['model_name']}.pkl"
"""
COMPOSITION_ROOT = """
# main.py — Composition root (dependency injection)
from domain.training_service import TrainingService
from adapters.wandb_tracker import WandbExperimentTracker, WandbModelRegistry
from adapters.csv_data_loader import CSVDataLoader
# Wire dependencies — เปลี่ยน adapter ที่นี่จุดเดียว
tracker = WandbExperimentTracker(project='my-ml-project')
registry = WandbModelRegistry(project='my-ml-project')
data_loader = CSVDataLoader(train_path='data/train.csv', test_path='data/test.csv')
# Inject into domain service
training_service = TrainingService(
tracker=tracker,
registry=registry,
data_loader=data_loader,
)
# Execute — domain ไม่รู้เลยว่าใช้ W&B
result = training_service.execute({
'model_name': 'classifier_v1',
'experiment_name': 'baseline',
'epochs': 50,
'learning_rate': 0.001,
'batch_size': 32,
})
# เปลี่ยนเป็น MLflow? แก้แค่ composition root!
# tracker = MLflowExperimentTracker(tracking_uri='http://mlflow:5000')
# registry = MLflowModelRegistry(tracking_uri='http://mlflow:5000')
"""
def show_code(self):
print("=== Training Service ===")
print(self.CODE[:600])
def show_composition(self):
print(f"\n=== Composition Root ===")
print(self.COMPOSITION_ROOT[:500])
ds = DomainService()
ds.show_code()
ds.show_composition()
Testing Strategy
# testing.py — Testing hexagonal ML system
import json
class TestingStrategy:
CODE = """
# tests/test_training_service.py
import pytest
from domain.training_service import TrainingService
from ports import ExperimentTrackerPort, ModelRegistryPort, DataLoaderPort
# Fake adapters for testing — ไม่ต้อง connect W&B จริง!
class FakeTracker(ExperimentTrackerPort):
def __init__(self):
self.logged_metrics = []
self.logged_params = {}
self.artifacts = []
def init_experiment(self, experiment):
return 'fake-run-id'
def log_metrics(self, metrics, step):
self.logged_metrics.append({'metrics': metrics, 'step': step})
def log_params(self, params):
self.logged_params.update(params)
def log_artifact(self, path, name):
self.artifacts.append({'path': path, 'name': name})
def finish(self):
pass
class FakeRegistry(ModelRegistryPort):
def __init__(self):
self.models = {}
def register_model(self, artifact):
self.models[artifact.name] = artifact
return f"{artifact.name}:v1"
def get_model(self, name, version):
return self.models.get(name)
def promote_model(self, name, version, stage):
pass
class FakeDataLoader(DataLoaderPort):
def load_train_data(self):
import numpy as np
return np.random.randn(100, 10), np.random.randint(0, 2, 100)
def load_test_data(self):
import numpy as np
return np.random.randn(20, 10), np.random.randint(0, 2, 20)
# Tests — fast, no external dependencies!
class TestTrainingService:
def setup_method(self):
self.tracker = FakeTracker()
self.registry = FakeRegistry()
self.data_loader = FakeDataLoader()
self.service = TrainingService(
tracker=self.tracker,
registry=self.registry,
data_loader=self.data_loader,
)
def test_training_logs_metrics(self):
config = {'model_name': 'test_model', 'epochs': 5, 'learning_rate': 0.01}
result = self.service.execute(config)
assert len(self.tracker.logged_metrics) == 5 # 5 epochs
assert 'learning_rate' in self.tracker.logged_params
def test_training_registers_model(self):
config = {'model_name': 'test_model', 'epochs': 3}
result = self.service.execute(config)
assert 'test_model' in self.registry.models
assert result.name == 'test_model'
def test_training_logs_artifact(self):
config = {'model_name': 'test_model', 'epochs': 1}
self.service.execute(config)
assert len(self.tracker.artifacts) == 1
"""
def show_tests(self):
print("=== Testing Strategy ===")
print(self.CODE[:600])
def benefits(self):
print(f"\n=== Hexagonal Testing Benefits ===")
benefits = [
"Fast tests — ไม่ต้อง connect W&B/MLflow จริง (fake adapters)",
"Isolated — test domain logic โดยไม่มี external dependencies",
"Deterministic — fake data ให้ผลเหมือนกันทุกรอบ",
"CI/CD friendly — ไม่ต้อง API keys, ไม่ต้อง network",
"Swappable — เปลี่ยน W&B → MLflow → test ผ่านหมด (same ports)",
]
for b in benefits:
print(f" • {b}")
test = TestingStrategy()
test.show_tests()
test.benefits()
FAQ - คำถามที่พบบ่อย
Q: Hexagonal Architecture จำเป็นสำหรับ ML projects ไหม?
A: ไม่จำเป็นทุก project: Small/POC: ไม่จำเป็น — เขียนตรงๆ เร็วกว่า Production ML systems: แนะนำมาก — เปลี่ยน tracking tool, data source, model store ได้ง่าย Team projects: จำเป็น — แบ่ง responsibilities ชัดเจน, test ง่าย กฎ: ถ้า project มีแนวโน้มจะเปลี่ยน infrastructure → ใช้ Hexagonal
Q: W&B กับ MLflow อันไหนดี?
A: W&B: UI สวย, collaboration ดีมาก, hosted (ไม่ต้อง manage), team features MLflow: Open source, self-hosted, free, customizable, community ใหญ่ W&B: เหมาะ teams ที่ต้องการ managed solution ($) MLflow: เหมาะ teams ที่ต้องการ control + ไม่มี budget ด้วย Hexagonal: เปลี่ยนได้ตลอด — ไม่ต้อง commit วันนี้
Q: Ports and Adapters ทำให้ code ซับซ้อนขึ้นไหม?
A: เพิ่ม boilerplate เล็กน้อย (interfaces + adapters) แต่: Tests เร็วขึ้น 10x (fake adapters), เปลี่ยน infrastructure ง่าย, team เข้าใจ boundaries ชัด, debug ง่ายกว่า Trade-off คุ้ม: เพิ่ม code 20% แต่ลด maintenance cost 50%+
Q: Dependency Injection ใน Python ทำยังไง?
A: Simple: Constructor injection (ส่ง dependencies ผ่าน __init__) Libraries: dependency-injector, inject, python-inject Recommended: Constructor injection สำหรับ ML projects — simple, explicit, ไม่ต้อง framework เพิ่ม ใน Composition Root (main.py): wire ทุก dependencies จุดเดียว
