ai
Shopify Hydrogen Data Pipeline ETL — สร้าง Data
Shopify Data Pipeline

Shopify Hydrogen Data Pipeline ETL Admin API GraphQL Webhook Bulk Operations Snowflake BigQuery dbt Analytics Dashboard Revenue CLV
เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน เหรียญ ctxc คือ — ข้อมูลครบถ้วน 2026
| Data Source | API | Method | Use Case |
|---|---|---|---|
| Orders | Admin API (GraphQL) | Incremental (created_at) | Revenue, AOV, Conversion |
| Products | Admin API (REST) | Full Sync (daily) | Product Performance, Inventory |
| Customers | Bulk Operations | Full Sync (weekly) | CLV, Cohort, Retention |
| Inventory | Admin API (GraphQL) | Webhook (real-time) | Stock Level, Reorder Alert |
| Analytics | Shopify Analytics API | Daily Sync | Traffic, Conversion, Sessions |
Data Extraction
# === Shopify Data Extraction ===
# import requests
# import json
# import time
#
# SHOPIFY_STORE = os.environ['SHOPIFY_STORE']
# ACCESS_TOKEN = os.environ['SHOPIFY_ACCESS_TOKEN']
# API_VERSION = '2024-01'
# BASE_URL = f'https://{SHOPIFY_STORE}.myshopify.com/admin/api/{API_VERSION}'
#
# headers = {
# 'X-Shopify-Access-Token': ACCESS_TOKEN,
# 'Content-Type': 'application/json'
# }
#
# # REST API - Get Orders (paginated)
# def get_orders(since_id=None, limit=250):
# params = {'limit': limit, 'status': 'any'}
# if since_id:
# params['since_id'] = since_id
# resp = requests.get(f'{BASE_URL}/orders.json', headers=headers, params=params)
# time.sleep(0.5) # Rate limit: 2 req/sec
# return resp.json()['orders']
#
# # GraphQL - Get Orders with specific fields
# def get_orders_graphql(cursor=None):
# query = '''
# { orders(first: 50, after: CURSOR) {
# edges { cursor node {
# id name createdAt totalPriceSet { shopMoney { amount } }
# customer { id email }
# lineItems(first: 10) { edges { node {
# title quantity originalTotalSet { shopMoney { amount } }
# }}}}}
# pageInfo { hasNextPage endCursor }
# }}'''
# query = query.replace('CURSOR', f'"{cursor}"' if cursor else 'null')
# resp = requests.post(f'{BASE_URL}/graphql.json',
# headers=headers, json={'query': query})
# return resp.json()['data']['orders']
from dataclasses import dataclass
@dataclass
class ExtractionMethod:
method: str
api: str
rate_limit: str
best_for: str
data_volume: str
methods = [
ExtractionMethod("REST API (Paginated)",
"GET /admin/api/{version}/{resource}.json",
"2 requests/second (REST)",
"Small-medium stores, Simple queries",
"< 100K records"),
ExtractionMethod("GraphQL API",
"POST /admin/api/{version}/graphql.json",
"1000 cost points/second",
"Complex queries, Nested data, Specific fields",
"< 500K records"),
ExtractionMethod("Bulk Operations",
"GraphQL bulkOperationRunQuery mutation",
"1 operation at a time",
"Large data export, Initial load, Full sync",
"> 500K records"),
ExtractionMethod("Webhook",
"POST to your endpoint on events",
"No limit (event-driven)",
"Real-time updates, Incremental sync",
"Any (event-based)"),
]
print("=== Extraction Methods ===")
for m in methods:
print(f"\n [{m.method}]")
print(f" API: {m.api}")
print(f" Rate: {m.rate_limit}")
print(f" Best: {m.best_for}")
print(f" Volume: {m.data_volume}")
Transform with dbt

# === dbt Transform Models ===
# -- models/staging/stg_orders.sql
# SELECT
# id as order_id,
# name as order_number,
# email as customer_email,
# created_at,
# updated_at,
# financial_status,
# fulfillment_status,
# total_price::DECIMAL(10,2) as total_price,
# subtotal_price::DECIMAL(10,2) as subtotal_price,
# total_tax::DECIMAL(10,2) as total_tax,
# total_discounts::DECIMAL(10,2) as total_discounts,
# currency,
# cancel_reason,
# cancelled_at
# FROM {{ source('shopify', 'orders') }}
# WHERE _fivetran_deleted = false
# -- models/marts/daily_revenue.sql
# SELECT
# DATE_TRUNC('day', created_at) as date,
# COUNT(DISTINCT order_id) as total_orders,
# SUM(total_price) as revenue,
# AVG(total_price) as aov,
# COUNT(DISTINCT customer_email) as unique_customers
# FROM {{ ref('stg_orders') }}
# WHERE financial_status NOT IN ('refunded', 'voided')
# GROUP BY 1
# ORDER BY 1 DESC
@dataclass
class DbtModel:
model: str
layer: str
description: str
key_metrics: str
models = [
DbtModel("stg_orders",
"Staging",
"Clean raw orders data, cast types, filter deleted",
"order_id, total_price, customer_email, status"),
DbtModel("stg_line_items",
"Staging",
"Order line items with product info",
"line_item_id, product_id, quantity, price"),
DbtModel("stg_customers",
"Staging",
"Clean customer data, deduplicate",
"customer_id, email, first_order_date, total_orders"),
DbtModel("int_order_items",
"Intermediate",
"Join orders + line_items + products",
"Enriched order data with product details"),
DbtModel("daily_revenue",
"Mart",
"Daily revenue, orders, AOV, unique customers",
"date, revenue, orders, aov, customers"),
DbtModel("customer_ltv",
"Mart",
"Customer Lifetime Value, Cohort, RFM",
"customer_id, ltv, recency, frequency, monetary"),
DbtModel("product_performance",
"Mart",
"Product sales, revenue, margin, inventory",
"product_id, units_sold, revenue, margin, stock"),
]
print("=== dbt Models ===")
for m in models:
print(f" [{m.model}] ({m.layer})")
print(f" Desc: {m.description}")
print(f" Metrics: {m.key_metrics}")
Dashboard & Analytics
# === Analytics Dashboard Design ===
@dataclass
class DashboardPanel:
dashboard: str
panels: str
refresh: str
audience: str
dashboards = [
DashboardPanel("Revenue Overview",
"Daily/Weekly/Monthly Revenue, AOV, Order Count, "
"Revenue by Channel, Revenue by Product Category, "
"Year-over-Year Comparison",
"Hourly",
"CEO, CMO, Finance"),
DashboardPanel("Product Performance",
"Top Selling Products, Revenue per Product, "
"Inventory Level, Low Stock Alert, "
"Product Margin Analysis, Category Breakdown",
"Daily",
"Product Manager, Merchandiser"),
DashboardPanel("Customer Analytics",
"New vs Returning, CLV Distribution, "
"Cohort Retention, RFM Segmentation, "
"Customer Acquisition Cost, Repeat Rate",
"Weekly",
"Marketing, CRM"),
DashboardPanel("Marketing Attribution",
"Revenue by Channel (Organic/Paid/Social/Email), "
"ROAS per Campaign, Conversion Rate per Channel, "
"UTM Tracking, Customer Journey",
"Daily",
"Marketing, Growth"),
DashboardPanel("Operations",
"Fulfillment Time, Shipping Cost, "
"Order Status Distribution, Return Rate, "
"Inventory Turnover, Stockout Incidents",
"Hourly",
"Operations, Logistics"),
]
print("=== Dashboards ===")
for d in dashboards:
print(f"\n [{d.dashboard}] Refresh: {d.refresh}")
print(f" Panels: {d.panels}")
print(f" Audience: {d.audience}")
เคล็ดลับ
- Bulk: ใช้ Bulk Operations API สำหรับ Initial Load ข้อมูลจำนวนมาก
- Webhook: ใช้ Webhook สำหรับ Incremental Update ลด API Call
- dbt: ใช้ dbt Transform SQL Version Control Test ได้
- Rate Limit: REST 2 req/sec, GraphQL 1000 cost/sec ใช้ Sleep ป้องกัน
- Fivetran: ใช้ Fivetran Shopify Connector ถ้าไม่อยากเขียน Pipeline เอง
Shopify Data Pipeline คืออะไร
ดึงข้อมูล Shopify เข้า Warehouse Orders Products Customers Inventory Admin API GraphQL Webhook Bulk Snowflake BigQuery dbt Dashboard
เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Fail2ban Advanced Agile Scrum Kanban
อ่านเพิ่ม: MySQL และ MariaDB คืออะไร? สอน Database Administration สำหรั · อ่านเพิ่ม: Serverless คืออะไร? สอน AWS Lambda, Cloud Functions และ Func · อ่านเพิ่ม: Apache Kafka เจาะลึก สอน Kafka Streams, Connect, Schema Regi
แนะนำเพิ่มเติม — บทวิเคราะห์จาก XM Signal
เนื้อหาเกี่ยวข้อง — อ่านต่อ: Voice Cloning CQRS Event Sourcing





