it

Pulumi IaC กับ Data Pipeline ETL — วิธีใช้

Pulumi IaC กับ Data Pipeline ETL — วิธีใช้

Pulumi สำหรับ Data Pipeline

Pulumi IaC กับ Data Pipeline ETL — วิธีใช้

Pulumi ใช้ภาษาโปรแกรมจริง Python TypeScript สร้าง Cloud Infrastructure ETL Pipeline ประกอบด้วย Extract Transform Load ดึงข้อมูลแปลงโหลดเข้า Data Warehouse

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: GCP Pub Sub GitOps Workflow

ใช้ Pulumi สร้าง S3 Buckets, Glue Jobs, Redshift Cluster, Airflow Environment อัตโนมัติ Version Control ด้วย Git Reproducible ทุกครั้ง

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Spdr Stock — คู่มือฉบับสมบูรณ์ 2026

Pulumi ETL Infrastructure

# === Pulumi ETL Infrastructure (Python) ===
# pip install pulumi pulumi-aws

import pulumi
import pulumi_aws as aws
from dataclasses import dataclass
from typing import List, Dict

# Configuration
config = pulumi.Config()
env = config.get("env") or "dev"
project = "etl-pipeline"

# 1. S3 Buckets
raw_bucket = aws.s3.Bucket(f"{project}-raw-{env}",
    bucket=f"{project}-raw-{env}",
    versioning=aws.s3.BucketVersioningArgs(
        enabled=True,
    ),
    lifecycle_rules=[aws.s3.BucketLifecycleRuleArgs(
        enabled=True,
        transitions=[aws.s3.BucketLifecycleRuleTransitionArgs(
            days=30,
            storage_class="STANDARD_IA",
        )],
        expiration=aws.s3.BucketLifecycleRuleExpirationArgs(
            days=365,
        ),
    )],
    tags={"Environment": env, "Project": project},
)

processed_bucket = aws.s3.Bucket(f"{project}-processed-{env}",
    bucket=f"{project}-processed-{env}",
    versioning=aws.s3.BucketVersioningArgs(enabled=True),
    tags={"Environment": env, "Project": project},
)

# 2. Glue Database
glue_db = aws.glue.CatalogDatabase(f"{project}-db",
    name=f"{project.replace('-', '_')}_{env}",
)

# 3. IAM Role สำหรับ Glue
glue_role = aws.iam.Role(f"{project}-glue-role",
    assume_role_policy="""{
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {"Service": "glue.amazonaws.com"},
            "Action": "sts:AssumeRole"
        }]
    }""",
)

aws.iam.RolePolicyAttachment(f"{project}-glue-policy",
    role=glue_role.name,
    policy_arn="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole",
)

# 4. Glue ETL Job
# glue_job = aws.glue.Job(f"{project}-transform",
#     name=f"{project}-transform-{env}",
#     role_arn=glue_role.arn,
#     command=aws.glue.JobCommandArgs(
#         script_location=f"s3://{project}-scripts-{env}/transform.py",
#         python_version="3",
#     ),
#     default_arguments={
#         "--job-language": "python",
#         "--raw_bucket": raw_bucket.bucket,
#         "--processed_bucket": processed_bucket.bucket,
#         "--TempDir": f"s3://{project}-temp-{env}/",
#     },
#     glue_version="4.0",
#     number_of_workers=2,
#     worker_type="G.1X",
#     tags={"Environment": env},
# )

# 5. Redshift Cluster
# redshift = aws.redshift.Cluster(f"{project}-redshift",
#     cluster_identifier=f"{project}-{env}",
#     database_name="analytics",
#     master_username="admin",
#     master_password=config.require_secret("redshift_password"),
#     node_type="dc2.large",
#     number_of_nodes=2,
#     skip_final_snapshot=True if env == "dev" else False,
#     tags={"Environment": env},
# )

# Outputs
pulumi.export("raw_bucket", raw_bucket.bucket)
pulumi.export("processed_bucket", processed_bucket.bucket)
pulumi.export("glue_database", glue_db.name)

print("Pulumi ETL Infrastructure:")
print(f"  Environment: {env}")
print(f"  S3 Raw: {project}-raw-{env}")
print(f"  S3 Processed: {project}-processed-{env}")
print(f"  Glue DB: {project}_{env}")
print(f"  Glue Job: {project}-transform-{env}")

ETL Pipeline Code

Pulumi IaC กับ Data Pipeline ETL — วิธีใช้
# etl_pipeline.py — ETL Pipeline with Python
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json

@dataclass
class ETLStep:
    name: str
    step_type: str  # extract, transform, load
    source: str
    destination: str
    status: str = "pending"
    rows_processed: int = 0
    duration_seconds: float = 0

@dataclass
class ETLPipeline:
    name: str
    steps: List[ETLStep] = field(default_factory=list)
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None

class ETLEngine:
    """ETL Pipeline Engine"""

    def __init__(self):
        self.pipelines: Dict[str, ETLPipeline] = {}

    def create_pipeline(self, name: str, steps: List[ETLStep]):
        pipeline = ETLPipeline(name=name, steps=steps)
        self.pipelines[name] = pipeline
        return pipeline

    def run_pipeline(self, name: str):
        pipeline = self.pipelines.get(name)
        if not pipeline:
            print(f"Pipeline not found: {name}")
            return

        pipeline.start_time = datetime.now()
        print(f"\n{'='*55}")
        print(f"ETL Pipeline: {name}")
        print(f"{'='*55}")

        for i, step in enumerate(pipeline.steps, 1):
            step.status = "running"
            print(f"\n  Step {i}: {step.name} [{step.step_type}]")
            print(f"    Source: {step.source}")
            print(f"    Destination: {step.destination}")

            # Simulate processing
            import time, random
            time.sleep(0.01)
            step.rows_processed = random.randint(10000, 100000)
            step.duration_seconds = random.uniform(5, 60)
            step.status = "completed"

            print(f"    Rows: {step.rows_processed:,}")
            print(f"    Duration: {step.duration_seconds:.1f}s")
            print(f"    Status: {step.status}")

        pipeline.end_time = datetime.now()

        total_rows = sum(s.rows_processed for s in pipeline.steps)
        total_duration = sum(s.duration_seconds for s in pipeline.steps)
        print(f"\n  Summary:")
        print(f"    Total Rows: {total_rows:,}")
        print(f"    Total Duration: {total_duration:.1f}s")
        print(f"    Steps: {len(pipeline.steps)} completed")

# สร้าง Pipeline
engine = ETLEngine()

steps = [
    ETLStep("Extract from API", "extract",
            "https://api.example.com/orders", "s3://raw/orders/"),
    ETLStep("Extract from DB", "extract",
            "postgres://prod/customers", "s3://raw/customers/"),
    ETLStep("Clean & Validate", "transform",
            "s3://raw/orders/", "s3://processed/orders_clean/"),
    ETLStep("Join Orders+Customers", "transform",
            "s3://processed/", "s3://processed/orders_enriched/"),
    ETLStep("Aggregate Daily", "transform",
            "s3://processed/orders_enriched/", "s3://processed/daily_summary/"),
    ETLStep("Load to Redshift", "load",
            "s3://processed/daily_summary/", "redshift://analytics/daily_sales"),
]

engine.create_pipeline("daily-sales-pipeline", steps)
engine.run_pipeline("daily-sales-pipeline")

Airflow Orchestration

# === Airflow DAG สำหรับ ETL Pipeline ===

# dags/etl_daily_sales.py
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
# from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
# from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# from datetime import datetime, timedelta
#
# default_args = {
#     'owner': 'data-team',
#     'depends_on_past': False,
#     'email_on_failure': True,
#     'email': ['data-alerts@company.com'],
#     'retries': 2,
#     'retry_delay': timedelta(minutes=5),
# }
#
# with DAG(
#     'etl_daily_sales',
#     default_args=default_args,
#     description='Daily Sales ETL Pipeline',
#     schedule_interval='0 6 * * *',  # ทุกวัน 6:00 AM
#     start_date=datetime(2024, 1, 1),
#     catchup=False,
#     tags=['etl', 'sales', 'daily'],
# ) as dag:
#
#     wait_for_data = S3KeySensor(
#         task_id='wait_for_raw_data',
#         bucket_name='etl-pipeline-raw',
#         bucket_key='orders/{{ ds }}/*.parquet',
#         timeout=3600,
#     )
#
#     extract_api = PythonOperator(
#         task_id='extract_from_api',
#         python_callable=extract_orders,
#         op_kwargs={'date': '{{ ds }}'},
#     )
#
#     transform = GlueJobOperator(
#         task_id='transform_data',
#         job_name='etl-pipeline-transform',
#         script_args={
#             '--date': '{{ ds }}',
#             '--raw_bucket': 'etl-pipeline-raw',
#             '--processed_bucket': 'etl-pipeline-processed',
#         },
#     )
#
#     load_redshift = PythonOperator(
#         task_id='load_to_redshift',
#         python_callable=load_to_redshift,
#         op_kwargs={'date': '{{ ds }}'},
#     )
#
#     notify = PythonOperator(
#         task_id='send_notification',
#         python_callable=send_slack_notification,
#     )
#
#     wait_for_data >> extract_api >> transform >> load_redshift >> notify

# Pulumi — สร้าง MWAA (Managed Airflow) Environment
# mwaa = aws.mwaa.Environment(f"{project}-airflow",
#     name=f"{project}-airflow-{env}",
#     airflow_version="2.8.1",
#     execution_role_arn=airflow_role.arn,
#     source_bucket_arn=dags_bucket.arn,
#     dag_s3_path="dags/",
#     environment_class="mw1.small",
#     max_workers=5,
#     network_configuration=aws.mwaa.EnvironmentNetworkConfigurationArgs(
#         security_group_ids=[sg.id],
#         subnet_ids=private_subnets,
#     ),
# )

dag_structure = {
    "DAG": "etl_daily_sales",
    "Schedule": "0 6 * * * (Daily 6AM)",
    "Tasks": [
        "wait_for_raw_data (S3 Sensor)",
        "extract_from_api (Python)",
        "transform_data (Glue Job)",
        "load_to_redshift (Python)",
        "send_notification (Slack)",
    ],
}

print("Airflow DAG:")
print(f"  Name: {dag_structure['DAG']}")
print(f"  Schedule: {dag_structure['Schedule']}")
print(f"  Tasks:")
for task in dag_structure['Tasks']:
    print(f"    - {task}")

Best Practices

  • Pulumi Stacks: แยก Stack ต่อ Environment (dev/staging/prod)
  • Secrets: ใช้ Pulumi Config Secrets เก็บ Password และ API Keys
  • Idempotent: ทำ ETL Jobs ให้ Idempotent รันซ้ำได้ผลเหมือนเดิม
  • Partitioning: แบ่ง Data เป็น Partitions ตาม Date ลด Scan Time
  • Monitoring: ตั้ง Alerts สำหรับ Failed Jobs, Data Quality Issues
  • Testing: เขียน Unit Tests สำหรับ Transform Functions ทดสอบก่อน Deploy

Pulumi คืออะไร

IaC Tool ใช้ภาษาโปรแกรมจริง Python TypeScript Go แทน DSL สร้าง Cloud Resources AWS Azure GCP State Management Preview Changes Drift Detection

แนะนำเพิ่มเติม — อ่านเพิ่มเติมที่ SiamCafeBook

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Crowdsec IPS Technical Debt Management — คู่มือฉบับสมบูรณ์ 2026

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง