ai

CDK Construct Audit Trail Logging

CDK Construct Audit Trail Logging

CDK Construct Audit Trail Logging คืออะไร

CDK Construct Audit Trail Logging

AWS CDK (Cloud Development Kit) เป็น Infrastructure as Code (IaC) framework ที่ใช้ภาษา programming จริง (TypeScript, Python, Java, C#) สร้าง AWS resources แทนการเขียน YAML/JSON CDK Construct เป็น building block พื้นฐานของ CDK ที่ encapsulate AWS resources และ configuration Audit Trail Logging คือการบันทึกทุกการเปลี่ยนแปลงของ infrastructure เพื่อ compliance, security และ troubleshooting การสร้าง CDK Construct สำหรับ audit trail ช่วยให้ทุก stack ที่ deploy มี logging built-in อัตโนมัติ

CDK Construct Basics

# cdk_basics.py — CDK Construct fundamentals

import json



class CDKBasics:

    CONSTRUCT_LEVELS = {

        "l1": {

            "name": "L1 Constructs (CFN Resources)",

            "description": "1:1 mapping กับ CloudFormation resources — low-level, verbose",

            "example": "CfnBucket, CfnFunction — ต้อง configure ทุก property เอง",

        },

        "l2": {

            "name": "L2 Constructs (Curated)",

            "description": "Higher-level abstractions — sensible defaults, helper methods",

            "example": "s3.Bucket, lambda_.Function — มี defaults ที่ดี + methods เช่น .grant_read()",

        },

        "l3": {

            "name": "L3 Constructs (Patterns)",

            "description": "Multi-resource patterns — combine หลาย resources เข้าด้วยกัน",

            "example": "LambdaRestApi, ApplicationLoadBalancedFargateService",

        },

    }



    CDK_COMMANDS = {

        "init": "cdk init app --language python",

        "synth": "cdk synth — generate CloudFormation template",

        "diff": "cdk diff — compare deployed vs local changes",

        "deploy": "cdk deploy — deploy stack to AWS",

        "destroy": "cdk destroy — remove stack from AWS",

    }



    def show_levels(self):

        print("=== CDK Construct Levels ===\n")

        for key, level in self.CONSTRUCT_LEVELS.items():

            print(f"[{level['name']}]")

            print(f"  {level['description']}")

            print(f"  Example: {level['example']}")

            print()



    def show_commands(self):

        print("=== CDK Commands ===")

        for cmd, desc in self.CDK_COMMANDS.items():

            print(f"  [{cmd}] {desc}")



basics = CDKBasics()

basics.show_levels()

basics.show_commands()

Audit Trail Construct (Python CDK)

# audit_construct.py — CDK Audit Trail Construct

import json



class AuditTrailConstruct:

    CODE = """

# audit_trail_construct.py — Reusable CDK Construct for Audit Trail

from constructs import Construct

from aws_cdk import (

    Stack, Duration, RemovalPolicy,

    aws_cloudtrail as cloudtrail,

    aws_s3 as s3,

    aws_logs as logs,

    aws_sns as sns,

    aws_sns_subscriptions as subs,

    aws_events as events,

    aws_events_targets as targets,

    aws_lambda as lambda_,

    aws_iam as iam,

    aws_dynamodb as dynamodb,

)



class AuditTrailConstruct(Construct):

    '''L3 Construct: Complete audit trail logging system'''

    

    def __init__(self, scope: Construct, id: str,

                 trail_name: str = "org-audit-trail",

                 retention_days: int = 365,

                 enable_data_events: bool = True,

                 alert_email: str = None,

                 **kwargs):

        super().__init__(scope, id, **kwargs)

        

        # === S3 Bucket for CloudTrail logs ===

        self.log_bucket = s3.Bucket(self, "AuditLogBucket",

            bucket_name=f"{trail_name}-logs",

            encryption=s3.BucketEncryption.S3_MANAGED,

            block_public_access=s3.BlockPublicAccess.BLOCK_ALL,

            versioned=True,

            lifecycle_rules=[

                s3.LifecycleRule(

                    transitions=[

                        s3.Transition(

                            storage_class=s3.StorageClass.GLACIER,

                            transition_after=Duration.days(90),

                        ),

                    ],

                    expiration=Duration.days(retention_days),

                ),

            ],

            removal_policy=RemovalPolicy.RETAIN,

        )

        

        # === CloudWatch Log Group ===

        self.log_group = logs.LogGroup(self, "AuditLogGroup",

            log_group_name=f"/audit/{trail_name}",

            retention=logs.RetentionDays.ONE_YEAR,

            removal_policy=RemovalPolicy.RETAIN,

        )

        

        # === CloudTrail ===

        self.trail = cloudtrail.Trail(self, "AuditTrail",

            trail_name=trail_name,

            bucket=self.log_bucket,

            cloud_watch_logs_group=self.log_group,

            send_to_cloud_watch_logs=True,

            is_multi_region_trail=True,

            include_global_service_events=True,

            enable_file_validation=True,

        )

        

        # Enable S3 data events

        if enable_data_events:

            self.trail.add_s3_event_selector(

                [cloudtrail.S3EventSelector(bucket=self.log_bucket)],

                include_management_events=True,

            )

        

        # === DynamoDB for structured audit logs ===

        self.audit_table = dynamodb.Table(self, "AuditTable",

            table_name=f"{trail_name}-events",

            partition_key=dynamodb.Attribute(

                name="event_id", type=dynamodb.AttributeType.STRING

            ),

            sort_key=dynamodb.Attribute(

                name="timestamp", type=dynamodb.AttributeType.STRING

            ),

            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,

            point_in_time_recovery=True,

            removal_policy=RemovalPolicy.RETAIN,

        )

        

        # GSI for querying by user

        self.audit_table.add_global_secondary_index(

            index_name="by-user",

            partition_key=dynamodb.Attribute(

                name="user_arn", type=dynamodb.AttributeType.STRING

            ),

            sort_key=dynamodb.Attribute(

                name="timestamp", type=dynamodb.AttributeType.STRING

            ),

        )

        

        # === SNS Topic for alerts ===

        self.alert_topic = sns.Topic(self, "AuditAlertTopic",

            topic_name=f"{trail_name}-alerts",

        )

        

        if alert_email:

            self.alert_topic.add_subscription(

                subs.EmailSubscription(alert_email)

            )



# Usage in Stack:

# class MyStack(Stack):

#     def __init__(self, scope, id, **kwargs):

#         super().__init__(scope, id, **kwargs)

#         audit = AuditTrailConstruct(self, "Audit",

#             trail_name="prod-audit",

#             retention_days=730,

#             alert_email="security@company.com",

#         )

"""



    def show_code(self):

        print("=== Audit Trail Construct ===")

        print(self.CODE[:600])



construct = AuditTrailConstruct()

construct.show_code()

Event Processing Lambda

CDK Construct Audit Trail Logging
# event_lambda.py — Lambda for processing audit events

import json



class EventProcessingLambda:

    CODE = """

# audit_processor.py — Lambda function for audit event processing

import json

import boto3

import os

from datetime import datetime

import hashlib



dynamodb = boto3.resource('dynamodb')

sns = boto3.client('sns')



TABLE_NAME = os.environ.get('AUDIT_TABLE', 'audit-events')

ALERT_TOPIC = os.environ.get('ALERT_TOPIC_ARN', '')



# High-risk events that trigger alerts

HIGH_RISK_EVENTS = {

    'ConsoleLogin': 'Console login detected',

    'DeleteBucket': 'S3 bucket deletion',

    'DeleteTrail': 'CloudTrail deletion attempt',

    'StopLogging': 'CloudTrail logging stopped',

    'CreateUser': 'New IAM user created',

    'AttachUserPolicy': 'Policy attached to user',

    'PutBucketPolicy': 'S3 bucket policy changed',

    'AuthorizeSecurityGroupIngress': 'Security group rule added',

    'CreateAccessKey': 'New access key created',

    'DeleteDBInstance': 'RDS instance deleted',

}



def handler(event, context):

    '''Process CloudTrail events from CloudWatch Logs'''

    table = dynamodb.Table(TABLE_NAME)

    

    processed = 0

    alerts = 0

    

    for record in event.get('Records', []):

        # Parse CloudTrail event

        detail = json.loads(record.get('body', '{}'))

        

        event_name = detail.get('eventName', '')

        user_arn = detail.get('userIdentity', {}).get('arn', 'unknown')

        source_ip = detail.get('sourceIPAddress', '')

        event_time = detail.get('eventTime', datetime.utcnow().isoformat())

        region = detail.get('awsRegion', '')

        

        # Generate unique event ID

        event_id = hashlib.sha256(

            f"{event_time}{event_name}{user_arn}".encode()

        ).hexdigest()[:16]

        

        # Store in DynamoDB

        item = {

            'event_id': event_id,

            'timestamp': event_time,

            'event_name': event_name,

            'user_arn': user_arn,

            'source_ip': source_ip,

            'region': region,

            'event_source': detail.get('eventSource', ''),

            'request_params': json.dumps(detail.get('requestParameters', {}))[:1000],

            'response': json.dumps(detail.get('responseElements', {}))[:1000],

            'error_code': detail.get('errorCode', ''),

            'is_high_risk': event_name in HIGH_RISK_EVENTS,

        }

        

        table.put_item(Item=item)

        processed += 1

        

        # Alert on high-risk events

        if event_name in HIGH_RISK_EVENTS and ALERT_TOPIC:

            sns.publish(

                TopicArn=ALERT_TOPIC,

                Subject=f"AUDIT ALERT: {HIGH_RISK_EVENTS[event_name]}",

                Message=json.dumps({

                    'event': event_name,

                    'user': user_arn,

                    'ip': source_ip,

                    'region': region,

                    'time': event_time,

                }, indent=2),

            )

            alerts += 1

    

    return {

        'processed': processed,

        'alerts': alerts,

    }

"""



    def show_code(self):

        print("=== Event Processor Lambda ===")

        print(self.CODE[:600])



lam = EventProcessingLambda()

lam.show_code()

Query & Reporting

# reporting.py — Audit trail querying and reporting

import json

import random



class AuditReporting:

    CODE = """

# audit_query.py — Query audit trail data

import boto3

from boto3.dynamodb.conditions import Key, Attr

from datetime import datetime, timedelta

import json



class AuditQueryService:

    def __init__(self, table_name='audit-events'):

        dynamodb = boto3.resource('dynamodb')

        self.table = dynamodb.Table(table_name)

    

    def get_user_activity(self, user_arn, hours=24):

        '''Get all activity for a specific user'''

        since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()

        

        response = self.table.query(

            IndexName='by-user',

            KeyConditionExpression=Key('user_arn').eq(user_arn) & Key('timestamp').gte(since),

        )

        return response['Items']

    

    def get_high_risk_events(self, hours=24):

        '''Get high-risk events'''

        since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()

        

        response = self.table.scan(

            FilterExpression=Attr('is_high_risk').eq(True) & Attr('timestamp').gte(since),

        )

        return response['Items']

    

    def get_failed_actions(self, hours=24):

        '''Get failed API calls (potential unauthorized access)'''

        since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()

        

        response = self.table.scan(

            FilterExpression=Attr('error_code').ne('') & Attr('timestamp').gte(since),

        )

        return response['Items']

    

    def compliance_report(self, days=30):

        '''Generate compliance report'''

        since = (datetime.utcnow() - timedelta(days=days)).isoformat()

        

        all_events = self.table.scan(

            FilterExpression=Attr('timestamp').gte(since),

        )['Items']

        

        from collections import Counter

        

        report = {

            'period': f"Last {days} days",

            'total_events': len(all_events),

            'high_risk_events': sum(1 for e in all_events if e.get('is_high_risk')),

            'failed_actions': sum(1 for e in all_events if e.get('error_code')),

            'unique_users': len(set(e.get('user_arn', '') for e in all_events)),

            'top_events': Counter(e.get('event_name') for e in all_events).most_common(10),

            'top_users': Counter(e.get('user_arn') for e in all_events).most_common(5),

        }

        

        return report



# query = AuditQueryService()

# high_risk = query.get_high_risk_events(24)

# report = query.compliance_report(30)

"""



    def show_code(self):

        print("=== Audit Query Service ===")

        print(self.CODE[:600])



    def sample_report(self):

        print(f"\n=== Compliance Report (30 days) ===")

        print(f"  Total events: {random.randint(10000, 100000):,}")

        print(f"  High-risk events: {random.randint(10, 100)}")

        print(f"  Failed actions: {random.randint(50, 500)}")

        print(f"  Unique users: {random.randint(10, 50)}")

        print(f"  Top event: DescribeInstances ({random.randint(5000, 20000)})")



report = AuditReporting()

report.show_code()

report.sample_report()

CDK Stack Integration

# stack_integration.py — Full CDK stack with audit trail

import json



class StackIntegration:

    FULL_STACK = """

# app.py — CDK app with audit trail

from aws_cdk import App, Stack, Environment

from constructs import Construct

from audit_trail_construct import AuditTrailConstruct



class ProductionStack(Stack):

    def __init__(self, scope: Construct, id: str, **kwargs):

        super().__init__(scope, id, **kwargs)

        

        # Audit Trail (reusable construct)

        audit = AuditTrailConstruct(self, "AuditTrail",

            trail_name="prod-audit-trail",

            retention_days=730,  # 2 years for compliance

            enable_data_events=True,

            alert_email="security@company.com",

        )

        

        # Other resources can reference audit construct

        # audit.log_bucket — S3 bucket for logs

        # audit.audit_table — DynamoDB table

        # audit.alert_topic — SNS topic



app = App()



ProductionStack(app, "Production",

    env=Environment(account="123456789012", region="ap-southeast-1"),

)



app.synth()

"""



    TESTING = """

# test_audit_construct.py — CDK construct tests

import pytest

from aws_cdk import App, Stack

from aws_cdk.assertions import Template, Match

from audit_trail_construct import AuditTrailConstruct



def test_creates_cloudtrail():

    app = App()

    stack = Stack(app, "TestStack")

    AuditTrailConstruct(stack, "Audit", trail_name="test-trail")

    

    template = Template.from_stack(stack)

    template.has_resource_properties("AWS::CloudTrail::Trail", {

        "TrailName": "test-trail",

        "IsMultiRegionTrail": True,

        "EnableLogFileValidation": True,

    })



def test_creates_s3_bucket():

    app = App()

    stack = Stack(app, "TestStack")

    AuditTrailConstruct(stack, "Audit")

    

    template = Template.from_stack(stack)

    template.has_resource_properties("AWS::S3::Bucket", {

        "VersioningConfiguration": {"Status": "Enabled"},

    })



def test_creates_dynamodb_table():

    app = App()

    stack = Stack(app, "TestStack")

    AuditTrailConstruct(stack, "Audit", trail_name="test")

    

    template = Template.from_stack(stack)

    template.has_resource_properties("AWS::DynamoDB::Table", {

        "PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": True},

    })

"""



    def show_stack(self):

        print("=== Full CDK Stack ===")

        print(self.FULL_STACK[:500])



    def show_tests(self):

        print(f"\n=== CDK Tests ===")

        print(self.TESTING[:500])



stack = StackIntegration()

stack.show_stack()

stack.show_tests()

FAQ - คำถามที่พบบ่อย

Q: CDK กับ Terraform อันไหนดีกว่า?

A: CDK: ดีถ้าใช้ AWS เท่านั้น + ทีมถนัด TypeScript/Python — IDE support ดี, type safety, reusable constructs Terraform: ดีถ้าใช้ multi-cloud (AWS + GCP + Azure) — HCL ง่าย, community ใหญ่, mature CDK ดีกว่า: complex logic, loops, conditions, custom constructs Terraform ดีกว่า: multi-cloud, simple infra, team ไม่ถนัด programming

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Passive Income ใน Hindi: คู่มือฉบับสมบูรณ์ 2026 สำหรับการสร้างรายได้แบบไม่ต้อ…

Q: CloudTrail เก็บ logs อะไรบ้าง?

แนะนำเพิ่มเติม — ติดตาม XM Signal

A: Management Events: ทุก API call ที่เปลี่ยนแปลง AWS resources (CreateInstance, DeleteBucket, etc.) Data Events: S3 object-level operations (GetObject, PutObject), Lambda invocations Insights Events: ตรวจจับ unusual API activity patterns ไม่เก็บ: data ภายใน resources (เช่น เนื้อหาของ S3 objects, database queries)

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน Payload CMS Testing Strategy QA

Q: Audit trail จำเป็นสำหรับ compliance อะไรบ้าง?

A: SOC 2: ต้องมี audit trail สำหรับ system changes + access PCI DSS: ต้อง log ทุก access ต่อ cardholder data HIPAA: ต้อง audit trail สำหรับ PHI access ISO 27001: ต้อง logging + monitoring สำหรับ security events GDPR: ต้อง track data processing activities CloudTrail + DynamoDB audit table ตอบโจทย์ compliance ได้ครบ

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง Stencil.js Container Orchestration — จัดการ Web

Q: CDK Construct publish เป็น library ได้ไหม?

A: ได้ — publish เป็น npm package (TypeScript) หรือ PyPI package (Python) ข้อดี: ทุกทีมใช้ audit trail construct เดียวกัน — consistency ข้ามทุก stacks วิธี: สร้าง construct library → publish → import ใน stacks ทุกอัน ตัวอย่าง: pip install my-audit-construct → from my_audit import AuditTrailConstruct

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง LLM Inference vLLM Best Practices ที่ต้องรู้

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง