SiamCafe.net Blog
Technology

Shopify Hydrogen Data Pipeline ETL

shopify hydrogen data pipeline etl
Shopify Hydrogen Data Pipeline ETL | SiamCafe Blog
2025-10-03· อ. บอม — SiamCafe.net· 10,440 คำ

Shopify Data Pipeline

Shopify Hydrogen Data Pipeline ETL Admin API GraphQL Webhook Bulk Operations Snowflake BigQuery dbt Analytics Dashboard Revenue CLV

Data SourceAPIMethodUse Case
OrdersAdmin API (GraphQL)Incremental (created_at)Revenue, AOV, Conversion
ProductsAdmin API (REST)Full Sync (daily)Product Performance, Inventory
CustomersBulk OperationsFull Sync (weekly)CLV, Cohort, Retention
InventoryAdmin API (GraphQL)Webhook (real-time)Stock Level, Reorder Alert
AnalyticsShopify Analytics APIDaily SyncTraffic, Conversion, Sessions

Data Extraction

# === Shopify Data Extraction ===

# import requests
# import json
# import time
#
# SHOPIFY_STORE = os.environ['SHOPIFY_STORE']
# ACCESS_TOKEN = os.environ['SHOPIFY_ACCESS_TOKEN']
# API_VERSION = '2024-01'
# BASE_URL = f'https://{SHOPIFY_STORE}.myshopify.com/admin/api/{API_VERSION}'
#
# headers = {
#     'X-Shopify-Access-Token': ACCESS_TOKEN,
#     'Content-Type': 'application/json'
# }
#
# # REST API - Get Orders (paginated)
# def get_orders(since_id=None, limit=250):
#     params = {'limit': limit, 'status': 'any'}
#     if since_id:
#         params['since_id'] = since_id
#     resp = requests.get(f'{BASE_URL}/orders.json', headers=headers, params=params)
#     time.sleep(0.5)  # Rate limit: 2 req/sec
#     return resp.json()['orders']
#
# # GraphQL - Get Orders with specific fields
# def get_orders_graphql(cursor=None):
#     query = '''
#     { orders(first: 50, after: CURSOR) {
#         edges { cursor node {
#             id name createdAt totalPriceSet { shopMoney { amount } }
#             customer { id email }
#             lineItems(first: 10) { edges { node {
#                 title quantity originalTotalSet { shopMoney { amount } }
#     }}}}}
#         pageInfo { hasNextPage endCursor }
#     }}'''
#     query = query.replace('CURSOR', f'"{cursor}"' if cursor else 'null')
#     resp = requests.post(f'{BASE_URL}/graphql.json',
#         headers=headers, json={'query': query})
#     return resp.json()['data']['orders']

from dataclasses import dataclass

@dataclass
class ExtractionMethod:
    method: str
    api: str
    rate_limit: str
    best_for: str
    data_volume: str

methods = [
    ExtractionMethod("REST API (Paginated)",
        "GET /admin/api/{version}/{resource}.json",
        "2 requests/second (REST)",
        "Small-medium stores, Simple queries",
        "< 100K records"),
    ExtractionMethod("GraphQL API",
        "POST /admin/api/{version}/graphql.json",
        "1000 cost points/second",
        "Complex queries, Nested data, Specific fields",
        "< 500K records"),
    ExtractionMethod("Bulk Operations",
        "GraphQL bulkOperationRunQuery mutation",
        "1 operation at a time",
        "Large data export, Initial load, Full sync",
        "> 500K records"),
    ExtractionMethod("Webhook",
        "POST to your endpoint on events",
        "No limit (event-driven)",
        "Real-time updates, Incremental sync",
        "Any (event-based)"),
]

print("=== Extraction Methods ===")
for m in methods:
    print(f"\n  [{m.method}]")
    print(f"    API: {m.api}")
    print(f"    Rate: {m.rate_limit}")
    print(f"    Best: {m.best_for}")
    print(f"    Volume: {m.data_volume}")

Transform with dbt

# === dbt Transform Models ===

# -- models/staging/stg_orders.sql
# SELECT
#     id as order_id,
#     name as order_number,
#     email as customer_email,
#     created_at,
#     updated_at,
#     financial_status,
#     fulfillment_status,
#     total_price::DECIMAL(10,2) as total_price,
#     subtotal_price::DECIMAL(10,2) as subtotal_price,
#     total_tax::DECIMAL(10,2) as total_tax,
#     total_discounts::DECIMAL(10,2) as total_discounts,
#     currency,
#     cancel_reason,
#     cancelled_at
# FROM {{ source('shopify', 'orders') }}
# WHERE _fivetran_deleted = false

# -- models/marts/daily_revenue.sql
# SELECT
#     DATE_TRUNC('day', created_at) as date,
#     COUNT(DISTINCT order_id) as total_orders,
#     SUM(total_price) as revenue,
#     AVG(total_price) as aov,
#     COUNT(DISTINCT customer_email) as unique_customers
# FROM {{ ref('stg_orders') }}
# WHERE financial_status NOT IN ('refunded', 'voided')
# GROUP BY 1
# ORDER BY 1 DESC

@dataclass
class DbtModel:
    model: str
    layer: str
    description: str
    key_metrics: str

models = [
    DbtModel("stg_orders",
        "Staging",
        "Clean raw orders data, cast types, filter deleted",
        "order_id, total_price, customer_email, status"),
    DbtModel("stg_line_items",
        "Staging",
        "Order line items with product info",
        "line_item_id, product_id, quantity, price"),
    DbtModel("stg_customers",
        "Staging",
        "Clean customer data, deduplicate",
        "customer_id, email, first_order_date, total_orders"),
    DbtModel("int_order_items",
        "Intermediate",
        "Join orders + line_items + products",
        "Enriched order data with product details"),
    DbtModel("daily_revenue",
        "Mart",
        "Daily revenue, orders, AOV, unique customers",
        "date, revenue, orders, aov, customers"),
    DbtModel("customer_ltv",
        "Mart",
        "Customer Lifetime Value, Cohort, RFM",
        "customer_id, ltv, recency, frequency, monetary"),
    DbtModel("product_performance",
        "Mart",
        "Product sales, revenue, margin, inventory",
        "product_id, units_sold, revenue, margin, stock"),
]

print("=== dbt Models ===")
for m in models:
    print(f"  [{m.model}] ({m.layer})")
    print(f"    Desc: {m.description}")
    print(f"    Metrics: {m.key_metrics}")

Dashboard & Analytics

# === Analytics Dashboard Design ===

@dataclass
class DashboardPanel:
    dashboard: str
    panels: str
    refresh: str
    audience: str

dashboards = [
    DashboardPanel("Revenue Overview",
        "Daily/Weekly/Monthly Revenue, AOV, Order Count, "
        "Revenue by Channel, Revenue by Product Category, "
        "Year-over-Year Comparison",
        "Hourly",
        "CEO, CMO, Finance"),
    DashboardPanel("Product Performance",
        "Top Selling Products, Revenue per Product, "
        "Inventory Level, Low Stock Alert, "
        "Product Margin Analysis, Category Breakdown",
        "Daily",
        "Product Manager, Merchandiser"),
    DashboardPanel("Customer Analytics",
        "New vs Returning, CLV Distribution, "
        "Cohort Retention, RFM Segmentation, "
        "Customer Acquisition Cost, Repeat Rate",
        "Weekly",
        "Marketing, CRM"),
    DashboardPanel("Marketing Attribution",
        "Revenue by Channel (Organic/Paid/Social/Email), "
        "ROAS per Campaign, Conversion Rate per Channel, "
        "UTM Tracking, Customer Journey",
        "Daily",
        "Marketing, Growth"),
    DashboardPanel("Operations",
        "Fulfillment Time, Shipping Cost, "
        "Order Status Distribution, Return Rate, "
        "Inventory Turnover, Stockout Incidents",
        "Hourly",
        "Operations, Logistics"),
]

print("=== Dashboards ===")
for d in dashboards:
    print(f"\n  [{d.dashboard}] Refresh: {d.refresh}")
    print(f"    Panels: {d.panels}")
    print(f"    Audience: {d.audience}")

เคล็ดลับ

Shopify Data Pipeline คืออะไร

ดึงข้อมูล Shopify เข้า Warehouse Orders Products Customers Inventory Admin API GraphQL Webhook Bulk Snowflake BigQuery dbt Dashboard

ดึงข้อมูลอย่างไร

REST API GET Paginated GraphQL POST Cost-based Bulk Operations JSONL Webhook Real-time Rate Limit 2 req/sec 1000 cost/sec

Transform ทำอย่างไร

dbt SQL Staging Intermediate Mart daily_revenue customer_ltv product_performance Version Control Test Incremental Full Refresh

สร้าง Dashboard อย่างไร

Looker Metabase Tableau Revenue Products Customers Marketing Operations CLV AOV Conversion Retention Cohort RFM Alert Stockout

สรุป

Shopify Hydrogen Data Pipeline ETL Admin API GraphQL Webhook Bulk Operations dbt Snowflake BigQuery Dashboard Revenue CLV Analytics

📖 บทความที่เกี่ยวข้อง

Shopify Hydrogen Hexagonal Architectureอ่านบทความ → Shopify Hydrogen Pub Sub Architectureอ่านบทความ → Shopify Hydrogen Service Level Objective SLOอ่านบทความ → Shopify Hydrogen Audit Trail Loggingอ่านบทความ → Shopify Hydrogen Agile Scrum Kanbanอ่านบทความ →

📚 ดูบทความทั้งหมด →