WordPress WooCommerce Data Pipeline ETL — สร้าง
WooCommerce Data Pipeline

WordPress WooCommerce Data Pipeline ETL Extract Transform Load REST API Data Warehouse BigQuery PostgreSQL Analytics Dashboard Orders Products Customers
| ETL Tool | ใช้เมื่อ | ราคา | Complexity |
|---|---|---|---|
| Python Script | เริ่มต้น ยืดหยุ่น | ฟรี | ปานกลาง |
| Apache Airflow | Production Pipeline | ฟรี (OSS) | สูง |
| n8n | Visual Automation | ฟรี (OSS) | ต่ำ |
| Airbyte | ELT Connector | ฟรี (OSS) | ต่ำ |
| Fivetran | Enterprise | $1/mo+ | ต่ำ |
Extract จาก WooCommerce API
=== WooCommerce ETL Pipeline ===
pip install woocommerce pandas sqlalchemy
from woocommerce import API
import pandas as pd
from datetime import datetime, timedelta
# WooCommerce API Setup
wcapi = API(
url="https://your-store.com",
consumer_key="ck_xxxxx",
consumer_secret="cs_xxxxx",
version="wc/v3",
timeout=30,
)
# Extract Orders
def extract_orders(after_date=None):
all_orders = []
page = 1
params = {"per_page": 100, "status": "completed"}
if after_date:
params["after"] = after_date.isoformat()
while True:
params["page"] = page
response = wcapi.get("orders", params=params)
orders = response.json()
if not orders:
break
all_orders.extend(orders)
page += 1
return all_orders
# Extract Products
def extract_products():
all_products = []
page = 1
while True:
response = wcapi.get("products", params={"per_page": 100, "page": page})
products = response.json()
if not products:
break
all_products.extend(products)
page += 1
เนื้อหาเกี่ยวข้อง — การ์ดจอโน๊ตบุ๊ค — ข้อมูลครบถ้วน 2026
return all_products
from dataclasses import dataclass
from typing import List
@dataclass
class WooOrder:
order_id: int
date: str
status: str
total: float
items: int
แนะนำเพิ่มเติม — แหล่งความรู้ Forex iCafeForex
customer_id: int
payment: str
orders = [
WooOrder(1001, "2024-03-01", "completed", 2500, 3, 101, "credit_card"),
WooOrder(1002, "2024-03-01", "completed", 890, 1, 102, "bank_transfer"),
WooOrder(1003, "2024-03-02", "completed", 4200, 5, 103, "credit_card"),
WooOrder(1004, "2024-03-02", "refunded", 1500, 2, 104, "credit_card"),
WooOrder(1005, "2024-03-03", "completed", 3100, 4, 101, "promptpay"),
]
print("=== Extracted Orders ===")
total_revenue = sum(o.total for o in orders if o.status == "completed")
for o in orders:
print(f" #{o.order_id} [{o.status}] {o.date} | "
f"{o.total:,.0f} THB | Items: {o.items} | {o.payment}")
print(f"\n Total Revenue: {total_revenue:,.0f} THB")
Transform และ Load

=== Transform & Load ===
Transform Orders to Analytics Format
def transform_orders(raw_orders):
records = []
for order in raw_orders:
for item in order.get("line_items", []):
records.append({
"order_id": order["id"],
"order_date": order["date_created"][:10],
"status": order["status"],
"customer_id": order.get("customer_id", 0),
"product_id": item["product_id"],
"product_name": item["name"],
เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ เหรียญ Op — คู่มือฉบับสมบูรณ์ 2026
"quantity": item["quantity"],
"subtotal": float(item["subtotal"]),
"total": float(item["total"]),
"payment_method": order["payment_method"],
"billing_city": order["billing"]["city"],
"billing_country": order["billing"]["country"],
})
return pd.DataFrame(records)
# Load to PostgreSQL
from sqlalchemy import create_engine
engine = create_engine("postgresql://user:pass@localhost/analytics")
def load_to_db(df, table_name):
df.to_sql(
table_name,
engine,
if_exists="append",
index=False,
method="multi",
chunksize=1000,
แนะนำเพิ่มเติม — XM Signal
)
print(f"Loaded {len(df)} rows to {table_name}")
# Full Pipeline
def run_pipeline():
yesterday = datetime.now() - timedelta(days=1)
raw = extract_orders(after_date=yesterday)
df = transform_orders(raw)
load_to_db(df, "order_items")
print(f"Pipeline complete: {len(df)} records")
@dataclass
class DailyReport:
date: str
orders: int
revenue: float
avg_order: float
top_product: str
new_customers: int
reports = [
เนื้อหาเกี่ยวข้อง — Weights Biases CDN Configuration
DailyReport("2024-03-01", 45, 85000, 1889, "iPhone Case", 12),
DailyReport("2024-03-02", 52, 102000, 1962, "USB Cable", 15),
DailyReport("2024-03-03", 38, 72000, 1895, "Power Bank", 8),
DailyReport("2024-03-04", 61, 125000, 2049, "Wireless Earbuds", 18),
DailyReport("2024-03-05", 48, 95000, 1979, "Phone Stand", 11),
]
print("\n=== Daily Sales Report ===")
for r in reports:
print(f" [{r.date}] Orders: {r.orders} | Revenue: {r.revenue:,.0f} THB")
print(f" Avg: {r.avg_order:,.0f} | Top: {r.top_product} | New: {r.new_customers}")
Scheduling และ Monitoring
=== Pipeline Scheduling ===
Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 3,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"woocommerce_etl",
default_args=default_args,
schedule_interval="0 * * * *", # ทุกชั่วโมง
start_date=datetime(2024, 1, 1),
catchup=False,
)
extract_task = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders,
dag=dag,
)
transform_task = PythonOperator(
task_id="transform",
เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง SOPS Encryption Troubleshooting แก้ปัญหา
python_callable=transform_orders,
dag=dag,
)
load_task = PythonOperator(
task_id="load_to_warehouse",
python_callable=load_to_db,
dag=dag,
)
extract_task >> transform_task >> load_task
Cron Job (Simple)
crontab -e
0 * * * * cd /app && python etl_pipeline.py >> /var/log/etl.log 2>&1
pipeline_metrics = {
"Last Run": "2024-03-05 10:00",
"Status": "Success",
"Records Processed": "1,250",
"Duration": "45 seconds",
"Errors": "0",
"Next Run": "2024-03-05 11:00",
}
print("Pipeline Status:")
for k, v in pipeline_metrics.items():
print(f" {k}: {v}")
SQL Analytics Queries
queries = [
"SELECT DATE(order_date), SUM(total), COUNT(*) FROM order_items GROUP BY 1",
"SELECT product_name, SUM(quantity), SUM(total) FROM order_items GROUP BY 1 ORDER BY 3 DESC LIMIT 10",
"SELECT payment_method, COUNT(*), SUM(total) FROM order_items GROUP BY 1",
"SELECT billing_city, COUNT(DISTINCT customer_id) FROM order_items GROUP BY 1 ORDER BY 2 DESC",
]
print(f"\n\nAnalytics Queries:")
for i, q in enumerate(queries, 1):
print(f" {i}. {q}")
เคล็ดลับ
- Incremental: ดึงเฉพาะข้อมูลใหม่ ไม่ต้องดึงทั้งหมดทุกรอบ
- Webhook: ใช้ WooCommerce Webhook สำหรับ Real-time Events
- Idempotent: Pipeline ต้อง Idempotent รันซ้ำไม่ Duplicate
- Monitoring: Alert เมื่อ Pipeline Fail หรือข้อมูลผิดปกติ
- Backup: เก็บ Raw Data ก่อน Transform เสมอ
Data Pipeline ETL คืออะไร
Extract Transform Load ย้ายข้อมูล WooCommerce API แปลง โหลด Data Warehouse BigQuery PostgreSQL วิเคราะห์ยอดขาย





