SiamCafe.net Blog
Cybersecurity

Pulumi IaC Shift Left Security

pulumi iac shift left security
Pulumi IaC Shift Left Security | SiamCafe Blog
2025-09-22· อ. บอม — SiamCafe.net· 8,023 คำ

Pulumi IaC

Pulumi Infrastructure as Code ใช้ภาษาจริง Python TypeScript Go C# เขียน Infrastructure AWS Azure GCP Kubernetes State Management Preview Drift Detection

Shift Left Security ย้าย Security Testing มาตั้งแต่ต้น Development Lifecycle SAST DAST SCA Policy as Code ลดค่าใช้จ่ายแก้ไข Bug

Pulumi Infrastructure Setup

# pulumi_infra.py — Pulumi Infrastructure with Security
# pip install pulumi pulumi-aws pulumi-policy

import pulumi
import pulumi_aws as aws
from dataclasses import dataclass, field
from typing import List, Dict

# === VPC with Security ===
# vpc = aws.ec2.Vpc("secure-vpc",
#     cidr_block="10.0.0.0/16",
#     enable_dns_support=True,
#     enable_dns_hostnames=True,
#     tags={"Name": "secure-vpc", "Environment": "production", "ManagedBy": "pulumi"})

# # Private Subnet (ไม่มี Internet Gateway)
# private_subnet = aws.ec2.Subnet("private-subnet",
#     vpc_id=vpc.id,
#     cidr_block="10.0.1.0/24",
#     availability_zone="ap-southeast-1a",
#     tags={"Name": "private-subnet", "Type": "private"})

# # Public Subnet (มี Internet Gateway)
# public_subnet = aws.ec2.Subnet("public-subnet",
#     vpc_id=vpc.id,
#     cidr_block="10.0.100.0/24",
#     availability_zone="ap-southeast-1a",
#     map_public_ip_on_launch=False,  # ไม่ Auto-assign Public IP
#     tags={"Name": "public-subnet", "Type": "public"})

# # Security Group — Least Privilege
# web_sg = aws.ec2.SecurityGroup("web-sg",
#     vpc_id=vpc.id,
#     description="Web Server Security Group",
#     ingress=[
#         {"protocol": "tcp", "from_port": 443, "to_port": 443,
#          "cidr_blocks": ["0.0.0.0/0"], "description": "HTTPS"},
#     ],
#     egress=[
#         {"protocol": "-1", "from_port": 0, "to_port": 0,
#          "cidr_blocks": ["0.0.0.0/0"], "description": "All outbound"},
#     ],
#     tags={"Name": "web-sg"})

# # S3 Bucket with Encryption
# bucket = aws.s3.Bucket("secure-bucket",
#     acl="private",
#     server_side_encryption_configuration={
#         "rule": {"apply_server_side_encryption_by_default": {
#             "sse_algorithm": "aws:kms"}}},
#     versioning={"enabled": True},
#     lifecycle_rules=[{"enabled": True, "expiration": {"days": 90}}],
#     tags={"Name": "secure-bucket", "Encryption": "KMS"})

# # Block Public Access
# aws.s3.BucketPublicAccessBlock("block-public",
#     bucket=bucket.id,
#     block_public_acls=True,
#     block_public_policy=True,
#     ignore_public_acls=True,
#     restrict_public_buckets=True)

# # RDS with Encryption
# db = aws.rds.Instance("secure-db",
#     engine="postgres",
#     engine_version="15.4",
#     instance_class="db.t3.medium",
#     allocated_storage=50,
#     storage_encrypted=True,
#     kms_key_id=kms_key.arn,
#     multi_az=True,
#     backup_retention_period=7,
#     deletion_protection=True,
#     publicly_accessible=False,
#     vpc_security_group_ids=[db_sg.id],
#     db_subnet_group_name=db_subnet_group.name,
#     tags={"Name": "secure-db", "Encryption": "KMS"})

# pulumi.export("vpc_id", vpc.id)
# pulumi.export("bucket_name", bucket.bucket)

security_checklist = {
    "VPC": ["Private Subnets สำหรับ Backend", "Security Groups Least Privilege", "NACLs เพิ่มเติม"],
    "S3": ["Block Public Access", "KMS Encryption", "Versioning", "Lifecycle Rules"],
    "RDS": ["Encryption at Rest", "Multi-AZ", "Private Subnet", "Backup Retention"],
    "IAM": ["Least Privilege Roles", "MFA Enforced", "No Hardcoded Keys"],
    "EKS": ["Private API Server", "Managed Node Groups", "Pod Security Standards"],
}

print("Security Checklist:")
for service, checks in security_checklist.items():
    print(f"\n  [{service}]")
    for check in checks:
        print(f"    [x] {check}")

CrossGuard Policy as Code

# crossguard_policy.py — Pulumi CrossGuard Policies
# Policy Pack: security-policies

# from pulumi_policy import (
#     EnforcementLevel, PolicyPack, ResourceValidationPolicy,
#     StackValidationPolicy,
# )

# # Policy 1: ห้าม Public S3 Buckets
# def no_public_s3(args, report_violation):
#     if args.resource_type == "aws:s3/bucket:Bucket":
#         acl = args.props.get("acl", "private")
#         if acl in ["public-read", "public-read-write"]:
#             report_violation("S3 Bucket must not be public")

# # Policy 2: ต้องมี Encryption
# def require_encryption(args, report_violation):
#     if args.resource_type == "aws:s3/bucket:Bucket":
#         encryption = args.props.get("serverSideEncryptionConfiguration")
#         if not encryption:
#             report_violation("S3 Bucket must have encryption enabled")
#     if args.resource_type == "aws:rds/instance:Instance":
#         if not args.props.get("storageEncrypted", False):
#             report_violation("RDS must have storage encryption enabled")

# # Policy 3: ต้องมี Tags
# def require_tags(args, report_violation):
#     required_tags = ["Name", "Environment", "ManagedBy"]
#     tags = args.props.get("tags", {})
#     for tag in required_tags:
#         if tag not in tags:
#             report_violation(f"Resource must have '{tag}' tag")

# # Policy 4: ห้าม SSH จาก 0.0.0.0/0
# def no_public_ssh(args, report_violation):
#     if args.resource_type == "aws:ec2/securityGroup:SecurityGroup":
#         for rule in args.props.get("ingress", []):
#             if rule.get("fromPort") == 22:
#                 cidrs = rule.get("cidrBlocks", [])
#                 if "0.0.0.0/0" in cidrs:
#                     report_violation("SSH must not be open to 0.0.0.0/0")

# # Policy 5: Resource Limits
# def enforce_resource_limits(args, report_violation):
#     if args.resource_type == "aws:rds/instance:Instance":
#         instance_class = args.props.get("instanceClass", "")
#         allowed = ["db.t3.micro", "db.t3.small", "db.t3.medium"]
#         if instance_class not in allowed:
#             report_violation(f"RDS instance class must be one of {allowed}")

# # Register Policy Pack
# PolicyPack("security-policies", [
#     ResourceValidationPolicy(
#         name="no-public-s3",
#         description="S3 Buckets must not be public",
#         validate=no_public_s3,
#         enforcement_level=EnforcementLevel.MANDATORY),
#     ResourceValidationPolicy(
#         name="require-encryption",
#         description="Storage must be encrypted",
#         validate=require_encryption,
#         enforcement_level=EnforcementLevel.MANDATORY),
#     ResourceValidationPolicy(
#         name="require-tags",
#         description="Resources must have required tags",
#         validate=require_tags,
#         enforcement_level=EnforcementLevel.MANDATORY),
#     ResourceValidationPolicy(
#         name="no-public-ssh",
#         description="SSH must not be open to world",
#         validate=no_public_ssh,
#         enforcement_level=EnforcementLevel.MANDATORY),
# ])

# Run: pulumi preview --policy-pack ./security-policies

policies = {
    "no-public-s3": {"level": "MANDATORY", "desc": "ห้าม S3 Public Access"},
    "require-encryption": {"level": "MANDATORY", "desc": "ต้อง Encrypt ทุก Storage"},
    "require-tags": {"level": "MANDATORY", "desc": "ต้องมี Tags ครบ"},
    "no-public-ssh": {"level": "MANDATORY", "desc": "ห้าม SSH จาก 0.0.0.0/0"},
    "resource-limits": {"level": "ADVISORY", "desc": "จำกัดขนาด Instance"},
    "no-public-rds": {"level": "MANDATORY", "desc": "ห้าม RDS Public"},
}

print("CrossGuard Policies:")
for name, info in policies.items():
    print(f"  [{info['level']}] {name}: {info['desc']}")

CI/CD Security Pipeline

# cicd_security.py — Shift Left Security in CI/CD

# GitHub Actions Pipeline
# name: Infrastructure Deploy
# on:
#   push:
#     branches: [main]
#   pull_request:
#     branches: [main]
# jobs:
#   security-scan:
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#
#       # 1. Checkov — IaC Security Scan
#       - name: Checkov Scan
#         uses: bridgecrewio/checkov-action@v12
#         with:
#           directory: ./infra
#           framework: all
#           soft_fail: false
#
#       # 2. Trivy — Container/IaC Scan
#       - name: Trivy IaC Scan
#         uses: aquasecurity/trivy-action@master
#         with:
#           scan-type: config
#           scan-ref: ./infra
#           severity: CRITICAL, HIGH
#
#       # 3. Pulumi Preview + CrossGuard
#       - name: Pulumi Preview
#         uses: pulumi/actions@v4
#         with:
#           command: preview
#           stack-name: production
#           policyPacks: ./security-policies
#         env:
#           PULUMI_ACCESS_TOKEN: }
#
#   deploy:
#     needs: security-scan
#     if: github.ref == 'refs/heads/main'
#     runs-on: ubuntu-latest
#     steps:
#       - uses: actions/checkout@v4
#       - name: Pulumi Up
#         uses: pulumi/actions@v4
#         with:
#           command: up
#           stack-name: production
#           policyPacks: ./security-policies
#         env:
#           PULUMI_ACCESS_TOKEN: }

shift_left_tools = {
    "SAST (Static Analysis)": {
        "tools": "SonarQube, Semgrep, CodeQL",
        "when": "Code Commit / PR",
        "finds": "SQL Injection, XSS, Hardcoded Secrets",
    },
    "SCA (Software Composition)": {
        "tools": "Snyk, Dependabot, Trivy",
        "when": "Dependency Update / PR",
        "finds": "Vulnerable Dependencies, License Issues",
    },
    "IaC Scanning": {
        "tools": "Checkov, tfsec, Trivy, KICS",
        "when": "Infrastructure PR",
        "finds": "Misconfiguration, Public Access, No Encryption",
    },
    "Policy as Code": {
        "tools": "Pulumi CrossGuard, OPA/Rego, Sentinel",
        "when": "Before Deploy (Preview)",
        "finds": "Policy Violations, Compliance Gaps",
    },
    "Container Scanning": {
        "tools": "Trivy, Grype, Docker Scout",
        "when": "Image Build",
        "finds": "OS Vulnerabilities, Malware",
    },
    "DAST (Dynamic Analysis)": {
        "tools": "OWASP ZAP, Nuclei, Burp Suite",
        "when": "After Deploy (Staging)",
        "finds": "Runtime Vulnerabilities, API Issues",
    },
}

print("Shift Left Security Tools:")
for category, info in shift_left_tools.items():
    print(f"\n  [{category}]")
    print(f"    Tools: {info['tools']}")
    print(f"    When: {info['when']}")
    print(f"    Finds: {info['finds']}")

Best Practices

Pulumi คืออะไร

Infrastructure as Code ใช้ภาษาจริง Python TypeScript Go C# AWS Azure GCP Kubernetes State Management Preview Drift Detection Software Engineering Practices

Shift Left Security คืออะไร

ย้าย Security Testing มาตั้งแต่ต้น Development Lifecycle Code Review CI/CD SAST DAST SCA Policy as Code ลดค่าใช้จ่ายแก้ไข Bug

Pulumi CrossGuard คืออะไร

Policy as Code Framework Pulumi เขียน Policy Python TypeScript ตรวจสอบ Infrastructure ก่อน Deploy ห้าม Public S3 ต้อง Encryption Tags CI/CD อัตโนมัติ

IaC Security Best Practices มีอะไรบ้าง

Policy as Code บังคับ Rules Checkov tfsec Scan Secrets Management Least Privilege IAM Encryption ทุก Storage Private Subnets Backend Services

สรุป

Pulumi IaC ภาษาจริง Python TypeScript Shift Left Security ตั้งแต่ต้น CrossGuard Policy as Code Checkov Trivy Scan SAST SCA IaC Scanning Encryption Least Privilege CI/CD Security Pipeline Compliance Automation

📖 บทความที่เกี่ยวข้อง

Pulumi IaC สำหรับมือใหม่ Step by Stepอ่านบทความ → Pulumi IaC Low Code No Codeอ่านบทความ → Pulumi IaC Metric Collectionอ่านบทความ → Pulumi IaC Code Review Best Practiceอ่านบทความ → Pulumi IaC Production Setup Guideอ่านบทความ →

📚 ดูบทความทั้งหมด →