Shopify Sync
Parent: Store OVERVIEWStatus: Active
Store: y0nrcb-2y.myshopify.com → trendingsociety.com
Overview
Shopify is the source of truth for all commerce data. This document covers bidirectional sync between Shopify and Supabase for analytics, personalization, and cross-business-unit integration.Copy
┌─────────────────────────────────────────────────────────────────────────┐
│ SHOPIFY SYNC ARCHITECTURE │
├─────────────────────────────────────────────────────────────────────────┤
│ │
│ SHOPIFY (Source of Truth) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Products │ Orders │ Customers │ Inventory │ Fulfillments │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ Webhooks │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SUPABASE EDGE FUNCTIONS │ │
│ │ /webhooks/shopify/products │ │
│ │ /webhooks/shopify/orders │ │
│ │ /webhooks/shopify/customers │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ SUPABASE (Analytics & Integration) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ shopify_products │ shopify_orders │ shopify_customers │ │
│ │ shopify_fulfillments │ shopify_inventory_levels │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ BUSINESS UNIT INTEGRATION │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Publisher │ │ Platform │ │ Agency │ │
│ │ Affiliate │ │ Licensing │ │ Packages │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────┘
Sync Direction
| Data Type | Direction | Trigger | Latency |
|---|---|---|---|
| Products | Shopify → Supabase | Webhook | Real-time |
| Orders | Shopify → Supabase | Webhook | Real-time |
| Customers | Shopify → Supabase | Webhook | Real-time |
| Fulfillments | Shopify → Supabase | Webhook | Real-time |
| Inventory | Shopify → Supabase | Webhook + Polling | Real-time / 15min |
| Metafields | Supabase → Shopify | API call | On-demand |
Core Tables
shopify_products
Mirror of Shopify product catalog with enrichment.Copy
CREATE TABLE shopify_products (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
-- Shopify identifiers
shopify_id bigint UNIQUE NOT NULL, -- Shopify's numeric ID
shopify_gid text, -- GraphQL ID: gid://shopify/Product/123
handle text UNIQUE NOT NULL, -- URL slug
-- Basic info
title text NOT NULL,
body_html text,
vendor text,
product_type text,
-- Status
status text DEFAULT 'active', -- 'active', 'draft', 'archived'
published_at timestamptz,
-- Pricing (from first variant)
price numeric(10,2),
compare_at_price numeric(10,2),
-- Media
featured_image_url text,
images jsonb DEFAULT '[]', -- [{id, src, alt, position}]
-- Categorization
tags text[],
collections jsonb DEFAULT '[]', -- [{id, title, handle}]
-- Variants summary
variant_count integer DEFAULT 1,
variants jsonb DEFAULT '[]', -- Full variant data
-- Inventory
total_inventory integer DEFAULT 0,
track_inventory boolean DEFAULT true,
-- SEO
seo_title text,
seo_description text,
-- Metafields (custom data)
metafields jsonb DEFAULT '{}',
-- ═══════════════════════════════════════════════════════════════════
-- TRENDING SOCIETY ENRICHMENT
-- ═══════════════════════════════════════════════════════════════════
-- Vertical assignment
vertical_id uuid REFERENCES publisher_verticals(id),
-- Source type
product_source text, -- 'supliful', 'collective', 'digital', 'service', 'owned'
supplier_id text, -- External supplier ID
-- Content linkage
linked_posts uuid[], -- Publisher posts featuring this product
-- Performance
total_orders integer DEFAULT 0,
total_revenue numeric(10,2) DEFAULT 0,
conversion_rate numeric,
-- Sync metadata
shopify_created_at timestamptz,
shopify_updated_at timestamptz,
last_synced_at timestamptz DEFAULT now(),
sync_error text,
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);
CREATE INDEX idx_shopify_products_id ON shopify_products(shopify_id);
CREATE INDEX idx_shopify_products_handle ON shopify_products(handle);
CREATE INDEX idx_shopify_products_status ON shopify_products(status);
CREATE INDEX idx_shopify_products_vertical ON shopify_products(vertical_id);
CREATE INDEX idx_shopify_products_source ON shopify_products(product_source);
CREATE INDEX idx_shopify_products_tags ON shopify_products USING GIN(tags);
shopify_orders
Order records with attribution tracking.Copy
CREATE TABLE shopify_orders (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
-- Shopify identifiers
shopify_id bigint UNIQUE NOT NULL,
shopify_gid text,
order_number integer NOT NULL, -- Human-readable #1001
name text, -- "#1001"
-- Customer
shopify_customer_id bigint,
customer_email text,
customer_name text,
-- Financials
subtotal_price numeric(10,2),
total_discounts numeric(10,2),
total_shipping numeric(10,2),
total_tax numeric(10,2),
total_price numeric(10,2),
currency text DEFAULT 'USD',
-- Status
financial_status text, -- 'pending', 'paid', 'refunded', 'partially_refunded'
fulfillment_status text, -- null, 'fulfilled', 'partial', 'unfulfilled'
cancelled_at timestamptz,
cancel_reason text,
-- Line items
line_items jsonb DEFAULT '[]', -- [{product_id, variant_id, title, quantity, price}]
line_item_count integer DEFAULT 0,
-- Shipping
shipping_address jsonb,
billing_address jsonb,
shipping_lines jsonb DEFAULT '[]',
-- Discounts
discount_codes jsonb DEFAULT '[]',
discount_applications jsonb DEFAULT '[]',
-- ═══════════════════════════════════════════════════════════════════
-- ATTRIBUTION (Critical for ROI)
-- ═══════════════════════════════════════════════════════════════════
-- UTM tracking
utm_source text,
utm_medium text,
utm_campaign text,
utm_term text,
utm_content text,
-- Landing page
landing_page_url text,
referring_site text,
-- Internal attribution
attributed_vertical_id uuid REFERENCES publisher_verticals(id),
attributed_post_id uuid REFERENCES publisher_posts(id),
attributed_affiliate_link_id uuid,
-- Business unit
business_unit text, -- 'store', 'agency', 'platform'
-- Agency package (if applicable)
agency_package_id uuid,
agency_client_id uuid,
-- Platform license (if applicable)
platform_license_id uuid,
-- Audience profile
audience_profile_id uuid REFERENCES audience_profiles(id),
-- ═══════════════════════════════════════════════════════════════════
-- FULFILLMENT ROUTING
-- ═══════════════════════════════════════════════════════════════════
fulfillment_type text, -- 'supliful', 'collective', 'digital', 'manual'
requires_manual_fulfillment boolean DEFAULT false,
-- Timestamps
processed_at timestamptz,
closed_at timestamptz,
-- Sync
shopify_created_at timestamptz,
shopify_updated_at timestamptz,
last_synced_at timestamptz DEFAULT now(),
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);
CREATE INDEX idx_shopify_orders_id ON shopify_orders(shopify_id);
CREATE INDEX idx_shopify_orders_customer ON shopify_orders(shopify_customer_id);
CREATE INDEX idx_shopify_orders_email ON shopify_orders(customer_email);
CREATE INDEX idx_shopify_orders_status ON shopify_orders(financial_status, fulfillment_status);
CREATE INDEX idx_shopify_orders_date ON shopify_orders(processed_at DESC);
CREATE INDEX idx_shopify_orders_vertical ON shopify_orders(attributed_vertical_id);
CREATE INDEX idx_shopify_orders_utm ON shopify_orders(utm_source, utm_campaign);
shopify_customers
Customer records with lifecycle tracking.Copy
CREATE TABLE shopify_customers (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
-- Shopify identifiers
shopify_id bigint UNIQUE NOT NULL,
shopify_gid text,
-- Contact
email text,
phone text,
-- Name
first_name text,
last_name text,
-- Status
state text, -- 'enabled', 'disabled', 'invited', 'declined'
verified_email boolean DEFAULT false,
accepts_marketing boolean DEFAULT false,
accepts_marketing_updated_at timestamptz,
-- Address
default_address jsonb,
addresses jsonb DEFAULT '[]',
-- Order history
orders_count integer DEFAULT 0,
total_spent numeric(10,2) DEFAULT 0,
-- Tags
tags text[],
-- Notes
note text,
-- ═══════════════════════════════════════════════════════════════════
-- TRENDING SOCIETY ENRICHMENT
-- ═══════════════════════════════════════════════════════════════════
-- Link to audience profile (unified identity)
audience_profile_id uuid REFERENCES audience_profiles(id),
-- Lifecycle
lifecycle_stage text, -- 'prospect', 'first_purchase', 'repeat', 'vip', 'churned'
first_order_at timestamptz,
last_order_at timestamptz,
-- Scoring
rfm_score text, -- Recency-Frequency-Monetary: '555', '111', etc.
customer_value_tier text, -- 'bronze', 'silver', 'gold', 'platinum'
-- Predictions
predicted_next_order_date date,
churn_risk_score integer, -- 0-100
predicted_ltv numeric(10,2),
-- Sync
shopify_created_at timestamptz,
shopify_updated_at timestamptz,
last_synced_at timestamptz DEFAULT now(),
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);
CREATE INDEX idx_shopify_customers_id ON shopify_customers(shopify_id);
CREATE INDEX idx_shopify_customers_email ON shopify_customers(email);
CREATE INDEX idx_shopify_customers_audience ON shopify_customers(audience_profile_id);
CREATE INDEX idx_shopify_customers_lifecycle ON shopify_customers(lifecycle_stage);
CREATE INDEX idx_shopify_customers_value ON shopify_customers(customer_value_tier);
shopify_fulfillments
Fulfillment tracking for all order types.Copy
CREATE TABLE shopify_fulfillments (
id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
-- Shopify identifiers
shopify_id bigint UNIQUE NOT NULL,
shopify_order_id bigint NOT NULL,
-- Status
status text, -- 'pending', 'open', 'success', 'cancelled', 'error', 'failure'
-- Tracking
tracking_company text,
tracking_number text,
tracking_url text,
tracking_numbers text[],
tracking_urls text[],
-- Shipment
shipment_status text, -- 'label_printed', 'in_transit', 'out_for_delivery', 'delivered'
-- Line items fulfilled
line_items jsonb DEFAULT '[]',
-- Location
location_id bigint,
-- Fulfillment service
service text, -- 'manual', 'supliful', 'collective'
-- Timestamps
shopify_created_at timestamptz,
shopify_updated_at timestamptz,
created_at timestamptz DEFAULT now(),
updated_at timestamptz DEFAULT now()
);
CREATE INDEX idx_shopify_fulfillments_order ON shopify_fulfillments(shopify_order_id);
CREATE INDEX idx_shopify_fulfillments_status ON shopify_fulfillments(status);
CREATE INDEX idx_shopify_fulfillments_tracking ON shopify_fulfillments(tracking_number);
Webhook Configuration
Required Webhooks
| Topic | Endpoint | Purpose |
|---|---|---|
products/create | /webhooks/shopify/products | New product sync |
products/update | /webhooks/shopify/products | Product changes |
products/delete | /webhooks/shopify/products | Product removal |
orders/create | /webhooks/shopify/orders | New order capture |
orders/updated | /webhooks/shopify/orders | Status changes |
orders/paid | /webhooks/shopify/orders | Payment confirmation |
orders/fulfilled | /webhooks/shopify/orders | Fulfillment complete |
orders/cancelled | /webhooks/shopify/orders | Cancellation |
customers/create | /webhooks/shopify/customers | New customer |
customers/update | /webhooks/shopify/customers | Customer changes |
fulfillments/create | /webhooks/shopify/fulfillments | Fulfillment started |
fulfillments/update | /webhooks/shopify/fulfillments | Tracking updates |
inventory_levels/update | /webhooks/shopify/inventory | Stock changes |
Webhook Handler Pattern
Copy
// supabase/functions/webhooks-shopify-orders/index.ts
import { serve } from 'https://deno.land/[email protected]/http/server.ts'
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'
import { verifyShopifyWebhook } from '../_shared/shopify-verify.ts'
serve(async (req) => {
// 1. Verify webhook signature
const hmac = req.headers.get('x-shopify-hmac-sha256')
const body = await req.text()
if (!verifyShopifyWebhook(body, hmac)) {
return new Response('Unauthorized', { status: 401 })
}
// 2. Parse payload
const order = JSON.parse(body)
const topic = req.headers.get('x-shopify-topic')
// 3. Initialize Supabase
const supabase = createClient(
Deno.env.get('SUPABASE_URL')!,
Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
)
// 4. Handle by topic
switch (topic) {
case 'orders/create':
await handleOrderCreate(supabase, order)
break
case 'orders/updated':
await handleOrderUpdate(supabase, order)
break
case 'orders/paid':
await handleOrderPaid(supabase, order)
break
// ... other cases
}
return new Response('OK', { status: 200 })
})
async function handleOrderCreate(supabase, order) {
// Extract attribution from landing page / UTM
const attribution = extractAttribution(order)
// Upsert order
const { error } = await supabase
.from('shopify_orders')
.upsert({
shopify_id: order.id,
shopify_gid: `gid://shopify/Order/${order.id}`,
order_number: order.order_number,
name: order.name,
shopify_customer_id: order.customer?.id,
customer_email: order.email,
customer_name: `${order.customer?.first_name} ${order.customer?.last_name}`.trim(),
subtotal_price: parseFloat(order.subtotal_price),
total_discounts: parseFloat(order.total_discounts),
total_shipping: parseFloat(order.total_shipping_price_set?.shop_money?.amount || 0),
total_tax: parseFloat(order.total_tax),
total_price: parseFloat(order.total_price),
currency: order.currency,
financial_status: order.financial_status,
fulfillment_status: order.fulfillment_status,
line_items: order.line_items,
line_item_count: order.line_items.length,
shipping_address: order.shipping_address,
billing_address: order.billing_address,
discount_codes: order.discount_codes,
...attribution,
processed_at: order.processed_at,
shopify_created_at: order.created_at,
shopify_updated_at: order.updated_at,
last_synced_at: new Date().toISOString()
}, {
onConflict: 'shopify_id'
})
if (error) throw error
// Link to audience profile
if (order.email) {
await linkOrderToAudience(supabase, order)
}
// Trigger fulfillment routing
await routeFulfillment(supabase, order)
}
function extractAttribution(order) {
// Parse landing page URL for UTM params
const landingUrl = order.landing_site || ''
const url = new URL(landingUrl, 'https://trendingsociety.com')
return {
utm_source: url.searchParams.get('utm_source'),
utm_medium: url.searchParams.get('utm_medium'),
utm_campaign: url.searchParams.get('utm_campaign'),
utm_term: url.searchParams.get('utm_term'),
utm_content: url.searchParams.get('utm_content'),
landing_page_url: landingUrl,
referring_site: order.referring_site
}
}
Product Sync Workflow
Initial Bulk Sync
Copy
// scripts/sync-shopify-products.ts
import Shopify from '@shopify/shopify-api'
async function bulkSyncProducts() {
const client = new Shopify.Clients.Graphql(
Deno.env.get('SHOPIFY_SHOP_DOMAIN'),
Deno.env.get('SHOPIFY_ACCESS_TOKEN')
)
let hasNextPage = true
let cursor = null
while (hasNextPage) {
const { body } = await client.query({
data: `
query getProducts($cursor: String) {
products(first: 50, after: $cursor) {
pageInfo {
hasNextPage
endCursor
}
edges {
node {
id
legacyResourceId
handle
title
descriptionHtml
vendor
productType
status
publishedAt
featuredImage { url altText }
images(first: 10) {
edges { node { url altText } }
}
tags
variants(first: 100) {
edges {
node {
id
legacyResourceId
title
price
compareAtPrice
inventoryQuantity
sku
}
}
}
metafields(first: 20) {
edges {
node {
namespace
key
value
}
}
}
}
}
}
}
`,
variables: { cursor }
})
const products = body.data.products.edges.map(e => e.node)
for (const product of products) {
await upsertProduct(product)
}
hasNextPage = body.data.products.pageInfo.hasNextPage
cursor = body.data.products.pageInfo.endCursor
// Rate limit
await new Promise(r => setTimeout(r, 500))
}
}
async function upsertProduct(product) {
const variants = product.variants.edges.map(e => e.node)
const firstVariant = variants[0]
// Determine product source from metafields
const sourceMetafield = product.metafields.edges.find(
e => e.node.namespace === 'custom' && e.node.key === 'product_source'
)
await supabase
.from('shopify_products')
.upsert({
shopify_id: parseInt(product.legacyResourceId),
shopify_gid: product.id,
handle: product.handle,
title: product.title,
body_html: product.descriptionHtml,
vendor: product.vendor,
product_type: product.productType,
status: product.status.toLowerCase(),
published_at: product.publishedAt,
price: parseFloat(firstVariant?.price || 0),
compare_at_price: firstVariant?.compareAtPrice
? parseFloat(firstVariant.compareAtPrice)
: null,
featured_image_url: product.featuredImage?.url,
images: product.images.edges.map(e => e.node),
tags: product.tags,
variant_count: variants.length,
variants: variants,
total_inventory: variants.reduce((sum, v) => sum + (v.inventoryQuantity || 0), 0),
product_source: sourceMetafield?.node.value || 'owned',
metafields: Object.fromEntries(
product.metafields.edges.map(e => [`${e.node.namespace}.${e.node.key}`, e.node.value])
),
last_synced_at: new Date().toISOString()
}, {
onConflict: 'shopify_id'
})
}
Vertical Auto-Assignment
Copy
-- Assign products to verticals based on tags/type
CREATE OR REPLACE FUNCTION assign_product_vertical()
RETURNS trigger AS $$
BEGIN
-- Match by product type
SELECT id INTO NEW.vertical_id
FROM publisher_verticals
WHERE slug = lower(NEW.product_type)
LIMIT 1;
-- If no match, try tags
IF NEW.vertical_id IS NULL THEN
SELECT v.id INTO NEW.vertical_id
FROM publisher_verticals v
WHERE v.slug = ANY(
SELECT lower(unnest(NEW.tags))
)
LIMIT 1;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_assign_product_vertical
BEFORE INSERT OR UPDATE ON shopify_products
FOR EACH ROW
EXECUTE FUNCTION assign_product_vertical();
Order Attribution
Attribution Hierarchy
Orders are attributed using this priority:- UTM Parameters - Explicit campaign tracking
- Affiliate Link ID - Custom parameter for affiliate tracking
- Referring URL - Publisher post that referred
- Landing Page - First page visited
- Customer History - Previous vertical affinity
Copy
async function attributeOrder(supabase, order) {
let attribution = {
attributed_vertical_id: null,
attributed_post_id: null,
attributed_affiliate_link_id: null,
business_unit: 'store'
}
// 1. Check for affiliate link ID
const affiliateLinkId = extractAffiliateId(order.landing_site)
if (affiliateLinkId) {
const { data: link } = await supabase
.from('affiliate_links')
.select('id, post_id, vertical_id')
.eq('id', affiliateLinkId)
.single()
if (link) {
attribution.attributed_affiliate_link_id = link.id
attribution.attributed_post_id = link.post_id
attribution.attributed_vertical_id = link.vertical_id
return attribution
}
}
// 2. Check UTM campaign for vertical
if (order.utm_campaign) {
const { data: vertical } = await supabase
.from('publisher_verticals')
.select('id')
.eq('slug', order.utm_campaign.split('_')[0])
.single()
if (vertical) {
attribution.attributed_vertical_id = vertical.id
}
}
// 3. Check referring URL for post
if (order.referring_site?.includes('trendingsociety.com')) {
const slug = extractPostSlug(order.referring_site)
if (slug) {
const { data: post } = await supabase
.from('publisher_posts')
.select('id, vertical_id')
.eq('slug', slug)
.single()
if (post) {
attribution.attributed_post_id = post.id
attribution.attributed_vertical_id = post.vertical_id
}
}
}
// 4. Check for agency package product
const agencyProduct = order.line_items.find(
item => item.product_id && isAgencyProduct(item.product_id)
)
if (agencyProduct) {
attribution.business_unit = 'agency'
attribution.agency_package_id = agencyProduct.variant_id
}
// 5. Check for platform license product
const platformProduct = order.line_items.find(
item => item.product_id && isPlatformProduct(item.product_id)
)
if (platformProduct) {
attribution.business_unit = 'platform'
attribution.platform_license_id = platformProduct.variant_id
}
return attribution
}
Inventory Sync
Real-time Inventory Webhook
Copy
// Handle inventory level updates
async function handleInventoryUpdate(supabase, payload) {
const { inventory_item_id, available, location_id } = payload
// Find product by inventory item
const { data: product } = await supabase
.from('shopify_products')
.select('id, shopify_id, variants')
.filter('variants', 'cs', `[{"inventory_item_id": ${inventory_item_id}}]`)
.single()
if (!product) return
// Update variant inventory in JSONB
const updatedVariants = product.variants.map(v => {
if (v.inventory_item_id === inventory_item_id) {
return { ...v, inventory_quantity: available }
}
return v
})
// Recalculate total
const totalInventory = updatedVariants.reduce(
(sum, v) => sum + (v.inventory_quantity || 0),
0
)
await supabase
.from('shopify_products')
.update({
variants: updatedVariants,
total_inventory: totalInventory,
last_synced_at: new Date().toISOString()
})
.eq('id', product.id)
// Check for low stock alert
if (totalInventory < 10) {
await createLowStockAlert(product)
}
}
Polling Fallback
Copy
// Cron: every 15 minutes
async function pollInventoryLevels() {
const client = new Shopify.Clients.Rest(
Deno.env.get('SHOPIFY_SHOP_DOMAIN'),
Deno.env.get('SHOPIFY_ACCESS_TOKEN')
)
// Get all products with track_inventory = true
const { data: products } = await supabase
.from('shopify_products')
.select('shopify_id, variants')
.eq('track_inventory', true)
for (const product of products) {
const { body } = await client.get({
path: `products/${product.shopify_id}`,
query: { fields: 'variants' }
})
// Update inventory from response
await updateProductInventory(product.shopify_id, body.product.variants)
// Rate limit: 2 calls/second
await new Promise(r => setTimeout(r, 500))
}
}
Customer Sync & Identity Resolution
Link Shopify Customer to Audience Profile
Copy
async function linkCustomerToAudience(supabase, customer) {
if (!customer.email) return
// Find or create audience profile
let { data: profile } = await supabase
.from('audience_profiles')
.select('id')
.eq('email', customer.email)
.single()
if (!profile) {
const { data: newProfile } = await supabase
.from('audience_profiles')
.insert({
email: customer.email,
first_name: customer.first_name,
last_name: customer.last_name,
phone: customer.phone,
lifecycle_stage: 'customer',
became_customer_at: new Date().toISOString()
})
.select('id')
.single()
profile = newProfile
}
// Link customer to profile
await supabase
.from('shopify_customers')
.update({ audience_profile_id: profile.id })
.eq('shopify_id', customer.id)
// Update profile with Shopify data
await supabase
.from('audience_profiles')
.update({
lifecycle_stage: 'customer',
platform_identities: supabase.sql`
jsonb_set(
COALESCE(platform_identities, '{}'),
'{shopify}',
'"${customer.id}"'
)
`
})
.eq('id', profile.id)
}
Metafield Sync (Supabase → Shopify)
For enriched data that needs to flow back to Shopify:Copy
// Update product metafields in Shopify
async function updateProductMetafields(productId: string, metafields: Record<string, any>) {
const client = new Shopify.Clients.Graphql(
Deno.env.get('SHOPIFY_SHOP_DOMAIN'),
Deno.env.get('SHOPIFY_ACCESS_TOKEN')
)
const metafieldInputs = Object.entries(metafields).map(([key, value]) => ({
namespace: 'trending_society',
key,
value: typeof value === 'string' ? value : JSON.stringify(value),
type: typeof value === 'string' ? 'single_line_text_field' : 'json'
}))
await client.query({
data: `
mutation updateProductMetafields($input: ProductInput!) {
productUpdate(input: $input) {
product { id }
userErrors { field message }
}
}
`,
variables: {
input: {
id: `gid://shopify/Product/${productId}`,
metafields: metafieldInputs
}
}
})
}
// Example: Sync vertical assignment back to Shopify
async function syncVerticalToShopify(productId: bigint, verticalSlug: string) {
await updateProductMetafields(productId.toString(), {
vertical: verticalSlug,
last_synced: new Date().toISOString()
})
}
Queries
Revenue by Attribution Source
Copy
SELECT
COALESCE(utm_source, 'direct') as source,
COUNT(*) as orders,
SUM(total_price) as revenue,
AVG(total_price) as aov
FROM shopify_orders
WHERE processed_at > now() - interval '30 days'
AND financial_status = 'paid'
GROUP BY 1
ORDER BY 3 DESC;
Revenue by Vertical
Copy
SELECT
v.name as vertical,
COUNT(o.id) as orders,
SUM(o.total_price) as revenue,
AVG(o.total_price) as aov
FROM shopify_orders o
JOIN publisher_verticals v ON o.attributed_vertical_id = v.id
WHERE o.processed_at > now() - interval '30 days'
AND o.financial_status = 'paid'
GROUP BY 1
ORDER BY 3 DESC;
Products Needing Restock
Copy
SELECT
title,
product_source,
total_inventory,
total_orders,
ROUND(total_orders::numeric / NULLIF(total_inventory, 0), 2) as velocity
FROM shopify_products
WHERE status = 'active'
AND track_inventory = true
AND total_inventory < 20
ORDER BY velocity DESC NULLS LAST;
Customer Lifetime Value
Copy
SELECT
c.email,
c.orders_count,
c.total_spent,
c.customer_value_tier,
c.predicted_ltv,
ap.wtp_score
FROM shopify_customers c
LEFT JOIN audience_profiles ap ON c.audience_profile_id = ap.id
WHERE c.orders_count > 0
ORDER BY c.total_spent DESC
LIMIT 100;
Related Documentation
| Document | Purpose |
|---|---|
| dropship-network.md | Supliful, Collective integration |
| vertical-collections.md | Product-content matching |
| fulfillment.md | Order routing and tracking |
| SCHEMA.md | Full table definitions |