Snyk Hexagonal Security
Snyk Code Security Hexagonal Architecture Ports Adapters SAST SCA Container IaC Domain Application Infrastructure CI/CD Production
| Snyk Product | Hex Layer | What It Checks | Example Finding |
|---|---|---|---|
| Snyk Code (SAST) | All Layers | Source Code Vulnerabilities | SQL Injection in DB Adapter |
| Snyk Open Source (SCA) | Infrastructure | Dependency CVEs | CVE in express lodash |
| Snyk Container | Infrastructure | Docker Image Vulnerabilities | CVE in base image OS package |
| Snyk IaC | Infrastructure | Terraform K8s Misconfig | Open Security Group S3 Public |
Hexagonal Architecture
# === Hexagonal Architecture Structure ===
# Project Structure:
# src/
# ├── domain/ # Core Business Logic (NO dependencies)
# │ ├── entities/ # User Order Product
# │ ├── value-objects/ # Email Password Money
# │ ├── services/ # DomainService
# │ └── ports/ # Interfaces (Repository, Gateway)
# ├── application/ # Use Cases (Orchestration)
# │ ├── use-cases/ # CreateOrder, AuthenticateUser
# │ ├── dto/ # Request/Response DTOs
# │ └── services/ # ApplicationService
# └── infrastructure/ # External Adapters
# ├── adapters/
# │ ├── http/ # Express/Fastify Controllers
# │ ├── database/ # PostgreSQL/MongoDB Repositories
# │ ├── queue/ # RabbitMQ/Kafka Producers
# │ └── external/ # Payment/Email API Clients
# └── config/ # Database Config, Env Variables
from dataclasses import dataclass
@dataclass
class HexLayer:
layer: str
responsibility: str
dependencies: str
security_concern: str
snyk_product: str
layers = [
HexLayer("Domain (Core)",
"Business Logic Entities Rules Validation",
"ไม่มี Dependencies ภายนอก (Pure Code)",
"Input Validation Business Rule Enforcement",
"Snyk Code: ตรวจ Logic Flaw Injection ใน Validation"),
HexLayer("Application (Use Cases)",
"Orchestrate Domain Logic Auth Authorization",
"Domain Layer Only",
"Authentication Authorization Audit Logging",
"Snyk Code: ตรวจ Auth Bypass Missing Authorization"),
HexLayer("Infrastructure (Adapters)",
"Database HTTP Queue External API Implementation",
"Application + Domain + External Libraries",
"SQL Injection XSS SSRF Dependency CVE",
"Snyk Code + Open Source + Container + IaC"),
HexLayer("Ports (Interfaces)",
"Define Contract ระหว่าง Layer",
"ไม่มี (Interface Only)",
"Input/Output Validation Contract Enforcement",
"Snyk Code: ตรวจ Type Safety Contract Violation"),
]
print("=== Hexagonal Layers ===")
for l in layers:
print(f"\n [{l.layer}] {l.responsibility}")
print(f" Deps: {l.dependencies}")
print(f" Security: {l.security_concern}")
print(f" Snyk: {l.snyk_product}")
Security per Layer
# === Security Implementation per Layer ===
# Domain Layer Security (Pure - No Framework)
# class Email:
# def __init__(self, value: str):
# if not re.match(r'^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$', value):
# raise ValueError("Invalid email format")
# self.value = value
#
# class Password:
# MIN_LENGTH = 12
# def __init__(self, raw: str):
# if len(raw) < self.MIN_LENGTH:
# raise ValueError(f"Password must be >= {self.MIN_LENGTH} chars")
# if not re.search(r'[A-Z]', raw):
# raise ValueError("Password must contain uppercase")
# self.hashed = bcrypt.hashpw(raw.encode(), bcrypt.gensalt())
# Application Layer Security
# class AuthenticateUser:
# def __init__(self, user_repo: UserRepository, token_service: TokenService):
# self.user_repo = user_repo
# self.token_service = token_service
#
# async def execute(self, email: str, password: str) -> AuthResult:
# user = await self.user_repo.find_by_email(Email(email))
# if not user or not user.verify_password(password):
# audit_log("AUTH_FAILED", email) # Audit
# raise AuthenticationError("Invalid credentials")
# audit_log("AUTH_SUCCESS", email)
# return AuthResult(token=self.token_service.generate(user))
@dataclass
class SecurityCheck:
layer: str
check: str
implementation: str
snyk_rule: str
checks = [
SecurityCheck("Domain",
"Input Validation (Email Password Amount)",
"Value Object Constructor Validation",
"Snyk Code: Improper Input Validation (CWE-20)"),
SecurityCheck("Domain",
"Business Rule Enforcement",
"Domain Service Invariant Check",
"Snyk Code: Business Logic Bypass"),
SecurityCheck("Application",
"Authentication (Login MFA Token)",
"Use Case + Token Service + User Repository",
"Snyk Code: Broken Authentication (CWE-287)"),
SecurityCheck("Application",
"Authorization (RBAC ABAC)",
"Use Case Decorator/Middleware",
"Snyk Code: Missing Authorization (CWE-862)"),
SecurityCheck("Infrastructure",
"SQL Injection Prevention",
"Parameterized Query ใน Database Adapter",
"Snyk Code: SQL Injection (CWE-89)"),
SecurityCheck("Infrastructure",
"XSS Prevention",
"Output Encoding ใน HTTP Adapter",
"Snyk Code: Cross-site Scripting (CWE-79)"),
SecurityCheck("Infrastructure",
"Dependency Vulnerabilities",
"Lock File + Version Pinning",
"Snyk Open Source: Known CVE Detection"),
]
print("=== Security Checks per Layer ===")
for c in checks:
print(f" [{c.layer}] {c.check}")
print(f" Impl: {c.implementation}")
print(f" Snyk: {c.snyk_rule}")
CI/CD Pipeline
# === Snyk Security Pipeline ===
# GitHub Actions
# name: Security Pipeline
# on: [push, pull_request]
# jobs:
# snyk-code:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - name: Snyk Code (SAST)
# uses: snyk/actions/node@master
# env:
# SNYK_TOKEN: }
# with:
# command: code test
# args: --severity-threshold=high
# snyk-oss:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - name: Snyk Open Source (SCA)
# uses: snyk/actions/node@master
# env:
# SNYK_TOKEN: }
# with:
# command: test
# args: --severity-threshold=high
# snyk-container:
# runs-on: ubuntu-latest
# steps:
# - name: Snyk Container
# uses: snyk/actions/docker@master
# env:
# SNYK_TOKEN: }
# with:
# image: my-app:latest
# args: --severity-threshold=high
@dataclass
class PipelineStage:
stage: str
snyk_command: str
hex_layer: str
fail_criteria: str
stages = [
PipelineStage("Snyk Code (SAST)",
"snyk code test --severity-threshold=high",
"Domain + Application + Infrastructure",
"Any High/Critical → Block PR"),
PipelineStage("Snyk Open Source (SCA)",
"snyk test --severity-threshold=high",
"Infrastructure (Dependencies)",
"High/Critical CVE → Block + Auto-fix PR"),
PipelineStage("Unit Tests",
"npm test -- --coverage",
"Domain + Application (90%+ Coverage)",
"Test Fail or Coverage < 90% → Block"),
PipelineStage("Snyk Container",
"snyk container test image:tag",
"Infrastructure (Docker)",
"Critical CVE in Base Image → Block"),
PipelineStage("Snyk IaC",
"snyk iac test terraform/",
"Infrastructure (Cloud Config)",
"High Severity Misconfig → Block"),
PipelineStage("DAST (Staging)",
"OWASP ZAP / Burp Suite scan staging URL",
"All Layers (Runtime)",
"Critical Finding → Block Release"),
]
print("=== Security Pipeline ===")
for s in stages:
print(f" [{s.stage}]")
print(f" Command: {s.snyk_command}")
print(f" Layer: {s.hex_layer}")
print(f" Fail: {s.fail_criteria}")
เคล็ดลับ
- Domain Pure: Domain Layer ต้องไม่มี External Dependency ลด Attack Surface
- Validation: Validate ใน Domain (Value Object) ไม่ใช่แค่ Controller
- Snyk Monitor: ใช้ snyk monitor ตรวจ Production Dependencies ต่อเนื่อง
- Auto-fix: เปิด Snyk Auto-fix PR สำหรับ Dependency Upgrade
- Port Security: กำหนด Security Contract ใน Port Interface
Snyk คืออะไร
Developer Security Platform SAST SCA Container IaC Code Dependencies Docker Terraform CVE Fix Recommendation CI/CD GitHub Free Team
Hexagonal Architecture คืออะไร
Ports Adapters Domain Application Infrastructure Core Business Logic Testable Flexible Maintainable Inbound Outbound Interface Contract
Security Layers ออกแบบอย่างไร
Domain Validation Value Object Application Auth RBAC Infrastructure SQL Injection XSS CSRF Port Contract Adapter Protocol Security
CI/CD Security Pipeline ทำอย่างไร
Snyk Code SAST Snyk OSS SCA Snyk Container IaC GitHub Actions Block PR Quality Gate Auto-fix Monitor DAST ZAP Burp Pipeline
สรุป
Snyk Code Security Hexagonal Architecture Ports Adapters SAST SCA Container Domain Application Infrastructure CI/CD Pipeline Production
