Shopify Data Pipeline
Shopify Hydrogen Data Pipeline ETL Admin API GraphQL Webhook Bulk Operations Snowflake BigQuery dbt Analytics Dashboard Revenue CLV
| Data Source | API | Method | Use Case |
|---|---|---|---|
| Orders | Admin API (GraphQL) | Incremental (created_at) | Revenue, AOV, Conversion |
| Products | Admin API (REST) | Full Sync (daily) | Product Performance, Inventory |
| Customers | Bulk Operations | Full Sync (weekly) | CLV, Cohort, Retention |
| Inventory | Admin API (GraphQL) | Webhook (real-time) | Stock Level, Reorder Alert |
| Analytics | Shopify Analytics API | Daily Sync | Traffic, Conversion, Sessions |
Data Extraction
# === Shopify Data Extraction ===
# import requests
# import json
# import time
#
# SHOPIFY_STORE = os.environ['SHOPIFY_STORE']
# ACCESS_TOKEN = os.environ['SHOPIFY_ACCESS_TOKEN']
# API_VERSION = '2024-01'
# BASE_URL = f'https://{SHOPIFY_STORE}.myshopify.com/admin/api/{API_VERSION}'
#
# headers = {
# 'X-Shopify-Access-Token': ACCESS_TOKEN,
# 'Content-Type': 'application/json'
# }
#
# # REST API - Get Orders (paginated)
# def get_orders(since_id=None, limit=250):
# params = {'limit': limit, 'status': 'any'}
# if since_id:
# params['since_id'] = since_id
# resp = requests.get(f'{BASE_URL}/orders.json', headers=headers, params=params)
# time.sleep(0.5) # Rate limit: 2 req/sec
# return resp.json()['orders']
#
# # GraphQL - Get Orders with specific fields
# def get_orders_graphql(cursor=None):
# query = '''
# { orders(first: 50, after: CURSOR) {
# edges { cursor node {
# id name createdAt totalPriceSet { shopMoney { amount } }
# customer { id email }
# lineItems(first: 10) { edges { node {
# title quantity originalTotalSet { shopMoney { amount } }
# }}}}}
# pageInfo { hasNextPage endCursor }
# }}'''
# query = query.replace('CURSOR', f'"{cursor}"' if cursor else 'null')
# resp = requests.post(f'{BASE_URL}/graphql.json',
# headers=headers, json={'query': query})
# return resp.json()['data']['orders']
from dataclasses import dataclass
@dataclass
class ExtractionMethod:
method: str
api: str
rate_limit: str
best_for: str
data_volume: str
methods = [
ExtractionMethod("REST API (Paginated)",
"GET /admin/api/{version}/{resource}.json",
"2 requests/second (REST)",
"Small-medium stores, Simple queries",
"< 100K records"),
ExtractionMethod("GraphQL API",
"POST /admin/api/{version}/graphql.json",
"1000 cost points/second",
"Complex queries, Nested data, Specific fields",
"< 500K records"),
ExtractionMethod("Bulk Operations",
"GraphQL bulkOperationRunQuery mutation",
"1 operation at a time",
"Large data export, Initial load, Full sync",
"> 500K records"),
ExtractionMethod("Webhook",
"POST to your endpoint on events",
"No limit (event-driven)",
"Real-time updates, Incremental sync",
"Any (event-based)"),
]
print("=== Extraction Methods ===")
for m in methods:
print(f"\n [{m.method}]")
print(f" API: {m.api}")
print(f" Rate: {m.rate_limit}")
print(f" Best: {m.best_for}")
print(f" Volume: {m.data_volume}")
Transform with dbt
# === dbt Transform Models ===
# -- models/staging/stg_orders.sql
# SELECT
# id as order_id,
# name as order_number,
# email as customer_email,
# created_at,
# updated_at,
# financial_status,
# fulfillment_status,
# total_price::DECIMAL(10,2) as total_price,
# subtotal_price::DECIMAL(10,2) as subtotal_price,
# total_tax::DECIMAL(10,2) as total_tax,
# total_discounts::DECIMAL(10,2) as total_discounts,
# currency,
# cancel_reason,
# cancelled_at
# FROM {{ source('shopify', 'orders') }}
# WHERE _fivetran_deleted = false
# -- models/marts/daily_revenue.sql
# SELECT
# DATE_TRUNC('day', created_at) as date,
# COUNT(DISTINCT order_id) as total_orders,
# SUM(total_price) as revenue,
# AVG(total_price) as aov,
# COUNT(DISTINCT customer_email) as unique_customers
# FROM {{ ref('stg_orders') }}
# WHERE financial_status NOT IN ('refunded', 'voided')
# GROUP BY 1
# ORDER BY 1 DESC
@dataclass
class DbtModel:
model: str
layer: str
description: str
key_metrics: str
models = [
DbtModel("stg_orders",
"Staging",
"Clean raw orders data, cast types, filter deleted",
"order_id, total_price, customer_email, status"),
DbtModel("stg_line_items",
"Staging",
"Order line items with product info",
"line_item_id, product_id, quantity, price"),
DbtModel("stg_customers",
"Staging",
"Clean customer data, deduplicate",
"customer_id, email, first_order_date, total_orders"),
DbtModel("int_order_items",
"Intermediate",
"Join orders + line_items + products",
"Enriched order data with product details"),
DbtModel("daily_revenue",
"Mart",
"Daily revenue, orders, AOV, unique customers",
"date, revenue, orders, aov, customers"),
DbtModel("customer_ltv",
"Mart",
"Customer Lifetime Value, Cohort, RFM",
"customer_id, ltv, recency, frequency, monetary"),
DbtModel("product_performance",
"Mart",
"Product sales, revenue, margin, inventory",
"product_id, units_sold, revenue, margin, stock"),
]
print("=== dbt Models ===")
for m in models:
print(f" [{m.model}] ({m.layer})")
print(f" Desc: {m.description}")
print(f" Metrics: {m.key_metrics}")
Dashboard & Analytics
# === Analytics Dashboard Design ===
@dataclass
class DashboardPanel:
dashboard: str
panels: str
refresh: str
audience: str
dashboards = [
DashboardPanel("Revenue Overview",
"Daily/Weekly/Monthly Revenue, AOV, Order Count, "
"Revenue by Channel, Revenue by Product Category, "
"Year-over-Year Comparison",
"Hourly",
"CEO, CMO, Finance"),
DashboardPanel("Product Performance",
"Top Selling Products, Revenue per Product, "
"Inventory Level, Low Stock Alert, "
"Product Margin Analysis, Category Breakdown",
"Daily",
"Product Manager, Merchandiser"),
DashboardPanel("Customer Analytics",
"New vs Returning, CLV Distribution, "
"Cohort Retention, RFM Segmentation, "
"Customer Acquisition Cost, Repeat Rate",
"Weekly",
"Marketing, CRM"),
DashboardPanel("Marketing Attribution",
"Revenue by Channel (Organic/Paid/Social/Email), "
"ROAS per Campaign, Conversion Rate per Channel, "
"UTM Tracking, Customer Journey",
"Daily",
"Marketing, Growth"),
DashboardPanel("Operations",
"Fulfillment Time, Shipping Cost, "
"Order Status Distribution, Return Rate, "
"Inventory Turnover, Stockout Incidents",
"Hourly",
"Operations, Logistics"),
]
print("=== Dashboards ===")
for d in dashboards:
print(f"\n [{d.dashboard}] Refresh: {d.refresh}")
print(f" Panels: {d.panels}")
print(f" Audience: {d.audience}")
เคล็ดลับ
- Bulk: ใช้ Bulk Operations API สำหรับ Initial Load ข้อมูลจำนวนมาก
- Webhook: ใช้ Webhook สำหรับ Incremental Update ลด API Call
- dbt: ใช้ dbt Transform SQL Version Control Test ได้
- Rate Limit: REST 2 req/sec, GraphQL 1000 cost/sec ใช้ Sleep ป้องกัน
- Fivetran: ใช้ Fivetran Shopify Connector ถ้าไม่อยากเขียน Pipeline เอง
Shopify Data Pipeline คืออะไร
ดึงข้อมูล Shopify เข้า Warehouse Orders Products Customers Inventory Admin API GraphQL Webhook Bulk Snowflake BigQuery dbt Dashboard
ดึงข้อมูลอย่างไร
REST API GET Paginated GraphQL POST Cost-based Bulk Operations JSONL Webhook Real-time Rate Limit 2 req/sec 1000 cost/sec
Transform ทำอย่างไร
dbt SQL Staging Intermediate Mart daily_revenue customer_ltv product_performance Version Control Test Incremental Full Refresh
สร้าง Dashboard อย่างไร
Looker Metabase Tableau Revenue Products Customers Marketing Operations CLV AOV Conversion Retention Cohort RFM Alert Stockout
สรุป
Shopify Hydrogen Data Pipeline ETL Admin API GraphQL Webhook Bulk Operations dbt Snowflake BigQuery Dashboard Revenue CLV Analytics
