Technology

CDK Construct Distributed System

cdk construct distributed system
CDK Construct Distributed System | SiamCafe Blog
2025-07-08· อ. บอม — SiamCafe.net· 1,862 คำ

CDK Construct Distributed System คืออะไร

AWS CDK (Cloud Development Kit) เป็น Infrastructure as Code framework ที่ใช้ programming languages เช่น Python, TypeScript, Java เขียน cloud infrastructure แทน YAML/JSON CDK Constructs คือ building blocks ที่ encapsulate AWS resources และ best practices เข้าด้วยกัน เป็น reusable components Distributed System คือระบบที่ประกอบด้วยหลาย services ทำงานร่วมกันบน multiple machines สื่อสารผ่าน network การรวม CDK Constructs กับ Distributed System patterns ช่วยให้สร้าง scalable, resilient cloud architectures ได้อย่างเป็นระบบ

CDK Construct Levels

# cdk_levels.py — CDK Construct levels
import json

class CDKConstructLevels:
    LEVELS = {
        "l1": {
            "name": "L1 — CFN Resources",
            "description": "1:1 mapping กับ CloudFormation resources — low-level, verbose",
            "example": "CfnBucket, CfnFunction, CfnTable",
            "use_case": "เมื่อต้องการ control ทุก property",
        },
        "l2": {
            "name": "L2 — Curated Constructs",
            "description": "AWS-maintained constructs ที่มี sensible defaults + helper methods",
            "example": "s3.Bucket, lambda_.Function, dynamodb.Table",
            "use_case": "ใช้เป็นหลัก — 90% ของงาน",
        },
        "l3": {
            "name": "L3 — Patterns",
            "description": "High-level patterns ที่รวมหลาย resources — opinionated architecture",
            "example": "LambdaRestApi, ApplicationLoadBalancedFargateService",
            "use_case": "Common architectures — API + Lambda, ECS + ALB",
        },
        "custom": {
            "name": "Custom Constructs",
            "description": "สร้าง constructs เอง — encapsulate company best practices",
            "example": "MicroserviceConstruct, EventDrivenPipeline",
            "use_case": "Internal platform engineering — reuse across teams",
        },
    }

    def show_levels(self):
        print("=== CDK Construct Levels ===\n")
        for key, level in self.LEVELS.items():
            print(f"[{level['name']}]")
            print(f"  {level['description']}")
            print(f"  Example: {level['example']}")
            print()

levels = CDKConstructLevels()
levels.show_levels()

Distributed System Patterns

# patterns.py — Distributed system patterns with CDK
import json

class DistributedPatterns:
    PATTERNS = {
        "event_driven": {
            "name": "Event-Driven Architecture",
            "description": "Services สื่อสารผ่าน events — loose coupling, async processing",
            "aws_services": "EventBridge, SQS, SNS, Kinesis",
            "cdk": "EventBridge Rule → SQS Queue → Lambda Consumer",
        },
        "saga": {
            "name": "Saga Pattern",
            "description": "Distributed transactions — แต่ละ step มี compensating action",
            "aws_services": "Step Functions, Lambda, DynamoDB",
            "cdk": "Step Functions state machine กับ error handling + compensation",
        },
        "cqrs": {
            "name": "CQRS (Command Query Responsibility Segregation)",
            "description": "แยก write model (Command) กับ read model (Query)",
            "aws_services": "DynamoDB (write) + DynamoDB Streams + ElasticSearch (read)",
            "cdk": "DynamoDB Table + Stream + Lambda + OpenSearch",
        },
        "circuit_breaker": {
            "name": "Circuit Breaker",
            "description": "ป้องกัน cascading failures — หยุดเรียก service ที่ fail",
            "aws_services": "Step Functions, Lambda with retry logic",
            "cdk": "Step Functions with Retry + Catch + fallback Lambda",
        },
        "outbox": {
            "name": "Transactional Outbox",
            "description": "Reliable event publishing — write to DB + outbox table atomically",
            "aws_services": "DynamoDB + Streams + EventBridge",
            "cdk": "DynamoDB Table + Stream → Lambda → EventBridge",
        },
    }

    def show_patterns(self):
        print("=== Distributed System Patterns ===\n")
        for key, p in self.PATTERNS.items():
            print(f"[{p['name']}]")
            print(f"  {p['description']}")
            print(f"  AWS: {p['aws_services']}")
            print(f"  CDK: {p['cdk']}")
            print()

patterns = DistributedPatterns()
patterns.show_patterns()

CDK Microservice Construct

# microservice.py — CDK Microservice construct
import json

class MicroserviceConstruct:
    CODE = """
# microservice_construct.py — Reusable CDK construct
from aws_cdk import (
    Stack, Duration, RemovalPolicy,
    aws_lambda as lambda_,
    aws_dynamodb as dynamodb,
    aws_sqs as sqs,
    aws_sns as sns,
    aws_sns_subscriptions as sns_sub,
    aws_apigateway as apigw,
    aws_iam as iam,
    aws_logs as logs,
)
from constructs import Construct

class MicroserviceConstruct(Construct):
    '''Reusable microservice pattern: API → Lambda → DynamoDB + Events'''
    
    def __init__(self, scope: Construct, id: str, *,
                 service_name: str,
                 table_partition_key: str = "id",
                 enable_events: bool = True,
                 memory_size: int = 256,
                 timeout_seconds: int = 30,
                 **kwargs):
        super().__init__(scope, id, **kwargs)
        
        # DynamoDB Table
        self.table = dynamodb.Table(
            self, "Table",
            table_name=f"{service_name}-table",
            partition_key=dynamodb.Attribute(
                name=table_partition_key,
                type=dynamodb.AttributeType.STRING
            ),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            removal_policy=RemovalPolicy.RETAIN,
            point_in_time_recovery=True,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES if enable_events else None,
        )
        
        # Dead Letter Queue
        self.dlq = sqs.Queue(
            self, "DLQ",
            queue_name=f"{service_name}-dlq",
            retention_period=Duration.days(14),
        )
        
        # Lambda Function
        self.handler = lambda_.Function(
            self, "Handler",
            function_name=f"{service_name}-handler",
            runtime=lambda_.Runtime.PYTHON_3_12,
            handler="index.handler",
            code=lambda_.Code.from_asset(f"lambda/{service_name}"),
            memory_size=memory_size,
            timeout=Duration.seconds(timeout_seconds),
            environment={
                "TABLE_NAME": self.table.table_name,
                "SERVICE_NAME": service_name,
            },
            dead_letter_queue=self.dlq,
            tracing=lambda_.Tracing.ACTIVE,
            log_retention=logs.RetentionDays.ONE_MONTH,
        )
        
        # Grant permissions
        self.table.grant_read_write_data(self.handler)
        
        # API Gateway
        self.api = apigw.RestApi(
            self, "Api",
            rest_api_name=f"{service_name}-api",
            default_cors_preflight_options=apigw.CorsOptions(
                allow_origins=apigw.Cors.ALL_ORIGINS,
                allow_methods=apigw.Cors.ALL_METHODS,
            ),
        )
        
        resource = self.api.root.add_resource(service_name)
        resource.add_method("GET", apigw.LambdaIntegration(self.handler))
        resource.add_method("POST", apigw.LambdaIntegration(self.handler))
        
        item = resource.add_resource("{id}")
        item.add_method("GET", apigw.LambdaIntegration(self.handler))
        item.add_method("PUT", apigw.LambdaIntegration(self.handler))
        item.add_method("DELETE", apigw.LambdaIntegration(self.handler))
        
        # Event publishing (optional)
        if enable_events:
            self.event_topic = sns.Topic(
                self, "EventTopic",
                topic_name=f"{service_name}-events",
            )
            
            # DynamoDB Stream → Lambda → SNS
            self.stream_handler = lambda_.Function(
                self, "StreamHandler",
                function_name=f"{service_name}-stream",
                runtime=lambda_.Runtime.PYTHON_3_12,
                handler="stream.handler",
                code=lambda_.Code.from_asset(f"lambda/{service_name}"),
                environment={
                    "EVENT_TOPIC_ARN": self.event_topic.topic_arn,
                },
                timeout=Duration.seconds(60),
            )
            
            self.event_topic.grant_publish(self.stream_handler)
            
            self.stream_handler.add_event_source(
                lambda_.DynamoEventSource(
                    self.table,
                    starting_position=lambda_.StartingPosition.TRIM_HORIZON,
                    batch_size=10,
                    retry_attempts=3,
                )
            )
"""

    def show_code(self):
        print("=== Microservice Construct ===")
        print(self.CODE[:600])

construct = MicroserviceConstruct()
construct.show_code()

Event-Driven Pipeline Construct

# event_pipeline.py — Event-driven pipeline construct
import json

class EventPipelineConstruct:
    CODE = """
# event_pipeline_construct.py — CDK construct for event-driven pipeline
from aws_cdk import (
    Stack, Duration,
    aws_events as events,
    aws_events_targets as targets,
    aws_sqs as sqs,
    aws_lambda as lambda_,
    aws_stepfunctions as sfn,
    aws_stepfunctions_tasks as tasks,
)
from constructs import Construct

class EventDrivenPipeline(Construct):
    '''Event-driven processing pipeline with retry and DLQ'''
    
    def __init__(self, scope: Construct, id: str, *,
                 pipeline_name: str,
                 source_event_bus: str = "default",
                 event_pattern: dict = None,
                 **kwargs):
        super().__init__(scope, id, **kwargs)
        
        # DLQ for failed events
        self.dlq = sqs.Queue(
            self, "DLQ",
            queue_name=f"{pipeline_name}-dlq",
            retention_period=Duration.days(14),
        )
        
        # Processing queue with DLQ
        self.queue = sqs.Queue(
            self, "Queue",
            queue_name=f"{pipeline_name}-queue",
            visibility_timeout=Duration.minutes(5),
            dead_letter_queue=sqs.DeadLetterQueue(
                max_receive_count=3,
                queue=self.dlq,
            ),
        )
        
        # EventBridge Rule
        self.rule = events.Rule(
            self, "Rule",
            rule_name=f"{pipeline_name}-rule",
            event_bus=events.EventBus.from_event_bus_name(
                self, "Bus", source_event_bus
            ) if source_event_bus != "default" else None,
            event_pattern=events.EventPattern(**event_pattern) if event_pattern else None,
        )
        self.rule.add_target(targets.SqsQueue(self.queue))
        
        # Step Functions for orchestration
        # Step 1: Validate
        validate = tasks.LambdaInvoke(
            self, "Validate",
            lambda_function=lambda_.Function(
                self, "ValidateFn",
                function_name=f"{pipeline_name}-validate",
                runtime=lambda_.Runtime.PYTHON_3_12,
                handler="validate.handler",
                code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
            ),
            result_path="$.validation",
        )
        
        # Step 2: Process
        process = tasks.LambdaInvoke(
            self, "Process",
            lambda_function=lambda_.Function(
                self, "ProcessFn",
                function_name=f"{pipeline_name}-process",
                runtime=lambda_.Runtime.PYTHON_3_12,
                handler="process.handler",
                code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
            ),
            result_path="$.result",
        ).add_retry(
            max_attempts=3,
            interval=Duration.seconds(5),
            backoff_rate=2,
        )
        
        # Step 3: Notify
        notify = tasks.LambdaInvoke(
            self, "Notify",
            lambda_function=lambda_.Function(
                self, "NotifyFn",
                function_name=f"{pipeline_name}-notify",
                runtime=lambda_.Runtime.PYTHON_3_12,
                handler="notify.handler",
                code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
            ),
        )
        
        # Error handling
        handle_error = tasks.LambdaInvoke(
            self, "HandleError",
            lambda_function=lambda_.Function(
                self, "ErrorFn",
                function_name=f"{pipeline_name}-error",
                runtime=lambda_.Runtime.PYTHON_3_12,
                handler="error.handler",
                code=lambda_.Code.from_asset(f"lambda/{pipeline_name}"),
            ),
        )
        
        # Chain
        definition = validate.next(
            process.add_catch(handle_error, result_path="$.error")
        ).next(notify)
        
        self.state_machine = sfn.StateMachine(
            self, "Pipeline",
            state_machine_name=f"{pipeline_name}-pipeline",
            definition_body=sfn.DefinitionBody.from_chainable(definition),
            timeout=Duration.minutes(30),
        )
"""

    def show_code(self):
        print("=== Event Pipeline Construct ===")
        print(self.CODE[:600])

pipeline = EventPipelineConstruct()
pipeline.show_code()

Testing & Deployment

# testing.py — CDK testing and deployment
import json

class CDKTesting:
    TEST_CODE = """
# test_constructs.py — CDK construct tests
import aws_cdk as cdk
from aws_cdk import assertions
import pytest

class TestMicroserviceConstruct:
    def setup_method(self):
        self.app = cdk.App()
        self.stack = cdk.Stack(self.app, "TestStack")
    
    def test_creates_dynamodb_table(self):
        '''Test DynamoDB table is created'''
        from constructs.microservice import MicroserviceConstruct
        
        MicroserviceConstruct(self.stack, "Test",
            service_name="orders",
        )
        
        template = assertions.Template.from_stack(self.stack)
        template.has_resource_properties("AWS::DynamoDB::Table", {
            "TableName": "orders-table",
            "BillingMode": "PAY_PER_REQUEST",
        })
    
    def test_creates_lambda_function(self):
        '''Test Lambda function is created'''
        from constructs.microservice import MicroserviceConstruct
        
        MicroserviceConstruct(self.stack, "Test",
            service_name="orders",
            memory_size=512,
        )
        
        template = assertions.Template.from_stack(self.stack)
        template.has_resource_properties("AWS::Lambda::Function", {
            "FunctionName": "orders-handler",
            "MemorySize": 512,
            "Runtime": "python3.12",
        })
    
    def test_creates_api_gateway(self):
        '''Test API Gateway is created'''
        from constructs.microservice import MicroserviceConstruct
        
        MicroserviceConstruct(self.stack, "Test",
            service_name="orders",
        )
        
        template = assertions.Template.from_stack(self.stack)
        template.resource_count_is("AWS::ApiGateway::RestApi", 1)
    
    def test_event_topic_when_enabled(self):
        '''Test SNS topic created when events enabled'''
        from constructs.microservice import MicroserviceConstruct
        
        MicroserviceConstruct(self.stack, "Test",
            service_name="orders",
            enable_events=True,
        )
        
        template = assertions.Template.from_stack(self.stack)
        template.has_resource_properties("AWS::SNS::Topic", {
            "TopicName": "orders-events",
        })

# pytest test_constructs.py -v
"""

    DEPLOY_COMMANDS = """
# CDK deployment commands

# Install CDK
npm install -g aws-cdk

# Bootstrap (first time per account/region)
cdk bootstrap aws://ACCOUNT_ID/REGION

# Synthesize (generate CloudFormation)
cdk synth

# Diff (compare with deployed)
cdk diff

# Deploy
cdk deploy --all

# Deploy specific stack
cdk deploy OrdersServiceStack

# Destroy
cdk destroy --all
"""

    def show_tests(self):
        print("=== CDK Tests ===")
        print(self.TEST_CODE[:500])

    def show_deploy(self):
        print(f"\n=== Deployment ===")
        print(self.DEPLOY_COMMANDS[:400])

testing = CDKTesting()
testing.show_tests()
testing.show_deploy()

FAQ - คำถามที่พบบ่อย

Q: CDK กับ Terraform อันไหนดีกว่า?

A: CDK: ใช้ programming languages (Python, TS), AWS-native, constructs ดี, testing ง่าย Terraform: multi-cloud, mature ecosystem, HCL language, huge community เลือก CDK: ถ้า AWS only + ทีมเป็น developers + ต้องการ reusable constructs เลือก Terraform: ถ้า multi-cloud + ทีมมี Terraform experience + ต้องการ state management ที่แข็งแรง

Q: Custom Construct ควรมีขนาดใหญ่แค่ไหน?

A: Single Responsibility: construct ควรทำหน้าที่เดียว เช่น MicroserviceConstruct, DataPipelineConstruct ไม่ใหญ่เกินไป: ถ้า construct มี 20+ resources → แยกเป็น sub-constructs Composable: constructs เล็กรวมกันเป็นใหญ่ได้ — เหมือน LEGO ทดสอบได้: ต้อง unit test ได้ง่าย

Q: Distributed System ยากตรงไหน?

A: Network: latency, partitions, failures — ใช้ retry + circuit breaker Consistency: eventual consistency — ใช้ saga pattern, CQRS Observability: ดูยากว่า request ไปถึงไหน — ใช้ X-Ray distributed tracing Testing: integration tests ซับซ้อน — ใช้ LocalStack หรือ CDK assertions CDK ช่วย: encapsulate patterns เป็น constructs → ทีมใช้ได้โดยไม่ต้องรู้ทุก detail

Q: Event-Driven กับ Synchronous อันไหนดีกว่า?

A: Event-Driven: loose coupling, scalable, resilient — แต่ eventual consistency, debug ยาก Synchronous: simple, immediate response, strong consistency — แต่ tight coupling, cascading failures ใช้ Event-Driven: background processing, cross-service communication, high throughput ใช้ Synchronous: user-facing APIs ที่ต้อง immediate response ส่วนใหญ่: ผสมกัน — sync สำหรับ queries, async สำหรับ commands

📖 บทความที่เกี่ยวข้อง

Svelte 5 Runes Distributed Systemอ่านบทความ → Python Pydantic Distributed Systemอ่านบทความ → CDK Construct Kubernetes Deploymentอ่านบทความ → CDK Construct Database Migrationอ่านบทความ → HTTP/3 QUIC Distributed Systemอ่านบทความ →

📚 ดูบทความทั้งหมด →