CDK Construct Audit Trail Logging คืออะไร
AWS CDK (Cloud Development Kit) เป็น Infrastructure as Code (IaC) framework ที่ใช้ภาษา programming จริง (TypeScript, Python, Java, C#) สร้าง AWS resources แทนการเขียน YAML/JSON CDK Construct เป็น building block พื้นฐานของ CDK ที่ encapsulate AWS resources และ configuration Audit Trail Logging คือการบันทึกทุกการเปลี่ยนแปลงของ infrastructure เพื่อ compliance, security และ troubleshooting การสร้าง CDK Construct สำหรับ audit trail ช่วยให้ทุก stack ที่ deploy มี logging built-in อัตโนมัติ
CDK Construct Basics
# cdk_basics.py — CDK Construct fundamentals
import json
class CDKBasics:
CONSTRUCT_LEVELS = {
"l1": {
"name": "L1 Constructs (CFN Resources)",
"description": "1:1 mapping กับ CloudFormation resources — low-level, verbose",
"example": "CfnBucket, CfnFunction — ต้อง configure ทุก property เอง",
},
"l2": {
"name": "L2 Constructs (Curated)",
"description": "Higher-level abstractions — sensible defaults, helper methods",
"example": "s3.Bucket, lambda_.Function — มี defaults ที่ดี + methods เช่น .grant_read()",
},
"l3": {
"name": "L3 Constructs (Patterns)",
"description": "Multi-resource patterns — combine หลาย resources เข้าด้วยกัน",
"example": "LambdaRestApi, ApplicationLoadBalancedFargateService",
},
}
CDK_COMMANDS = {
"init": "cdk init app --language python",
"synth": "cdk synth — generate CloudFormation template",
"diff": "cdk diff — compare deployed vs local changes",
"deploy": "cdk deploy — deploy stack to AWS",
"destroy": "cdk destroy — remove stack from AWS",
}
def show_levels(self):
print("=== CDK Construct Levels ===\n")
for key, level in self.CONSTRUCT_LEVELS.items():
print(f"[{level['name']}]")
print(f" {level['description']}")
print(f" Example: {level['example']}")
print()
def show_commands(self):
print("=== CDK Commands ===")
for cmd, desc in self.CDK_COMMANDS.items():
print(f" [{cmd}] {desc}")
basics = CDKBasics()
basics.show_levels()
basics.show_commands()
Audit Trail Construct (Python CDK)
# audit_construct.py — CDK Audit Trail Construct
import json
class AuditTrailConstruct:
CODE = """
# audit_trail_construct.py — Reusable CDK Construct for Audit Trail
from constructs import Construct
from aws_cdk import (
Stack, Duration, RemovalPolicy,
aws_cloudtrail as cloudtrail,
aws_s3 as s3,
aws_logs as logs,
aws_sns as sns,
aws_sns_subscriptions as subs,
aws_events as events,
aws_events_targets as targets,
aws_lambda as lambda_,
aws_iam as iam,
aws_dynamodb as dynamodb,
)
class AuditTrailConstruct(Construct):
'''L3 Construct: Complete audit trail logging system'''
def __init__(self, scope: Construct, id: str,
trail_name: str = "org-audit-trail",
retention_days: int = 365,
enable_data_events: bool = True,
alert_email: str = None,
**kwargs):
super().__init__(scope, id, **kwargs)
# === S3 Bucket for CloudTrail logs ===
self.log_bucket = s3.Bucket(self, "AuditLogBucket",
bucket_name=f"{trail_name}-logs",
encryption=s3.BucketEncryption.S3_MANAGED,
block_public_access=s3.BlockPublicAccess.BLOCK_ALL,
versioned=True,
lifecycle_rules=[
s3.LifecycleRule(
transitions=[
s3.Transition(
storage_class=s3.StorageClass.GLACIER,
transition_after=Duration.days(90),
),
],
expiration=Duration.days(retention_days),
),
],
removal_policy=RemovalPolicy.RETAIN,
)
# === CloudWatch Log Group ===
self.log_group = logs.LogGroup(self, "AuditLogGroup",
log_group_name=f"/audit/{trail_name}",
retention=logs.RetentionDays.ONE_YEAR,
removal_policy=RemovalPolicy.RETAIN,
)
# === CloudTrail ===
self.trail = cloudtrail.Trail(self, "AuditTrail",
trail_name=trail_name,
bucket=self.log_bucket,
cloud_watch_logs_group=self.log_group,
send_to_cloud_watch_logs=True,
is_multi_region_trail=True,
include_global_service_events=True,
enable_file_validation=True,
)
# Enable S3 data events
if enable_data_events:
self.trail.add_s3_event_selector(
[cloudtrail.S3EventSelector(bucket=self.log_bucket)],
include_management_events=True,
)
# === DynamoDB for structured audit logs ===
self.audit_table = dynamodb.Table(self, "AuditTable",
table_name=f"{trail_name}-events",
partition_key=dynamodb.Attribute(
name="event_id", type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="timestamp", type=dynamodb.AttributeType.STRING
),
billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
point_in_time_recovery=True,
removal_policy=RemovalPolicy.RETAIN,
)
# GSI for querying by user
self.audit_table.add_global_secondary_index(
index_name="by-user",
partition_key=dynamodb.Attribute(
name="user_arn", type=dynamodb.AttributeType.STRING
),
sort_key=dynamodb.Attribute(
name="timestamp", type=dynamodb.AttributeType.STRING
),
)
# === SNS Topic for alerts ===
self.alert_topic = sns.Topic(self, "AuditAlertTopic",
topic_name=f"{trail_name}-alerts",
)
if alert_email:
self.alert_topic.add_subscription(
subs.EmailSubscription(alert_email)
)
# Usage in Stack:
# class MyStack(Stack):
# def __init__(self, scope, id, **kwargs):
# super().__init__(scope, id, **kwargs)
# audit = AuditTrailConstruct(self, "Audit",
# trail_name="prod-audit",
# retention_days=730,
# alert_email="security@company.com",
# )
"""
def show_code(self):
print("=== Audit Trail Construct ===")
print(self.CODE[:600])
construct = AuditTrailConstruct()
construct.show_code()
Event Processing Lambda
# event_lambda.py — Lambda for processing audit events
import json
class EventProcessingLambda:
CODE = """
# audit_processor.py — Lambda function for audit event processing
import json
import boto3
import os
from datetime import datetime
import hashlib
dynamodb = boto3.resource('dynamodb')
sns = boto3.client('sns')
TABLE_NAME = os.environ.get('AUDIT_TABLE', 'audit-events')
ALERT_TOPIC = os.environ.get('ALERT_TOPIC_ARN', '')
# High-risk events that trigger alerts
HIGH_RISK_EVENTS = {
'ConsoleLogin': 'Console login detected',
'DeleteBucket': 'S3 bucket deletion',
'DeleteTrail': 'CloudTrail deletion attempt',
'StopLogging': 'CloudTrail logging stopped',
'CreateUser': 'New IAM user created',
'AttachUserPolicy': 'Policy attached to user',
'PutBucketPolicy': 'S3 bucket policy changed',
'AuthorizeSecurityGroupIngress': 'Security group rule added',
'CreateAccessKey': 'New access key created',
'DeleteDBInstance': 'RDS instance deleted',
}
def handler(event, context):
'''Process CloudTrail events from CloudWatch Logs'''
table = dynamodb.Table(TABLE_NAME)
processed = 0
alerts = 0
for record in event.get('Records', []):
# Parse CloudTrail event
detail = json.loads(record.get('body', '{}'))
event_name = detail.get('eventName', '')
user_arn = detail.get('userIdentity', {}).get('arn', 'unknown')
source_ip = detail.get('sourceIPAddress', '')
event_time = detail.get('eventTime', datetime.utcnow().isoformat())
region = detail.get('awsRegion', '')
# Generate unique event ID
event_id = hashlib.sha256(
f"{event_time}{event_name}{user_arn}".encode()
).hexdigest()[:16]
# Store in DynamoDB
item = {
'event_id': event_id,
'timestamp': event_time,
'event_name': event_name,
'user_arn': user_arn,
'source_ip': source_ip,
'region': region,
'event_source': detail.get('eventSource', ''),
'request_params': json.dumps(detail.get('requestParameters', {}))[:1000],
'response': json.dumps(detail.get('responseElements', {}))[:1000],
'error_code': detail.get('errorCode', ''),
'is_high_risk': event_name in HIGH_RISK_EVENTS,
}
table.put_item(Item=item)
processed += 1
# Alert on high-risk events
if event_name in HIGH_RISK_EVENTS and ALERT_TOPIC:
sns.publish(
TopicArn=ALERT_TOPIC,
Subject=f"AUDIT ALERT: {HIGH_RISK_EVENTS[event_name]}",
Message=json.dumps({
'event': event_name,
'user': user_arn,
'ip': source_ip,
'region': region,
'time': event_time,
}, indent=2),
)
alerts += 1
return {
'processed': processed,
'alerts': alerts,
}
"""
def show_code(self):
print("=== Event Processor Lambda ===")
print(self.CODE[:600])
lam = EventProcessingLambda()
lam.show_code()
Query & Reporting
# reporting.py — Audit trail querying and reporting
import json
import random
class AuditReporting:
CODE = """
# audit_query.py — Query audit trail data
import boto3
from boto3.dynamodb.conditions import Key, Attr
from datetime import datetime, timedelta
import json
class AuditQueryService:
def __init__(self, table_name='audit-events'):
dynamodb = boto3.resource('dynamodb')
self.table = dynamodb.Table(table_name)
def get_user_activity(self, user_arn, hours=24):
'''Get all activity for a specific user'''
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
response = self.table.query(
IndexName='by-user',
KeyConditionExpression=Key('user_arn').eq(user_arn) & Key('timestamp').gte(since),
)
return response['Items']
def get_high_risk_events(self, hours=24):
'''Get high-risk events'''
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
response = self.table.scan(
FilterExpression=Attr('is_high_risk').eq(True) & Attr('timestamp').gte(since),
)
return response['Items']
def get_failed_actions(self, hours=24):
'''Get failed API calls (potential unauthorized access)'''
since = (datetime.utcnow() - timedelta(hours=hours)).isoformat()
response = self.table.scan(
FilterExpression=Attr('error_code').ne('') & Attr('timestamp').gte(since),
)
return response['Items']
def compliance_report(self, days=30):
'''Generate compliance report'''
since = (datetime.utcnow() - timedelta(days=days)).isoformat()
all_events = self.table.scan(
FilterExpression=Attr('timestamp').gte(since),
)['Items']
from collections import Counter
report = {
'period': f"Last {days} days",
'total_events': len(all_events),
'high_risk_events': sum(1 for e in all_events if e.get('is_high_risk')),
'failed_actions': sum(1 for e in all_events if e.get('error_code')),
'unique_users': len(set(e.get('user_arn', '') for e in all_events)),
'top_events': Counter(e.get('event_name') for e in all_events).most_common(10),
'top_users': Counter(e.get('user_arn') for e in all_events).most_common(5),
}
return report
# query = AuditQueryService()
# high_risk = query.get_high_risk_events(24)
# report = query.compliance_report(30)
"""
def show_code(self):
print("=== Audit Query Service ===")
print(self.CODE[:600])
def sample_report(self):
print(f"\n=== Compliance Report (30 days) ===")
print(f" Total events: {random.randint(10000, 100000):,}")
print(f" High-risk events: {random.randint(10, 100)}")
print(f" Failed actions: {random.randint(50, 500)}")
print(f" Unique users: {random.randint(10, 50)}")
print(f" Top event: DescribeInstances ({random.randint(5000, 20000)})")
report = AuditReporting()
report.show_code()
report.sample_report()
CDK Stack Integration
# stack_integration.py — Full CDK stack with audit trail
import json
class StackIntegration:
FULL_STACK = """
# app.py — CDK app with audit trail
from aws_cdk import App, Stack, Environment
from constructs import Construct
from audit_trail_construct import AuditTrailConstruct
class ProductionStack(Stack):
def __init__(self, scope: Construct, id: str, **kwargs):
super().__init__(scope, id, **kwargs)
# Audit Trail (reusable construct)
audit = AuditTrailConstruct(self, "AuditTrail",
trail_name="prod-audit-trail",
retention_days=730, # 2 years for compliance
enable_data_events=True,
alert_email="security@company.com",
)
# Other resources can reference audit construct
# audit.log_bucket — S3 bucket for logs
# audit.audit_table — DynamoDB table
# audit.alert_topic — SNS topic
app = App()
ProductionStack(app, "Production",
env=Environment(account="123456789012", region="ap-southeast-1"),
)
app.synth()
"""
TESTING = """
# test_audit_construct.py — CDK construct tests
import pytest
from aws_cdk import App, Stack
from aws_cdk.assertions import Template, Match
from audit_trail_construct import AuditTrailConstruct
def test_creates_cloudtrail():
app = App()
stack = Stack(app, "TestStack")
AuditTrailConstruct(stack, "Audit", trail_name="test-trail")
template = Template.from_stack(stack)
template.has_resource_properties("AWS::CloudTrail::Trail", {
"TrailName": "test-trail",
"IsMultiRegionTrail": True,
"EnableLogFileValidation": True,
})
def test_creates_s3_bucket():
app = App()
stack = Stack(app, "TestStack")
AuditTrailConstruct(stack, "Audit")
template = Template.from_stack(stack)
template.has_resource_properties("AWS::S3::Bucket", {
"VersioningConfiguration": {"Status": "Enabled"},
})
def test_creates_dynamodb_table():
app = App()
stack = Stack(app, "TestStack")
AuditTrailConstruct(stack, "Audit", trail_name="test")
template = Template.from_stack(stack)
template.has_resource_properties("AWS::DynamoDB::Table", {
"PointInTimeRecoverySpecification": {"PointInTimeRecoveryEnabled": True},
})
"""
def show_stack(self):
print("=== Full CDK Stack ===")
print(self.FULL_STACK[:500])
def show_tests(self):
print(f"\n=== CDK Tests ===")
print(self.TESTING[:500])
stack = StackIntegration()
stack.show_stack()
stack.show_tests()
FAQ - คำถามที่พบบ่อย
Q: CDK กับ Terraform อันไหนดีกว่า?
A: CDK: ดีถ้าใช้ AWS เท่านั้น + ทีมถนัด TypeScript/Python — IDE support ดี, type safety, reusable constructs Terraform: ดีถ้าใช้ multi-cloud (AWS + GCP + Azure) — HCL ง่าย, community ใหญ่, mature CDK ดีกว่า: complex logic, loops, conditions, custom constructs Terraform ดีกว่า: multi-cloud, simple infra, team ไม่ถนัด programming
Q: CloudTrail เก็บ logs อะไรบ้าง?
A: Management Events: ทุก API call ที่เปลี่ยนแปลง AWS resources (CreateInstance, DeleteBucket, etc.) Data Events: S3 object-level operations (GetObject, PutObject), Lambda invocations Insights Events: ตรวจจับ unusual API activity patterns ไม่เก็บ: data ภายใน resources (เช่น เนื้อหาของ S3 objects, database queries)
Q: Audit trail จำเป็นสำหรับ compliance อะไรบ้าง?
A: SOC 2: ต้องมี audit trail สำหรับ system changes + access PCI DSS: ต้อง log ทุก access ต่อ cardholder data HIPAA: ต้อง audit trail สำหรับ PHI access ISO 27001: ต้อง logging + monitoring สำหรับ security events GDPR: ต้อง track data processing activities CloudTrail + DynamoDB audit table ตอบโจทย์ compliance ได้ครบ
Q: CDK Construct publish เป็น library ได้ไหม?
A: ได้ — publish เป็น npm package (TypeScript) หรือ PyPI package (Python) ข้อดี: ทุกทีมใช้ audit trail construct เดียวกัน — consistency ข้ามทุก stacks วิธี: สร้าง construct library → publish → import ใน stacks ทุกอัน ตัวอย่าง: pip install my-audit-construct → from my_audit import AuditTrailConstruct
