Technology

CDK Construct Batch Processing Pipeline สร้าง Reusable Infrastructure บน AWS

CDK Construct Batch Processing Pipeline | SiamCafe Blog
2026-03-01· อ. บอม — SiamCafe.net· 1,224 คำ

AWS CDK Construct ?????????????????? Batch Processing ?????????????????????

AWS CDK (Cloud Development Kit) ???????????? Infrastructure as Code framework ?????????????????? programming languages (TypeScript, Python, Java, C#) ????????? YAML/JSON ?????????????????????????????? reusable infrastructure components ????????????????????? CDK Construct ????????? building block ?????????????????????????????? AWS resources ????????????????????????????????? component ???????????????

Batch Processing Pipeline ????????????????????? ?????????????????????????????????????????????????????????????????????????????????????????? batch ???????????? ETL jobs, report generation, image/video processing, machine learning training ?????? AWS ????????? services ???????????? AWS Batch ?????????????????? containerized batch workloads, AWS Step Functions ?????????????????? orchestration, AWS Lambda ?????????????????? lightweight processing, Amazon SQS ?????????????????? message queuing, Amazon S3 ?????????????????? data storage

CDK Construct ?????????????????? Batch Processing ????????? services ???????????????????????????????????? reusable component ???????????????????????????????????????????????????????????????????????????????????????????????????????????????????????? infrastructure ?????? boilerplate code enforce best practices (encryption, logging, monitoring) ???????????????????????????

????????????????????? CDK Project

Setup CDK project ?????????????????? batch processing

# === CDK Project Setup ===

# 1. Install CDK CLI
npm install -g aws-cdk

# 2. Create CDK Project (TypeScript)
mkdir batch-pipeline && cd batch-pipeline
cdk init app --language typescript

# 3. Install Dependencies
npm install @aws-cdk/aws-batch-alpha @aws-cdk/aws-stepfunctions-tasks-alpha
npm install aws-cdk-lib constructs

# 4. Project Structure
cat > project-structure.txt << 'EOF'
batch-pipeline/
  lib/
    constructs/
      batch-job.ts          # Reusable Batch Job construct
      processing-pipeline.ts # Full pipeline construct
      monitoring.ts          # Monitoring construct
    stacks/
      batch-stack.ts         # Main stack
      monitoring-stack.ts    # Monitoring stack
  bin/
    app.ts                   # CDK app entry point
  test/
    batch-job.test.ts        # Unit tests
  cdk.json
  tsconfig.json
  package.json
EOF

# 5. CDK Configuration
cat > cdk.json << 'EOF'
{
  "app": "npx ts-node --prefer-ts-exts bin/app.ts",
  "context": {
    "@aws-cdk/core:stackRelativeExports": true,
    "@aws-cdk/aws-lambda:recognizeVersionProps": true
  }
}
EOF

# 6. Verify
cdk --version
cdk ls

echo "CDK project initialized"

??????????????? Batch Processing Construct

Reusable CDK Construct ?????????????????? batch jobs

// === CDK Batch Processing Construct ===

// File: lib/constructs/batch-job.ts
import * as cdk from 'aws-cdk-lib';
import * as batch from 'aws-cdk-lib/aws-batch';
import * as ec2 from 'aws-cdk-lib/aws-ec2';
import * as ecs from 'aws-cdk-lib/aws-ecs';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';

export interface BatchJobProps {
  readonly jobName: string;
  readonly containerImage: string;
  readonly vcpus: number;
  readonly memoryMiB: number;
  readonly environment?: Record<string, string>;
  readonly inputBucket: s3.IBucket;
  readonly outputBucket: s3.IBucket;
  readonly maxRetries?: number;
  readonly timeoutMinutes?: number;
  readonly useSpot?: boolean;
}

export class BatchJob extends Construct {
  public readonly jobDefinition: batch.EcsJobDefinition;
  public readonly jobQueue: batch.JobQueue;
  public readonly computeEnvironment: batch.ManagedEc2EcsComputeEnvironment;

  constructor(scope: Construct, id: string, props: BatchJobProps) {
    super(scope, id);

    // VPC
    const vpc = new ec2.Vpc(this, 'BatchVpc', {
      maxAzs: 2,
      natGateways: 1,
    });

    // Log Group
    const logGroup = new logs.LogGroup(this, 'BatchLogs', {
      logGroupName: `/batch/`,
      retention: logs.RetentionDays.ONE_MONTH,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    // IAM Role for Batch Jobs
    const jobRole = new iam.Role(this, 'JobRole', {
      assumedBy: new iam.ServicePrincipal('ecs-tasks.amazonaws.com'),
    });
    props.inputBucket.grantRead(jobRole);
    props.outputBucket.grantWrite(jobRole);

    // Compute Environment
    this.computeEnvironment = new batch.ManagedEc2EcsComputeEnvironment(this, 'ComputeEnv', {
      vpc,
      instanceTypes: [
        ec2.InstanceType.of(ec2.InstanceClass.C6I, ec2.InstanceSize.XLARGE),
        ec2.InstanceType.of(ec2.InstanceClass.M6I, ec2.InstanceSize.XLARGE),
      ],
      maxvCpus: 256,
      minvCpus: 0,
      spot: props.useSpot ?? true,
      spotBidPercentage: 80,
    });

    // Job Queue
    this.jobQueue = new batch.JobQueue(this, 'JobQueue', {
      computeEnvironments: [{
        computeEnvironment: this.computeEnvironment,
        order: 1,
      }],
      priority: 10,
    });

    // Container Definition
    const container = new batch.EcsEc2ContainerDefinition(this, 'Container', {
      image: ecs.ContainerImage.fromRegistry(props.containerImage),
      cpu: props.vcpus,
      memory: cdk.Size.mebibytes(props.memoryMiB),
      jobRole,
      environment: {
        INPUT_BUCKET: props.inputBucket.bucketName,
        OUTPUT_BUCKET: props.outputBucket.bucketName,
        ...props.environment,
      },
      logging: ecs.LogDrivers.awsLogs({
        logGroup,
        streamPrefix: props.jobName,
      }),
    });

    // Job Definition
    this.jobDefinition = new batch.EcsJobDefinition(this, 'JobDef', {
      container,
      retryAttempts: props.maxRetries ?? 3,
      timeout: cdk.Duration.minutes(props.timeoutMinutes ?? 60),
    });

    // Outputs
    new cdk.CfnOutput(this, 'JobQueueArn', { value: this.jobQueue.jobQueueArn });
    new cdk.CfnOutput(this, 'JobDefArn', { value: this.jobDefinition.jobDefinitionArn });
  }
}

Step Functions Orchestration

Orchestrate batch pipeline ???????????? Step Functions

// === Step Functions Pipeline Construct ===

// File: lib/constructs/processing-pipeline.ts
import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as batch from 'aws-cdk-lib/aws-batch';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as s3 from 'aws-cdk-lib/aws-s3';
import { Construct } from 'constructs';

export interface ProcessingPipelineProps {
  readonly pipelineName: string;
  readonly inputBucket: s3.IBucket;
  readonly outputBucket: s3.IBucket;
  readonly jobDefinition: batch.IJobDefinition;
  readonly jobQueue: batch.IJobQueue;
  readonly notificationTopic?: sns.ITopic;
}

export class ProcessingPipeline extends Construct {
  public readonly stateMachine: sfn.StateMachine;

  constructor(scope: Construct, id: string, props: ProcessingPipelineProps) {
    super(scope, id);

    // Validation Lambda
    const validateFn = new lambda.Function(this, 'ValidateFn', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'index.handler',
      code: lambda.Code.fromInline(`
import json
def handler(event, context):
    input_key = event.get('inputKey', '')
    if not input_key:
        raise ValueError('inputKey is required')
    return {
        'status': 'validated',
        'inputKey': input_key,
        'timestamp': '2024-01-01T00:00:00Z',
    }
      `),
      timeout: cdk.Duration.seconds(30),
    });

    // Post-processing Lambda
    const postProcessFn = new lambda.Function(this, 'PostProcessFn', {
      runtime: lambda.Runtime.PYTHON_3_12,
      handler: 'index.handler',
      code: lambda.Code.fromInline(`
import json
def handler(event, context):
    return {
        'status': 'completed',
        'outputKey': event.get('outputKey', 'output/result.json'),
        'processingTime': 120,
    }
      `),
      timeout: cdk.Duration.seconds(30),
    });

    // Step Function Tasks
    const validateStep = new tasks.LambdaInvoke(this, 'Validate Input', {
      lambdaFunction: validateFn,
      outputPath: '$.Payload',
    });

    const batchStep = new tasks.BatchSubmitJob(this, 'Run Batch Job', {
      jobDefinitionArn: props.jobDefinition.jobDefinitionArn,
      jobQueueArn: props.jobQueue.jobQueueArn,
      jobName: sfn.JsonPath.format('batch-{}', sfn.JsonPath.stringAt('$.inputKey')),
      containerOverrides: {
        environment: {
          INPUT_KEY: sfn.JsonPath.stringAt('$.inputKey'),
        },
      },
      integrationPattern: sfn.IntegrationPattern.RUN_JOB,
    });

    const postProcessStep = new tasks.LambdaInvoke(this, 'Post Process', {
      lambdaFunction: postProcessFn,
      outputPath: '$.Payload',
    });

    const successState = new sfn.Succeed(this, 'Pipeline Complete');
    const failState = new sfn.Fail(this, 'Pipeline Failed', {
      cause: 'Batch job failed after retries',
    });

    // Error handling
    batchStep.addCatch(failState, { resultPath: '$.error' });

    // Pipeline definition
    const definition = validateStep
      .next(batchStep)
      .next(postProcessStep)
      .next(successState);

    // State Machine
    this.stateMachine = new sfn.StateMachine(this, 'Pipeline', {
      stateMachineName: props.pipelineName,
      definitionBody: sfn.DefinitionBody.fromChainable(definition),
      timeout: cdk.Duration.hours(4),
      tracingEnabled: true,
    });

    new cdk.CfnOutput(this, 'StateMachineArn', {
      value: this.stateMachine.stateMachineArn,
    });
  }
}

Error Handling ????????? Retry Logic

?????????????????? errors ????????? retries

#!/usr/bin/env python3
# error_handling.py ??? Batch Pipeline Error Handling
import json
import logging
from typing import Dict, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("errors")

class BatchErrorHandler:
    def __init__(self):
        self.strategies = {}
    
    def error_handling_patterns(self):
        return {
            "step_functions_retry": {
                "description": "Built-in retry ?????? Step Functions",
                "config": {
                    "Retry": [{
                        "ErrorEquals": ["States.TaskFailed", "Batch.JobFailed"],
                        "IntervalSeconds": 30,
                        "MaxAttempts": 3,
                        "BackoffRate": 2.0,
                    }],
                    "Catch": [{
                        "ErrorEquals": ["States.ALL"],
                        "ResultPath": "$.error",
                        "Next": "HandleError",
                    }],
                },
            },
            "dead_letter_queue": {
                "description": "????????? failed messages ?????? DLQ ?????????????????? manual review",
                "sqs_config": {
                    "maxReceiveCount": 3,
                    "deadLetterTargetArn": "arn:aws:sqs:ap-southeast-1:123456:batch-dlq",
                },
            },
            "circuit_breaker": {
                "description": "???????????? submit jobs ??????????????? failure rate ?????????",
                "thresholds": {
                    "failure_rate": 50,
                    "consecutive_failures": 5,
                    "cooldown_minutes": 15,
                },
            },
            "partial_retry": {
                "description": "Retry ??????????????? items ????????? fail ????????????????????? reprocess ???????????? batch",
                "implementation": "Track processed items ?????? DynamoDB checkpoint table",
            },
        }
    
    def categorize_error(self, error_type):
        """Categorize error and recommend action"""
        categories = {
            "transient": {
                "errors": ["Timeout", "ThrottlingException", "ServiceUnavailable", "InternalError"],
                "action": "RETRY with exponential backoff",
                "max_retries": 3,
            },
            "resource": {
                "errors": ["InsufficientCapacity", "OutOfMemory", "DiskFull"],
                "action": "RETRY with larger instance or FAIL",
                "max_retries": 1,
            },
            "data": {
                "errors": ["InvalidInput", "MalformedData", "SchemaViolation"],
                "action": "SKIP item, log to DLQ, continue batch",
                "max_retries": 0,
            },
            "permanent": {
                "errors": ["AccessDenied", "ResourceNotFound", "InvalidConfiguration"],
                "action": "FAIL immediately, alert team",
                "max_retries": 0,
            },
        }
        
        for category, info in categories.items():
            if error_type in info["errors"]:
                return {"category": category, **info}
        
        return {"category": "unknown", "action": "RETRY once then FAIL", "max_retries": 1}

handler = BatchErrorHandler()
patterns = handler.error_handling_patterns()
print("Error Handling Patterns:")
for name, info in patterns.items():
    print(f"  {name}: {info['description']}")

# Test error categorization
for error in ["Timeout", "InvalidInput", "AccessDenied", "OutOfMemory"]:
    result = handler.categorize_error(error)
    print(f"\n  {error} ??? {result['category']}: {result['action']}")

Monitoring ????????? Cost Optimization

?????????????????? pipeline performance ?????????????????????????????????????????????

# === Monitoring & Cost Optimization ===

# 1. CloudWatch Dashboard (CDK)
cat > monitoring_construct.ts << 'EOF'
// File: lib/constructs/monitoring.ts
import * as cdk from 'aws-cdk-lib';
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
import * as sns from 'aws-cdk-lib/aws-sns';
import { Construct } from 'constructs';

export class BatchMonitoring extends Construct {
  constructor(scope: Construct, id: string) {
    super(scope, id);

    const dashboard = new cloudwatch.Dashboard(this, 'Dashboard', {
      dashboardName: 'BatchProcessingPipeline',
    });

    // Metrics
    const jobsSubmitted = new cloudwatch.Metric({
      namespace: 'AWS/Batch',
      metricName: 'JobsSubmitted',
      statistic: 'Sum',
      period: cdk.Duration.minutes(5),
    });

    const jobsFailed = new cloudwatch.Metric({
      namespace: 'AWS/Batch',
      metricName: 'JobsFailed',
      statistic: 'Sum',
      period: cdk.Duration.minutes(5),
    });

    // Dashboard Widgets
    dashboard.addWidgets(
      new cloudwatch.GraphWidget({
        title: 'Batch Jobs',
        left: [jobsSubmitted, jobsFailed],
        width: 12,
      }),
    );

    // Alarms
    const failureAlarm = new cloudwatch.Alarm(this, 'FailureAlarm', {
      metric: jobsFailed,
      threshold: 5,
      evaluationPeriods: 2,
      alarmDescription: 'Batch job failures exceeded threshold',
    });
  }
}
EOF

# 2. Cost Optimization Strategies
cat > cost_optimization.json << 'EOF'
{
  "cost_strategies": {
    "spot_instances": {
      "savings": "60-90% vs On-Demand",
      "config": "useSpot: true, spotBidPercentage: 80",
      "risk": "Jobs may be interrupted, use checkpointing"
    },
    "right_sizing": {
      "description": "Match instance type to workload",
      "cpu_intensive": "c6i instances",
      "memory_intensive": "r6i instances",
      "general": "m6i instances",
      "gpu": "p4d/g5 instances"
    },
    "scheduling": {
      "description": "Run batch jobs during off-peak hours",
      "off_peak": "Weekends, nights (lower spot prices)",
      "savings": "10-30% additional on spot"
    },
    "auto_scaling": {
      "min_vcpus": 0,
      "max_vcpus": 256,
      "description": "Scale to 0 when no jobs, scale up when needed"
    },
    "data_transfer": {
      "same_region": "Keep S3 and Batch in same region (free transfer)",
      "compression": "Compress data before transfer",
      "vpc_endpoints": "Use S3 VPC endpoint (free, no NAT cost)"
    }
  }
}
EOF

python3 -c "
import json
with open('cost_optimization.json') as f:
    data = json.load(f)
print('Cost Optimization:')
for name, info in data['cost_strategies'].items():
    if isinstance(info, dict):
        desc = info.get('description', info.get('savings', ''))
        print(f'  {name}: {desc}')
"

echo "Monitoring configured"

FAQ ??????????????????????????????????????????

Q: CDK Construct ????????? CloudFormation Module ???????????????????????????????????????????

A: CDK Construct ??????????????????????????? programming language (TypeScript, Python) ?????? IDE support, type checking, unit testing ??????????????????????????? ??????????????? abstractions ????????????????????? ????????? loops, conditions, functions ???????????? compile ???????????? CloudFormation template CloudFormation Module ??????????????????????????? YAML/JSON register ?????? CloudFormation Registry ????????????????????? resource type ?????? templates ??????????????? programming language features CDK Construct ???????????????????????????????????????????????????????????? ??????????????????????????????????????????????????????????????? programming language ????????????????????????????????? pure CloudFormation

Q: AWS Batch ????????? ECS Fargate ?????????????????? batch jobs ???????????????????????????????????????????

A: AWS Batch ???????????????????????????????????? batch workloads ???????????????????????? ?????? job scheduling, job queues, compute environment management ?????????????????? Spot instances ???????????? auto-scaling ????????? job queue depth ??????????????? large-scale batch processing, HPC, ML training ECS Fargate ???????????? general-purpose container orchestration ???????????? manage scheduling ????????? (????????????????????? Step Functions) ??????????????? built-in job queue ???????????? setup auto-scaling ????????? ??????????????? long-running services, microservices ????????? AWS Batch ??????????????? ?????? jobs ????????????????????????????????????????????? queue, ????????????????????? Spot instances, jobs ?????? dependencies ????????? Fargate ??????????????? jobs ??????????????????????????? orchestrate ???????????? Step Functions ????????????????????? ????????????????????? simplicity

Q: L1, L2, L3 Constructs ?????? CDK ???????????????????????????????????????????

A: L1 (CloudFormation Resources) Cfn prefix ???????????? CfnBucket ???????????? 1:1 mapping ????????? CloudFormation resources ???????????? configure ????????? property ????????? L2 (AWS Construct Library) ???????????? s3.Bucket ?????? defaults ??????????????? helper methods (grantRead, addLifecycleRule) ?????????????????????????????????????????? L1 ????????? L3 (Patterns) ???????????? aws-ecs-patterns.ApplicationLoadBalancedFargateService ????????????????????? L2 constructs ???????????? solution patterns ??????????????????????????????????????? customize ?????????????????????????????? Custom Constructs (L2/L3) ???????????? BatchJob construct ????????????????????????????????? ??????????????? reusable components ??????????????????????????? ??????????????? ????????? L2 ???????????????????????? ????????? L3 ?????????????????? common patterns ??????????????? custom L3 ?????????????????? organization-specific patterns

Q: Testing CDK Constructs ????????????????????????????

A: CDK ?????? testing framework ??????????????? Snapshot Testing ????????????????????????????????? generated CloudFormation template ????????? snapshot ????????????????????? unintended changes ????????? Template.fromStack(stack).toJSON() Fine-grained Assertions ????????????????????? specific resources ?????? template ???????????? template.hasResourceProperties('AWS::Batch::JobDefinition', {...}) Validation Testing ??????????????? input validation ????????? construct props Integration Testing deploy ?????????????????? test account ????????????????????????????????? resources ????????? CDK integ-tests framework ??????????????? Unit tests (snapshot + assertions) ??????????????????????????? construct, Integration tests ?????????????????? critical paths, Run tests ?????? CI/CD pipeline ???????????? deploy

📖 บทความที่เกี่ยวข้อง

Datadog APM Batch Processing Pipelineอ่านบทความ → CDK Construct API Gateway Patternอ่านบทความ → CDK Construct Database Migrationอ่านบทความ → CDK Construct Kubernetes Deploymentอ่านบทความ → CDK Construct Cloud Migration Strategyอ่านบทความ →

📚 ดูบทความทั้งหมด →