WooCommerce Data Pipeline
WordPress WooCommerce Data Pipeline ETL Extract Transform Load REST API Data Warehouse BigQuery PostgreSQL Analytics Dashboard Orders Products Customers
| ETL Tool | ใช้เมื่อ | ราคา | Complexity |
|---|---|---|---|
| Python Script | เริ่มต้น ยืดหยุ่น | ฟรี | ปานกลาง |
| Apache Airflow | Production Pipeline | ฟรี (OSS) | สูง |
| n8n | Visual Automation | ฟรี (OSS) | ต่ำ |
| Airbyte | ELT Connector | ฟรี (OSS) | ต่ำ |
| Fivetran | Enterprise | $1/mo+ | ต่ำ |
Extract จาก WooCommerce API
# === WooCommerce ETL Pipeline ===
# pip install woocommerce pandas sqlalchemy
# from woocommerce import API
# import pandas as pd
# from datetime import datetime, timedelta
#
# # WooCommerce API Setup
# wcapi = API(
# url="https://your-store.com",
# consumer_key="ck_xxxxx",
# consumer_secret="cs_xxxxx",
# version="wc/v3",
# timeout=30,
# )
#
# # Extract Orders
# def extract_orders(after_date=None):
# all_orders = []
# page = 1
# params = {"per_page": 100, "status": "completed"}
# if after_date:
# params["after"] = after_date.isoformat()
#
# while True:
# params["page"] = page
# response = wcapi.get("orders", params=params)
# orders = response.json()
# if not orders:
# break
# all_orders.extend(orders)
# page += 1
# return all_orders
#
# # Extract Products
# def extract_products():
# all_products = []
# page = 1
# while True:
# response = wcapi.get("products", params={"per_page": 100, "page": page})
# products = response.json()
# if not products:
# break
# all_products.extend(products)
# page += 1
# return all_products
from dataclasses import dataclass
from typing import List
@dataclass
class WooOrder:
order_id: int
date: str
status: str
total: float
items: int
customer_id: int
payment: str
orders = [
WooOrder(1001, "2024-03-01", "completed", 2500, 3, 101, "credit_card"),
WooOrder(1002, "2024-03-01", "completed", 890, 1, 102, "bank_transfer"),
WooOrder(1003, "2024-03-02", "completed", 4200, 5, 103, "credit_card"),
WooOrder(1004, "2024-03-02", "refunded", 1500, 2, 104, "credit_card"),
WooOrder(1005, "2024-03-03", "completed", 3100, 4, 101, "promptpay"),
]
print("=== Extracted Orders ===")
total_revenue = sum(o.total for o in orders if o.status == "completed")
for o in orders:
print(f" #{o.order_id} [{o.status}] {o.date} | "
f"{o.total:,.0f} THB | Items: {o.items} | {o.payment}")
print(f"\n Total Revenue: {total_revenue:,.0f} THB")
Transform และ Load
# === Transform & Load ===
# Transform Orders to Analytics Format
# def transform_orders(raw_orders):
# records = []
# for order in raw_orders:
# for item in order.get("line_items", []):
# records.append({
# "order_id": order["id"],
# "order_date": order["date_created"][:10],
# "status": order["status"],
# "customer_id": order.get("customer_id", 0),
# "product_id": item["product_id"],
# "product_name": item["name"],
# "quantity": item["quantity"],
# "subtotal": float(item["subtotal"]),
# "total": float(item["total"]),
# "payment_method": order["payment_method"],
# "billing_city": order["billing"]["city"],
# "billing_country": order["billing"]["country"],
# })
# return pd.DataFrame(records)
#
# # Load to PostgreSQL
# from sqlalchemy import create_engine
#
# engine = create_engine("postgresql://user:pass@localhost/analytics")
#
# def load_to_db(df, table_name):
# df.to_sql(
# table_name,
# engine,
# if_exists="append",
# index=False,
# method="multi",
# chunksize=1000,
# )
# print(f"Loaded {len(df)} rows to {table_name}")
#
# # Full Pipeline
# def run_pipeline():
# yesterday = datetime.now() - timedelta(days=1)
# raw = extract_orders(after_date=yesterday)
# df = transform_orders(raw)
# load_to_db(df, "order_items")
# print(f"Pipeline complete: {len(df)} records")
@dataclass
class DailyReport:
date: str
orders: int
revenue: float
avg_order: float
top_product: str
new_customers: int
reports = [
DailyReport("2024-03-01", 45, 85000, 1889, "iPhone Case", 12),
DailyReport("2024-03-02", 52, 102000, 1962, "USB Cable", 15),
DailyReport("2024-03-03", 38, 72000, 1895, "Power Bank", 8),
DailyReport("2024-03-04", 61, 125000, 2049, "Wireless Earbuds", 18),
DailyReport("2024-03-05", 48, 95000, 1979, "Phone Stand", 11),
]
print("\n=== Daily Sales Report ===")
for r in reports:
print(f" [{r.date}] Orders: {r.orders} | Revenue: {r.revenue:,.0f} THB")
print(f" Avg: {r.avg_order:,.0f} | Top: {r.top_product} | New: {r.new_customers}")
Scheduling และ Monitoring
# === Pipeline Scheduling ===
# Airflow DAG
# from airflow import DAG
# from airflow.operators.python import PythonOperator
# from datetime import datetime, timedelta
#
# default_args = {
# "owner": "data-team",
# "retries": 3,
# "retry_delay": timedelta(minutes=5),
# }
#
# dag = DAG(
# "woocommerce_etl",
# default_args=default_args,
# schedule_interval="0 * * * *", # ทุกชั่วโมง
# start_date=datetime(2024, 1, 1),
# catchup=False,
# )
#
# extract_task = PythonOperator(
# task_id="extract_orders",
# python_callable=extract_orders,
# dag=dag,
# )
# transform_task = PythonOperator(
# task_id="transform",
# python_callable=transform_orders,
# dag=dag,
# )
# load_task = PythonOperator(
# task_id="load_to_warehouse",
# python_callable=load_to_db,
# dag=dag,
# )
# extract_task >> transform_task >> load_task
# Cron Job (Simple)
# crontab -e
# 0 * * * * cd /app && python etl_pipeline.py >> /var/log/etl.log 2>&1
pipeline_metrics = {
"Last Run": "2024-03-05 10:00",
"Status": "Success",
"Records Processed": "1,250",
"Duration": "45 seconds",
"Errors": "0",
"Next Run": "2024-03-05 11:00",
}
print("Pipeline Status:")
for k, v in pipeline_metrics.items():
print(f" {k}: {v}")
# SQL Analytics Queries
queries = [
"SELECT DATE(order_date), SUM(total), COUNT(*) FROM order_items GROUP BY 1",
"SELECT product_name, SUM(quantity), SUM(total) FROM order_items GROUP BY 1 ORDER BY 3 DESC LIMIT 10",
"SELECT payment_method, COUNT(*), SUM(total) FROM order_items GROUP BY 1",
"SELECT billing_city, COUNT(DISTINCT customer_id) FROM order_items GROUP BY 1 ORDER BY 2 DESC",
]
print(f"\n\nAnalytics Queries:")
for i, q in enumerate(queries, 1):
print(f" {i}. {q}")
เคล็ดลับ
- Incremental: ดึงเฉพาะข้อมูลใหม่ ไม่ต้องดึงทั้งหมดทุกรอบ
- Webhook: ใช้ WooCommerce Webhook สำหรับ Real-time Events
- Idempotent: Pipeline ต้อง Idempotent รันซ้ำไม่ Duplicate
- Monitoring: Alert เมื่อ Pipeline Fail หรือข้อมูลผิดปกติ
- Backup: เก็บ Raw Data ก่อน Transform เสมอ
Data Pipeline ETL คืออะไร
Extract Transform Load ย้ายข้อมูล WooCommerce API แปลง โหลด Data Warehouse BigQuery PostgreSQL วิเคราะห์ยอดขาย
WooCommerce REST API ใช้อย่างไร
Consumer Key Secret Authentication GET orders products customers Pagination per_page Filter วันที่ สถานะ
ควรใช้ Data Warehouse อะไร
BigQuery Google Cloud Snowflake Multi-cloud PostgreSQL ฟรี ClickHouse เร็ว DuckDB Local ตามขนาดข้อมูลและงบ
ควรรัน ETL บ่อยแค่ไหน
Real-time Webhook ทุก 15 นาที Dashboard ทุกชั่วโมง ร้านกลาง ทุกวัน Report เริ่มทุกชั่วโมง ปรับตามความต้องการ
สรุป
WordPress WooCommerce Data Pipeline ETL REST API Extract Transform Load Data Warehouse BigQuery PostgreSQL Airflow Scheduling Analytics Dashboard Incremental Webhook Monitoring
