SiamCafe · Blog
Pulumi IaC กับ Shift Left Security — วิธีใช้
บทความ

Pulumi IaC กับ Shift Left Security — วิธีใช้

เผยแพร่ 28 พฤษภาคม 2569

Pulumi IaC

Pulumi IaC กับ Shift Left Security — วิธีใช้

Pulumi Infrastructure as Code ใช้ภาษาจริง Python TypeScript Go C# เขียน Infrastructure AWS Azure GCP Kubernetes State Management Preview Drift Detection

Shift Left Security ย้าย Security Testing มาตั้งแต่ต้น Development Lifecycle SAST DAST SCA Policy as Code ลดค่าใช้จ่ายแก้ไข Bug

Pulumi Infrastructure Setup

pulumi_infra.py — Pulumi Infrastructure with Security

pip install pulumi pulumi-aws pulumi-policy

import pulumi

import pulumi_aws as aws

from dataclasses import dataclass, field

from typing import List, Dict

=== VPC with Security ===

vpc = aws.ec2.Vpc("secure-vpc",

cidr_block="10.0.0.0/16",

enable_dns_support=True,

enable_dns_hostnames=True,

tags={"Name": "secure-vpc", "Environment": "production", "ManagedBy": "pulumi"})

# Private Subnet (ไม่มี Internet Gateway)

private_subnet = aws.ec2.Subnet("private-subnet",

vpc_id=vpc.id,

cidr_block="10.0.1.0/24",

availability_zone="ap-southeast-1a",

tags={"Name": "private-subnet", "Type": "private"})

# Public Subnet (มี Internet Gateway)

public_subnet = aws.ec2.Subnet("public-subnet",

vpc_id=vpc.id,

cidr_block="10.0.100.0/24",

availability_zone="ap-southeast-1a",

map_public_ip_on_launch=False, # ไม่ Auto-assign Public IP

tags={"Name": "public-subnet", "Type": "public"})

# Security Group — Least Privilege

web_sg = aws.ec2.SecurityGroup("web-sg",

vpc_id=vpc.id,

description="Web Server Security Group",

ingress=[

{"protocol": "tcp", "from_port": 443, "to_port": 443,

"cidr_blocks": ["0.0.0.0/0"], "description": "HTTPS"},

],

egress=[

{"protocol": "-1", "from_port": 0, "to_port": 0,

"cidr_blocks": ["0.0.0.0/0"], "description": "All outbound"},

],

tags={"Name": "web-sg"})

# S3 Bucket with Encryption

bucket = aws.s3.Bucket("secure-bucket",

acl="private",

server_side_encryption_configuration={

"rule": {"apply_server_side_encryption_by_default": {

"sse_algorithm": "aws:kms"}}},

versioning={"enabled": True},

lifecycle_rules=[{"enabled": True, "expiration": {"days": 90}}],

tags={"Name": "secure-bucket", "Encryption": "KMS"})

# Block Public Access

aws.s3.BucketPublicAccessBlock("block-public",

bucket=bucket.id,

block_public_acls=True,

block_public_policy=True,

ignore_public_acls=True,

restrict_public_buckets=True)

# RDS with Encryption

db = aws.rds.Instance("secure-db",

engine="postgres",

engine_version="15.4",

instance_class="db.t3.medium",

allocated_storage=50,

storage_encrypted=True,

kms_key_id=kms_key.arn,

multi_az=True,

backup_retention_period=7,

deletion_protection=True,

publicly_accessible=False,

vpc_security_group_ids=[db_sg.id],

db_subnet_group_name=db_subnet_group.name,

tags={"Name": "secure-db", "Encryption": "KMS"})

pulumi.export("vpc_id", vpc.id)

pulumi.export("bucket_name", bucket.bucket)

security_checklist = {

"VPC": ["Private Subnets สำหรับ Backend", "Security Groups Least Privilege", "NACLs เพิ่มเติม"],

"S3": ["Block Public Access", "KMS Encryption", "Versioning", "Lifecycle Rules"],

"RDS": ["Encryption at Rest", "Multi-AZ", "Private Subnet", "Backup Retention"],

"IAM": ["Least Privilege Roles", "MFA Enforced", "No Hardcoded Keys"],

"EKS": ["Private API Server", "Managed Node Groups", "Pod Security Standards"],

}

print("Security Checklist:")

for service, checks in security_checklist.items():

print(f"\n [{service}]")

for check in checks:

print(f" [x] {check}")

CrossGuard Policy as Code

crossguard_policy.py — Pulumi CrossGuard Policies

Policy Pack: security-policies

from pulumi_policy import (

EnforcementLevel, PolicyPack, ResourceValidationPolicy,

StackValidationPolicy,

)

# Policy 1: ห้าม Public S3 Buckets

def no_public_s3(args, report_violation):

if args.resource_type == "aws:s3/bucket:Bucket":

acl = args.props.get("acl", "private")

if acl in ["public-read", "public-read-write"]:

report_violation("S3 Bucket must not be public")

# Policy 2: ต้องมี Encryption

def require_encryption(args, report_violation):

if args.resource_type == "aws:s3/bucket:Bucket":

encryption = args.props.get("serverSideEncryptionConfiguration")

if not encryption:

report_violation("S3 Bucket must have encryption enabled")

if args.resource_type == "aws:rds/instance:Instance":

if not args.props.get("storageEncrypted", False):

report_violation("RDS must have storage encryption enabled")

# Policy 3: ต้องมี Tags

def require_tags(args, report_violation):

required_tags = ["Name", "Environment", "ManagedBy"]

tags = args.props.get("tags", {})

for tag in required_tags:

if tag not in tags:

report_violation(f"Resource must have '{tag}' tag")

# Policy 4: ห้าม SSH จาก 0.0.0.0/0

def no_public_ssh(args, report_violation):

if args.resource_type == "aws:ec2/securityGroup:SecurityGroup":

for rule in args.props.get("ingress", []):

if rule.get("fromPort") == 22:

cidrs = rule.get("cidrBlocks", [])

if "0.0.0.0/0" in cidrs:

report_violation("SSH must not be open to 0.0.0.0/0")

# Policy 5: Resource Limits

def enforce_resource_limits(args, report_violation):

Pulumi IaC กับ Shift Left Security — วิธีใช้

if args.resource_type == "aws:rds/instance:Instance":

instance_class = args.props.get("instanceClass", "")

allowed = ["db.t3.micro", "db.t3.small", "db.t3.medium"]

if instance_class not in allowed:

report_violation(f"RDS instance class must be one of {allowed}")

# Register Policy Pack

PolicyPack("security-policies", [

ResourceValidationPolicy(

name="no-public-s3",

description="S3 Buckets must not be public",

validate=no_public_s3,

enforcement_level=EnforcementLevel.MANDATORY),

ResourceValidationPolicy(

name="require-encryption",

description="Storage must be encrypted",

validate=require_encryption,

enforcement_level=EnforcementLevel.MANDATORY),

ResourceValidationPolicy(

name="require-tags",

description="Resources must have required tags",

validate=require_tags,

enforcement_level=EnforcementLevel.MANDATORY),

ResourceValidationPolicy(

name="no-public-ssh",

description="SSH must not be open to world",

validate=no_public_ssh,

enforcement_level=EnforcementLevel.MANDATORY),

])

Run: pulumi preview --policy-pack ./security-policies

policies = {

"no-public-s3": {"level": "MANDATORY", "desc": "ห้าม S3 Public Access"},

"require-encryption": {"level": "MANDATORY", "desc": "ต้อง Encrypt ทุก Storage"},

"require-tags": {"level": "MANDATORY", "desc": "ต้องมี Tags ครบ"},

"no-public-ssh": {"level": "MANDATORY", "desc": "ห้าม SSH จาก 0.0.0.0/0"},

"resource-limits": {"level": "ADVISORY", "desc": "จำกัดขนาด Instance"},

"no-public-rds": {"level": "MANDATORY", "desc": "ห้าม RDS Public"},

}

print("CrossGuard Policies:")

for name, info in policies.items():

print(f" [{info['level']}] {name}: {info['desc']}")

CI/CD Security Pipeline

cicd_security.py — Shift Left Security in CI/CD

GitHub Actions Pipeline

name: Infrastructure Deploy

on:

push:

branches: [main]

pull_request:

branches: [main]

jobs:

security-scan:

runs-on: ubuntu-latest

steps:

  • uses: actions/checkout@v4

# 1. Checkov — IaC Security Scan

  • name: Checkov Scan

uses: bridgecrewio/checkov-action@v12

with:

directory: ./infra

framework: all

soft_fail: false

# 2. Trivy — Container/IaC Scan

  • name: Trivy IaC Scan

uses: aquasecurity/trivy-action@master

with:

scan-type: config

scan-ref: ./infra

severity: CRITICAL, HIGH

# 3. Pulumi Preview + CrossGuard

  • name: Pulumi Preview

uses: pulumi/actions@v4

with:

command: preview

stack-name: production

policyPacks: ./security-policies

env:

PULUMI_ACCESS_TOKEN: }

deploy:

needs: security-scan

if: github.ref == 'refs/heads/main'

runs-on: ubuntu-latest

steps:

  • uses: actions/checkout@v4
  • name: Pulumi Up

uses: pulumi/actions@v4

with:

command: up

stack-name: production

policyPacks: ./security-policies

env:

PULUMI_ACCESS_TOKEN: }

shift_left_tools = {

"SAST (Static Analysis)": {

"tools": "SonarQube, Semgrep, CodeQL",

"when": "Code Commit / PR",

"finds": "SQL Injection, XSS, Hardcoded Secrets",

},

"SCA (Software Composition)": {

"tools": "Snyk, Dependabot, Trivy",

"when": "Dependency Update / PR",

"finds": "Vulnerable Dependencies, License Issues",

},

"IaC Scanning": {

"tools": "Checkov, tfsec, Trivy, KICS",

"when": "Infrastructure PR",

"finds": "Misconfiguration, Public Access, No Encryption",

},

"Policy as Code": {

"tools": "Pulumi CrossGuard, OPA/Rego, Sentinel",

"when": "Before Deploy (Preview)",

"finds": "Policy Violations, Compliance Gaps",

},

"Container Scanning": {

"tools": "Trivy, Grype, Docker Scout",

"when": "Image Build",

"finds": "OS Vulnerabilities, Malware",

},

"DAST (Dynamic Analysis)": {

"tools": "OWASP ZAP, Nuclei, Burp Suite",

"when": "After Deploy (Staging)",

"finds": "Runtime Vulnerabilities, API Issues",

},

}

print("Shift Left Security Tools:")

for category, info in shift_left_tools.items():

print(f"\n [{category}]")

print(f" Tools: {info['tools']}")

print(f" When: {info['when']}")

print(f" Finds: {info['finds']}")

Best Practices

  • Policy Pack: สร้าง CrossGuard Policy Pack บังคับทุก Stack
  • PR Review: ใช้ pulumi preview ใน PR แสดง Changes ก่อน Merge
  • Checkov: Scan IaC ทุก PR ก่อน Deploy
  • Secrets: ใช้ Pulumi Secrets Provider เข้ารหัส Secrets
  • Least Privilege: IAM Roles ให้สิทธิ์น้อยที่สุด
  • Encrypt Everything: Encrypt ทุก Storage Database Network

Pulumi คืออะไร

Infrastructure as Code ใช้ภาษาจริง Python TypeScript Go C# AWS Azure GCP Kubernetes State Management Preview Drift Detection Software Engineering Practices