ai

Shopify Hydrogen Data Pipeline ETL — สร้าง Data

Shopify Hydrogen Data Pipeline ETL — สร้าง Data

Shopify Data Pipeline

Shopify Hydrogen Data Pipeline ETL — สร้าง Data

Shopify Hydrogen Data Pipeline ETL Admin API GraphQL Webhook Bulk Operations Snowflake BigQuery dbt Analytics Dashboard Revenue CLV

เนื้อหาเกี่ยวข้อง — แนะนำให้อ่าน เหรียญ ctxc คือ — ข้อมูลครบถ้วน 2026

Data SourceAPIMethodUse Case
OrdersAdmin API (GraphQL)Incremental (created_at)Revenue, AOV, Conversion
ProductsAdmin API (REST)Full Sync (daily)Product Performance, Inventory
CustomersBulk OperationsFull Sync (weekly)CLV, Cohort, Retention
InventoryAdmin API (GraphQL)Webhook (real-time)Stock Level, Reorder Alert
AnalyticsShopify Analytics APIDaily SyncTraffic, Conversion, Sessions

Data Extraction

# === Shopify Data Extraction ===

# import requests
# import json
# import time
#
# SHOPIFY_STORE = os.environ['SHOPIFY_STORE']
# ACCESS_TOKEN = os.environ['SHOPIFY_ACCESS_TOKEN']
# API_VERSION = '2024-01'
# BASE_URL = f'https://{SHOPIFY_STORE}.myshopify.com/admin/api/{API_VERSION}'
#
# headers = {
#     'X-Shopify-Access-Token': ACCESS_TOKEN,
#     'Content-Type': 'application/json'
# }
#
# # REST API - Get Orders (paginated)
# def get_orders(since_id=None, limit=250):
#     params = {'limit': limit, 'status': 'any'}
#     if since_id:
#         params['since_id'] = since_id
#     resp = requests.get(f'{BASE_URL}/orders.json', headers=headers, params=params)
#     time.sleep(0.5)  # Rate limit: 2 req/sec
#     return resp.json()['orders']
#
# # GraphQL - Get Orders with specific fields
# def get_orders_graphql(cursor=None):
#     query = '''
#     { orders(first: 50, after: CURSOR) {
#         edges { cursor node {
#             id name createdAt totalPriceSet { shopMoney { amount } }
#             customer { id email }
#             lineItems(first: 10) { edges { node {
#                 title quantity originalTotalSet { shopMoney { amount } }
#     }}}}}
#         pageInfo { hasNextPage endCursor }
#     }}'''
#     query = query.replace('CURSOR', f'"{cursor}"' if cursor else 'null')
#     resp = requests.post(f'{BASE_URL}/graphql.json',
#         headers=headers, json={'query': query})
#     return resp.json()['data']['orders']

from dataclasses import dataclass

@dataclass
class ExtractionMethod:
    method: str
    api: str
    rate_limit: str
    best_for: str
    data_volume: str

methods = [
    ExtractionMethod("REST API (Paginated)",
        "GET /admin/api/{version}/{resource}.json",
        "2 requests/second (REST)",
        "Small-medium stores, Simple queries",
        "< 100K records"),
    ExtractionMethod("GraphQL API",
        "POST /admin/api/{version}/graphql.json",
        "1000 cost points/second",
        "Complex queries, Nested data, Specific fields",
        "< 500K records"),
    ExtractionMethod("Bulk Operations",
        "GraphQL bulkOperationRunQuery mutation",
        "1 operation at a time",
        "Large data export, Initial load, Full sync",
        "> 500K records"),
    ExtractionMethod("Webhook",
        "POST to your endpoint on events",
        "No limit (event-driven)",
        "Real-time updates, Incremental sync",
        "Any (event-based)"),
]

print("=== Extraction Methods ===")
for m in methods:
    print(f"\n  [{m.method}]")
    print(f"    API: {m.api}")
    print(f"    Rate: {m.rate_limit}")
    print(f"    Best: {m.best_for}")
    print(f"    Volume: {m.data_volume}")

Transform with dbt

Shopify Hydrogen Data Pipeline ETL — สร้าง Data
# === dbt Transform Models ===

# -- models/staging/stg_orders.sql
# SELECT
#     id as order_id,
#     name as order_number,
#     email as customer_email,
#     created_at,
#     updated_at,
#     financial_status,
#     fulfillment_status,
#     total_price::DECIMAL(10,2) as total_price,
#     subtotal_price::DECIMAL(10,2) as subtotal_price,
#     total_tax::DECIMAL(10,2) as total_tax,
#     total_discounts::DECIMAL(10,2) as total_discounts,
#     currency,
#     cancel_reason,
#     cancelled_at
# FROM {{ source('shopify', 'orders') }}
# WHERE _fivetran_deleted = false

# -- models/marts/daily_revenue.sql
# SELECT
#     DATE_TRUNC('day', created_at) as date,
#     COUNT(DISTINCT order_id) as total_orders,
#     SUM(total_price) as revenue,
#     AVG(total_price) as aov,
#     COUNT(DISTINCT customer_email) as unique_customers
# FROM {{ ref('stg_orders') }}
# WHERE financial_status NOT IN ('refunded', 'voided')
# GROUP BY 1
# ORDER BY 1 DESC

@dataclass
class DbtModel:
    model: str
    layer: str
    description: str
    key_metrics: str

models = [
    DbtModel("stg_orders",
        "Staging",
        "Clean raw orders data, cast types, filter deleted",
        "order_id, total_price, customer_email, status"),
    DbtModel("stg_line_items",
        "Staging",
        "Order line items with product info",
        "line_item_id, product_id, quantity, price"),
    DbtModel("stg_customers",
        "Staging",
        "Clean customer data, deduplicate",
        "customer_id, email, first_order_date, total_orders"),
    DbtModel("int_order_items",
        "Intermediate",
        "Join orders + line_items + products",
        "Enriched order data with product details"),
    DbtModel("daily_revenue",
        "Mart",
        "Daily revenue, orders, AOV, unique customers",
        "date, revenue, orders, aov, customers"),
    DbtModel("customer_ltv",
        "Mart",
        "Customer Lifetime Value, Cohort, RFM",
        "customer_id, ltv, recency, frequency, monetary"),
    DbtModel("product_performance",
        "Mart",
        "Product sales, revenue, margin, inventory",
        "product_id, units_sold, revenue, margin, stock"),
]

print("=== dbt Models ===")
for m in models:
    print(f"  [{m.model}] ({m.layer})")
    print(f"    Desc: {m.description}")
    print(f"    Metrics: {m.key_metrics}")

Dashboard & Analytics

# === Analytics Dashboard Design ===

@dataclass
class DashboardPanel:
    dashboard: str
    panels: str
    refresh: str
    audience: str

dashboards = [
    DashboardPanel("Revenue Overview",
        "Daily/Weekly/Monthly Revenue, AOV, Order Count, "
        "Revenue by Channel, Revenue by Product Category, "
        "Year-over-Year Comparison",
        "Hourly",
        "CEO, CMO, Finance"),
    DashboardPanel("Product Performance",
        "Top Selling Products, Revenue per Product, "
        "Inventory Level, Low Stock Alert, "
        "Product Margin Analysis, Category Breakdown",
        "Daily",
        "Product Manager, Merchandiser"),
    DashboardPanel("Customer Analytics",
        "New vs Returning, CLV Distribution, "
        "Cohort Retention, RFM Segmentation, "
        "Customer Acquisition Cost, Repeat Rate",
        "Weekly",
        "Marketing, CRM"),
    DashboardPanel("Marketing Attribution",
        "Revenue by Channel (Organic/Paid/Social/Email), "
        "ROAS per Campaign, Conversion Rate per Channel, "
        "UTM Tracking, Customer Journey",
        "Daily",
        "Marketing, Growth"),
    DashboardPanel("Operations",
        "Fulfillment Time, Shipping Cost, "
        "Order Status Distribution, Return Rate, "
        "Inventory Turnover, Stockout Incidents",
        "Hourly",
        "Operations, Logistics"),
]

print("=== Dashboards ===")
for d in dashboards:
    print(f"\n  [{d.dashboard}] Refresh: {d.refresh}")
    print(f"    Panels: {d.panels}")
    print(f"    Audience: {d.audience}")

เคล็ดลับ

  • Bulk: ใช้ Bulk Operations API สำหรับ Initial Load ข้อมูลจำนวนมาก
  • Webhook: ใช้ Webhook สำหรับ Incremental Update ลด API Call
  • dbt: ใช้ dbt Transform SQL Version Control Test ได้
  • Rate Limit: REST 2 req/sec, GraphQL 1000 cost/sec ใช้ Sleep ป้องกัน
  • Fivetran: ใช้ Fivetran Shopify Connector ถ้าไม่อยากเขียน Pipeline เอง

Shopify Data Pipeline คืออะไร

ดึงข้อมูล Shopify เข้า Warehouse Orders Products Customers Inventory Admin API GraphQL Webhook Bulk Snowflake BigQuery dbt Dashboard

เนื้อหาเกี่ยวข้อง — บทความที่เกี่ยวข้อง: Fail2ban Advanced Agile Scrum Kanban

อ่านเพิ่ม: MySQL และ MariaDB คืออะไร? สอน Database Administration สำหรั · อ่านเพิ่ม: Serverless คืออะไร? สอน AWS Lambda, Cloud Functions และ Func · อ่านเพิ่ม: Apache Kafka เจาะลึก สอน Kafka Streams, Connect, Schema Regi

แนะนำเพิ่มเติม — บทวิเคราะห์จาก XM Signal

เนื้อหาเกี่ยวข้อง — อ่านต่อ: Voice Cloning CQRS Event Sourcing

XM Legend · เทรดเดอร์ & ผู้สอน Forex 13 ปี

ผู้ก่อตั้ง SiamCafe ตั้งแต่ปี 1997 · เทรดเดอร์สาย Forex มากกว่า 13 ปี ได้รับการยกย่องเป็น XM Legend · แบ่งปันความรู้ Forex, ไอที, AI และการเทรด จากประสบการณ์จริงในตลาดจริง