Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 14 additions & 42 deletions nodejs/src/ingestion/cookieless/cookieless-manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { createTestEventHeaders } from '~/tests/helpers/event-headers'
import { createOrganization, createTeam, getTeam } from '~/tests/helpers/sql'

import { cookielessRedisErrorCounter } from '../../common/metrics'
import { CookielessServerHashMode, Hub, PipelineEvent, Team } from '../../types'
import { CookielessServerHashMode, EventHeaders, Hub, PipelineEvent, Team } from '../../types'
import { RedisOperationError } from '../../utils/db/error'
import { closeHub, createHub } from '../../utils/db/hub'
import { PostgresUse } from '../../utils/db/postgres'
Expand Down Expand Up @@ -334,13 +334,7 @@ describe('CookielessManager', () => {

async function processEvent(
event: PipelineEvent,
headers: {
token?: string
distinct_id?: string
timestamp?: string
force_disable_person_processing: boolean
historical_migration: boolean
} = createTestEventHeaders()
headers: EventHeaders = createTestEventHeaders()
): Promise<PipelineEvent | undefined> {
const response = await hub.cookielessManager.doBatch([{ event, team, message, headers }])
expect(response.length).toBe(1)
Expand All @@ -350,22 +344,10 @@ describe('CookielessManager', () => {

async function processEventWithHeaders(
event: PipelineEvent,
headers: {
token?: string
distinct_id?: string
timestamp?: string
force_disable_person_processing: boolean
historical_migration: boolean
}
headers: EventHeaders
): Promise<{
event: PipelineEvent | undefined
headers: {
token?: string
distinct_id?: string
timestamp?: string
force_disable_person_processing: boolean
historical_migration: boolean
}
headers: EventHeaders
}> {
const response = await hub.cookielessManager.doBatch([{ event, team, message, headers }])
expect(response.length).toBe(1)
Expand Down Expand Up @@ -518,13 +500,11 @@ describe('CookielessManager', () => {
})

it('should preserve headers through cookieless processing', async () => {
const testHeaders = {
const testHeaders = createTestEventHeaders({
token: 'test-token',
distinct_id: 'test-distinct-id',
timestamp: '1234567890',
force_disable_person_processing: false,
historical_migration: false,
}
})

const result = await processEventWithHeaders(event, testHeaders)

Expand All @@ -533,13 +513,11 @@ describe('CookielessManager', () => {
})

it('should preserve headers for non-cookieless events', async () => {
const testHeaders = {
const testHeaders = createTestEventHeaders({
token: 'test-token',
distinct_id: 'test-distinct-id',
timestamp: '1234567890',
force_disable_person_processing: false,
historical_migration: false,
}
})

const result = await processEventWithHeaders(nonCookielessEvent, testHeaders)

Expand All @@ -548,13 +526,11 @@ describe('CookielessManager', () => {
})

it('should not return dropped events but should not throw', async () => {
const testHeaders = {
const testHeaders = createTestEventHeaders({
token: 'test-token',
distinct_id: 'test-distinct-id',
timestamp: '1234567890',
force_disable_person_processing: false,
historical_migration: false,
}
})

// Test with alias event which should be dropped
const result = await processEventWithHeaders(aliasEvent, testHeaders)
Expand Down Expand Up @@ -902,13 +878,11 @@ describe('CookielessManager', () => {
expect(actual1).toBe(nonCookielessEvent)
})
it('should not return dropped cookieless events but should not throw', async () => {
const testHeaders = {
const testHeaders = createTestEventHeaders({
token: 'test-token',
distinct_id: 'test-distinct-id',
timestamp: '1234567890',
force_disable_person_processing: false,
historical_migration: false,
}
})

const result = await processEventWithHeaders(event, testHeaders)

Expand All @@ -917,13 +891,11 @@ describe('CookielessManager', () => {
expect(result.headers).toEqual(createTestEventHeaders())
})
it('should preserve headers when passing through non-cookieless events', async () => {
const testHeaders = {
const testHeaders = createTestEventHeaders({
token: 'test-token',
distinct_id: 'test-distinct-id',
timestamp: '1234567890',
force_disable_person_processing: false,
historical_migration: false,
}
})

const result = await processEventWithHeaders(nonCookielessEvent, testHeaders)

Expand Down
42 changes: 21 additions & 21 deletions nodejs/src/ingestion/error-tracking/prepare-event-step.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { DateTime } from 'luxon'

import { createTestEventHeaders } from '~/tests/helpers/event-headers'
import { createTestPluginEvent } from '~/tests/helpers/plugin-event'
import { createTestTeam } from '~/tests/helpers/team'
import { EventHeaders, Person } from '~/types'
import { Person } from '~/types'

import { PipelineResultType, isOkResult } from '../pipelines/results'
import { createErrorTrackingPrepareEventStep } from './prepare-event-step'
Expand All @@ -12,12 +13,6 @@ describe('createErrorTrackingPrepareEventStep', () => {

const team = createTestTeam({ id: 123, project_id: 456 as any })

const createTestHeaders = (overrides: Partial<EventHeaders> = {}): EventHeaders => ({
force_disable_person_processing: false,
historical_migration: false,
...overrides,
})

const createTestPerson = (overrides: Partial<Person> = {}): Person => ({
team_id: 123,
uuid: 'person-uuid-123',
Expand All @@ -40,7 +35,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
})
const person = createTestPerson()

const result = await step({ event, team, person, headers: createTestHeaders() })
const result = await step({ event, team, person, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -60,7 +55,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
const event = createTestPluginEvent({ event: '$exception' })
const person = createTestPerson({ uuid: 'existing-person-uuid' })

const result = await step({ event, team, person, headers: createTestHeaders() })
const result = await step({ event, team, person, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -73,7 +68,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
it('returns undefined person when person is null', async () => {
const event = createTestPluginEvent({ event: '$exception' })

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -85,11 +80,16 @@ describe('createErrorTrackingPrepareEventStep', () => {
const event = createTestPluginEvent({ event: '$exception' })

// With person
const resultWithPerson = await step({ event, team, person: createTestPerson(), headers: createTestHeaders() })
const resultWithPerson = await step({
event,
team,
person: createTestPerson(),
headers: createTestEventHeaders(),
})
expect(isOkResult(resultWithPerson) && resultWithPerson.value.processPerson).toBe(true)

// Without person
const resultWithoutPerson = await step({ event, team, person: null, headers: createTestHeaders() })
const resultWithoutPerson = await step({ event, team, person: null, headers: createTestEventHeaders() })
expect(isOkResult(resultWithoutPerson) && resultWithoutPerson.value.processPerson).toBe(true)
})

Expand All @@ -100,7 +100,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
event,
team,
person: null,
headers: createTestHeaders({ historical_migration: true }),
headers: createTestEventHeaders({ historical_migration: true }),
})

expect(result.type).toBe(PipelineResultType.OK)
Expand All @@ -112,7 +112,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
it('defaults historical_migration to false when not in headers', async () => {
const event = createTestPluginEvent({ event: '$exception' })

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -128,7 +128,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
timestamp: '2024-01-20T12:00:00.000Z',
})

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -142,7 +142,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
properties: null as any,
})

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -164,7 +164,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
},
})

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -186,7 +186,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
},
})

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -208,7 +208,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
},
})

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand All @@ -228,7 +228,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
event,
team,
person: null,
headers: createTestHeaders(),
headers: createTestEventHeaders(),
message: { topic: 'test-topic', partition: 0 } as any,
customField: 'should-be-preserved',
}
Expand All @@ -250,7 +250,7 @@ describe('createErrorTrackingPrepareEventStep', () => {
it('removes event from output but preserves team and adds preparedEvent', async () => {
const event = createTestPluginEvent({ event: '$exception' })

const result = await step({ event, team, person: null, headers: createTestHeaders() })
const result = await step({ event, team, person: null, headers: createTestEventHeaders() })

expect(result.type).toBe(PipelineResultType.OK)
if (isOkResult(result)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { createTestEventHeaders } from '../../../tests/helpers/event-headers'
import { createTestPipelineEvent } from '../../../tests/helpers/pipeline-event'
import { PipelineResultType } from '../pipelines/results'
import { OverflowRedirectService } from '../utils/overflow-redirect/overflow-redirect-service'
import { createOverflowLaneTTLRefreshStep } from './overflow-lane-ttl-refresh-step'
import { RateLimitToOverflowStepInput } from './rate-limit-to-overflow-step'

const createMockEvent = (token: string, distinctId: string, now?: Date): RateLimitToOverflowStepInput => ({
headers: {
token,
distinct_id: distinctId,
now: now ?? new Date(),
force_disable_person_processing: false,
historical_migration: false,
},
headers: createTestEventHeaders({ token, distinct_id: distinctId, now: now ?? new Date() }),
event: createTestPipelineEvent({ distinct_id: distinctId }),
})

Expand Down
Loading
Loading