SiamCafe.net Blog
Technology

Pulumi IaC Data Pipeline ETL

pulumi iac data pipeline etl
Pulumi IaC Data Pipeline ETL | SiamCafe Blog
2025-12-15· อ. บอม — SiamCafe.net· 11,218 คำ

Pulumi สำหรับ Data Pipeline

Pulumi ใช้ภาษาโปรแกรมจริง Python TypeScript สร้าง Cloud Infrastructure ETL Pipeline ประกอบด้วย Extract Transform Load ดึงข้อมูลแปลงโหลดเข้า Data Warehouse

ใช้ Pulumi สร้าง S3 Buckets, Glue Jobs, Redshift Cluster, Airflow Environment อัตโนมัติ Version Control ด้วย Git Reproducible ทุกครั้ง

Pulumi ETL Infrastructure

# === Pulumi ETL Infrastructure (Python) ===
# pip install pulumi pulumi-aws

import pulumi
import pulumi_aws as aws
from dataclasses import dataclass
from typing import List, Dict

# Configuration
config = pulumi.Config()
env = config.get("env") or "dev"
project = "etl-pipeline"

# 1. S3 Buckets
raw_bucket = aws.s3.Bucket(f"{project}-raw-{env}",
    bucket=f"{project}-raw-{env}",
    versioning=aws.s3.BucketVersioningArgs(
        enabled=True,
    ),
    lifecycle_rules=[aws.s3.BucketLifecycleRuleArgs(
        enabled=True,
        transitions=[aws.s3.BucketLifecycleRuleTransitionArgs(
            days=30,
            storage_class="STANDARD_IA",
        )],
        expiration=aws.s3.BucketLifecycleRuleExpirationArgs(
            days=365,
        ),
    )],
    tags={"Environment": env, "Project": project},
)

processed_bucket = aws.s3.Bucket(f"{project}-processed-{env}",
    bucket=f"{project}-processed-{env}",
    versioning=aws.s3.BucketVersioningArgs(enabled=True),
    tags={"Environment": env, "Project": project},
)

# 2. Glue Database
glue_db = aws.glue.CatalogDatabase(f"{project}-db",
    name=f"{project.replace('-', '_')}_{env}",
)

# 3. IAM Role สำหรับ Glue
glue_role = aws.iam.Role(f"{project}-glue-role",
    assume_role_policy="""{
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "glue.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }]
    }""",
)

aws.iam.RolePolicyAttachment(f"{project}-glue-policy",
    role=glue_role.name,
    policy_arn="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole",
)

# 4. Glue ETL Job
# glue_job = aws.glue.Job(f"{project}-transform",
#     name=f"{project}-transform-{env}",
#     role_arn=glue_role.arn,
#     command=aws.glue.JobCommandArgs(
#         script_location=f"s3://{project}-scripts-{env}/transform.py",
#         python_version="3",
#     ),
#     default_arguments={
#         "--job-language": "python",
#         "--raw_bucket": raw_bucket.bucket,
#         "--processed_bucket": processed_bucket.bucket,
#         "--TempDir": f"s3://{project}-temp-{env}/",
#     },
#     glue_version="4.0",
#     number_of_workers=2,
#     worker_type="G.1X",
#     tags={"Environment": env},
# )

# 5. Redshift Cluster
# redshift = aws.redshift.Cluster(f"{project}-redshift",
#     cluster_identifier=f"{project}-{env}",
#     database_name="analytics",
#     master_username="admin",
#     master_password=config.require_secret("redshift_password"),
#     node_type="dc2.large",
#     number_of_nodes=2,
#     skip_final_snapshot=True if env == "dev" else False,
#     tags={"Environment": env},
# )

# Outputs
pulumi.export("raw_bucket", raw_bucket.bucket)
pulumi.export("processed_bucket", processed_bucket.bucket)
pulumi.export("glue_database", glue_db.name)

print("Pulumi ETL Infrastructure:")
print(f"  Environment: {env}")
print(f"  S3 Raw: {project}-raw-{env}")
print(f"  S3 Processed: {project}-processed-{env}")
print(f"  Glue DB: {project}_{env}")
print(f"  Glue Job: {project}-transform-{env}")

ETL Pipeline Code

# etl_pipeline.py — ETL Pipeline with Python
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json

@dataclass
class ETLStep:
    name: str
    step_type: str  # extract, transform, load
    source: str
    destination: str
    status: str = "pending"
    rows_processed: int = 0
    duration_seconds: float = 0

@dataclass
class ETLPipeline:
    name: str
    steps: List[ETLStep] = field(default_factory=list)
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None

class ETLEngine:
    """ETL Pipeline Engine"""

    def __init__(self):
        self.pipelines: Dict[str, ETLPipeline] = {}

    def create_pipeline(self, name: str, steps: List[ETLStep]):
        pipeline = ETLPipeline(name=name, steps=steps)
        self.pipelines[name] = pipeline
        return pipeline

    def run_pipeline(self, name: str):
        pipeline = self.pipelines.get(name)
        if not pipeline:
            print(f"Pipeline not found: {name}")
            return

        pipeline.start_time = datetime.now()
        print(f"\n{'='*55}")
        print(f"ETL Pipeline: {name}")
        print(f"{'='*55}")

        for i, step in enumerate(pipeline.steps, 1):
            step.status = "running"
            print(f"\n  Step {i}: {step.name} [{step.step_type}]")
            print(f"    Source: {step.source}")
            print(f"    Destination: {step.destination}")

            # Simulate processing
            import time, random
            time.sleep(0.01)
            step.rows_processed = random.randint(10000, 100000)
            step.duration_seconds = random.uniform(5, 60)
            step.status = "completed"

            print(f"    Rows: {step.rows_processed:,}")
            print(f"    Duration: {step.duration_seconds:.1f}s")
            print(f"    Status: {step.status}")

        pipeline.end_time = datetime.now()

        total_rows = sum(s.rows_processed for s in pipeline.steps)
        total_duration = sum(s.duration_seconds for s in pipeline.steps)
        print(f"\n  Summary:")
        print(f"    Total Rows: {total_rows:,}")
        print(f"    Total Duration: {total_duration:.1f}s")
        print(f"    Steps: {len(pipeline.steps)} completed")

# สร้าง Pipeline
engine = ETLEngine()

steps = [
    ETLStep("Extract from API", "extract",
            "https://api.example.com/orders", "s3://raw/orders/"),
    ETLStep("Extract from DB", "extract",
            "postgres://prod/customers", "s3://raw/customers/"),
    ETLStep("Clean & Validate", "transform",
            "s3://raw/orders/", "s3://processed/orders_clean/"),
    ETLStep("Join Orders+Customers", "transform",
            "s3://processed/", "s3://processed/orders_enriched/"),
    ETLStep("Aggregate Daily", "transform",
            "s3://processed/orders_enriched/", "s3://processed/daily_summary/"),
    ETLStep("Load to Redshift", "load",
            "s3://processed/daily_summary/", "redshift://analytics/daily_sales"),
]

engine.create_pipeline("daily-sales-pipeline", steps)
engine.run_pipeline("daily-sales-pipeline")

Airflow Orchestration

# === Airflow DAG สำหรับ ETL Pipeline ===

# dags/etl_daily_sales.py
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
# from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
# from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# from datetime import datetime, timedelta
#
# default_args = {
#     'owner': 'data-team',
#     'depends_on_past': False,
#     'email_on_failure': True,
#     'email': ['data-alerts@company.com'],
#     'retries': 2,
#     'retry_delay': timedelta(minutes=5),
# }
#
# with DAG(
#     'etl_daily_sales',
#     default_args=default_args,
#     description='Daily Sales ETL Pipeline',
#     schedule_interval='0 6 * * *',  # ทุกวัน 6:00 AM
#     start_date=datetime(2024, 1, 1),
#     catchup=False,
#     tags=['etl', 'sales', 'daily'],
# ) as dag:
#
#     wait_for_data = S3KeySensor(
#         task_id='wait_for_raw_data',
#         bucket_name='etl-pipeline-raw',
#         bucket_key='orders/{{ ds }}/*.parquet',
#         timeout=3600,
#     )
#
#     extract_api = PythonOperator(
#         task_id='extract_from_api',
#         python_callable=extract_orders,
#         op_kwargs={'date': '{{ ds }}'},
#     )
#
#     transform = GlueJobOperator(
#         task_id='transform_data',
#         job_name='etl-pipeline-transform',
#         script_args={
#             '--date': '{{ ds }}',
#             '--raw_bucket': 'etl-pipeline-raw',
#             '--processed_bucket': 'etl-pipeline-processed',
#         },
#     )
#
#     load_redshift = PythonOperator(
#         task_id='load_to_redshift',
#         python_callable=load_to_redshift,
#         op_kwargs={'date': '{{ ds }}'},
#     )
#
#     notify = PythonOperator(
#         task_id='send_notification',
#         python_callable=send_slack_notification,
#     )
#
#     wait_for_data >> extract_api >> transform >> load_redshift >> notify

# Pulumi — สร้าง MWAA (Managed Airflow) Environment
# mwaa = aws.mwaa.Environment(f"{project}-airflow",
#     name=f"{project}-airflow-{env}",
#     airflow_version="2.8.1",
#     execution_role_arn=airflow_role.arn,
#     source_bucket_arn=dags_bucket.arn,
#     dag_s3_path="dags/",
#     environment_class="mw1.small",
#     max_workers=5,
#     network_configuration=aws.mwaa.EnvironmentNetworkConfigurationArgs(
#         security_group_ids=[sg.id],
#         subnet_ids=private_subnets,
#     ),
# )

dag_structure = {
    "DAG": "etl_daily_sales",
    "Schedule": "0 6 * * * (Daily 6AM)",
    "Tasks": [
        "wait_for_raw_data (S3 Sensor)",
        "extract_from_api (Python)",
        "transform_data (Glue Job)",
        "load_to_redshift (Python)",
        "send_notification (Slack)",
    ],
}

print("Airflow DAG:")
print(f"  Name: {dag_structure['DAG']}")
print(f"  Schedule: {dag_structure['Schedule']}")
print(f"  Tasks:")
for task in dag_structure['Tasks']:
    print(f"    - {task}")

Best Practices

Pulumi คืออะไร

IaC Tool ใช้ภาษาโปรแกรมจริง Python TypeScript Go แทน DSL สร้าง Cloud Resources AWS Azure GCP State Management Preview Changes Drift Detection

ETL Pipeline คืออะไร

Extract Transform Load ดึงข้อมูลจาก Sources แปลงข้อมูล Clean Aggregate Join โหลดเข้า Data Warehouse Redshift BigQuery Snowflake Analytics BI

Pulumi ต่างจาก Terraform อย่างไร

Pulumi ภาษาจริง Python TypeScript IDE Support Type Checking Testing Terraform HCL ภาษาเฉพาะ Pulumi Automation API Terraform Community ใหญ่ Providers มาก ทั้งสอง State ดี

วิธีเลือก ETL Tool ทำอย่างไร

Data Volume น้อย Python Scripts มาก Spark/Glue Complexity ง่าย Glue/dbt ซับซ้อน Airflow+Spark Cost งบน้อย Open Source งบมาก Managed Services Team Skills

สรุป

Pulumi ใช้ภาษาจริงสร้าง ETL Infrastructure S3 Glue Redshift Airflow อัตโนมัติ ETL Pipeline Extract Transform Load เข้า Data Warehouse Airflow Orchestration จัดการ DAG แยก Stack ต่อ Environment Secrets Management Testing ก่อน Deploy

📖 บทความที่เกี่ยวข้อง

Pulumi IaC สำหรับมือใหม่ Step by Stepอ่านบทความ → Pulumi IaC IoT Gatewayอ่านบทความ → Pulumi IaC GitOps Workflowอ่านบทความ → Pulumi IaC AR VR Developmentอ่านบทความ →

📚 ดูบทความทั้งหมด →