ai

WordPress WooCommerce Data Pipeline ETL — สร้าง

WordPress WooCommerce Data Pipeline ETL — สร้าง

WooCommerce Data Pipeline

WordPress WooCommerce Data Pipeline ETL — สร้าง

WordPress WooCommerce Data Pipeline ETL Extract Transform Load REST API Data Warehouse BigQuery PostgreSQL Analytics Dashboard Orders Products Customers

ETL Toolใช้เมื่อราคาComplexity
Python Scriptเริ่มต้น ยืดหยุ่นฟรีปานกลาง
Apache AirflowProduction Pipelineฟรี (OSS)สูง
n8nVisual Automationฟรี (OSS)ต่ำ
AirbyteELT Connectorฟรี (OSS)ต่ำ
FivetranEnterprise$1/mo+ต่ำ

Extract จาก WooCommerce API

=== WooCommerce ETL Pipeline ===

pip install woocommerce pandas sqlalchemy

from woocommerce import API

import pandas as pd

from datetime import datetime, timedelta

# WooCommerce API Setup

wcapi = API(

url="https://your-store.com",

consumer_key="ck_xxxxx",

consumer_secret="cs_xxxxx",

version="wc/v3",

timeout=30,

)

# Extract Orders

def extract_orders(after_date=None):

all_orders = []

page = 1

params = {"per_page": 100, "status": "completed"}

if after_date:

params["after"] = after_date.isoformat()

while True:

params["page"] = page

response = wcapi.get("orders", params=params)

orders = response.json()

if not orders:

break

all_orders.extend(orders)

page += 1

return all_orders

# Extract Products

def extract_products():

all_products = []

page = 1

while True:

response = wcapi.get("products", params={"per_page": 100, "page": page})

products = response.json()

if not products:

break

all_products.extend(products)

page += 1

เนื้อหาเกี่ยวข้อง — การ์ดจอโน๊ตบุ๊ค — ข้อมูลครบถ้วน 2026

return all_products

from dataclasses import dataclass

from typing import List

@dataclass

class WooOrder:

order_id: int

date: str

status: str

total: float

items: int

แนะนำเพิ่มเติม — แหล่งความรู้ Forex iCafeForex

customer_id: int

payment: str

orders = [

WooOrder(1001, "2024-03-01", "completed", 2500, 3, 101, "credit_card"),

WooOrder(1002, "2024-03-01", "completed", 890, 1, 102, "bank_transfer"),

WooOrder(1003, "2024-03-02", "completed", 4200, 5, 103, "credit_card"),

WooOrder(1004, "2024-03-02", "refunded", 1500, 2, 104, "credit_card"),

WooOrder(1005, "2024-03-03", "completed", 3100, 4, 101, "promptpay"),

]

print("=== Extracted Orders ===")

total_revenue = sum(o.total for o in orders if o.status == "completed")

for o in orders:

print(f" #{o.order_id} [{o.status}] {o.date} | "

f"{o.total:,.0f} THB | Items: {o.items} | {o.payment}")

print(f"\n Total Revenue: {total_revenue:,.0f} THB")

Transform และ Load

WordPress WooCommerce Data Pipeline ETL — สร้าง

=== Transform & Load ===

Transform Orders to Analytics Format

def transform_orders(raw_orders):

records = []

for order in raw_orders:

for item in order.get("line_items", []):

records.append({

"order_id": order["id"],

"order_date": order["date_created"][:10],

"status": order["status"],

"customer_id": order.get("customer_id", 0),

"product_id": item["product_id"],

"product_name": item["name"],

เนื้อหาเกี่ยวข้อง — ทำความเข้าใจ เหรียญ Op — คู่มือฉบับสมบูรณ์ 2026

"quantity": item["quantity"],

"subtotal": float(item["subtotal"]),

"total": float(item["total"]),

"payment_method": order["payment_method"],

"billing_city": order["billing"]["city"],

"billing_country": order["billing"]["country"],

})

return pd.DataFrame(records)

# Load to PostgreSQL

from sqlalchemy import create_engine

engine = create_engine("postgresql://user:pass@localhost/analytics")

def load_to_db(df, table_name):

df.to_sql(

table_name,

engine,

if_exists="append",

index=False,

method="multi",

chunksize=1000,

แนะนำเพิ่มเติม — XM Signal

)

print(f"Loaded {len(df)} rows to {table_name}")

# Full Pipeline

def run_pipeline():

yesterday = datetime.now() - timedelta(days=1)

raw = extract_orders(after_date=yesterday)

df = transform_orders(raw)

load_to_db(df, "order_items")

print(f"Pipeline complete: {len(df)} records")

@dataclass

class DailyReport:

date: str

orders: int

revenue: float

avg_order: float

top_product: str

new_customers: int

reports = [

เนื้อหาเกี่ยวข้อง — Weights Biases CDN Configuration

DailyReport("2024-03-01", 45, 85000, 1889, "iPhone Case", 12),

DailyReport("2024-03-02", 52, 102000, 1962, "USB Cable", 15),

DailyReport("2024-03-03", 38, 72000, 1895, "Power Bank", 8),

DailyReport("2024-03-04", 61, 125000, 2049, "Wireless Earbuds", 18),

DailyReport("2024-03-05", 48, 95000, 1979, "Phone Stand", 11),

]

print("\n=== Daily Sales Report ===")

for r in reports:

print(f" [{r.date}] Orders: {r.orders} | Revenue: {r.revenue:,.0f} THB")

print(f" Avg: {r.avg_order:,.0f} | Top: {r.top_product} | New: {r.new_customers}")

Scheduling และ Monitoring

=== Pipeline Scheduling ===

Airflow DAG

from airflow import DAG

from airflow.operators.python import PythonOperator

from datetime import datetime, timedelta

default_args = {

"owner": "data-team",

"retries": 3,

"retry_delay": timedelta(minutes=5),

}

dag = DAG(

"woocommerce_etl",

default_args=default_args,

schedule_interval="0 * * * *", # ทุกชั่วโมง

start_date=datetime(2024, 1, 1),

catchup=False,

)

extract_task = PythonOperator(

task_id="extract_orders",

python_callable=extract_orders,

dag=dag,

)

transform_task = PythonOperator(

task_id="transform",

เนื้อหาเกี่ยวข้อง — ดูเพิ่มเติมเรื่อง SOPS Encryption Troubleshooting แก้ปัญหา

python_callable=transform_orders,

dag=dag,

)

load_task = PythonOperator(

task_id="load_to_warehouse",

python_callable=load_to_db,

dag=dag,

)

extract_task >> transform_task >> load_task

Cron Job (Simple)

crontab -e

0 * * * * cd /app && python etl_pipeline.py >> /var/log/etl.log 2>&1

pipeline_metrics = {

"Last Run": "2024-03-05 10:00",

"Status": "Success",

"Records Processed": "1,250",

"Duration": "45 seconds",

"Errors": "0",

"Next Run": "2024-03-05 11:00",

}

print("Pipeline Status:")

for k, v in pipeline_metrics.items():

print(f" {k}: {v}")

SQL Analytics Queries

queries = [

"SELECT DATE(order_date), SUM(total), COUNT(*) FROM order_items GROUP BY 1",

"SELECT product_name, SUM(quantity), SUM(total) FROM order_items GROUP BY 1 ORDER BY 3 DESC LIMIT 10",

"SELECT payment_method, COUNT(*), SUM(total) FROM order_items GROUP BY 1",

"SELECT billing_city, COUNT(DISTINCT customer_id) FROM order_items GROUP BY 1 ORDER BY 2 DESC",

]

print(f"\n\nAnalytics Queries:")

for i, q in enumerate(queries, 1):

print(f" {i}. {q}")

เคล็ดลับ

  • Incremental: ดึงเฉพาะข้อมูลใหม่ ไม่ต้องดึงทั้งหมดทุกรอบ
  • Webhook: ใช้ WooCommerce Webhook สำหรับ Real-time Events
  • Idempotent: Pipeline ต้อง Idempotent รันซ้ำไม่ Duplicate
  • Monitoring: Alert เมื่อ Pipeline Fail หรือข้อมูลผิดปกติ
  • Backup: เก็บ Raw Data ก่อน Transform เสมอ

Data Pipeline ETL คืออะไร

Extract Transform Load ย้ายข้อมูล WooCommerce API แปลง โหลด Data Warehouse BigQuery PostgreSQL วิเคราะห์ยอดขาย

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง