Skip to content

Architecture

Overview

The Priority Pipeline is a monthly LinkedIn profile scraping system that uses BrightData to scrape priority profiles (executives, directors, key decision-makers). It uses a webhook-based architecture with Cloud Run services and jobs.

Pipeline Flow

1. Coordinator Job (Cloud Run Job - Monthly Schedule)
   ↓
2. BigQuery Query
   - Fetch priority profiles from PersonFields (isPriority=TRUE)
   - Join with PersonIdentifier for LinkedIn usernames
   - Exclude profiles scraped in last 25 days
   ↓
3. Create Batches (20 profiles each)
   ↓
4. Publish to Pub/Sub (with rate limiting)
   ↓
5. API Service receives Pub/Sub push
   ↓
6. Trigger BrightData scraping with webhook URL
   ↓
7. BrightData scrapes profiles (asynchronously)
   ↓
8. BrightData POSTs data to Webhook Service
   ↓
9. Webhook Service:
   - Inserts to LinkedinApiCall table
   - Backs up to GCS bucket

Components

Cloud Run Services

Service Purpose Trigger
priority-pipeline-api Receives Pub/Sub batches, triggers BrightData Pub/Sub push
priority-pipeline-webhook Receives scraped data from BrightData HTTP webhook

Cloud Run Jobs

Job Purpose Schedule
priority-pipeline-coordinator Query profiles, create batches, publish to Pub/Sub Monthly (5th at 00:00 UTC)

Data Storage

BigQuery Tables: - linkedin.LinkedinApiCall - Stores scraped profile data - credentity.PersonFields - Source for priority profiles - credmodel_google.PersonIdentifier - LinkedIn usernames

GCS Bucket: - brightdata-monthly-priority-people - Backup storage

Pub/Sub Topic: - linkedin-scraping-batches - Batch distribution

Data Flow Details

1. Coordinator Job (Monthly)

Schedule: 5th of each month at 00:00 UTC

Process: 1. Queries BigQuery for priority profiles 2. Filters out profiles scraped in last 25 days 3. Creates batches of 20 profiles each 4. Publishes batches to Pub/Sub with rate limiting: - Small delay after 100 batches (5 seconds) - Large delay after 1000 batches (5 minutes)

SQL Query (from config.py):

SELECT DISTINCT
  pi.identifierValue AS linkedin_username,
  pf.personId
FROM `credentity.PersonFields` pf
JOIN `credmodel_google.PersonIdentifier` pi 
  ON pf.personId = pi.personId
WHERE pf.isPriority = TRUE
  AND pi.identifierType = 'LINKEDIN'
  AND pi.identifierValue IS NOT NULL
  AND NOT EXISTS (
    SELECT 1 
    FROM `linkedin.LinkedinApiCall` lac
    WHERE lac.requestResource = pi.identifierValue
      AND lac.requestType = 'BRIGHTDATA_API_PERSON'
      AND lac.requestDate > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 25 DAY)
  )

2. API Service

Trigger: Pub/Sub push subscription

Process: 1. Receives batch from Pub/Sub 2. Decodes base64 message 3. For each profile in batch: - Calls BrightData API - Passes webhook URL for result delivery 4. Returns success/failure status

Configuration: - Concurrency: 1 (processes one batch at a time) - Max instances: 1000 - Timeout: 3600 seconds

3. BrightData Scraping

Service: BrightData API

Process: 1. Receives scraping request from API service 2. Scrapes LinkedIn profile 3. POSTs result to webhook URL

Typical Duration: 5-30 minutes per profile

4. Webhook Service

Trigger: HTTP POST from BrightData

Process: 1. Receives scraped profile data 2. Validates data format 3. Writes to BigQuery LinkedinApiCall table 4. Backs up to GCS bucket 5. Returns success status

Configuration: - Concurrency: 1 - Max instances: 5000 - Timeout: 3600 seconds

Configuration Settings

From config.py:

Setting Value Description
batch_size 20 Profiles per batch
refresh_window_days 25 Skip profiles scraped within this window
batches_per_small_group 100 Batches before 5s delay
batches_per_large_group 1000 Batches before 5min delay

Error Handling

Coordinator Job Failures

  • Job has max-retries=0 (no automatic retries)
  • Manual re-execution required if failed
  • Logs available in Cloud Logging

API Service Failures

  • Pub/Sub retries automatically
  • Failed batches are redelivered
  • Dead letter queue after max retries

Webhook Failures

  • BrightData retries webhook delivery
  • Data preserved in BrightData dashboard
  • Manual recovery possible from BrightData

File Structure

src/priority_pipeline/
  api_main.py              # API service entry point
  webhook_main.py          # Webhook service entry point
  config.py                # Configuration and SQL queries

  api/
    routers/
      batch_trigger.py     # POST /batch-trigger/ endpoint

  bigquery/
    linkedin_api_call.py   # LinkedinApiCall table operations
    queries.py             # SQL queries

  brightdata/
    person_scraper.py      # BrightData API client

  gcs/
    backup.py              # GCS backup operations

  jobs/
    coordinator.py         # Coordinator job (monthly)

  webhook/
    handler.py             # Webhook data processing