Skip to main content

Shopify Sync

Parent: Store OVERVIEW
Status: Active
Store: y0nrcb-2y.myshopify.com → trendingsociety.com

Overview

Shopify is the source of truth for all commerce data. This document covers bidirectional sync between Shopify and Supabase for analytics, personalization, and cross-business-unit integration.
┌─────────────────────────────────────────────────────────────────────────┐
│                         SHOPIFY SYNC ARCHITECTURE                        │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   SHOPIFY (Source of Truth)                                             │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │  Products │ Orders │ Customers │ Inventory │ Fulfillments       │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                              │                                           │
│                              ▼ Webhooks                                  │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │                    SUPABASE EDGE FUNCTIONS                       │   │
│   │  /webhooks/shopify/products                                      │   │
│   │  /webhooks/shopify/orders                                        │   │
│   │  /webhooks/shopify/customers                                     │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                              │                                           │
│                              ▼                                           │
│   SUPABASE (Analytics & Integration)                                    │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │  shopify_products │ shopify_orders │ shopify_customers          │   │
│   │  shopify_fulfillments │ shopify_inventory_levels                │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                              │                                           │
│                              ▼                                           │
│   BUSINESS UNIT INTEGRATION                                             │
│   ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                    │
│   │  Publisher  │  │  Platform   │  │   Agency    │                    │
│   │  Affiliate  │  │  Licensing  │  │  Packages   │                    │
│   └─────────────┘  └─────────────┘  └─────────────┘                    │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Sync Direction

Data TypeDirectionTriggerLatency
ProductsShopify → SupabaseWebhookReal-time
OrdersShopify → SupabaseWebhookReal-time
CustomersShopify → SupabaseWebhookReal-time
FulfillmentsShopify → SupabaseWebhookReal-time
InventoryShopify → SupabaseWebhook + PollingReal-time / 15min
MetafieldsSupabase → ShopifyAPI callOn-demand
Rule: Shopify is always the source of truth for commerce data. Supabase is read-only mirror + analytics enrichment.

Core Tables

shopify_products

Mirror of Shopify product catalog with enrichment.
CREATE TABLE shopify_products (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Shopify identifiers
  shopify_id bigint UNIQUE NOT NULL,      -- Shopify's numeric ID
  shopify_gid text,                       -- GraphQL ID: gid://shopify/Product/123
  handle text UNIQUE NOT NULL,            -- URL slug
  
  -- Basic info
  title text NOT NULL,
  body_html text,
  vendor text,
  product_type text,
  
  -- Status
  status text DEFAULT 'active',           -- 'active', 'draft', 'archived'
  published_at timestamptz,
  
  -- Pricing (from first variant)
  price numeric(10,2),
  compare_at_price numeric(10,2),
  
  -- Media
  featured_image_url text,
  images jsonb DEFAULT '[]',              -- [{id, src, alt, position}]
  
  -- Categorization
  tags text[],
  collections jsonb DEFAULT '[]',         -- [{id, title, handle}]
  
  -- Variants summary
  variant_count integer DEFAULT 1,
  variants jsonb DEFAULT '[]',            -- Full variant data
  
  -- Inventory
  total_inventory integer DEFAULT 0,
  track_inventory boolean DEFAULT true,
  
  -- SEO
  seo_title text,
  seo_description text,
  
  -- Metafields (custom data)
  metafields jsonb DEFAULT '{}',
  
  -- ═══════════════════════════════════════════════════════════════════
  -- TRENDING SOCIETY ENRICHMENT
  -- ═══════════════════════════════════════════════════════════════════
  
  -- Vertical assignment
  vertical_id uuid REFERENCES publisher_verticals(id),
  
  -- Source type
  product_source text,                    -- 'supliful', 'collective', 'digital', 'service', 'owned'
  supplier_id text,                       -- External supplier ID
  
  -- Content linkage
  linked_posts uuid[],                    -- Publisher posts featuring this product
  
  -- Performance
  total_orders integer DEFAULT 0,
  total_revenue numeric(10,2) DEFAULT 0,
  conversion_rate numeric,
  
  -- Sync metadata
  shopify_created_at timestamptz,
  shopify_updated_at timestamptz,
  last_synced_at timestamptz DEFAULT now(),
  sync_error text,
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

CREATE INDEX idx_shopify_products_id ON shopify_products(shopify_id);
CREATE INDEX idx_shopify_products_handle ON shopify_products(handle);
CREATE INDEX idx_shopify_products_status ON shopify_products(status);
CREATE INDEX idx_shopify_products_vertical ON shopify_products(vertical_id);
CREATE INDEX idx_shopify_products_source ON shopify_products(product_source);
CREATE INDEX idx_shopify_products_tags ON shopify_products USING GIN(tags);

shopify_orders

Order records with attribution tracking.
CREATE TABLE shopify_orders (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Shopify identifiers
  shopify_id bigint UNIQUE NOT NULL,
  shopify_gid text,
  order_number integer NOT NULL,          -- Human-readable #1001
  name text,                              -- "#1001"
  
  -- Customer
  shopify_customer_id bigint,
  customer_email text,
  customer_name text,
  
  -- Financials
  subtotal_price numeric(10,2),
  total_discounts numeric(10,2),
  total_shipping numeric(10,2),
  total_tax numeric(10,2),
  total_price numeric(10,2),
  currency text DEFAULT 'USD',
  
  -- Status
  financial_status text,                  -- 'pending', 'paid', 'refunded', 'partially_refunded'
  fulfillment_status text,                -- null, 'fulfilled', 'partial', 'unfulfilled'
  cancelled_at timestamptz,
  cancel_reason text,
  
  -- Line items
  line_items jsonb DEFAULT '[]',          -- [{product_id, variant_id, title, quantity, price}]
  line_item_count integer DEFAULT 0,
  
  -- Shipping
  shipping_address jsonb,
  billing_address jsonb,
  shipping_lines jsonb DEFAULT '[]',
  
  -- Discounts
  discount_codes jsonb DEFAULT '[]',
  discount_applications jsonb DEFAULT '[]',
  
  -- ═══════════════════════════════════════════════════════════════════
  -- ATTRIBUTION (Critical for ROI)
  -- ═══════════════════════════════════════════════════════════════════
  
  -- UTM tracking
  utm_source text,
  utm_medium text,
  utm_campaign text,
  utm_term text,
  utm_content text,
  
  -- Landing page
  landing_page_url text,
  referring_site text,
  
  -- Internal attribution
  attributed_vertical_id uuid REFERENCES publisher_verticals(id),
  attributed_post_id uuid REFERENCES publisher_posts(id),
  attributed_affiliate_link_id uuid,
  
  -- Business unit
  business_unit text,                     -- 'store', 'agency', 'platform'
  
  -- Agency package (if applicable)
  agency_package_id uuid,
  agency_client_id uuid,
  
  -- Platform license (if applicable)  
  platform_license_id uuid,
  
  -- Audience profile
  audience_profile_id uuid REFERENCES audience_profiles(id),
  
  -- ═══════════════════════════════════════════════════════════════════
  -- FULFILLMENT ROUTING
  -- ═══════════════════════════════════════════════════════════════════
  
  fulfillment_type text,                  -- 'supliful', 'collective', 'digital', 'manual'
  requires_manual_fulfillment boolean DEFAULT false,
  
  -- Timestamps
  processed_at timestamptz,
  closed_at timestamptz,
  
  -- Sync
  shopify_created_at timestamptz,
  shopify_updated_at timestamptz,
  last_synced_at timestamptz DEFAULT now(),
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

CREATE INDEX idx_shopify_orders_id ON shopify_orders(shopify_id);
CREATE INDEX idx_shopify_orders_customer ON shopify_orders(shopify_customer_id);
CREATE INDEX idx_shopify_orders_email ON shopify_orders(customer_email);
CREATE INDEX idx_shopify_orders_status ON shopify_orders(financial_status, fulfillment_status);
CREATE INDEX idx_shopify_orders_date ON shopify_orders(processed_at DESC);
CREATE INDEX idx_shopify_orders_vertical ON shopify_orders(attributed_vertical_id);
CREATE INDEX idx_shopify_orders_utm ON shopify_orders(utm_source, utm_campaign);

shopify_customers

Customer records with lifecycle tracking.
CREATE TABLE shopify_customers (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Shopify identifiers
  shopify_id bigint UNIQUE NOT NULL,
  shopify_gid text,
  
  -- Contact
  email text,
  phone text,
  
  -- Name
  first_name text,
  last_name text,
  
  -- Status
  state text,                             -- 'enabled', 'disabled', 'invited', 'declined'
  verified_email boolean DEFAULT false,
  accepts_marketing boolean DEFAULT false,
  accepts_marketing_updated_at timestamptz,
  
  -- Address
  default_address jsonb,
  addresses jsonb DEFAULT '[]',
  
  -- Order history
  orders_count integer DEFAULT 0,
  total_spent numeric(10,2) DEFAULT 0,
  
  -- Tags
  tags text[],
  
  -- Notes
  note text,
  
  -- ═══════════════════════════════════════════════════════════════════
  -- TRENDING SOCIETY ENRICHMENT
  -- ═══════════════════════════════════════════════════════════════════
  
  -- Link to audience profile (unified identity)
  audience_profile_id uuid REFERENCES audience_profiles(id),
  
  -- Lifecycle
  lifecycle_stage text,                   -- 'prospect', 'first_purchase', 'repeat', 'vip', 'churned'
  first_order_at timestamptz,
  last_order_at timestamptz,
  
  -- Scoring
  rfm_score text,                         -- Recency-Frequency-Monetary: '555', '111', etc.
  customer_value_tier text,               -- 'bronze', 'silver', 'gold', 'platinum'
  
  -- Predictions
  predicted_next_order_date date,
  churn_risk_score integer,               -- 0-100
  predicted_ltv numeric(10,2),
  
  -- Sync
  shopify_created_at timestamptz,
  shopify_updated_at timestamptz,
  last_synced_at timestamptz DEFAULT now(),
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

CREATE INDEX idx_shopify_customers_id ON shopify_customers(shopify_id);
CREATE INDEX idx_shopify_customers_email ON shopify_customers(email);
CREATE INDEX idx_shopify_customers_audience ON shopify_customers(audience_profile_id);
CREATE INDEX idx_shopify_customers_lifecycle ON shopify_customers(lifecycle_stage);
CREATE INDEX idx_shopify_customers_value ON shopify_customers(customer_value_tier);

shopify_fulfillments

Fulfillment tracking for all order types.
CREATE TABLE shopify_fulfillments (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Shopify identifiers
  shopify_id bigint UNIQUE NOT NULL,
  shopify_order_id bigint NOT NULL,
  
  -- Status
  status text,                            -- 'pending', 'open', 'success', 'cancelled', 'error', 'failure'
  
  -- Tracking
  tracking_company text,
  tracking_number text,
  tracking_url text,
  tracking_numbers text[],
  tracking_urls text[],
  
  -- Shipment
  shipment_status text,                   -- 'label_printed', 'in_transit', 'out_for_delivery', 'delivered'
  
  -- Line items fulfilled
  line_items jsonb DEFAULT '[]',
  
  -- Location
  location_id bigint,
  
  -- Fulfillment service
  service text,                           -- 'manual', 'supliful', 'collective'
  
  -- Timestamps
  shopify_created_at timestamptz,
  shopify_updated_at timestamptz,
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

CREATE INDEX idx_shopify_fulfillments_order ON shopify_fulfillments(shopify_order_id);
CREATE INDEX idx_shopify_fulfillments_status ON shopify_fulfillments(status);
CREATE INDEX idx_shopify_fulfillments_tracking ON shopify_fulfillments(tracking_number);

Webhook Configuration

Required Webhooks

TopicEndpointPurpose
products/create/webhooks/shopify/productsNew product sync
products/update/webhooks/shopify/productsProduct changes
products/delete/webhooks/shopify/productsProduct removal
orders/create/webhooks/shopify/ordersNew order capture
orders/updated/webhooks/shopify/ordersStatus changes
orders/paid/webhooks/shopify/ordersPayment confirmation
orders/fulfilled/webhooks/shopify/ordersFulfillment complete
orders/cancelled/webhooks/shopify/ordersCancellation
customers/create/webhooks/shopify/customersNew customer
customers/update/webhooks/shopify/customersCustomer changes
fulfillments/create/webhooks/shopify/fulfillmentsFulfillment started
fulfillments/update/webhooks/shopify/fulfillmentsTracking updates
inventory_levels/update/webhooks/shopify/inventoryStock changes

Webhook Handler Pattern

// supabase/functions/webhooks-shopify-orders/index.ts
import { serve } from 'https://deno.land/[email protected]/http/server.ts'
import { createClient } from 'https://esm.sh/@supabase/supabase-js@2'
import { verifyShopifyWebhook } from '../_shared/shopify-verify.ts'

serve(async (req) => {
  // 1. Verify webhook signature
  const hmac = req.headers.get('x-shopify-hmac-sha256')
  const body = await req.text()
  
  if (!verifyShopifyWebhook(body, hmac)) {
    return new Response('Unauthorized', { status: 401 })
  }
  
  // 2. Parse payload
  const order = JSON.parse(body)
  const topic = req.headers.get('x-shopify-topic')
  
  // 3. Initialize Supabase
  const supabase = createClient(
    Deno.env.get('SUPABASE_URL')!,
    Deno.env.get('SUPABASE_SERVICE_ROLE_KEY')!
  )
  
  // 4. Handle by topic
  switch (topic) {
    case 'orders/create':
      await handleOrderCreate(supabase, order)
      break
    case 'orders/updated':
      await handleOrderUpdate(supabase, order)
      break
    case 'orders/paid':
      await handleOrderPaid(supabase, order)
      break
    // ... other cases
  }
  
  return new Response('OK', { status: 200 })
})

async function handleOrderCreate(supabase, order) {
  // Extract attribution from landing page / UTM
  const attribution = extractAttribution(order)
  
  // Upsert order
  const { error } = await supabase
    .from('shopify_orders')
    .upsert({
      shopify_id: order.id,
      shopify_gid: `gid://shopify/Order/${order.id}`,
      order_number: order.order_number,
      name: order.name,
      shopify_customer_id: order.customer?.id,
      customer_email: order.email,
      customer_name: `${order.customer?.first_name} ${order.customer?.last_name}`.trim(),
      subtotal_price: parseFloat(order.subtotal_price),
      total_discounts: parseFloat(order.total_discounts),
      total_shipping: parseFloat(order.total_shipping_price_set?.shop_money?.amount || 0),
      total_tax: parseFloat(order.total_tax),
      total_price: parseFloat(order.total_price),
      currency: order.currency,
      financial_status: order.financial_status,
      fulfillment_status: order.fulfillment_status,
      line_items: order.line_items,
      line_item_count: order.line_items.length,
      shipping_address: order.shipping_address,
      billing_address: order.billing_address,
      discount_codes: order.discount_codes,
      ...attribution,
      processed_at: order.processed_at,
      shopify_created_at: order.created_at,
      shopify_updated_at: order.updated_at,
      last_synced_at: new Date().toISOString()
    }, {
      onConflict: 'shopify_id'
    })
  
  if (error) throw error
  
  // Link to audience profile
  if (order.email) {
    await linkOrderToAudience(supabase, order)
  }
  
  // Trigger fulfillment routing
  await routeFulfillment(supabase, order)
}

function extractAttribution(order) {
  // Parse landing page URL for UTM params
  const landingUrl = order.landing_site || ''
  const url = new URL(landingUrl, 'https://trendingsociety.com')
  
  return {
    utm_source: url.searchParams.get('utm_source'),
    utm_medium: url.searchParams.get('utm_medium'),
    utm_campaign: url.searchParams.get('utm_campaign'),
    utm_term: url.searchParams.get('utm_term'),
    utm_content: url.searchParams.get('utm_content'),
    landing_page_url: landingUrl,
    referring_site: order.referring_site
  }
}

Product Sync Workflow

Initial Bulk Sync

// scripts/sync-shopify-products.ts
import Shopify from '@shopify/shopify-api'

async function bulkSyncProducts() {
  const client = new Shopify.Clients.Graphql(
    Deno.env.get('SHOPIFY_SHOP_DOMAIN'),
    Deno.env.get('SHOPIFY_ACCESS_TOKEN')
  )
  
  let hasNextPage = true
  let cursor = null
  
  while (hasNextPage) {
    const { body } = await client.query({
      data: `
        query getProducts($cursor: String) {
          products(first: 50, after: $cursor) {
            pageInfo {
              hasNextPage
              endCursor
            }
            edges {
              node {
                id
                legacyResourceId
                handle
                title
                descriptionHtml
                vendor
                productType
                status
                publishedAt
                featuredImage { url altText }
                images(first: 10) {
                  edges { node { url altText } }
                }
                tags
                variants(first: 100) {
                  edges {
                    node {
                      id
                      legacyResourceId
                      title
                      price
                      compareAtPrice
                      inventoryQuantity
                      sku
                    }
                  }
                }
                metafields(first: 20) {
                  edges {
                    node {
                      namespace
                      key
                      value
                    }
                  }
                }
              }
            }
          }
        }
      `,
      variables: { cursor }
    })
    
    const products = body.data.products.edges.map(e => e.node)
    
    for (const product of products) {
      await upsertProduct(product)
    }
    
    hasNextPage = body.data.products.pageInfo.hasNextPage
    cursor = body.data.products.pageInfo.endCursor
    
    // Rate limit
    await new Promise(r => setTimeout(r, 500))
  }
}

async function upsertProduct(product) {
  const variants = product.variants.edges.map(e => e.node)
  const firstVariant = variants[0]
  
  // Determine product source from metafields
  const sourceMetafield = product.metafields.edges.find(
    e => e.node.namespace === 'custom' && e.node.key === 'product_source'
  )
  
  await supabase
    .from('shopify_products')
    .upsert({
      shopify_id: parseInt(product.legacyResourceId),
      shopify_gid: product.id,
      handle: product.handle,
      title: product.title,
      body_html: product.descriptionHtml,
      vendor: product.vendor,
      product_type: product.productType,
      status: product.status.toLowerCase(),
      published_at: product.publishedAt,
      price: parseFloat(firstVariant?.price || 0),
      compare_at_price: firstVariant?.compareAtPrice 
        ? parseFloat(firstVariant.compareAtPrice) 
        : null,
      featured_image_url: product.featuredImage?.url,
      images: product.images.edges.map(e => e.node),
      tags: product.tags,
      variant_count: variants.length,
      variants: variants,
      total_inventory: variants.reduce((sum, v) => sum + (v.inventoryQuantity || 0), 0),
      product_source: sourceMetafield?.node.value || 'owned',
      metafields: Object.fromEntries(
        product.metafields.edges.map(e => [`${e.node.namespace}.${e.node.key}`, e.node.value])
      ),
      last_synced_at: new Date().toISOString()
    }, {
      onConflict: 'shopify_id'
    })
}

Vertical Auto-Assignment

-- Assign products to verticals based on tags/type
CREATE OR REPLACE FUNCTION assign_product_vertical()
RETURNS trigger AS $$
BEGIN
  -- Match by product type
  SELECT id INTO NEW.vertical_id
  FROM publisher_verticals
  WHERE slug = lower(NEW.product_type)
  LIMIT 1;
  
  -- If no match, try tags
  IF NEW.vertical_id IS NULL THEN
    SELECT v.id INTO NEW.vertical_id
    FROM publisher_verticals v
    WHERE v.slug = ANY(
      SELECT lower(unnest(NEW.tags))
    )
    LIMIT 1;
  END IF;
  
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER trg_assign_product_vertical
  BEFORE INSERT OR UPDATE ON shopify_products
  FOR EACH ROW
  EXECUTE FUNCTION assign_product_vertical();

Order Attribution

Attribution Hierarchy

Orders are attributed using this priority:
  1. UTM Parameters - Explicit campaign tracking
  2. Affiliate Link ID - Custom parameter for affiliate tracking
  3. Referring URL - Publisher post that referred
  4. Landing Page - First page visited
  5. Customer History - Previous vertical affinity
async function attributeOrder(supabase, order) {
  let attribution = {
    attributed_vertical_id: null,
    attributed_post_id: null,
    attributed_affiliate_link_id: null,
    business_unit: 'store'
  }
  
  // 1. Check for affiliate link ID
  const affiliateLinkId = extractAffiliateId(order.landing_site)
  if (affiliateLinkId) {
    const { data: link } = await supabase
      .from('affiliate_links')
      .select('id, post_id, vertical_id')
      .eq('id', affiliateLinkId)
      .single()
    
    if (link) {
      attribution.attributed_affiliate_link_id = link.id
      attribution.attributed_post_id = link.post_id
      attribution.attributed_vertical_id = link.vertical_id
      return attribution
    }
  }
  
  // 2. Check UTM campaign for vertical
  if (order.utm_campaign) {
    const { data: vertical } = await supabase
      .from('publisher_verticals')
      .select('id')
      .eq('slug', order.utm_campaign.split('_')[0])
      .single()
    
    if (vertical) {
      attribution.attributed_vertical_id = vertical.id
    }
  }
  
  // 3. Check referring URL for post
  if (order.referring_site?.includes('trendingsociety.com')) {
    const slug = extractPostSlug(order.referring_site)
    if (slug) {
      const { data: post } = await supabase
        .from('publisher_posts')
        .select('id, vertical_id')
        .eq('slug', slug)
        .single()
      
      if (post) {
        attribution.attributed_post_id = post.id
        attribution.attributed_vertical_id = post.vertical_id
      }
    }
  }
  
  // 4. Check for agency package product
  const agencyProduct = order.line_items.find(
    item => item.product_id && isAgencyProduct(item.product_id)
  )
  if (agencyProduct) {
    attribution.business_unit = 'agency'
    attribution.agency_package_id = agencyProduct.variant_id
  }
  
  // 5. Check for platform license product
  const platformProduct = order.line_items.find(
    item => item.product_id && isPlatformProduct(item.product_id)
  )
  if (platformProduct) {
    attribution.business_unit = 'platform'
    attribution.platform_license_id = platformProduct.variant_id
  }
  
  return attribution
}

Inventory Sync

Real-time Inventory Webhook

// Handle inventory level updates
async function handleInventoryUpdate(supabase, payload) {
  const { inventory_item_id, available, location_id } = payload
  
  // Find product by inventory item
  const { data: product } = await supabase
    .from('shopify_products')
    .select('id, shopify_id, variants')
    .filter('variants', 'cs', `[{"inventory_item_id": ${inventory_item_id}}]`)
    .single()
  
  if (!product) return
  
  // Update variant inventory in JSONB
  const updatedVariants = product.variants.map(v => {
    if (v.inventory_item_id === inventory_item_id) {
      return { ...v, inventory_quantity: available }
    }
    return v
  })
  
  // Recalculate total
  const totalInventory = updatedVariants.reduce(
    (sum, v) => sum + (v.inventory_quantity || 0), 
    0
  )
  
  await supabase
    .from('shopify_products')
    .update({
      variants: updatedVariants,
      total_inventory: totalInventory,
      last_synced_at: new Date().toISOString()
    })
    .eq('id', product.id)
  
  // Check for low stock alert
  if (totalInventory < 10) {
    await createLowStockAlert(product)
  }
}

Polling Fallback

// Cron: every 15 minutes
async function pollInventoryLevels() {
  const client = new Shopify.Clients.Rest(
    Deno.env.get('SHOPIFY_SHOP_DOMAIN'),
    Deno.env.get('SHOPIFY_ACCESS_TOKEN')
  )
  
  // Get all products with track_inventory = true
  const { data: products } = await supabase
    .from('shopify_products')
    .select('shopify_id, variants')
    .eq('track_inventory', true)
  
  for (const product of products) {
    const { body } = await client.get({
      path: `products/${product.shopify_id}`,
      query: { fields: 'variants' }
    })
    
    // Update inventory from response
    await updateProductInventory(product.shopify_id, body.product.variants)
    
    // Rate limit: 2 calls/second
    await new Promise(r => setTimeout(r, 500))
  }
}

Customer Sync & Identity Resolution

async function linkCustomerToAudience(supabase, customer) {
  if (!customer.email) return
  
  // Find or create audience profile
  let { data: profile } = await supabase
    .from('audience_profiles')
    .select('id')
    .eq('email', customer.email)
    .single()
  
  if (!profile) {
    const { data: newProfile } = await supabase
      .from('audience_profiles')
      .insert({
        email: customer.email,
        first_name: customer.first_name,
        last_name: customer.last_name,
        phone: customer.phone,
        lifecycle_stage: 'customer',
        became_customer_at: new Date().toISOString()
      })
      .select('id')
      .single()
    
    profile = newProfile
  }
  
  // Link customer to profile
  await supabase
    .from('shopify_customers')
    .update({ audience_profile_id: profile.id })
    .eq('shopify_id', customer.id)
  
  // Update profile with Shopify data
  await supabase
    .from('audience_profiles')
    .update({
      lifecycle_stage: 'customer',
      platform_identities: supabase.sql`
        jsonb_set(
          COALESCE(platform_identities, '{}'),
          '{shopify}',
          '"${customer.id}"'
        )
      `
    })
    .eq('id', profile.id)
}

Metafield Sync (Supabase → Shopify)

For enriched data that needs to flow back to Shopify:
// Update product metafields in Shopify
async function updateProductMetafields(productId: string, metafields: Record<string, any>) {
  const client = new Shopify.Clients.Graphql(
    Deno.env.get('SHOPIFY_SHOP_DOMAIN'),
    Deno.env.get('SHOPIFY_ACCESS_TOKEN')
  )
  
  const metafieldInputs = Object.entries(metafields).map(([key, value]) => ({
    namespace: 'trending_society',
    key,
    value: typeof value === 'string' ? value : JSON.stringify(value),
    type: typeof value === 'string' ? 'single_line_text_field' : 'json'
  }))
  
  await client.query({
    data: `
      mutation updateProductMetafields($input: ProductInput!) {
        productUpdate(input: $input) {
          product { id }
          userErrors { field message }
        }
      }
    `,
    variables: {
      input: {
        id: `gid://shopify/Product/${productId}`,
        metafields: metafieldInputs
      }
    }
  })
}

// Example: Sync vertical assignment back to Shopify
async function syncVerticalToShopify(productId: bigint, verticalSlug: string) {
  await updateProductMetafields(productId.toString(), {
    vertical: verticalSlug,
    last_synced: new Date().toISOString()
  })
}

Queries

Revenue by Attribution Source

SELECT 
  COALESCE(utm_source, 'direct') as source,
  COUNT(*) as orders,
  SUM(total_price) as revenue,
  AVG(total_price) as aov
FROM shopify_orders
WHERE processed_at > now() - interval '30 days'
  AND financial_status = 'paid'
GROUP BY 1
ORDER BY 3 DESC;

Revenue by Vertical

SELECT 
  v.name as vertical,
  COUNT(o.id) as orders,
  SUM(o.total_price) as revenue,
  AVG(o.total_price) as aov
FROM shopify_orders o
JOIN publisher_verticals v ON o.attributed_vertical_id = v.id
WHERE o.processed_at > now() - interval '30 days'
  AND o.financial_status = 'paid'
GROUP BY 1
ORDER BY 3 DESC;

Products Needing Restock

SELECT 
  title,
  product_source,
  total_inventory,
  total_orders,
  ROUND(total_orders::numeric / NULLIF(total_inventory, 0), 2) as velocity
FROM shopify_products
WHERE status = 'active'
  AND track_inventory = true
  AND total_inventory < 20
ORDER BY velocity DESC NULLS LAST;

Customer Lifetime Value

SELECT 
  c.email,
  c.orders_count,
  c.total_spent,
  c.customer_value_tier,
  c.predicted_ltv,
  ap.wtp_score
FROM shopify_customers c
LEFT JOIN audience_profiles ap ON c.audience_profile_id = ap.id
WHERE c.orders_count > 0
ORDER BY c.total_spent DESC
LIMIT 100;

DocumentPurpose
dropship-network.mdSupliful, Collective integration
vertical-collections.mdProduct-content matching
fulfillment.mdOrder routing and tracking
SCHEMA.mdFull table definitions