CDK Construct Distributed System คืออะไร
AWS CDK (Cloud Development Kit) เป็น Infrastructure as Code framework ที่ใช้ programming languages เช่น Python, TypeScript, Java เขียน cloud infrastructure แทน YAML/JSON CDK Constructs คือ building blocks ที่ encapsulate AWS resources และ best practices เข้าด้วยกัน เป็น reusable components Distributed System คือระบบที่ประกอบด้วยหลาย services ทำงานร่วมกันบน multiple machines สื่อสารผ่าน network การรวม CDK Constructs กับ Distributed System patterns ช่วยให้สร้าง scalable, resilient cloud architectures ได้อย่างเป็นระบบ
CDK Construct Levels
# cdk_levels.py — CDK Construct levels
import json
class CDKConstructLevels:
LEVELS = {
"l1": {
"name": "L1 — CFN Resources",
"description": "1:1 mapping กับ CloudFormation resources — low-level, verbose",
"example": "CfnBucket, CfnFunction, CfnTable",
"use_case": "เมื่อต้องการ control ทุก property",
},
"l2": {
"name": "L2 — Curated Constructs",
"description": "AWS-maintained constructs ที่มี sensible defaults + helper methods",
"example": "s3.Bucket, lambda_.Function, dynamodb.Table",
"use_case": "ใช้เป็นหลัก — 90% ของงาน",
},
"l3": {
"name": "L3 — Patterns",
"description": "High-level patterns ที่รวมหลาย resources — opinionated architecture",
"example": "LambdaRestApi, ApplicationLoadBalancedFargateService",
"use_case": "Common architectures — API + Lambda, ECS + ALB",
},
"custom": {
"name": "Custom Constructs",
"description": "สร้าง constructs เอง — encapsulate company best practices",
"example": "MicroserviceConstruct, EventDrivenPipeline",
"use_case": "Internal platform engineering — reuse across teams",
},
}
def show_levels(self):
print("=== CDK Construct Levels ===\n")
for key, level in self.LEVELS.items():
print(f"[{level['name']}]")
print(f" {level['description']}")
print(f" Example: {level['example']}")
print()
levels = CDKConstructLevels()
levels.show_levels()
Distributed System Patterns
# patterns.py — Distributed system patterns with CDK
import json
class DistributedPatterns:
PATTERNS = {
"event_driven": {
"name": "Event-Driven Architecture",
"description": "Services สื่อสารผ่าน events — loose coupling, async processing",
"aws_services": "EventBridge, SQS, SNS, Kinesis",
"cdk": "EventBridge Rule → SQS Queue → Lambda Consumer",
},
"saga": {
"name": "Saga Pattern",
"description": "Distributed transactions — แต่ละ step มี compensating action",
"aws_services": "Step Functions, Lambda, DynamoDB",
"cdk": "Step Functions state machine กับ error handling + compensation",
},
"cqrs": {
"name": "CQRS (Command Query Responsibility Segregation)",
"description": "แยก write model (Command) กับ read model (Query)",
"aws_services": "DynamoDB (write) + DynamoDB Streams + ElasticSearch (read)",
"cdk": "DynamoDB Table + Stream + Lambda + OpenSearch",
},
"circuit_breaker": {
"name": "Circuit Breaker",
"description": "ป้องกัน cascading failures — หยุดเรียก service ที่ fail",
"aws_services": "Step Functions, Lambda with retry logic",
"cdk": "Step Functions with Retry + Catch + fallback Lambda",
},
"outbox": {
"name": "Transactional Outbox",
"description": "Reliable event publishing — write to DB + outbox table atomically",
"aws_services": "DynamoDB + Streams + EventBridge",
"cdk": "DynamoDB Table + Stream → Lambda → EventBridge",
},
}
def show_patterns(self):
print("=== Distributed System Patterns ===\n")
for key, p in self.PATTERNS.items():
print(f"[{p['name']}]")
print(f" {p['description']}")
print(f" AWS: {p['aws_services']}")
print(f" CDK: {p['cdk']}")
print()
patterns = DistributedPatterns()
patterns.show_patterns()
CDK Microservice Construct
# microservice.py — CDK Microservice construct
import json
class MicroserviceConstruct:
CODE = """
# microservice_construct.py — Reusable CDK construct
from aws_cdk import (
Stack, Duration, RemovalPolicy,
aws_lambda as lambda_,
aws_dynamodb as dynamodb,
aws_sqs as sqs,
aws_sns as sns,
aws_sns_subscriptions as sns_sub,
aws_apigateway as apigw,
aws_iam as iam,
aws_logs as logs,
)
from constructs import Construct
class MicroserviceConstruct(Construct):
'''Reusable microservice pattern: API → Lambda → DynamoDB + Events'''
def __init__(self, scope: Construct, id: str, *,
service_name: str,
table_partition_key: str = "id",
enable_events: bool = True,
memory_size: int = 256,
timeout_seconds: int = 30,
**kwargs):
super().__init__(scope, id, **kwargs)
# DynamoDB Table
self.table = dynamodb.Table(
self, "Table",
table_name=f"{service_name}-table",
partition_key=dynamodb.Attribute(
name=table_partition_key,
type=dynamodb.AttributeType.STRING
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
removal_policy=RemovalPolicy.RETAIN,
point_in_time_recovery=True,
stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES if enable_events else None,
)
# Dead Letter Queue
self.dlq = sqs.Queue(
self, "DLQ",
queue_name=f"{service_name}-dlq",
retention_period=Duration.days(14),
)
# Lambda Function
self.handler = lambda_.Function(
self, "Handler",
function_name=f"{service_name}-handler",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="index.handler",
code=lambda_.Code.from_asset(f"lambda/{service_name}"),
memory_size=memory_size,
timeout=Duration.seconds(timeout_seconds),
environment={
"TABLE_NAME": self.table.table_name,
"SERVICE_NAME": service_name,
},
dead_letter_queue=self.dlq,
tracing=lambda_.Tracing.ACTIVE,
log_retention=logs.RetentionDays.ONE_MONTH,
)
# Grant permissions
self.table.grant_read_write_data(self.handler)
# API Gateway
self.api = apigw.RestApi(
self, "Api",
rest_api_name=f"{service_name}-api",
default_cors_preflight_options=apigw.CorsOptions(
allow_origins=apigw.Cors.ALL_ORIGINS,
allow_methods=apigw.Cors.ALL_METHODS,
),
)
resource = self.api.root.add_resource(service_name)
resource.add_method("GET", apigw.LambdaIntegration(self.handler))
resource.add_method("POST", apigw.LambdaIntegration(self.handler))
item = resource.add_resource("{id}")
item.add_method("GET", apigw.LambdaIntegration(self.handler))
item.add_method("PUT", apigw.LambdaIntegration(self.handler))
item.add_method("DELETE", apigw.LambdaIntegration(self.handler))
# Event publishing (optional)
if enable_events:
self.event_topic = sns.Topic(
self, "EventTopic",
topic_name=f"{service_name}-events",
)
# DynamoDB Stream → Lambda → SNS
self.stream_handler = lambda_.Function(
self, "StreamHandler",
function_name=f"{service_name}-stream",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="stream.handler",
code=lambda_.Code.from_asset(f"lambda/{service_name}"),
environment={
"EVENT_TOPIC_ARN": self.event_topic.topic_arn,
},
timeout=Duration.seconds(60),
)
self.event_topic.grant_publish(self.stream_handler)
self.stream_handler.add_event_source(
lambda_.DynamoEventSource(
self.table,
starting_position=lambda_.StartingPosition.TRIM_HORIZON,
batch_size=10,
retry_attempts=3,
)
)
"""
def show_code(self):
print("=== Microservice Construct ===")
print(self.CODE[:600])
construct = MicroserviceConstruct()
construct.show_code()
Event-Driven Pipeline Construct
# event_pipeline.py — Event-driven pipeline construct
import json
class EventPipelineConstruct:
CODE = """
# event_pipeline_construct.py — CDK construct for event-driven pipeline
from aws_cdk import (
Stack, Duration,
aws_events as events,
aws_events_targets as targets,
aws_sqs as sqs,
aws_lambda as lambda_,
aws_stepfunctions as sfn,
aws_stepfunctions_tasks as tasks,
)
from constructs import Construct
class EventDrivenPipeline(Construct):
'''Event-driven processing pipeline with retry and DLQ'''
def __init__(self, scope: Construct, id: str, *,
pipeline_name: str,
source_event_bus: str = "default",
event_pattern: dict = None,
**kwargs):
super().__init__(scope, id, **kwargs)
# DLQ for failed events
self.dlq = sqs.Queue(
self, "DLQ",
queue_name=f"{pipeline_name}-dlq",
retention_period=Duration.days(14),
)
# Processing queue with DLQ
self.queue = sqs.Queue(
self, "Queue",
queue_name=f"{pipeline_name}-queue",
visibility_timeout=Duration.minutes(5),
dead_letter_queue=sqs.DeadLetterQueue(
max_receive_count=3,
queue=self.dlq,
),
)
# EventBridge Rule
self.rule = events.Rule(
self, "Rule",
rule_name=f"{pipeline_name}-rule",
event_bus=events.EventBus.from_event_bus_name(
self, "Bus", source_event_bus
) if source_event_bus != "default" else None,
event_pattern=events.EventPattern(**event_pattern) if event_pattern else None,
)
self.rule.add_target(targets.SqsQueue(self.queue))
# Step Functions for orchestration
# Step 1: Validate
validate = tasks.LambdaInvoke(
self, "Validate",
lambda_function=lambda_.Function(
self, "ValidateFn",
function_name=f"{pipeline_name}-validate",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="validate.handler",
code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
),
result_path="$.validation",
)
# Step 2: Process
process = tasks.LambdaInvoke(
self, "Process",
lambda_function=lambda_.Function(
self, "ProcessFn",
function_name=f"{pipeline_name}-process",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="process.handler",
code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
),
result_path="$.result",
).add_retry(
max_attempts=3,
interval=Duration.seconds(5),
backoff_rate=2,
)
# Step 3: Notify
notify = tasks.LambdaInvoke(
self, "Notify",
lambda_function=lambda_.Function(
self, "NotifyFn",
function_name=f"{pipeline_name}-notify",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="notify.handler",
code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
),
)
# Error handling
handle_error = tasks.LambdaInvoke(
self, "HandleError",
lambda_function=lambda_.Function(
self, "ErrorFn",
function_name=f"{pipeline_name}-error",
runtime=lambda_.Runtime.PYTHON_3_12,
handler="error.handler",
code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
),
)
# Chain
definition = validate.next(
process.add_catch(handle_error, result_path="$.error")
).next(notify)
self.state_machine = sfn.StateMachine(
self, "Pipeline",
state_machine_name=f"{pipeline_name}-pipeline",
definition_body=sfn.DefinitionBody.from_chainable(definition),
timeout=Duration.minutes(30),
)
"""
def show_code(self):
print("=== Event Pipeline Construct ===")
print(self.CODE[:600])
pipeline = EventPipelineConstruct()
pipeline.show_code()
Testing & Deployment
# testing.py — CDK testing and deployment
import json
class CDKTesting:
TEST_CODE = """
# test_constructs.py — CDK construct tests
import aws_cdk as cdk
from aws_cdk import assertions
import pytest
class TestMicroserviceConstruct:
def setup_method(self):
self.app = cdk.App()
self.stack = cdk.Stack(self.app, "TestStack")
def test_creates_dynamodb_table(self):
'''Test DynamoDB table is created'''
from constructs.microservice import MicroserviceConstruct
MicroserviceConstruct(self.stack, "Test",
service_name="orders",
)
template = assertions.Template.from_stack(self.stack)
template.has_resource_properties("AWS::DynamoDB::Table", {
"TableName": "orders-table",
"BillingMode": "PAY_PER_REQUEST",
})
def test_creates_lambda_function(self):
'''Test Lambda function is created'''
from constructs.microservice import MicroserviceConstruct
MicroserviceConstruct(self.stack, "Test",
service_name="orders",
memory_size=512,
)
template = assertions.Template.from_stack(self.stack)
template.has_resource_properties("AWS::Lambda::Function", {
"FunctionName": "orders-handler",
"MemorySize": 512,
"Runtime": "python3.12",
})
def test_creates_api_gateway(self):
'''Test API Gateway is created'''
from constructs.microservice import MicroserviceConstruct
MicroserviceConstruct(self.stack, "Test",
service_name="orders",
)
template = assertions.Template.from_stack(self.stack)
template.resource_count_is("AWS::ApiGateway::RestApi", 1)
def test_event_topic_when_enabled(self):
'''Test SNS topic created when events enabled'''
from constructs.microservice import MicroserviceConstruct
MicroserviceConstruct(self.stack, "Test",
service_name="orders",
enable_events=True,
)
template = assertions.Template.from_stack(self.stack)
template.has_resource_properties("AWS::SNS::Topic", {
"TopicName": "orders-events",
})
# pytest test_constructs.py -v
"""
DEPLOY_COMMANDS = """
# CDK deployment commands
# Install CDK
npm install -g aws-cdk
# Bootstrap (first time per account/region)
cdk bootstrap aws://ACCOUNT_ID/REGION
# Synthesize (generate CloudFormation)
cdk synth
# Diff (compare with deployed)
cdk diff
# Deploy
cdk deploy --all
# Deploy specific stack
cdk deploy OrdersServiceStack
# Destroy
cdk destroy --all
"""
def show_tests(self):
print("=== CDK Tests ===")
print(self.TEST_CODE[:500])
def show_deploy(self):
print(f"\n=== Deployment ===")
print(self.DEPLOY_COMMANDS[:400])
testing = CDKTesting()
testing.show_tests()
testing.show_deploy()
FAQ - คำถามที่พบบ่อย
Q: CDK กับ Terraform อันไหนดีกว่า?
A: CDK: ใช้ programming languages (Python, TS), AWS-native, constructs ดี, testing ง่าย Terraform: multi-cloud, mature ecosystem, HCL language, huge community เลือก CDK: ถ้า AWS only + ทีมเป็น developers + ต้องการ reusable constructs เลือก Terraform: ถ้า multi-cloud + ทีมมี Terraform experience + ต้องการ state management ที่แข็งแรง
Q: Custom Construct ควรมีขนาดใหญ่แค่ไหน?
A: Single Responsibility: construct ควรทำหน้าที่เดียว เช่น MicroserviceConstruct, DataPipelineConstruct ไม่ใหญ่เกินไป: ถ้า construct มี 20+ resources → แยกเป็น sub-constructs Composable: constructs เล็กรวมกันเป็นใหญ่ได้ — เหมือน LEGO ทดสอบได้: ต้อง unit test ได้ง่าย
Q: Distributed System ยากตรงไหน?
A: Network: latency, partitions, failures — ใช้ retry + circuit breaker Consistency: eventual consistency — ใช้ saga pattern, CQRS Observability: ดูยากว่า request ไปถึงไหน — ใช้ X-Ray distributed tracing Testing: integration tests ซับซ้อน — ใช้ LocalStack หรือ CDK assertions CDK ช่วย: encapsulate patterns เป็น constructs → ทีมใช้ได้โดยไม่ต้องรู้ทุก detail
Q: Event-Driven กับ Synchronous อันไหนดีกว่า?
A: Event-Driven: loose coupling, scalable, resilient — แต่ eventual consistency, debug ยาก Synchronous: simple, immediate response, strong consistency — แต่ tight coupling, cascading failures ใช้ Event-Driven: background processing, cross-service communication, high throughput ใช้ Synchronous: user-facing APIs ที่ต้อง immediate response ส่วนใหญ่: ผสมกัน — sync สำหรับ queries, async สำหรับ commands
