Skip to main content

Audience Intelligence

Parent: Shared Infrastructure OVERVIEW
Status: Active
Scope: Profiles → Events → Segments → Predictions

Overview

Audience Intelligence is the understanding layer - it tracks users across all touchpoints, builds unified profiles, segments audiences dynamically, and predicts behavior including willingness-to-pay.
┌─────────────────────────────────────────────────────────────────────────┐
│                   AUDIENCE INTELLIGENCE PIPELINE                         │
├─────────────────────────────────────────────────────────────────────────┤
│                                                                          │
│   DATA COLLECTION                                                        │
│   ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐               │
│   │ Website  │  │  Email   │  │  Store   │  │  Social  │               │
│   │ Events   │  │ Activity │  │ Purchases│  │Engagement│               │
│   └────┬─────┘  └────┬─────┘  └────┬─────┘  └────┬─────┘               │
│        └─────────────┴─────────────┴─────────────┘                      │
│                              │                                           │
│                              ▼                                           │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │                    UNIFIED PROFILE                               │   │
│   │   Identity Resolution → Merge Events → Build Profile             │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                              │                                           │
│              ┌───────────────┼───────────────┐                          │
│              ▼               ▼               ▼                          │
│   ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                 │
│   │   SEGMENTS   │  │  WTP SCORE   │  │ PREDICTIONS  │                 │
│   │              │  │              │  │              │                 │
│   │ Rule-based   │  │ Willingness  │  │ Churn risk   │                 │
│   │ ML-based     │  │ to pay       │  │ LTV          │                 │
│   │ Behavioral   │  │ 0-100 scale  │  │ Next action  │                 │
│   └──────────────┘  └──────────────┘  └──────────────┘                 │
│                              │                                           │
│                              ▼                                           │
│   ┌─────────────────────────────────────────────────────────────────┐   │
│   │                    ACTIVATION                                    │   │
│   │   Personalization → Targeting → Recommendations → Pricing        │   │
│   └─────────────────────────────────────────────────────────────────┘   │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Schema

Audience Profiles

Unified user identity across all touchpoints.
CREATE TABLE audience_profiles (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Identity
  email text UNIQUE,
  phone text,
  external_ids jsonb DEFAULT '{}',        -- {shopify_id, stripe_id, email_provider_id}
  
  -- Basic info
  first_name text,
  last_name text,
  display_name text,
  avatar_url text,
  
  -- Location
  city text,
  state text,
  country text,
  timezone text,
  
  -- Engagement summary
  first_seen_at timestamptz DEFAULT now(),
  last_seen_at timestamptz DEFAULT now(),
  total_sessions integer DEFAULT 0,
  total_pageviews integer DEFAULT 0,
  total_events integer DEFAULT 0,
  
  -- Content engagement
  articles_read integer DEFAULT 0,
  videos_watched integer DEFAULT 0,
  downloads integer DEFAULT 0,
  
  -- Email engagement
  email_subscribed boolean DEFAULT false,
  email_subscription_date timestamptz,
  emails_received integer DEFAULT 0,
  emails_opened integer DEFAULT 0,
  emails_clicked integer DEFAULT 0,
  
  -- Commerce
  total_orders integer DEFAULT 0,
  total_revenue numeric(10,2) DEFAULT 0,
  average_order_value numeric(10,2),
  last_order_at timestamptz,
  
  -- Interests & preferences
  interests text[],                       -- Inferred from behavior
  preferred_verticals text[],             -- Most engaged verticals
  preferred_content_types text[],         -- article, video, podcast
  preferred_channels text[],              -- email, sms, push
  
  -- Scoring
  engagement_score integer DEFAULT 0,     -- 0-100 overall engagement
  wtp_score integer DEFAULT 50,           -- 0-100 willingness to pay
  lead_score integer DEFAULT 0,           -- 0-100 sales readiness
  health_score integer DEFAULT 50,        -- 0-100 relationship health
  
  -- Lifecycle
  lifecycle_stage text DEFAULT 'anonymous', -- 'anonymous', 'visitor', 'subscriber', 'customer', 'advocate'
  acquisition_source text,
  acquisition_campaign text,
  acquisition_date timestamptz,
  
  -- RFM (Recency, Frequency, Monetary)
  rfm_recency integer,                    -- 1-5
  rfm_frequency integer,                  -- 1-5
  rfm_monetary integer,                   -- 1-5
  rfm_segment text,                       -- 'champions', 'loyal', 'at_risk', etc.
  
  -- Predictions
  predicted_ltv numeric(10,2),
  churn_risk numeric,                     -- 0-1 probability
  next_purchase_probability numeric,
  
  -- Privacy
  gdpr_consent boolean DEFAULT false,
  gdpr_consent_date timestamptz,
  do_not_track boolean DEFAULT false,
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

CREATE INDEX idx_audience_profiles_email ON audience_profiles(email);
CREATE INDEX idx_audience_profiles_lifecycle ON audience_profiles(lifecycle_stage);
CREATE INDEX idx_audience_profiles_wtp ON audience_profiles(wtp_score DESC);
CREATE INDEX idx_audience_profiles_engagement ON audience_profiles(engagement_score DESC);
CREATE INDEX idx_audience_profiles_last_seen ON audience_profiles(last_seen_at DESC);
CREATE INDEX idx_audience_profiles_interests ON audience_profiles USING GIN(interests);

Audience Events

Every user action logged for analysis.
CREATE TABLE audience_events (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Profile reference
  profile_id uuid REFERENCES audience_profiles(id),
  anonymous_id text,                      -- Before identification
  
  -- Event details
  event_type text NOT NULL,               -- 'page_view', 'click', 'purchase', 'email_open', etc.
  event_name text,                        -- Specific event name
  
  -- Context
  page_url text,
  page_title text,
  referrer text,
  
  -- Properties
  properties jsonb DEFAULT '{}',          -- Event-specific data
  
  -- Content reference
  content_id uuid,                        -- If related to content
  product_id uuid,                        -- If related to product
  
  -- Source
  source text,                            -- 'website', 'email', 'app', 'api'
  campaign text,
  medium text,
  
  -- Device & location
  device_type text,
  browser text,
  os text,
  ip_address text,
  country text,
  city text,
  
  -- Session
  session_id text,
  session_sequence integer,               -- Order within session
  
  -- Value
  revenue numeric(10,2),
  
  -- Timestamp
  occurred_at timestamptz DEFAULT now(),
  
  -- Partition by month for performance
  created_at timestamptz DEFAULT now()
) PARTITION BY RANGE (created_at);

-- Create partitions
CREATE TABLE audience_events_2025_01 PARTITION OF audience_events
  FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE audience_events_2025_02 PARTITION OF audience_events
  FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- Continue for each month...

CREATE INDEX idx_audience_events_profile ON audience_events(profile_id);
CREATE INDEX idx_audience_events_anonymous ON audience_events(anonymous_id);
CREATE INDEX idx_audience_events_type ON audience_events(event_type);
CREATE INDEX idx_audience_events_occurred ON audience_events(occurred_at DESC);
CREATE INDEX idx_audience_events_session ON audience_events(session_id);

Micro Segments

Dynamic audience groupings.
CREATE TABLE micro_segments (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Segment identity
  name text NOT NULL,
  slug text UNIQUE NOT NULL,
  description text,
  
  -- Type
  segment_type text NOT NULL,             -- 'rule_based', 'ml_based', 'behavioral', 'manual'
  
  -- Rules (for rule-based segments)
  rules jsonb DEFAULT '[]',               -- [{field, operator, value}]
  rule_logic text DEFAULT 'AND',          -- 'AND', 'OR'
  
  -- ML model (for ml-based segments)
  model_id text,
  model_threshold numeric,
  
  -- Behavioral (for behavioral segments)
  behavior_definition jsonb DEFAULT '{}',
  
  -- Membership
  member_count integer DEFAULT 0,
  last_calculated_at timestamptz,
  
  -- Usage
  business_unit text,                     -- 'publisher', 'platform', 'agency', 'store', 'all'
  use_cases text[],                       -- ['email_targeting', 'personalization', 'pricing']
  
  -- Refresh
  refresh_frequency interval DEFAULT '1 hour',
  auto_refresh boolean DEFAULT true,
  
  -- Status
  status text DEFAULT 'active',
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

CREATE INDEX idx_micro_segments_type ON micro_segments(segment_type);
CREATE INDEX idx_micro_segments_business_unit ON micro_segments(business_unit);
CREATE INDEX idx_micro_segments_status ON micro_segments(status);

Segment Membership

Profile ↔ Segment relationships.
CREATE TABLE segment_membership (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  profile_id uuid REFERENCES audience_profiles(id) NOT NULL,
  segment_id uuid REFERENCES micro_segments(id) NOT NULL,
  
  -- Membership details
  score numeric,                          -- Confidence/fit score (0-1)
  joined_at timestamptz DEFAULT now(),
  expires_at timestamptz,                 -- For time-limited segments
  
  -- Source
  source text DEFAULT 'automatic',        -- 'automatic', 'manual', 'import'
  
  UNIQUE(profile_id, segment_id)
);

CREATE INDEX idx_segment_membership_profile ON segment_membership(profile_id);
CREATE INDEX idx_segment_membership_segment ON segment_membership(segment_id);
CREATE INDEX idx_segment_membership_score ON segment_membership(score DESC);

WTP Scoring Rules

Willingness-to-pay calculation rules.
CREATE TABLE wtp_scoring_rules (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Rule identification
  name text NOT NULL,
  description text,
  
  -- Rule definition
  rule_type text NOT NULL,                -- 'attribute', 'behavior', 'aggregate'
  field text NOT NULL,                    -- Field to evaluate
  operator text NOT NULL,                 -- 'equals', 'greater_than', 'contains', 'exists'
  value jsonb,                            -- Comparison value
  
  -- Scoring
  score_adjustment integer NOT NULL,      -- Points to add/subtract
  max_contribution integer,               -- Cap on this rule's contribution
  
  -- Priority
  priority integer DEFAULT 100,           -- Lower = evaluated first
  
  -- Applicability
  applies_to text DEFAULT 'all',          -- 'all', 'new', 'returning'
  
  -- Status
  status text DEFAULT 'active',
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

CREATE INDEX idx_wtp_rules_status ON wtp_scoring_rules(status);
CREATE INDEX idx_wtp_rules_priority ON wtp_scoring_rules(priority);

Cohort Definitions

Behavioral cohorts for analysis.
CREATE TABLE cohort_definitions (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  -- Cohort identity
  name text NOT NULL,
  description text,
  
  -- Type
  cohort_type text NOT NULL,              -- 'acquisition', 'behavioral', 'lifecycle'
  
  -- Definition
  entry_criteria jsonb NOT NULL,          -- Rules for entering cohort
  exit_criteria jsonb,                    -- Rules for leaving (null = permanent)
  
  -- Time bounds
  cohort_period text,                     -- 'day', 'week', 'month'
  lookback_days integer,                  -- For behavioral cohorts
  
  -- Stats
  total_members integer DEFAULT 0,
  
  -- Status
  status text DEFAULT 'active',
  
  created_at timestamptz DEFAULT now(),
  updated_at timestamptz DEFAULT now()
);

Predictions

ML predictions per profile.
CREATE TABLE predictions (
  id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
  
  profile_id uuid REFERENCES audience_profiles(id) NOT NULL,
  
  -- Prediction type
  prediction_type text NOT NULL,          -- 'churn', 'ltv', 'next_purchase', 'upgrade'
  
  -- Prediction value
  predicted_value numeric,                -- The prediction (probability, amount, etc.)
  confidence numeric,                     -- Model confidence (0-1)
  
  -- Context
  model_version text,
  features_used jsonb DEFAULT '{}',       -- Key features that drove prediction
  
  -- Validity
  predicted_at timestamptz DEFAULT now(),
  valid_until timestamptz,
  
  UNIQUE(profile_id, prediction_type)
);

CREATE INDEX idx_predictions_profile ON predictions(profile_id);
CREATE INDEX idx_predictions_type ON predictions(prediction_type);
CREATE INDEX idx_predictions_value ON predictions(predicted_value DESC);

Identity Resolution

Track Events

// Track any user event
async function trackEvent(event: {
  anonymous_id?: string
  profile_id?: string
  event_type: string
  event_name?: string
  properties?: Record<string, any>
  page_url?: string
  session_id?: string
}) {
  // Get or create profile
  let profileId = event.profile_id
  
  if (!profileId && event.anonymous_id) {
    // Check if anonymous ID has been identified
    profileId = await getProfileByAnonymousId(event.anonymous_id)
  }
  
  // Store event
  await supabase
    .from('audience_events')
    .insert({
      profile_id: profileId,
      anonymous_id: event.anonymous_id,
      event_type: event.event_type,
      event_name: event.event_name,
      properties: event.properties,
      page_url: event.page_url,
      session_id: event.session_id,
      source: 'website',
      occurred_at: new Date().toISOString()
    })
  
  // Update profile stats if identified
  if (profileId) {
    await updateProfileStats(profileId, event)
  }
}

Identify User

// Link anonymous activity to known user
async function identifyUser(
  anonymousId: string,
  traits: {
    email?: string
    first_name?: string
    last_name?: string
    phone?: string
  }
) {
  // Find or create profile by email
  let profile = null
  
  if (traits.email) {
    const { data: existing } = await supabase
      .from('audience_profiles')
      .select('*')
      .eq('email', traits.email.toLowerCase())
      .single()
    
    if (existing) {
      profile = existing
    } else {
      const { data: created } = await supabase
        .from('audience_profiles')
        .insert({
          email: traits.email.toLowerCase(),
          first_name: traits.first_name,
          last_name: traits.last_name,
          phone: traits.phone,
          lifecycle_stage: 'visitor',
          acquisition_date: new Date().toISOString()
        })
        .select()
        .single()
      
      profile = created
    }
  }
  
  if (!profile) return null
  
  // Link anonymous events to profile
  await supabase
    .from('audience_events')
    .update({ profile_id: profile.id })
    .eq('anonymous_id', anonymousId)
    .is('profile_id', null)
  
  // Recalculate profile stats
  await recalculateProfileStats(profile.id)
  
  // Calculate initial scores
  await calculateWTPScore(profile.id)
  await calculateEngagementScore(profile.id)
  
  return profile
}

Merge Profiles

// Merge duplicate profiles
async function mergeProfiles(primaryId: string, secondaryId: string) {
  // Get both profiles
  const { data: [primary, secondary] } = await supabase
    .from('audience_profiles')
    .select('*')
    .in('id', [primaryId, secondaryId])
  
  if (!primary || !secondary) {
    throw new Error('Profile not found')
  }
  
  // Merge strategy: keep primary, augment with secondary
  const merged = {
    // Keep primary identity
    email: primary.email || secondary.email,
    phone: primary.phone || secondary.phone,
    first_name: primary.first_name || secondary.first_name,
    last_name: primary.last_name || secondary.last_name,
    
    // Merge external IDs
    external_ids: {
      ...secondary.external_ids,
      ...primary.external_ids
    },
    
    // Use earliest dates
    first_seen_at: primary.first_seen_at < secondary.first_seen_at 
      ? primary.first_seen_at 
      : secondary.first_seen_at,
    acquisition_date: primary.acquisition_date || secondary.acquisition_date,
    
    // Sum engagement stats
    total_sessions: primary.total_sessions + secondary.total_sessions,
    total_pageviews: primary.total_pageviews + secondary.total_pageviews,
    total_events: primary.total_events + secondary.total_events,
    total_orders: primary.total_orders + secondary.total_orders,
    total_revenue: primary.total_revenue + secondary.total_revenue,
    
    // Merge interests
    interests: [...new Set([
      ...(primary.interests || []),
      ...(secondary.interests || [])
    ])],
    
    // Recalculate later
    updated_at: new Date().toISOString()
  }
  
  // Update primary
  await supabase
    .from('audience_profiles')
    .update(merged)
    .eq('id', primaryId)
  
  // Move events
  await supabase
    .from('audience_events')
    .update({ profile_id: primaryId })
    .eq('profile_id', secondaryId)
  
  // Move segment memberships
  await supabase
    .from('segment_membership')
    .update({ profile_id: primaryId })
    .eq('profile_id', secondaryId)
  
  // Delete secondary
  await supabase
    .from('audience_profiles')
    .delete()
    .eq('id', secondaryId)
  
  // Recalculate scores
  await recalculateProfileStats(primaryId)
  
  return primaryId
}

WTP Scoring

Calculate WTP Score

async function calculateWTPScore(profileId: string) {
  const { data: profile } = await supabase
    .from('audience_profiles')
    .select('*')
    .eq('id', profileId)
    .single()
  
  if (!profile) return
  
  const { data: rules } = await supabase
    .from('wtp_scoring_rules')
    .select('*')
    .eq('status', 'active')
    .order('priority')
  
  let score = 50 // Base score
  const contributions: Record<string, number> = {}
  
  for (const rule of rules) {
    const contribution = evaluateWTPRule(rule, profile)
    
    if (contribution !== 0) {
      // Apply max contribution cap
      const cappedContribution = rule.max_contribution
        ? Math.min(Math.abs(contribution), rule.max_contribution) * Math.sign(contribution)
        : contribution
      
      contributions[rule.name] = cappedContribution
      score += cappedContribution
    }
  }
  
  // Clamp to 0-100
  score = Math.max(0, Math.min(100, score))
  
  // Update profile
  await supabase
    .from('audience_profiles')
    .update({
      wtp_score: score,
      updated_at: new Date().toISOString()
    })
    .eq('id', profileId)
  
  return { score, contributions }
}

function evaluateWTPRule(rule, profile) {
  const value = getNestedValue(profile, rule.field)
  
  switch (rule.operator) {
    case 'equals':
      return value === rule.value ? rule.score_adjustment : 0
    case 'greater_than':
      return value > rule.value ? rule.score_adjustment : 0
    case 'less_than':
      return value < rule.value ? rule.score_adjustment : 0
    case 'contains':
      return Array.isArray(value) && value.includes(rule.value) 
        ? rule.score_adjustment : 0
    case 'exists':
      return value != null ? rule.score_adjustment : 0
    case 'range':
      return value >= rule.value.min && value <= rule.value.max
        ? rule.score_adjustment : 0
    default:
      return 0
  }
}

Default WTP Rules

const defaultWTPRules = [
  // Positive signals
  {
    name: 'has_purchased',
    field: 'total_orders',
    operator: 'greater_than',
    value: 0,
    score_adjustment: 20
  },
  {
    name: 'high_aov',
    field: 'average_order_value',
    operator: 'greater_than',
    value: 100,
    score_adjustment: 15
  },
  {
    name: 'repeat_customer',
    field: 'total_orders',
    operator: 'greater_than',
    value: 2,
    score_adjustment: 15
  },
  {
    name: 'email_engaged',
    field: 'emails_clicked',
    operator: 'greater_than',
    value: 5,
    score_adjustment: 10
  },
  {
    name: 'high_engagement',
    field: 'engagement_score',
    operator: 'greater_than',
    value: 70,
    score_adjustment: 10
  },
  {
    name: 'premium_interest',
    field: 'interests',
    operator: 'contains',
    value: 'premium',
    score_adjustment: 10
  },
  
  // Negative signals
  {
    name: 'price_sensitive_browsing',
    field: 'interests',
    operator: 'contains',
    value: 'discount',
    score_adjustment: -15
  },
  {
    name: 'low_engagement',
    field: 'engagement_score',
    operator: 'less_than',
    value: 20,
    score_adjustment: -10
  },
  {
    name: 'churned_email',
    field: 'email_subscribed',
    operator: 'equals',
    value: false,
    score_adjustment: -5
  }
]

Segmentation

Rule-Based Segments

async function calculateRuleBasedSegment(segmentId: string) {
  const { data: segment } = await supabase
    .from('micro_segments')
    .select('*')
    .eq('id', segmentId)
    .single()
  
  if (!segment || segment.segment_type !== 'rule_based') return
  
  // Build query from rules
  let query = supabase
    .from('audience_profiles')
    .select('id')
  
  for (const rule of segment.rules) {
    query = applyRuleToQuery(query, rule, segment.rule_logic)
  }
  
  const { data: matchingProfiles } = await query
  
  // Clear existing memberships
  await supabase
    .from('segment_membership')
    .delete()
    .eq('segment_id', segmentId)
    .eq('source', 'automatic')
  
  // Add new memberships
  if (matchingProfiles?.length > 0) {
    await supabase
      .from('segment_membership')
      .insert(
        matchingProfiles.map(p => ({
          profile_id: p.id,
          segment_id: segmentId,
          score: 1,
          source: 'automatic'
        }))
      )
  }
  
  // Update segment stats
  await supabase
    .from('micro_segments')
    .update({
      member_count: matchingProfiles?.length || 0,
      last_calculated_at: new Date().toISOString()
    })
    .eq('id', segmentId)
}

function applyRuleToQuery(query, rule, logic) {
  switch (rule.operator) {
    case 'equals':
      return query.eq(rule.field, rule.value)
    case 'not_equals':
      return query.neq(rule.field, rule.value)
    case 'greater_than':
      return query.gt(rule.field, rule.value)
    case 'less_than':
      return query.lt(rule.field, rule.value)
    case 'contains':
      return query.contains(rule.field, [rule.value])
    case 'is_null':
      return query.is(rule.field, null)
    case 'is_not_null':
      return query.not(rule.field, 'is', null)
    default:
      return query
  }
}

Pre-Built Segments

const preBuiltSegments = [
  {
    name: 'High-Value Customers',
    slug: 'high-value-customers',
    segment_type: 'rule_based',
    rules: [
      { field: 'total_revenue', operator: 'greater_than', value: 500 },
      { field: 'total_orders', operator: 'greater_than', value: 2 }
    ],
    rule_logic: 'AND',
    use_cases: ['email_targeting', 'personalization']
  },
  {
    name: 'At-Risk Customers',
    slug: 'at-risk-customers',
    segment_type: 'rule_based',
    rules: [
      { field: 'total_orders', operator: 'greater_than', value: 0 },
      { field: 'last_order_at', operator: 'less_than', value: '90 days ago' },
      { field: 'health_score', operator: 'less_than', value: 40 }
    ],
    rule_logic: 'AND',
    use_cases: ['win_back', 'retention']
  },
  {
    name: 'Engaged Subscribers',
    slug: 'engaged-subscribers',
    segment_type: 'rule_based',
    rules: [
      { field: 'email_subscribed', operator: 'equals', value: true },
      { field: 'engagement_score', operator: 'greater_than', value: 60 }
    ],
    rule_logic: 'AND',
    use_cases: ['email_targeting', 'upsell']
  },
  {
    name: 'High WTP Non-Buyers',
    slug: 'high-wtp-non-buyers',
    segment_type: 'rule_based',
    rules: [
      { field: 'wtp_score', operator: 'greater_than', value: 70 },
      { field: 'total_orders', operator: 'equals', value: 0 }
    ],
    rule_logic: 'AND',
    use_cases: ['conversion', 'targeting']
  },
  {
    name: 'Champions',
    slug: 'champions',
    segment_type: 'rule_based',
    rules: [
      { field: 'rfm_segment', operator: 'equals', value: 'champions' }
    ],
    use_cases: ['loyalty', 'referral', 'feedback']
  }
]

Behavioral Segments

async function calculateBehavioralSegment(segmentId: string) {
  const { data: segment } = await supabase
    .from('micro_segments')
    .select('*')
    .eq('id', segmentId)
    .single()
  
  if (!segment || segment.segment_type !== 'behavioral') return
  
  const def = segment.behavior_definition
  
  // Query events for the lookback period
  const { data: qualifyingProfiles } = await supabase.rpc(
    'find_profiles_with_behavior',
    {
      event_type: def.event_type,
      event_count: def.min_count,
      lookback_days: def.lookback_days,
      event_properties: def.properties || {}
    }
  )
  
  // Update memberships
  await updateSegmentMemberships(segmentId, qualifyingProfiles)
}

// Supabase function
/*
CREATE OR REPLACE FUNCTION find_profiles_with_behavior(
  event_type text,
  event_count int,
  lookback_days int,
  event_properties jsonb DEFAULT '{}'
)
RETURNS TABLE (profile_id uuid, event_count bigint)
LANGUAGE sql STABLE
AS $$
  SELECT 
    profile_id,
    COUNT(*) as event_count
  FROM audience_events
  WHERE event_type = find_profiles_with_behavior.event_type
    AND occurred_at > now() - (lookback_days || ' days')::interval
    AND profile_id IS NOT NULL
    AND (
      event_properties = '{}' 
      OR properties @> find_profiles_with_behavior.event_properties
    )
  GROUP BY profile_id
  HAVING COUNT(*) >= find_profiles_with_behavior.event_count;
$$;
*/

RFM Analysis

Calculate RFM Scores

async function calculateRFMScores() {
  // Get customers with purchases
  const { data: customers } = await supabase
    .from('audience_profiles')
    .select('id, last_order_at, total_orders, total_revenue')
    .gt('total_orders', 0)
  
  // Calculate percentiles
  const recencyValues = customers.map(c => 
    daysSince(c.last_order_at)
  ).sort((a, b) => a - b)
  
  const frequencyValues = customers.map(c => 
    c.total_orders
  ).sort((a, b) => a - b)
  
  const monetaryValues = customers.map(c => 
    c.total_revenue
  ).sort((a, b) => a - b)
  
  for (const customer of customers) {
    const recencyDays = daysSince(customer.last_order_at)
    
    // Score 1-5 (5 is best)
    // For recency, lower days = higher score
    const rScore = 6 - getQuintile(recencyDays, recencyValues)
    const fScore = getQuintile(customer.total_orders, frequencyValues)
    const mScore = getQuintile(customer.total_revenue, monetaryValues)
    
    // Determine segment
    const segment = getRFMSegment(rScore, fScore, mScore)
    
    await supabase
      .from('audience_profiles')
      .update({
        rfm_recency: rScore,
        rfm_frequency: fScore,
        rfm_monetary: mScore,
        rfm_segment: segment
      })
      .eq('id', customer.id)
  }
}

function getQuintile(value: number, sortedValues: number[]) {
  const index = sortedValues.findIndex(v => v >= value)
  const percentile = index / sortedValues.length
  
  if (percentile <= 0.2) return 1
  if (percentile <= 0.4) return 2
  if (percentile <= 0.6) return 3
  if (percentile <= 0.8) return 4
  return 5
}

function getRFMSegment(r: number, f: number, m: number) {
  const score = r * 100 + f * 10 + m
  
  // Champions: Recent, frequent, high spend
  if (r >= 4 && f >= 4 && m >= 4) return 'champions'
  
  // Loyal: Frequent buyers
  if (f >= 4) return 'loyal'
  
  // Potential Loyalists: Recent with good frequency
  if (r >= 4 && f >= 2) return 'potential_loyalist'
  
  // Recent: Just bought, first time or low frequency
  if (r >= 4) return 'recent'
  
  // At Risk: Used to buy frequently, haven't recently
  if (r <= 2 && f >= 3) return 'at_risk'
  
  // Hibernating: Long time no purchase, used to be good
  if (r <= 2 && f >= 2 && m >= 2) return 'hibernating'
  
  // Lost: Worst scores
  if (r <= 2 && f <= 2) return 'lost'
  
  // Default
  return 'needs_attention'
}

Engagement Scoring

async function calculateEngagementScore(profileId: string) {
  const { data: profile } = await supabase
    .from('audience_profiles')
    .select('*')
    .eq('id', profileId)
    .single()
  
  if (!profile) return
  
  // Get recent events
  const { data: recentEvents } = await supabase
    .from('audience_events')
    .select('event_type, occurred_at')
    .eq('profile_id', profileId)
    .gte('occurred_at', new Date(Date.now() - 30 * 86400000).toISOString())
  
  let score = 0
  
  // Recency (last 7 days = 30 points, 14 days = 20, 30 days = 10)
  const daysSinceLastSeen = daysSince(profile.last_seen_at)
  if (daysSinceLastSeen <= 7) score += 30
  else if (daysSinceLastSeen <= 14) score += 20
  else if (daysSinceLastSeen <= 30) score += 10
  
  // Frequency (sessions in last 30 days)
  const recentSessions = new Set(recentEvents?.map(e => 
    e.occurred_at.split('T')[0]
  )).size
  score += Math.min(30, recentSessions * 3)
  
  // Depth (types of engagement)
  const engagementTypes = new Set(recentEvents?.map(e => e.event_type))
  if (engagementTypes.has('purchase')) score += 15
  if (engagementTypes.has('email_click')) score += 10
  if (engagementTypes.has('article_read')) score += 5
  if (engagementTypes.has('video_watch')) score += 5
  if (engagementTypes.has('download')) score += 5
  
  // Cap at 100
  score = Math.min(100, score)
  
  await supabase
    .from('audience_profiles')
    .update({
      engagement_score: score,
      updated_at: new Date().toISOString()
    })
    .eq('id', profileId)
  
  return score
}

Queries

Profile Summary by Lifecycle Stage

SELECT 
  lifecycle_stage,
  COUNT(*) as profile_count,
  ROUND(AVG(engagement_score), 1) as avg_engagement,
  ROUND(AVG(wtp_score), 1) as avg_wtp,
  ROUND(AVG(total_revenue), 2) as avg_revenue,
  COUNT(CASE WHEN email_subscribed THEN 1 END) as email_subscribers
FROM audience_profiles
GROUP BY lifecycle_stage
ORDER BY 
  CASE lifecycle_stage
    WHEN 'advocate' THEN 1
    WHEN 'customer' THEN 2
    WHEN 'subscriber' THEN 3
    WHEN 'visitor' THEN 4
    ELSE 5
  END;

Segment Performance

SELECT 
  ms.name as segment,
  ms.member_count,
  ROUND(AVG(ap.wtp_score), 1) as avg_wtp,
  ROUND(AVG(ap.total_revenue), 2) as avg_revenue,
  ROUND(AVG(ap.engagement_score), 1) as avg_engagement
FROM micro_segments ms
JOIN segment_membership sm ON ms.id = sm.segment_id
JOIN audience_profiles ap ON sm.profile_id = ap.id
WHERE ms.status = 'active'
GROUP BY ms.id, ms.name, ms.member_count
ORDER BY avg_revenue DESC;

WTP Distribution

SELECT 
  CASE 
    WHEN wtp_score >= 80 THEN 'Very High (80-100)'
    WHEN wtp_score >= 60 THEN 'High (60-79)'
    WHEN wtp_score >= 40 THEN 'Medium (40-59)'
    WHEN wtp_score >= 20 THEN 'Low (20-39)'
    ELSE 'Very Low (0-19)'
  END as wtp_tier,
  COUNT(*) as profile_count,
  ROUND(AVG(total_revenue), 2) as avg_revenue,
  COUNT(CASE WHEN total_orders > 0 THEN 1 END) as customers
FROM audience_profiles
GROUP BY 1
ORDER BY MIN(wtp_score) DESC;

RFM Segment Analysis

SELECT 
  rfm_segment,
  COUNT(*) as customer_count,
  ROUND(AVG(total_revenue), 2) as avg_revenue,
  ROUND(AVG(total_orders), 1) as avg_orders,
  ROUND(AVG(EXTRACT(EPOCH FROM (now() - last_order_at)) / 86400), 0) as avg_days_since_order
FROM audience_profiles
WHERE rfm_segment IS NOT NULL
GROUP BY rfm_segment
ORDER BY avg_revenue DESC;

DocumentPurpose
content-intelligence.mdContent analysis
generation-engine.mdContent generation
../store/shopify-sync.mdCustomer data from Shopify
SCHEMA.mdFull table definitions