Pulumi IaC
Pulumi Infrastructure as Code ใช้ภาษาจริง Python TypeScript Go C# เขียน Infrastructure AWS Azure GCP Kubernetes State Management Preview Drift Detection
Shift Left Security ย้าย Security Testing มาตั้งแต่ต้น Development Lifecycle SAST DAST SCA Policy as Code ลดค่าใช้จ่ายแก้ไข Bug
Pulumi Infrastructure Setup
# pulumi_infra.py — Pulumi Infrastructure with Security
# pip install pulumi pulumi-aws pulumi-policy
import pulumi
import pulumi_aws as aws
from dataclasses import dataclass, field
from typing import List, Dict
# === VPC with Security ===
# vpc = aws.ec2.Vpc("secure-vpc",
# cidr_block="10.0.0.0/16",
# enable_dns_support=True,
# enable_dns_hostnames=True,
# tags={"Name": "secure-vpc", "Environment": "production", "ManagedBy": "pulumi"})
# # Private Subnet (ไม่มี Internet Gateway)
# private_subnet = aws.ec2.Subnet("private-subnet",
# vpc_id=vpc.id,
# cidr_block="10.0.1.0/24",
# availability_zone="ap-southeast-1a",
# tags={"Name": "private-subnet", "Type": "private"})
# # Public Subnet (มี Internet Gateway)
# public_subnet = aws.ec2.Subnet("public-subnet",
# vpc_id=vpc.id,
# cidr_block="10.0.100.0/24",
# availability_zone="ap-southeast-1a",
# map_public_ip_on_launch=False, # ไม่ Auto-assign Public IP
# tags={"Name": "public-subnet", "Type": "public"})
# # Security Group — Least Privilege
# web_sg = aws.ec2.SecurityGroup("web-sg",
# vpc_id=vpc.id,
# description="Web Server Security Group",
# ingress=[
# {"protocol": "tcp", "from_port": 443, "to_port": 443,
# "cidr_blocks": ["0.0.0.0/0"], "description": "HTTPS"},
# ],
# egress=[
# {"protocol": "-1", "from_port": 0, "to_port": 0,
# "cidr_blocks": ["0.0.0.0/0"], "description": "All outbound"},
# ],
# tags={"Name": "web-sg"})
# # S3 Bucket with Encryption
# bucket = aws.s3.Bucket("secure-bucket",
# acl="private",
# server_side_encryption_configuration={
# "rule": {"apply_server_side_encryption_by_default": {
# "sse_algorithm": "aws:kms"}}},
# versioning={"enabled": True},
# lifecycle_rules=[{"enabled": True, "expiration": {"days": 90}}],
# tags={"Name": "secure-bucket", "Encryption": "KMS"})
# # Block Public Access
# aws.s3.BucketPublicAccessBlock("block-public",
# bucket=bucket.id,
# block_public_acls=True,
# block_public_policy=True,
# ignore_public_acls=True,
# restrict_public_buckets=True)
# # RDS with Encryption
# db = aws.rds.Instance("secure-db",
# engine="postgres",
# engine_version="15.4",
# instance_class="db.t3.medium",
# allocated_storage=50,
# storage_encrypted=True,
# kms_key_id=kms_key.arn,
# multi_az=True,
# backup_retention_period=7,
# deletion_protection=True,
# publicly_accessible=False,
# vpc_security_group_ids=[db_sg.id],
# db_subnet_group_name=db_subnet_group.name,
# tags={"Name": "secure-db", "Encryption": "KMS"})
# pulumi.export("vpc_id", vpc.id)
# pulumi.export("bucket_name", bucket.bucket)
security_checklist = {
"VPC": ["Private Subnets สำหรับ Backend", "Security Groups Least Privilege", "NACLs เพิ่มเติม"],
"S3": ["Block Public Access", "KMS Encryption", "Versioning", "Lifecycle Rules"],
"RDS": ["Encryption at Rest", "Multi-AZ", "Private Subnet", "Backup Retention"],
"IAM": ["Least Privilege Roles", "MFA Enforced", "No Hardcoded Keys"],
"EKS": ["Private API Server", "Managed Node Groups", "Pod Security Standards"],
}
print("Security Checklist:")
for service, checks in security_checklist.items():
print(f"\n [{service}]")
for check in checks:
print(f" [x] {check}")
CrossGuard Policy as Code
# crossguard_policy.py — Pulumi CrossGuard Policies
# Policy Pack: security-policies
# from pulumi_policy import (
# EnforcementLevel, PolicyPack, ResourceValidationPolicy,
# StackValidationPolicy,
# )
# # Policy 1: ห้าม Public S3 Buckets
# def no_public_s3(args, report_violation):
# if args.resource_type == "aws:s3/bucket:Bucket":
# acl = args.props.get("acl", "private")
# if acl in ["public-read", "public-read-write"]:
# report_violation("S3 Bucket must not be public")
# # Policy 2: ต้องมี Encryption
# def require_encryption(args, report_violation):
# if args.resource_type == "aws:s3/bucket:Bucket":
# encryption = args.props.get("serverSideEncryptionConfiguration")
# if not encryption:
# report_violation("S3 Bucket must have encryption enabled")
# if args.resource_type == "aws:rds/instance:Instance":
# if not args.props.get("storageEncrypted", False):
# report_violation("RDS must have storage encryption enabled")
# # Policy 3: ต้องมี Tags
# def require_tags(args, report_violation):
# required_tags = ["Name", "Environment", "ManagedBy"]
# tags = args.props.get("tags", {})
# for tag in required_tags:
# if tag not in tags:
# report_violation(f"Resource must have '{tag}' tag")
# # Policy 4: ห้าม SSH จาก 0.0.0.0/0
# def no_public_ssh(args, report_violation):
# if args.resource_type == "aws:ec2/securityGroup:SecurityGroup":
# for rule in args.props.get("ingress", []):
# if rule.get("fromPort") == 22:
# cidrs = rule.get("cidrBlocks", [])
# if "0.0.0.0/0" in cidrs:
# report_violation("SSH must not be open to 0.0.0.0/0")
# # Policy 5: Resource Limits
# def enforce_resource_limits(args, report_violation):
# if args.resource_type == "aws:rds/instance:Instance":
# instance_class = args.props.get("instanceClass", "")
# allowed = ["db.t3.micro", "db.t3.small", "db.t3.medium"]
# if instance_class not in allowed:
# report_violation(f"RDS instance class must be one of {allowed}")
# # Register Policy Pack
# PolicyPack("security-policies", [
# ResourceValidationPolicy(
# name="no-public-s3",
# description="S3 Buckets must not be public",
# validate=no_public_s3,
# enforcement_level=EnforcementLevel.MANDATORY),
# ResourceValidationPolicy(
# name="require-encryption",
# description="Storage must be encrypted",
# validate=require_encryption,
# enforcement_level=EnforcementLevel.MANDATORY),
# ResourceValidationPolicy(
# name="require-tags",
# description="Resources must have required tags",
# validate=require_tags,
# enforcement_level=EnforcementLevel.MANDATORY),
# ResourceValidationPolicy(
# name="no-public-ssh",
# description="SSH must not be open to world",
# validate=no_public_ssh,
# enforcement_level=EnforcementLevel.MANDATORY),
# ])
# Run: pulumi preview --policy-pack ./security-policies
policies = {
"no-public-s3": {"level": "MANDATORY", "desc": "ห้าม S3 Public Access"},
"require-encryption": {"level": "MANDATORY", "desc": "ต้อง Encrypt ทุก Storage"},
"require-tags": {"level": "MANDATORY", "desc": "ต้องมี Tags ครบ"},
"no-public-ssh": {"level": "MANDATORY", "desc": "ห้าม SSH จาก 0.0.0.0/0"},
"resource-limits": {"level": "ADVISORY", "desc": "จำกัดขนาด Instance"},
"no-public-rds": {"level": "MANDATORY", "desc": "ห้าม RDS Public"},
}
print("CrossGuard Policies:")
for name, info in policies.items():
print(f" [{info['level']}] {name}: {info['desc']}")
CI/CD Security Pipeline
# cicd_security.py — Shift Left Security in CI/CD
# GitHub Actions Pipeline
# name: Infrastructure Deploy
# on:
# push:
# branches: [main]
# pull_request:
# branches: [main]
# jobs:
# security-scan:
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
#
# # 1. Checkov — IaC Security Scan
# - name: Checkov Scan
# uses: bridgecrewio/checkov-action@v12
# with:
# directory: ./infra
# framework: all
# soft_fail: false
#
# # 2. Trivy — Container/IaC Scan
# - name: Trivy IaC Scan
# uses: aquasecurity/trivy-action@master
# with:
# scan-type: config
# scan-ref: ./infra
# severity: CRITICAL, HIGH
#
# # 3. Pulumi Preview + CrossGuard
# - name: Pulumi Preview
# uses: pulumi/actions@v4
# with:
# command: preview
# stack-name: production
# policyPacks: ./security-policies
# env:
# PULUMI_ACCESS_TOKEN: }
#
# deploy:
# needs: security-scan
# if: github.ref == 'refs/heads/main'
# runs-on: ubuntu-latest
# steps:
# - uses: actions/checkout@v4
# - name: Pulumi Up
# uses: pulumi/actions@v4
# with:
# command: up
# stack-name: production
# policyPacks: ./security-policies
# env:
# PULUMI_ACCESS_TOKEN: }
shift_left_tools = {
"SAST (Static Analysis)": {
"tools": "SonarQube, Semgrep, CodeQL",
"when": "Code Commit / PR",
"finds": "SQL Injection, XSS, Hardcoded Secrets",
},
"SCA (Software Composition)": {
"tools": "Snyk, Dependabot, Trivy",
"when": "Dependency Update / PR",
"finds": "Vulnerable Dependencies, License Issues",
},
"IaC Scanning": {
"tools": "Checkov, tfsec, Trivy, KICS",
"when": "Infrastructure PR",
"finds": "Misconfiguration, Public Access, No Encryption",
},
"Policy as Code": {
"tools": "Pulumi CrossGuard, OPA/Rego, Sentinel",
"when": "Before Deploy (Preview)",
"finds": "Policy Violations, Compliance Gaps",
},
"Container Scanning": {
"tools": "Trivy, Grype, Docker Scout",
"when": "Image Build",
"finds": "OS Vulnerabilities, Malware",
},
"DAST (Dynamic Analysis)": {
"tools": "OWASP ZAP, Nuclei, Burp Suite",
"when": "After Deploy (Staging)",
"finds": "Runtime Vulnerabilities, API Issues",
},
}
print("Shift Left Security Tools:")
for category, info in shift_left_tools.items():
print(f"\n [{category}]")
print(f" Tools: {info['tools']}")
print(f" When: {info['when']}")
print(f" Finds: {info['finds']}")
Best Practices
- Policy Pack: สร้าง CrossGuard Policy Pack บังคับทุก Stack
- PR Review: ใช้ pulumi preview ใน PR แสดง Changes ก่อน Merge
- Checkov: Scan IaC ทุก PR ก่อน Deploy
- Secrets: ใช้ Pulumi Secrets Provider เข้ารหัส Secrets
- Least Privilege: IAM Roles ให้สิทธิ์น้อยที่สุด
- Encrypt Everything: Encrypt ทุก Storage Database Network
Pulumi คืออะไร
Infrastructure as Code ใช้ภาษาจริง Python TypeScript Go C# AWS Azure GCP Kubernetes State Management Preview Drift Detection Software Engineering Practices
Shift Left Security คืออะไร
ย้าย Security Testing มาตั้งแต่ต้น Development Lifecycle Code Review CI/CD SAST DAST SCA Policy as Code ลดค่าใช้จ่ายแก้ไข Bug
Pulumi CrossGuard คืออะไร
Policy as Code Framework Pulumi เขียน Policy Python TypeScript ตรวจสอบ Infrastructure ก่อน Deploy ห้าม Public S3 ต้อง Encryption Tags CI/CD อัตโนมัติ
IaC Security Best Practices มีอะไรบ้าง
Policy as Code บังคับ Rules Checkov tfsec Scan Secrets Management Least Privilege IAM Encryption ทุก Storage Private Subnets Backend Services
สรุป
Pulumi IaC ภาษาจริง Python TypeScript Shift Left Security ตั้งแต่ต้น CrossGuard Policy as Code Checkov Trivy Scan SAST SCA IaC Scanning Encryption Least Privilege CI/CD Security Pipeline Compliance Automation
