SiamCafe.net Blog
Technology

WordPress WooCommerce Data Pipeline ETL

wordpress woocommerce data pipeline etl
WordPress WooCommerce Data Pipeline ETL | SiamCafe Blog
2025-06-27· อ. บอม — SiamCafe.net· 9,594 คำ

WooCommerce Data Pipeline

WordPress WooCommerce Data Pipeline ETL Extract Transform Load REST API Data Warehouse BigQuery PostgreSQL Analytics Dashboard Orders Products Customers

ETL Toolใช้เมื่อราคาComplexity
Python Scriptเริ่มต้น ยืดหยุ่นฟรีปานกลาง
Apache AirflowProduction Pipelineฟรี (OSS)สูง
n8nVisual Automationฟรี (OSS)ต่ำ
AirbyteELT Connectorฟรี (OSS)ต่ำ
FivetranEnterprise$1/mo+ต่ำ

Extract จาก WooCommerce API

# === WooCommerce ETL Pipeline ===

# pip install woocommerce pandas sqlalchemy

# from woocommerce import API
# import pandas as pd
# from datetime import datetime, timedelta
#
# # WooCommerce API Setup
# wcapi = API(
#     url="https://your-store.com",
#     consumer_key="ck_xxxxx",
#     consumer_secret="cs_xxxxx",
#     version="wc/v3",
#     timeout=30,
# )
#
# # Extract Orders
# def extract_orders(after_date=None):
#     all_orders = []
#     page = 1
#     params = {"per_page": 100, "status": "completed"}
#     if after_date:
#         params["after"] = after_date.isoformat()
#
#     while True:
#         params["page"] = page
#         response = wcapi.get("orders", params=params)
#         orders = response.json()
#         if not orders:
#             break
#         all_orders.extend(orders)
#         page += 1
#     return all_orders
#
# # Extract Products
# def extract_products():
#     all_products = []
#     page = 1
#     while True:
#         response = wcapi.get("products", params={"per_page": 100, "page": page})
#         products = response.json()
#         if not products:
#             break
#         all_products.extend(products)
#         page += 1
#     return all_products

from dataclasses import dataclass
from typing import List

@dataclass
class WooOrder:
    order_id: int
    date: str
    status: str
    total: float
    items: int
    customer_id: int
    payment: str

orders = [
    WooOrder(1001, "2024-03-01", "completed", 2500, 3, 101, "credit_card"),
    WooOrder(1002, "2024-03-01", "completed", 890, 1, 102, "bank_transfer"),
    WooOrder(1003, "2024-03-02", "completed", 4200, 5, 103, "credit_card"),
    WooOrder(1004, "2024-03-02", "refunded", 1500, 2, 104, "credit_card"),
    WooOrder(1005, "2024-03-03", "completed", 3100, 4, 101, "promptpay"),
]

print("=== Extracted Orders ===")
total_revenue = sum(o.total for o in orders if o.status == "completed")
for o in orders:
    print(f"  #{o.order_id} [{o.status}] {o.date} | "
          f"{o.total:,.0f} THB | Items: {o.items} | {o.payment}")
print(f"\n  Total Revenue: {total_revenue:,.0f} THB")

Transform และ Load

# === Transform & Load ===

# Transform Orders to Analytics Format
# def transform_orders(raw_orders):
#     records = []
#     for order in raw_orders:
#         for item in order.get("line_items", []):
#             records.append({
#                 "order_id": order["id"],
#                 "order_date": order["date_created"][:10],
#                 "status": order["status"],
#                 "customer_id": order.get("customer_id", 0),
#                 "product_id": item["product_id"],
#                 "product_name": item["name"],
#                 "quantity": item["quantity"],
#                 "subtotal": float(item["subtotal"]),
#                 "total": float(item["total"]),
#                 "payment_method": order["payment_method"],
#                 "billing_city": order["billing"]["city"],
#                 "billing_country": order["billing"]["country"],
#             })
#     return pd.DataFrame(records)
#
# # Load to PostgreSQL
# from sqlalchemy import create_engine
#
# engine = create_engine("postgresql://user:pass@localhost/analytics")
#
# def load_to_db(df, table_name):
#     df.to_sql(
#         table_name,
#         engine,
#         if_exists="append",
#         index=False,
#         method="multi",
#         chunksize=1000,
#     )
#     print(f"Loaded {len(df)} rows to {table_name}")
#
# # Full Pipeline
# def run_pipeline():
#     yesterday = datetime.now() - timedelta(days=1)
#     raw = extract_orders(after_date=yesterday)
#     df = transform_orders(raw)
#     load_to_db(df, "order_items")
#     print(f"Pipeline complete: {len(df)} records")

@dataclass
class DailyReport:
    date: str
    orders: int
    revenue: float
    avg_order: float
    top_product: str
    new_customers: int

reports = [
    DailyReport("2024-03-01", 45, 85000, 1889, "iPhone Case", 12),
    DailyReport("2024-03-02", 52, 102000, 1962, "USB Cable", 15),
    DailyReport("2024-03-03", 38, 72000, 1895, "Power Bank", 8),
    DailyReport("2024-03-04", 61, 125000, 2049, "Wireless Earbuds", 18),
    DailyReport("2024-03-05", 48, 95000, 1979, "Phone Stand", 11),
]

print("\n=== Daily Sales Report ===")
for r in reports:
    print(f"  [{r.date}] Orders: {r.orders} | Revenue: {r.revenue:,.0f} THB")
    print(f"    Avg: {r.avg_order:,.0f} | Top: {r.top_product} | New: {r.new_customers}")

Scheduling และ Monitoring

# === Pipeline Scheduling ===

# Airflow DAG
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from datetime import datetime, timedelta
#
# default_args = {
#     "owner": "data-team",
#     "retries": 3,
#     "retry_delay": timedelta(minutes=5),
# }
#
# dag = DAG(
#     "woocommerce_etl",
#     default_args=default_args,
#     schedule_interval="0 * * * *",  # ทุกชั่วโมง
#     start_date=datetime(2024, 1, 1),
#     catchup=False,
# )
#
# extract_task = PythonOperator(
#     task_id="extract_orders",
#     python_callable=extract_orders,
#     dag=dag,
# )
# transform_task = PythonOperator(
#     task_id="transform",
#     python_callable=transform_orders,
#     dag=dag,
# )
# load_task = PythonOperator(
#     task_id="load_to_warehouse",
#     python_callable=load_to_db,
#     dag=dag,
# )
# extract_task >> transform_task >> load_task

# Cron Job (Simple)
# crontab -e
# 0 * * * * cd /app && python etl_pipeline.py >> /var/log/etl.log 2>&1

pipeline_metrics = {
    "Last Run": "2024-03-05 10:00",
    "Status": "Success",
    "Records Processed": "1,250",
    "Duration": "45 seconds",
    "Errors": "0",
    "Next Run": "2024-03-05 11:00",
}

print("Pipeline Status:")
for k, v in pipeline_metrics.items():
    print(f"  {k}: {v}")

# SQL Analytics Queries
queries = [
    "SELECT DATE(order_date), SUM(total), COUNT(*) FROM order_items GROUP BY 1",
    "SELECT product_name, SUM(quantity), SUM(total) FROM order_items GROUP BY 1 ORDER BY 3 DESC LIMIT 10",
    "SELECT payment_method, COUNT(*), SUM(total) FROM order_items GROUP BY 1",
    "SELECT billing_city, COUNT(DISTINCT customer_id) FROM order_items GROUP BY 1 ORDER BY 2 DESC",
]

print(f"\n\nAnalytics Queries:")
for i, q in enumerate(queries, 1):
    print(f"  {i}. {q}")

เคล็ดลับ

Data Pipeline ETL คืออะไร

Extract Transform Load ย้ายข้อมูล WooCommerce API แปลง โหลด Data Warehouse BigQuery PostgreSQL วิเคราะห์ยอดขาย

WooCommerce REST API ใช้อย่างไร

Consumer Key Secret Authentication GET orders products customers Pagination per_page Filter วันที่ สถานะ

ควรใช้ Data Warehouse อะไร

BigQuery Google Cloud Snowflake Multi-cloud PostgreSQL ฟรี ClickHouse เร็ว DuckDB Local ตามขนาดข้อมูลและงบ

ควรรัน ETL บ่อยแค่ไหน

Real-time Webhook ทุก 15 นาที Dashboard ทุกชั่วโมง ร้านกลาง ทุกวัน Report เริ่มทุกชั่วโมง ปรับตามความต้องการ

สรุป

WordPress WooCommerce Data Pipeline ETL REST API Extract Transform Load Data Warehouse BigQuery PostgreSQL Airflow Scheduling Analytics Dashboard Incremental Webhook Monitoring

📖 บทความที่เกี่ยวข้อง

ONNX Runtime Data Pipeline ETLอ่านบทความ → React Server Components Data Pipeline ETLอ่านบทความ → WordPress WooCommerce Load Testing Strategyอ่านบทความ → WordPress WooCommerce CQRS Event Sourcingอ่านบทความ → WordPress Block Theme Data Pipeline ETLอ่านบทความ →

📚 ดูบทความทั้งหมด →