Pulumi สำหรับ Data Pipeline
Pulumi ใช้ภาษาโปรแกรมจริง Python TypeScript สร้าง Cloud Infrastructure ETL Pipeline ประกอบด้วย Extract Transform Load ดึงข้อมูลแปลงโหลดเข้า Data Warehouse
ใช้ Pulumi สร้าง S3 Buckets, Glue Jobs, Redshift Cluster, Airflow Environment อัตโนมัติ Version Control ด้วย Git Reproducible ทุกครั้ง
Pulumi ETL Infrastructure
# === Pulumi ETL Infrastructure (Python) ===
# pip install pulumi pulumi-aws
import pulumi
import pulumi_aws as aws
from dataclasses import dataclass
from typing import List, Dict
# Configuration
config = pulumi.Config()
env = config.get("env") or "dev"
project = "etl-pipeline"
# 1. S3 Buckets
raw_bucket = aws.s3.Bucket(f"{project}-raw-{env}",
bucket=f"{project}-raw-{env}",
versioning=aws.s3.BucketVersioningArgs(
enabled=True,
),
lifecycle_rules=[aws.s3.BucketLifecycleRuleArgs(
enabled=True,
transitions=[aws.s3.BucketLifecycleRuleTransitionArgs(
days=30,
storage_class="STANDARD_IA",
)],
expiration=aws.s3.BucketLifecycleRuleExpirationArgs(
days=365,
),
)],
tags={"Environment": env, "Project": project},
)
processed_bucket = aws.s3.Bucket(f"{project}-processed-{env}",
bucket=f"{project}-processed-{env}",
versioning=aws.s3.BucketVersioningArgs(enabled=True),
tags={"Environment": env, "Project": project},
)
# 2. Glue Database
glue_db = aws.glue.CatalogDatabase(f"{project}-db",
name=f"{project.replace('-', '_')}_{env}",
)
# 3. IAM Role สำหรับ Glue
glue_role = aws.iam.Role(f"{project}-glue-role",
assume_role_policy="""{
"Version": "2012-10-17",
"Statement": [{
"Effect": "Allow",
"Principal": {"Service": "glue.amazonaws.com"},
"Action": "sts:AssumeRole"
}]
}""",
)
aws.iam.RolePolicyAttachment(f"{project}-glue-policy",
role=glue_role.name,
policy_arn="arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole",
)
# 4. Glue ETL Job
# glue_job = aws.glue.Job(f"{project}-transform",
# name=f"{project}-transform-{env}",
# role_arn=glue_role.arn,
# command=aws.glue.JobCommandArgs(
# script_location=f"s3://{project}-scripts-{env}/transform.py",
# python_version="3",
# ),
# default_arguments={
# "--job-language": "python",
# "--raw_bucket": raw_bucket.bucket,
# "--processed_bucket": processed_bucket.bucket,
# "--TempDir": f"s3://{project}-temp-{env}/",
# },
# glue_version="4.0",
# number_of_workers=2,
# worker_type="G.1X",
# tags={"Environment": env},
# )
# 5. Redshift Cluster
# redshift = aws.redshift.Cluster(f"{project}-redshift",
# cluster_identifier=f"{project}-{env}",
# database_name="analytics",
# master_username="admin",
# master_password=config.require_secret("redshift_password"),
# node_type="dc2.large",
# number_of_nodes=2,
# skip_final_snapshot=True if env == "dev" else False,
# tags={"Environment": env},
# )
# Outputs
pulumi.export("raw_bucket", raw_bucket.bucket)
pulumi.export("processed_bucket", processed_bucket.bucket)
pulumi.export("glue_database", glue_db.name)
print("Pulumi ETL Infrastructure:")
print(f" Environment: {env}")
print(f" S3 Raw: {project}-raw-{env}")
print(f" S3 Processed: {project}-processed-{env}")
print(f" Glue DB: {project}_{env}")
print(f" Glue Job: {project}-transform-{env}")
ETL Pipeline Code
# etl_pipeline.py — ETL Pipeline with Python
from dataclasses import dataclass, field
from typing import List, Dict, Optional
from datetime import datetime
import json
@dataclass
class ETLStep:
name: str
step_type: str # extract, transform, load
source: str
destination: str
status: str = "pending"
rows_processed: int = 0
duration_seconds: float = 0
@dataclass
class ETLPipeline:
name: str
steps: List[ETLStep] = field(default_factory=list)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
class ETLEngine:
"""ETL Pipeline Engine"""
def __init__(self):
self.pipelines: Dict[str, ETLPipeline] = {}
def create_pipeline(self, name: str, steps: List[ETLStep]):
pipeline = ETLPipeline(name=name, steps=steps)
self.pipelines[name] = pipeline
return pipeline
def run_pipeline(self, name: str):
pipeline = self.pipelines.get(name)
if not pipeline:
print(f"Pipeline not found: {name}")
return
pipeline.start_time = datetime.now()
print(f"\n{'='*55}")
print(f"ETL Pipeline: {name}")
print(f"{'='*55}")
for i, step in enumerate(pipeline.steps, 1):
step.status = "running"
print(f"\n Step {i}: {step.name} [{step.step_type}]")
print(f" Source: {step.source}")
print(f" Destination: {step.destination}")
# Simulate processing
import time, random
time.sleep(0.01)
step.rows_processed = random.randint(10000, 100000)
step.duration_seconds = random.uniform(5, 60)
step.status = "completed"
print(f" Rows: {step.rows_processed:,}")
print(f" Duration: {step.duration_seconds:.1f}s")
print(f" Status: {step.status}")
pipeline.end_time = datetime.now()
total_rows = sum(s.rows_processed for s in pipeline.steps)
total_duration = sum(s.duration_seconds for s in pipeline.steps)
print(f"\n Summary:")
print(f" Total Rows: {total_rows:,}")
print(f" Total Duration: {total_duration:.1f}s")
print(f" Steps: {len(pipeline.steps)} completed")
# สร้าง Pipeline
engine = ETLEngine()
steps = [
ETLStep("Extract from API", "extract",
"https://api.example.com/orders", "s3://raw/orders/"),
ETLStep("Extract from DB", "extract",
"postgres://prod/customers", "s3://raw/customers/"),
ETLStep("Clean & Validate", "transform",
"s3://raw/orders/", "s3://processed/orders_clean/"),
ETLStep("Join Orders+Customers", "transform",
"s3://processed/", "s3://processed/orders_enriched/"),
ETLStep("Aggregate Daily", "transform",
"s3://processed/orders_enriched/", "s3://processed/daily_summary/"),
ETLStep("Load to Redshift", "load",
"s3://processed/daily_summary/", "redshift://analytics/daily_sales"),
]
engine.create_pipeline("daily-sales-pipeline", steps)
engine.run_pipeline("daily-sales-pipeline")
Airflow Orchestration
# === Airflow DAG สำหรับ ETL Pipeline ===
# dags/etl_daily_sales.py
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
# from airflow.providers.amazon.aws.operators.s3 import S3CopyObjectOperator
# from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# from datetime import datetime, timedelta
#
# default_args = {
# 'owner': 'data-team',
# 'depends_on_past': False,
# 'email_on_failure': True,
# 'email': ['data-alerts@company.com'],
# 'retries': 2,
# 'retry_delay': timedelta(minutes=5),
# }
#
# with DAG(
# 'etl_daily_sales',
# default_args=default_args,
# description='Daily Sales ETL Pipeline',
# schedule_interval='0 6 * * *', # ทุกวัน 6:00 AM
# start_date=datetime(2024, 1, 1),
# catchup=False,
# tags=['etl', 'sales', 'daily'],
# ) as dag:
#
# wait_for_data = S3KeySensor(
# task_id='wait_for_raw_data',
# bucket_name='etl-pipeline-raw',
# bucket_key='orders/{{ ds }}/*.parquet',
# timeout=3600,
# )
#
# extract_api = PythonOperator(
# task_id='extract_from_api',
# python_callable=extract_orders,
# op_kwargs={'date': '{{ ds }}'},
# )
#
# transform = GlueJobOperator(
# task_id='transform_data',
# job_name='etl-pipeline-transform',
# script_args={
# '--date': '{{ ds }}',
# '--raw_bucket': 'etl-pipeline-raw',
# '--processed_bucket': 'etl-pipeline-processed',
# },
# )
#
# load_redshift = PythonOperator(
# task_id='load_to_redshift',
# python_callable=load_to_redshift,
# op_kwargs={'date': '{{ ds }}'},
# )
#
# notify = PythonOperator(
# task_id='send_notification',
# python_callable=send_slack_notification,
# )
#
# wait_for_data >> extract_api >> transform >> load_redshift >> notify
# Pulumi — สร้าง MWAA (Managed Airflow) Environment
# mwaa = aws.mwaa.Environment(f"{project}-airflow",
# name=f"{project}-airflow-{env}",
# airflow_version="2.8.1",
# execution_role_arn=airflow_role.arn,
# source_bucket_arn=dags_bucket.arn,
# dag_s3_path="dags/",
# environment_class="mw1.small",
# max_workers=5,
# network_configuration=aws.mwaa.EnvironmentNetworkConfigurationArgs(
# security_group_ids=[sg.id],
# subnet_ids=private_subnets,
# ),
# )
dag_structure = {
"DAG": "etl_daily_sales",
"Schedule": "0 6 * * * (Daily 6AM)",
"Tasks": [
"wait_for_raw_data (S3 Sensor)",
"extract_from_api (Python)",
"transform_data (Glue Job)",
"load_to_redshift (Python)",
"send_notification (Slack)",
],
}
print("Airflow DAG:")
print(f" Name: {dag_structure['DAG']}")
print(f" Schedule: {dag_structure['Schedule']}")
print(f" Tasks:")
for task in dag_structure['Tasks']:
print(f" - {task}")
Best Practices
- Pulumi Stacks: แยก Stack ต่อ Environment (dev/staging/prod)
- Secrets: ใช้ Pulumi Config Secrets เก็บ Password และ API Keys
- Idempotent: ทำ ETL Jobs ให้ Idempotent รันซ้ำได้ผลเหมือนเดิม
- Partitioning: แบ่ง Data เป็น Partitions ตาม Date ลด Scan Time
- Monitoring: ตั้ง Alerts สำหรับ Failed Jobs, Data Quality Issues
- Testing: เขียน Unit Tests สำหรับ Transform Functions ทดสอบก่อน Deploy
Pulumi คืออะไร
IaC Tool ใช้ภาษาโปรแกรมจริง Python TypeScript Go แทน DSL สร้าง Cloud Resources AWS Azure GCP State Management Preview Changes Drift Detection
ETL Pipeline คืออะไร
Extract Transform Load ดึงข้อมูลจาก Sources แปลงข้อมูล Clean Aggregate Join โหลดเข้า Data Warehouse Redshift BigQuery Snowflake Analytics BI
Pulumi ต่างจาก Terraform อย่างไร
Pulumi ภาษาจริง Python TypeScript IDE Support Type Checking Testing Terraform HCL ภาษาเฉพาะ Pulumi Automation API Terraform Community ใหญ่ Providers มาก ทั้งสอง State ดี
วิธีเลือก ETL Tool ทำอย่างไร
Data Volume น้อย Python Scripts มาก Spark/Glue Complexity ง่าย Glue/dbt ซับซ้อน Airflow+Spark Cost งบน้อย Open Source งบมาก Managed Services Team Skills
สรุป
Pulumi ใช้ภาษาจริงสร้าง ETL Infrastructure S3 Glue Redshift Airflow อัตโนมัติ ETL Pipeline Extract Transform Load เข้า Data Warehouse Airflow Orchestration จัดการ DAG แยก Stack ต่อ Environment Secrets Management Testing ก่อน Deploy
