diff --git a/nodejs/src/ingestion/cookieless/cookieless-manager.test.ts b/nodejs/src/ingestion/cookieless/cookieless-manager.test.ts index 6827f7e6866f..7042cf878c44 100644 --- a/nodejs/src/ingestion/cookieless/cookieless-manager.test.ts +++ b/nodejs/src/ingestion/cookieless/cookieless-manager.test.ts @@ -7,7 +7,7 @@ import { createTestEventHeaders } from '~/tests/helpers/event-headers' import { createOrganization, createTeam, getTeam } from '~/tests/helpers/sql' import { cookielessRedisErrorCounter } from '../../common/metrics' -import { CookielessServerHashMode, Hub, PipelineEvent, Team } from '../../types' +import { CookielessServerHashMode, EventHeaders, Hub, PipelineEvent, Team } from '../../types' import { RedisOperationError } from '../../utils/db/error' import { closeHub, createHub } from '../../utils/db/hub' import { PostgresUse } from '../../utils/db/postgres' @@ -334,13 +334,7 @@ describe('CookielessManager', () => { async function processEvent( event: PipelineEvent, - headers: { - token?: string - distinct_id?: string - timestamp?: string - force_disable_person_processing: boolean - historical_migration: boolean - } = createTestEventHeaders() + headers: EventHeaders = createTestEventHeaders() ): Promise { const response = await hub.cookielessManager.doBatch([{ event, team, message, headers }]) expect(response.length).toBe(1) @@ -350,22 +344,10 @@ describe('CookielessManager', () => { async function processEventWithHeaders( event: PipelineEvent, - headers: { - token?: string - distinct_id?: string - timestamp?: string - force_disable_person_processing: boolean - historical_migration: boolean - } + headers: EventHeaders ): Promise<{ event: PipelineEvent | undefined - headers: { - token?: string - distinct_id?: string - timestamp?: string - force_disable_person_processing: boolean - historical_migration: boolean - } + headers: EventHeaders }> { const response = await hub.cookielessManager.doBatch([{ event, team, message, headers }]) expect(response.length).toBe(1) @@ -518,13 +500,11 @@ describe('CookielessManager', () => { }) it('should preserve headers through cookieless processing', async () => { - const testHeaders = { + const testHeaders = createTestEventHeaders({ token: 'test-token', distinct_id: 'test-distinct-id', timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - } + }) const result = await processEventWithHeaders(event, testHeaders) @@ -533,13 +513,11 @@ describe('CookielessManager', () => { }) it('should preserve headers for non-cookieless events', async () => { - const testHeaders = { + const testHeaders = createTestEventHeaders({ token: 'test-token', distinct_id: 'test-distinct-id', timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - } + }) const result = await processEventWithHeaders(nonCookielessEvent, testHeaders) @@ -548,13 +526,11 @@ describe('CookielessManager', () => { }) it('should not return dropped events but should not throw', async () => { - const testHeaders = { + const testHeaders = createTestEventHeaders({ token: 'test-token', distinct_id: 'test-distinct-id', timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - } + }) // Test with alias event which should be dropped const result = await processEventWithHeaders(aliasEvent, testHeaders) @@ -902,13 +878,11 @@ describe('CookielessManager', () => { expect(actual1).toBe(nonCookielessEvent) }) it('should not return dropped cookieless events but should not throw', async () => { - const testHeaders = { + const testHeaders = createTestEventHeaders({ token: 'test-token', distinct_id: 'test-distinct-id', timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - } + }) const result = await processEventWithHeaders(event, testHeaders) @@ -917,13 +891,11 @@ describe('CookielessManager', () => { expect(result.headers).toEqual(createTestEventHeaders()) }) it('should preserve headers when passing through non-cookieless events', async () => { - const testHeaders = { + const testHeaders = createTestEventHeaders({ token: 'test-token', distinct_id: 'test-distinct-id', timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - } + }) const result = await processEventWithHeaders(nonCookielessEvent, testHeaders) diff --git a/nodejs/src/ingestion/error-tracking/prepare-event-step.test.ts b/nodejs/src/ingestion/error-tracking/prepare-event-step.test.ts index cf2cd392438e..03d0d4752331 100644 --- a/nodejs/src/ingestion/error-tracking/prepare-event-step.test.ts +++ b/nodejs/src/ingestion/error-tracking/prepare-event-step.test.ts @@ -1,8 +1,9 @@ import { DateTime } from 'luxon' +import { createTestEventHeaders } from '~/tests/helpers/event-headers' import { createTestPluginEvent } from '~/tests/helpers/plugin-event' import { createTestTeam } from '~/tests/helpers/team' -import { EventHeaders, Person } from '~/types' +import { Person } from '~/types' import { PipelineResultType, isOkResult } from '../pipelines/results' import { createErrorTrackingPrepareEventStep } from './prepare-event-step' @@ -12,12 +13,6 @@ describe('createErrorTrackingPrepareEventStep', () => { const team = createTestTeam({ id: 123, project_id: 456 as any }) - const createTestHeaders = (overrides: Partial = {}): EventHeaders => ({ - force_disable_person_processing: false, - historical_migration: false, - ...overrides, - }) - const createTestPerson = (overrides: Partial = {}): Person => ({ team_id: 123, uuid: 'person-uuid-123', @@ -40,7 +35,7 @@ describe('createErrorTrackingPrepareEventStep', () => { }) const person = createTestPerson() - const result = await step({ event, team, person, headers: createTestHeaders() }) + const result = await step({ event, team, person, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -60,7 +55,7 @@ describe('createErrorTrackingPrepareEventStep', () => { const event = createTestPluginEvent({ event: '$exception' }) const person = createTestPerson({ uuid: 'existing-person-uuid' }) - const result = await step({ event, team, person, headers: createTestHeaders() }) + const result = await step({ event, team, person, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -73,7 +68,7 @@ describe('createErrorTrackingPrepareEventStep', () => { it('returns undefined person when person is null', async () => { const event = createTestPluginEvent({ event: '$exception' }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -85,11 +80,16 @@ describe('createErrorTrackingPrepareEventStep', () => { const event = createTestPluginEvent({ event: '$exception' }) // With person - const resultWithPerson = await step({ event, team, person: createTestPerson(), headers: createTestHeaders() }) + const resultWithPerson = await step({ + event, + team, + person: createTestPerson(), + headers: createTestEventHeaders(), + }) expect(isOkResult(resultWithPerson) && resultWithPerson.value.processPerson).toBe(true) // Without person - const resultWithoutPerson = await step({ event, team, person: null, headers: createTestHeaders() }) + const resultWithoutPerson = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(isOkResult(resultWithoutPerson) && resultWithoutPerson.value.processPerson).toBe(true) }) @@ -100,7 +100,7 @@ describe('createErrorTrackingPrepareEventStep', () => { event, team, person: null, - headers: createTestHeaders({ historical_migration: true }), + headers: createTestEventHeaders({ historical_migration: true }), }) expect(result.type).toBe(PipelineResultType.OK) @@ -112,7 +112,7 @@ describe('createErrorTrackingPrepareEventStep', () => { it('defaults historical_migration to false when not in headers', async () => { const event = createTestPluginEvent({ event: '$exception' }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -128,7 +128,7 @@ describe('createErrorTrackingPrepareEventStep', () => { timestamp: '2024-01-20T12:00:00.000Z', }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -142,7 +142,7 @@ describe('createErrorTrackingPrepareEventStep', () => { properties: null as any, }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -164,7 +164,7 @@ describe('createErrorTrackingPrepareEventStep', () => { }, }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -186,7 +186,7 @@ describe('createErrorTrackingPrepareEventStep', () => { }, }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -208,7 +208,7 @@ describe('createErrorTrackingPrepareEventStep', () => { }, }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { @@ -228,7 +228,7 @@ describe('createErrorTrackingPrepareEventStep', () => { event, team, person: null, - headers: createTestHeaders(), + headers: createTestEventHeaders(), message: { topic: 'test-topic', partition: 0 } as any, customField: 'should-be-preserved', } @@ -250,7 +250,7 @@ describe('createErrorTrackingPrepareEventStep', () => { it('removes event from output but preserves team and adds preparedEvent', async () => { const event = createTestPluginEvent({ event: '$exception' }) - const result = await step({ event, team, person: null, headers: createTestHeaders() }) + const result = await step({ event, team, person: null, headers: createTestEventHeaders() }) expect(result.type).toBe(PipelineResultType.OK) if (isOkResult(result)) { diff --git a/nodejs/src/ingestion/event-preprocessing/overflow-lane-ttl-refresh-step.test.ts b/nodejs/src/ingestion/event-preprocessing/overflow-lane-ttl-refresh-step.test.ts index d0be8be317a8..277697e9e329 100644 --- a/nodejs/src/ingestion/event-preprocessing/overflow-lane-ttl-refresh-step.test.ts +++ b/nodejs/src/ingestion/event-preprocessing/overflow-lane-ttl-refresh-step.test.ts @@ -1,3 +1,4 @@ +import { createTestEventHeaders } from '../../../tests/helpers/event-headers' import { createTestPipelineEvent } from '../../../tests/helpers/pipeline-event' import { PipelineResultType } from '../pipelines/results' import { OverflowRedirectService } from '../utils/overflow-redirect/overflow-redirect-service' @@ -5,13 +6,7 @@ import { createOverflowLaneTTLRefreshStep } from './overflow-lane-ttl-refresh-st import { RateLimitToOverflowStepInput } from './rate-limit-to-overflow-step' const createMockEvent = (token: string, distinctId: string, now?: Date): RateLimitToOverflowStepInput => ({ - headers: { - token, - distinct_id: distinctId, - now: now ?? new Date(), - force_disable_person_processing: false, - historical_migration: false, - }, + headers: createTestEventHeaders({ token, distinct_id: distinctId, now: now ?? new Date() }), event: createTestPipelineEvent({ distinct_id: distinctId }), }) diff --git a/nodejs/src/ingestion/event-preprocessing/parse-headers.test.ts b/nodejs/src/ingestion/event-preprocessing/parse-headers.test.ts index 7e1be4dbd030..27299bf8490d 100644 --- a/nodejs/src/ingestion/event-preprocessing/parse-headers.test.ts +++ b/nodejs/src/ingestion/event-preprocessing/parse-headers.test.ts @@ -27,6 +27,7 @@ describe('createParseHeadersStep', () => { distinct_id: 'test-user', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -49,6 +50,7 @@ describe('createParseHeadersStep', () => { token: 'test-token', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -68,6 +70,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -87,6 +90,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -113,6 +117,7 @@ describe('createParseHeadersStep', () => { distinct_id: 'second-user', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -141,6 +146,7 @@ describe('createParseHeadersStep', () => { now: new Date('2023-01-01T12:00:00Z'), force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -167,6 +173,7 @@ describe('createParseHeadersStep', () => { timestamp: '2023-01-01T00:00:00Z', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -193,6 +200,7 @@ describe('createParseHeadersStep', () => { timestamp: '2023-01-01T00:00:00Z', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -221,6 +229,7 @@ describe('createParseHeadersStep', () => { distinct_id: 'test-user', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -248,6 +257,7 @@ describe('createParseHeadersStep', () => { session_id: 'test-session-123', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -268,6 +278,7 @@ describe('createParseHeadersStep', () => { session_id: 'string-session-456', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -288,6 +299,7 @@ describe('createParseHeadersStep', () => { session_id: '0192e72a-1dd2-7714-8000-8b3e4c123456', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -308,6 +320,7 @@ describe('createParseHeadersStep', () => { session_id: 'Custom-Session-ID', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -329,6 +342,7 @@ describe('createParseHeadersStep', () => { distinct_id: 'test-user', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -350,6 +364,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: true, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -369,6 +384,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -388,6 +404,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -408,6 +425,7 @@ describe('createParseHeadersStep', () => { token: 'test-token', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -427,6 +445,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: true, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -446,6 +465,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -471,6 +491,7 @@ describe('createParseHeadersStep', () => { distinct_id: 'test-user', force_disable_person_processing: true, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -493,6 +514,7 @@ describe('createParseHeadersStep', () => { now: new Date('2023-01-01T12:00:00Z'), force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -513,6 +535,7 @@ describe('createParseHeadersStep', () => { now: new Date('2023-01-01T12:00:00Z'), force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -539,6 +562,7 @@ describe('createParseHeadersStep', () => { now: new Date('2023-01-01T12:00:00Z'), force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -560,6 +584,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: true, + skip_heatmap_processing: false, }, }) ) @@ -579,6 +604,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -598,6 +624,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -618,6 +645,7 @@ describe('createParseHeadersStep', () => { token: 'test-token', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -637,6 +665,7 @@ describe('createParseHeadersStep', () => { headers: { force_disable_person_processing: false, historical_migration: true, + skip_heatmap_processing: false, }, }) ) @@ -663,6 +692,7 @@ describe('createParseHeadersStep', () => { distinct_id: 'test-user', force_disable_person_processing: false, historical_migration: true, + skip_heatmap_processing: false, }, }) ) @@ -685,6 +715,7 @@ describe('createParseHeadersStep', () => { distinct_id: 'user\uFFFD123', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }, }) ) @@ -705,6 +736,50 @@ describe('createParseHeadersStep', () => { token: 'test\uFFFDtoken', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, + }, + }) + ) + }) + }) + + describe('skip_heatmap_processing header', () => { + it('should parse skip_heatmap_processing as true when value is "true"', async () => { + const input = { + message: { + headers: [{ skip_heatmap_processing: Buffer.from('true') }], + } as Pick, + } + const result = await step(input) + + expect(result).toEqual( + ok({ + ...input, + headers: { + force_disable_person_processing: false, + historical_migration: false, + skip_heatmap_processing: true, + }, + }) + ) + }) + + it('should parse skip_heatmap_processing as false when missing', async () => { + const input = { + message: { + headers: [{ token: Buffer.from('test-token') }], + } as Pick, + } + const result = await step(input) + + expect(result).toEqual( + ok({ + ...input, + headers: { + token: 'test-token', + force_disable_person_processing: false, + historical_migration: false, + skip_heatmap_processing: false, }, }) ) diff --git a/nodejs/src/ingestion/event-preprocessing/rate-limit-to-overflow-step.test.ts b/nodejs/src/ingestion/event-preprocessing/rate-limit-to-overflow-step.test.ts index 02482d7bcdaa..c5d9b6cd2d23 100644 --- a/nodejs/src/ingestion/event-preprocessing/rate-limit-to-overflow-step.test.ts +++ b/nodejs/src/ingestion/event-preprocessing/rate-limit-to-overflow-step.test.ts @@ -1,3 +1,4 @@ +import { createTestEventHeaders } from '../../../tests/helpers/event-headers' import { createTestPipelineEvent } from '../../../tests/helpers/pipeline-event' import { OVERFLOW_OUTPUT } from '../common/outputs' import { PipelineResultType } from '../pipelines/results' @@ -5,13 +6,7 @@ import { OverflowRedirectService } from '../utils/overflow-redirect/overflow-red import { RateLimitToOverflowStepInput, createRateLimitToOverflowStep } from './rate-limit-to-overflow-step' const createMockEvent = (token: string, distinctId: string, now?: Date): RateLimitToOverflowStepInput => ({ - headers: { - token, - distinct_id: distinctId, - now: now ?? new Date(), - force_disable_person_processing: false, - historical_migration: false, - }, + headers: createTestEventHeaders({ token, distinct_id: distinctId, now: now ?? new Date() }), event: createTestPipelineEvent({ distinct_id: distinctId }), }) diff --git a/nodejs/src/ingestion/event-preprocessing/validate-event-metadata.test.ts b/nodejs/src/ingestion/event-preprocessing/validate-event-metadata.test.ts index 660ffd7f0257..60810639be54 100644 --- a/nodejs/src/ingestion/event-preprocessing/validate-event-metadata.test.ts +++ b/nodejs/src/ingestion/event-preprocessing/validate-event-metadata.test.ts @@ -1,3 +1,4 @@ +import { createTestEventHeaders } from '../../../tests/helpers/event-headers' import { EventHeaders } from '../../types' import { drop, ok } from '../pipelines/results' import { createValidateEventMetadataStep } from './validate-event-metadata' @@ -7,12 +8,10 @@ describe('createValidateEventMetadataStep', () => { let step: ReturnType beforeEach(() => { - mockHeaders = { + mockHeaders = createTestEventHeaders({ token: 'test-token-123', distinct_id: 'test-user-456', - force_disable_person_processing: false, - historical_migration: false, - } + }) step = createValidateEventMetadataStep() jest.clearAllMocks() diff --git a/nodejs/src/ingestion/event-processing/emit-event-step.test.ts b/nodejs/src/ingestion/event-processing/emit-event-step.test.ts index b8f7734ee818..9dbb74209b4a 100644 --- a/nodejs/src/ingestion/event-processing/emit-event-step.test.ts +++ b/nodejs/src/ingestion/event-processing/emit-event-step.test.ts @@ -329,12 +329,6 @@ describe('emit-event-step', () => { ...overrides, }) - const createHeaders = (overrides: Partial = {}): EventHeaders => ({ - force_disable_person_processing: false, - historical_migration: false, - ...overrides, - }) - beforeEach(() => { jest.useFakeTimers() jest.setSystemTime(FAKE_NOW_MS) @@ -355,7 +349,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: captureTime }), + headers: createTestEventHeaders({ now: captureTime }), message: createMessage(), } @@ -375,7 +369,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders(), + headers: createTestEventHeaders(), message: createMessage(), } @@ -390,7 +384,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), + headers: createTestEventHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), message: createMessage({ topic: undefined as unknown as string }), } @@ -405,7 +399,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), + headers: createTestEventHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), message: createMessage({ partition: undefined as unknown as number }), } @@ -421,7 +415,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), + headers: createTestEventHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), message: createMessage(), } @@ -439,7 +433,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), + headers: createTestEventHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), message: createMessage({ partition: 0 }), } @@ -459,7 +453,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: captureTime }), + headers: createTestEventHeaders({ now: captureTime }), message: createMessage(), } @@ -479,7 +473,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), + headers: createTestEventHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), message: createMessage({ partition: 3 }), } @@ -497,7 +491,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders(), + headers: createTestEventHeaders(), message: createMessage(), } @@ -512,7 +506,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), + headers: createTestEventHeaders({ now: new Date(FAKE_NOW_MS - 1000) }), message: createMessage({ partition: undefined as unknown as number }), } @@ -527,7 +521,7 @@ describe('emit-event-step', () => { const input: EmitEventStepInput = { eventsToEmit: [{ event: mockProcessedEvent, output: EVENTS_OUTPUT }], teamId: 1, - headers: createHeaders({ now: new Date(FAKE_NOW_MS - 2500) }), + headers: createTestEventHeaders({ now: new Date(FAKE_NOW_MS - 2500) }), message: createMessage({ partition: 0 }), } diff --git a/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.test.ts b/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.test.ts index 4ab593754b0c..a6176e12473e 100644 --- a/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.test.ts +++ b/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.test.ts @@ -1,3 +1,4 @@ +import { createTestEventHeaders } from '../../../tests/helpers/event-headers' import { createMockIngestionOutputs } from '../../../tests/helpers/mock-ingestion-outputs' import { ISOTimestamp, PreIngestionEvent, ProjectId } from '../../types' import { parseJSON } from '../../utils/json-parse' @@ -430,4 +431,52 @@ describe('createExtractHeatmapDataStep', () => { expect(message.scale_factor).toBe(16) }) }) + + describe('skip_heatmap_processing header (capture redirect)', () => { + it('skips extraction and strips $heatmap_data when skip_heatmap_processing is true', async () => { + const event = createTestEvent() + const headers = createTestEventHeaders({ skip_heatmap_processing: true }) + + const result = await step({ preparedEvent: event, headers }) + + expect(result.type).toBe(PipelineResultType.OK) + if (result.type === PipelineResultType.OK) { + expect(result.value.preparedEvent.properties.$heatmap_data).toBeUndefined() + expect(result.value.preparedEvent.properties.$current_url).toBe('http://localhost:3000/') + expect(result.sideEffects).toEqual([]) + expect(result.warnings).toEqual([]) + } + expect(mockOutputs.queueMessages).not.toHaveBeenCalled() + }) + + it('does not mutate the original event when skip_heatmap_processing is true', async () => { + const event = createTestEvent() + const originalEvent = cloneObject(event) + const headers = createTestEventHeaders({ skip_heatmap_processing: true }) + + await step({ preparedEvent: event, headers }) + + expect(event).toEqual(originalEvent) + expect(event.properties.$heatmap_data).toBeDefined() + }) + + it('extracts normally when skip_heatmap_processing is false', async () => { + const event = createTestEvent() + const headers = createTestEventHeaders({ skip_heatmap_processing: false }) + + const result = await step({ preparedEvent: event, headers }) + + expect(result.type).toBe(PipelineResultType.OK) + expect(mockOutputs.queueMessages).toHaveBeenCalledTimes(1) + }) + + it('extracts normally when headers are not provided', async () => { + const event = createTestEvent() + + const result = await step({ preparedEvent: event }) + + expect(result.type).toBe(PipelineResultType.OK) + expect(mockOutputs.queueMessages).toHaveBeenCalledTimes(1) + }) + }) }) diff --git a/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.ts b/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.ts index 1e16cd8ea1ea..072397cb0c20 100644 --- a/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.ts +++ b/nodejs/src/ingestion/event-processing/extract-heatmap-data-step.ts @@ -1,6 +1,6 @@ import { URL } from 'url' -import { PreIngestionEvent, RawClickhouseHeatmapEvent, TimestampFormat } from '../../types' +import { EventHeaders, PreIngestionEvent, RawClickhouseHeatmapEvent, TimestampFormat } from '../../types' import { logger } from '../../utils/logger' import { castTimestampOrNow } from '../../utils/utils' import { isDistinctIdIllegal } from '../../worker/ingestion/persons/person-merge-service' @@ -12,6 +12,7 @@ import { ProcessingStep } from '../pipelines/steps' export interface ExtractHeatmapDataStepInput { preparedEvent: PreIngestionEvent + headers?: EventHeaders } export type ExtractHeatmapDataStepResult = TInput & { @@ -24,8 +25,21 @@ export function createExtractHeatmapDataStep>> { - const { preparedEvent } = input + const { preparedEvent, headers } = input const { eventUuid } = preparedEvent + + // When capture has already redirected heatmap data to the heatmaps topic, + // skip extraction here — just strip the property and pass through. + if (headers?.skip_heatmap_processing) { + const { $heatmap_data, ...propertiesWithoutHeatmapData } = preparedEvent.properties + return Promise.resolve( + ok({ + ...input, + preparedEvent: { ...preparedEvent, properties: propertiesWithoutHeatmapData }, + }) + ) + } + const acks: Promise[] = [] const warnings: PipelineWarning[] = [] diff --git a/nodejs/src/ingestion/session_replay/team-filter-step.test.ts b/nodejs/src/ingestion/session_replay/team-filter-step.test.ts index e4a85db3584e..5a2c4da0d6a5 100644 --- a/nodejs/src/ingestion/session_replay/team-filter-step.test.ts +++ b/nodejs/src/ingestion/session_replay/team-filter-step.test.ts @@ -1,17 +1,11 @@ +import { createTestEventHeaders } from '../../../tests/helpers/event-headers' import { TeamForReplay } from '../../session-recording/teams/types' -import { EventHeaders } from '../../types' import { PipelineResultType } from '../pipelines/results' import { TeamFilterStepInput, TeamTokenResolver, createTeamFilterStep } from './team-filter-step' describe('createTeamFilterStep', () => { - const createHeaders = (token?: string): EventHeaders => ({ - token, - force_disable_person_processing: false, - historical_migration: false, - }) - const createInput = (token?: string): TeamFilterStepInput => ({ - headers: createHeaders(token), + headers: createTestEventHeaders({ token }), }) const defaultTeam: TeamForReplay = { diff --git a/nodejs/src/kafka/consumer.test.ts b/nodejs/src/kafka/consumer.test.ts index 0b5f8191bd4a..8163f91f08e2 100644 --- a/nodejs/src/kafka/consumer.test.ts +++ b/nodejs/src/kafka/consumer.test.ts @@ -1,6 +1,7 @@ import { CODES, Message, MessageHeader, KafkaConsumer as RdKafkaConsumer } from 'node-rdkafka' import { defaultConfig } from '~/config/config' +import { createTestEventHeaders } from '~/tests/helpers/event-headers' import { delay } from '../utils/utils' import { KafkaConsumer, parseEventHeaders, parseKafkaHeaders } from './consumer' @@ -544,42 +545,30 @@ describe('parseKafkaHeaders', () => { describe('parseEventHeaders', () => { it('should return empty object when headers is undefined', () => { const result = parseEventHeaders(undefined) - expect(result).toEqual({ force_disable_person_processing: false, historical_migration: false }) + expect(result).toEqual(createTestEventHeaders()) }) it('should return empty object when headers is empty array', () => { const result = parseEventHeaders([]) - expect(result).toEqual({ force_disable_person_processing: false, historical_migration: false }) + expect(result).toEqual(createTestEventHeaders()) }) it('should parse token header only', () => { const headers: MessageHeader[] = [{ token: Buffer.from('test-token') }] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ token: 'test-token' })) }) it('should parse distinct_id header only', () => { const headers: MessageHeader[] = [{ distinct_id: Buffer.from('user-123') }] const result = parseEventHeaders(headers) - expect(result).toEqual({ - distinct_id: 'user-123', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ distinct_id: 'user-123' })) }) it('should parse timestamp header only', () => { const headers: MessageHeader[] = [{ timestamp: Buffer.from('1234567890') }] const result = parseEventHeaders(headers) - expect(result).toEqual({ - timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ timestamp: '1234567890' })) }) it('should parse all supported headers', () => { @@ -591,13 +580,9 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - distinct_id: 'user-123', - timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual( + createTestEventHeaders({ token: 'test-token', distinct_id: 'user-123', timestamp: '1234567890' }) + ) }) it('should parse supported headers from multiple objects', () => { @@ -607,13 +592,9 @@ describe('parseEventHeaders', () => { { timestamp: Buffer.from('1234567890') }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - distinct_id: 'user-123', - timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual( + createTestEventHeaders({ token: 'test-token', distinct_id: 'user-123', timestamp: '1234567890' }) + ) }) it('should ignore unsupported headers', () => { @@ -626,12 +607,7 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - distinct_id: 'user-123', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ token: 'test-token', distinct_id: 'user-123' })) }) it('should handle empty buffer values', () => { @@ -643,13 +619,7 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: '', - distinct_id: '', - timestamp: '', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ token: '', distinct_id: '', timestamp: '' })) }) it('should handle duplicate keys by overwriting', () => { @@ -660,12 +630,7 @@ describe('parseEventHeaders', () => { { distinct_id: Buffer.from('second-id') }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'second-token', - distinct_id: 'second-id', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ token: 'second-token', distinct_id: 'second-id' })) }) it('should handle mixed supported and unsupported headers', () => { @@ -677,12 +642,7 @@ describe('parseEventHeaders', () => { { unsupported3: Buffer.from('still-ignored') }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ token: 'test-token', timestamp: '1234567890' })) }) it('should handle partial header sets', () => { @@ -694,32 +654,19 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - timestamp: '1234567890', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ token: 'test-token', timestamp: '1234567890' })) }) it('should parse event header', () => { const headers: MessageHeader[] = [{ event: Buffer.from('$pageview') }] const result = parseEventHeaders(headers) - expect(result).toEqual({ - event: '$pageview', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ event: '$pageview' })) }) it('should parse uuid header', () => { const headers: MessageHeader[] = [{ uuid: Buffer.from('123e4567-e89b-12d3-a456-426614174000') }] const result = parseEventHeaders(headers) - expect(result).toEqual({ - uuid: '123e4567-e89b-12d3-a456-426614174000', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ uuid: '123e4567-e89b-12d3-a456-426614174000' })) }) it('should parse all headers including new event and uuid', () => { @@ -733,15 +680,15 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - distinct_id: 'user-123', - timestamp: '1234567890', - event: '$pageview', - uuid: '123e4567-e89b-12d3-a456-426614174000', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual( + createTestEventHeaders({ + token: 'test-token', + distinct_id: 'user-123', + timestamp: '1234567890', + event: '$pageview', + uuid: '123e4567-e89b-12d3-a456-426614174000', + }) + ) }) it('should ignore unsupported headers but include event and uuid', () => { @@ -756,14 +703,14 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - distinct_id: 'user-123', - event: 'custom_event', - uuid: 'uuid-value', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual( + createTestEventHeaders({ + token: 'test-token', + distinct_id: 'user-123', + event: 'custom_event', + uuid: 'uuid-value', + }) + ) }) it('should handle empty event and uuid headers', () => { @@ -775,13 +722,7 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - event: '', - uuid: '', - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual(createTestEventHeaders({ token: 'test-token', event: '', uuid: '' })) }) describe('now header parsing', () => { @@ -825,13 +766,13 @@ describe('parseEventHeaders', () => { }, ] const result = parseEventHeaders(headers) - expect(result).toEqual({ - token: 'test-token', - distinct_id: 'user-123', - now: new Date(isoDate), - force_disable_person_processing: false, - historical_migration: false, - }) + expect(result).toEqual( + createTestEventHeaders({ + token: 'test-token', + distinct_id: 'user-123', + now: new Date(isoDate), + }) + ) }) it('should handle now header with milliseconds precision', () => { diff --git a/nodejs/src/kafka/consumer.ts b/nodejs/src/kafka/consumer.ts index 74615af947bd..64671c1b18b3 100644 --- a/nodejs/src/kafka/consumer.ts +++ b/nodejs/src/kafka/consumer.ts @@ -889,6 +889,7 @@ export const parseEventHeaders = (headers?: MessageHeader[]): EventHeaders => { const result: EventHeaders = { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, } headers?.forEach((header) => { @@ -915,6 +916,8 @@ export const parseEventHeaders = (headers?: MessageHeader[]): EventHeaders => { } } else if (key === 'historical_migration') { result.historical_migration = value === 'true' + } else if (key === 'skip_heatmap_processing') { + result.skip_heatmap_processing = value === 'true' } }) }) @@ -930,6 +933,7 @@ export const parseEventHeaders = (headers?: MessageHeader[]): EventHeaders => { 'now', 'force_disable_person_processing', 'historical_migration', + 'skip_heatmap_processing', ] as const trackedHeaders.forEach((header) => { const status = result[header] ? 'present' : 'absent' diff --git a/nodejs/src/session-recording/session-recording-restriction-handler.test.ts b/nodejs/src/session-recording/session-recording-restriction-handler.test.ts index 66c87ae810f2..8bd9ebac7a25 100644 --- a/nodejs/src/session-recording/session-recording-restriction-handler.test.ts +++ b/nodejs/src/session-recording/session-recording-restriction-handler.test.ts @@ -84,6 +84,7 @@ describe('SessionRecordingRestrictionHandler', () => { distinct_id: 'user-1', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }) expect(SessionRecordingIngesterMetrics.observeDroppedByRestrictions).not.toHaveBeenCalled() expect(SessionRecordingIngesterMetrics.observeOverflowedByRestrictions).not.toHaveBeenCalled() @@ -297,6 +298,7 @@ describe('SessionRecordingRestrictionHandler', () => { session_id: 'session-123', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }) }) @@ -358,6 +360,7 @@ describe('SessionRecordingRestrictionHandler', () => { session_id: 'session-123', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }) }) @@ -385,6 +388,7 @@ describe('SessionRecordingRestrictionHandler', () => { uuid: 'event-uuid-456', force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, }) }) diff --git a/nodejs/src/types.ts b/nodejs/src/types.ts index afdf9d1583d7..78922de5ed59 100644 --- a/nodejs/src/types.ts +++ b/nodejs/src/types.ts @@ -877,6 +877,7 @@ export interface EventHeaders { now?: Date force_disable_person_processing: boolean historical_migration: boolean + skip_heatmap_processing: boolean } export interface IncomingEvent { diff --git a/nodejs/tests/helpers/event-headers.ts b/nodejs/tests/helpers/event-headers.ts index ac79aa4d3fcc..bef533b7e727 100644 --- a/nodejs/tests/helpers/event-headers.ts +++ b/nodejs/tests/helpers/event-headers.ts @@ -23,6 +23,7 @@ export function createTestEventHeaders(overrides: Partial = {}): E return { force_disable_person_processing: false, historical_migration: false, + skip_heatmap_processing: false, ...overrides, } } diff --git a/rust/capture/src/ai_endpoint.rs b/rust/capture/src/ai_endpoint.rs index 27537c2f1d83..3b109909b198 100644 --- a/rust/capture/src/ai_endpoint.rs +++ b/rust/capture/src/ai_endpoint.rs @@ -542,6 +542,7 @@ fn build_kafka_event( skip_person_processing: restrictions.skip_person_processing(), redirect_to_dlq: restrictions.redirect_to_dlq(), redirect_to_topic: restrictions.redirect_to_topic().map(|s| s.to_string()), + skip_heatmap_processing: false, }; // Create ProcessedEvent diff --git a/rust/capture/src/events/analytics.rs b/rust/capture/src/events/analytics.rs index 17b4d1ed758f..6b89c97d7fc2 100644 --- a/rust/capture/src/events/analytics.rs +++ b/rust/capture/src/events/analytics.rs @@ -3,6 +3,7 @@ //! This module handles processing of regular analytics events (pageviews, custom events, //! exceptions, etc.) as opposed to recordings (session replay). +use std::collections::HashMap; use std::sync::Arc; use chrono::DateTime; @@ -21,6 +22,78 @@ use crate::{ v0_request::{DataType, ProcessedEvent, ProcessedEventMetadata, ProcessingContext}, }; +/// Property keys that the heatmap extraction pipeline reads from the event. +/// When a non-$$heatmap event carries heatmap data, we produce a second message +/// to the heatmaps topic containing only these properties. +const HEATMAP_PROPERTY_KEYS: &[&str] = &[ + "$heatmap_data", + "$viewport_height", + "$viewport_width", + "$session_id", + "$prev_pageview_pathname", + "$prev_pageview_max_scroll", + "$current_url", +]; + +/// Returns true if this event carries data that the heatmap extraction pipeline would process. +fn has_heatmap_data(event: &RawEvent) -> bool { + event.properties.contains_key("$heatmap_data") + || (event.properties.contains_key("$prev_pageview_pathname") + && event.properties.contains_key("$current_url")) +} + +/// Create a stripped-down $$heatmap event from a non-$$heatmap event that contains heatmap data. +/// The redirect carries only the properties the heatmap pipeline needs, and gets a new UUID +/// to avoid deduplication with the original event. +fn create_heatmap_redirect( + event: &RawEvent, + historical_cfg: router::HistoricalConfig, + context: &ProcessingContext, +) -> Result { + let mut properties = HashMap::new(); + + for key in HEATMAP_PROPERTY_KEYS { + if let Some(value) = event.properties.get(*key) { + properties.insert(key.to_string(), value.clone()); + } + } + + // Preserve distinct_id and $cookieless_mode — needed for routing key generation + if let Some(value) = event.properties.get("distinct_id") { + properties.insert("distinct_id".to_string(), value.clone()); + } + if let Some(value) = event.properties.get("$cookieless_mode") { + properties.insert("$cookieless_mode".to_string(), value.clone()); + } + + let heatmap_event = RawEvent { + token: event.token.clone(), + distinct_id: event.distinct_id.clone(), + uuid: Some(uuid_v7()), + event: "$$heatmap".to_string(), + properties, + timestamp: event.timestamp.clone(), + offset: event.offset, + set: None, + set_once: None, + }; + + process_single_event(&heatmap_event, historical_cfg, context) +} + +/// Strip heatmap-only properties from a ProcessedEvent's serialized data field +/// and mark it so the pipeline knows heatmap data was already redirected. +fn strip_heatmap_data(event: &mut ProcessedEvent) { + if let Ok(mut raw_event) = serde_json::from_str::(&event.event.data) { + if raw_event.properties.remove("$heatmap_data").is_some() { + if let Ok(data) = serde_json::to_string(&raw_event) { + event.event.data = data; + } + } + } + event.metadata.skip_heatmap_processing = true; +} + /// Process a single analytics event from RawEvent to ProcessedEvent #[instrument(skip_all, fields(event_name, request_id))] pub fn process_single_event( @@ -92,6 +165,7 @@ pub fn process_single_event( skip_person_processing: false, redirect_to_dlq: false, redirect_to_topic: None, + skip_heatmap_processing: false, }; if historical_cfg.should_reroute(metadata.data_type, parsed_timestamp.timestamp) { @@ -142,11 +216,41 @@ pub async fn process_events<'a>( Span::current().record("request_id", &context.request_id); Span::current().record("is_mirror_deploy", context.is_mirror_deploy); + // Identify which raw events need heatmap redirects before `events` is shadowed. + let needs_heatmap_redirect: Vec = events + .iter() + .map(|e| e.event != "$$heatmap" && has_heatmap_data(e)) + .collect(); + + let mut heatmap_redirects: Vec = Vec::new(); + for (e, needs_redirect) in events.iter().zip(needs_heatmap_redirect.iter()) { + if *needs_redirect { + match create_heatmap_redirect(e, historical_cfg.clone(), context) { + Ok(processed) => { + metrics::counter!("capture_heatmap_redirects_created").increment(1); + heatmap_redirects.push(processed); + } + Err(err) => { + error!("failed to create heatmap redirect: {err:#}"); + } + } + } + } + let mut events: Vec = events .iter() .map(|e| process_single_event(e, historical_cfg.clone(), context)) .collect::, CaptureError>>()?; + // Strip heatmap data from originals that got redirects + for (event, needs_redirect) in events.iter_mut().zip(needs_heatmap_redirect.iter()) { + if *needs_redirect { + strip_heatmap_data(event); + } + } + + events.extend(heatmap_redirects); + debug_or_info!(chatty_debug_enabled, context=?context, event_count=?events.len(), "created ProcessedEvents batch"); events.retain(|e| { @@ -835,4 +939,218 @@ mod tests { Some("custom_events_topic".to_string()) ); } + + fn create_event_with_heatmap_data() -> RawEvent { + let mut properties = HashMap::new(); + properties.insert("distinct_id".to_string(), json!("test_user")); + properties.insert( + "$heatmap_data".to_string(), + json!({"https://example.com": [{"x": 100, "y": 200, "target_fixed": false, "type": "click"}]}), + ); + properties.insert("$viewport_height".to_string(), json!(900)); + properties.insert("$viewport_width".to_string(), json!(1440)); + properties.insert("$session_id".to_string(), json!("session-abc")); + properties.insert("$current_url".to_string(), json!("https://example.com")); + properties.insert( + "other_prop".to_string(), + json!("should_not_appear_in_redirect"), + ); + + RawEvent { + uuid: Some(uuid_v7()), + distinct_id: None, + event: "$pageview".to_string(), + properties, + timestamp: Some("2023-01-01T11:00:00Z".to_string()), + offset: None, + set: None, + set_once: None, + token: Some("test_token".to_string()), + } + } + + #[test] + fn test_has_heatmap_data_with_heatmap_data_property() { + let event = create_event_with_heatmap_data(); + assert!(has_heatmap_data(&event)); + } + + #[test] + fn test_has_heatmap_data_with_scroll_depth_properties() { + let mut properties = HashMap::new(); + properties.insert("distinct_id".to_string(), json!("test_user")); + properties.insert("$prev_pageview_pathname".to_string(), json!("/old")); + properties.insert("$current_url".to_string(), json!("https://example.com/new")); + + let event = RawEvent { + uuid: Some(uuid_v7()), + distinct_id: None, + event: "$pageview".to_string(), + properties, + timestamp: None, + offset: None, + set: None, + set_once: None, + token: Some("test_token".to_string()), + }; + assert!(has_heatmap_data(&event)); + } + + #[test] + fn test_has_heatmap_data_without_heatmap_properties() { + let event = create_test_event(None, None, None); + assert!(!has_heatmap_data(&event)); + } + + #[test] + fn test_create_heatmap_redirect_properties_and_metadata() { + let now = Utc::now(); + let context = create_test_context(now, None); + let event = create_event_with_heatmap_data(); + let historical_cfg = router::HistoricalConfig::new(false, 1); + + let redirect = create_heatmap_redirect(&event, historical_cfg, &context).unwrap(); + + assert_eq!(redirect.metadata.data_type, DataType::HeatmapMain); + assert_eq!(redirect.metadata.event_name, "$$heatmap"); + assert!(!redirect.metadata.skip_heatmap_processing); + assert_eq!(redirect.event.event, "$$heatmap"); + assert_ne!(redirect.event.uuid, event.uuid.unwrap()); + + let data: RawEvent = serde_json::from_str(&redirect.event.data).unwrap(); + assert!(data.properties.contains_key("$heatmap_data")); + assert!(data.properties.contains_key("$viewport_height")); + assert!(data.properties.contains_key("$viewport_width")); + assert!(data.properties.contains_key("$session_id")); + assert!(data.properties.contains_key("$current_url")); + assert!(data.properties.contains_key("distinct_id")); + assert!( + !data.properties.contains_key("other_prop"), + "redirect should only contain heatmap properties" + ); + } + + #[test] + fn test_strip_heatmap_data_removes_property_and_sets_flag() { + let now = Utc::now(); + let context = create_test_context(now, None); + let event = create_event_with_heatmap_data(); + let historical_cfg = router::HistoricalConfig::new(false, 1); + + let mut processed = process_single_event(&event, historical_cfg, &context).unwrap(); + assert!(!processed.metadata.skip_heatmap_processing); + + strip_heatmap_data(&mut processed); + + assert!(processed.metadata.skip_heatmap_processing); + let data: RawEvent = serde_json::from_str(&processed.event.data).unwrap(); + assert!(!data.properties.contains_key("$heatmap_data")); + assert!( + data.properties.contains_key("$viewport_height"), + "non-$heatmap_data properties should be preserved" + ); + } + + #[tokio::test] + async fn test_process_events_creates_heatmap_redirect() { + let now = Utc::now(); + let context = create_test_context(now, None); + let events = vec![create_event_with_heatmap_data()]; + + let sink = Arc::new(MockSink::new()); + let dropper = Arc::new(limiters::token_dropper::TokenDropper::default()); + let historical_cfg = router::HistoricalConfig::new(false, 1); + + let result = process_events( + sink.clone(), + dropper, + None, + historical_cfg, + &events, + &context, + ) + .await; + + assert!(result.is_ok()); + let captured = sink.get_events(); + assert_eq!(captured.len(), 2, "should produce original + redirect"); + + let original = &captured[0]; + assert_eq!(original.event.event, "$pageview"); + assert!(original.metadata.skip_heatmap_processing); + let orig_data: RawEvent = serde_json::from_str(&original.event.data).unwrap(); + assert!( + !orig_data.properties.contains_key("$heatmap_data"), + "$heatmap_data should be stripped from original" + ); + + let redirect = &captured[1]; + assert_eq!(redirect.event.event, "$$heatmap"); + assert_eq!(redirect.metadata.data_type, DataType::HeatmapMain); + assert!(!redirect.metadata.skip_heatmap_processing); + } + + #[tokio::test] + async fn test_process_events_no_redirect_for_heatmap_event() { + let now = Utc::now(); + let context = create_test_context(now, None); + + let mut event = create_event_with_heatmap_data(); + event.event = "$$heatmap".to_string(); + let events = vec![event]; + + let sink = Arc::new(MockSink::new()); + let dropper = Arc::new(limiters::token_dropper::TokenDropper::default()); + let historical_cfg = router::HistoricalConfig::new(false, 1); + + let result = process_events( + sink.clone(), + dropper, + None, + historical_cfg, + &events, + &context, + ) + .await; + + assert!(result.is_ok()); + let captured = sink.get_events(); + assert_eq!( + captured.len(), + 1, + "$$heatmap events should not produce a redirect" + ); + assert_eq!(captured[0].metadata.data_type, DataType::HeatmapMain); + assert!(!captured[0].metadata.skip_heatmap_processing); + } + + #[tokio::test] + async fn test_process_events_no_redirect_without_heatmap_data() { + let now = Utc::now(); + let context = create_test_context(now, None); + let events = vec![create_test_event( + Some("2023-01-01T11:00:00Z".to_string()), + None, + None, + )]; + + let sink = Arc::new(MockSink::new()); + let dropper = Arc::new(limiters::token_dropper::TokenDropper::default()); + let historical_cfg = router::HistoricalConfig::new(false, 1); + + let result = process_events( + sink.clone(), + dropper, + None, + historical_cfg, + &events, + &context, + ) + .await; + + assert!(result.is_ok()); + let captured = sink.get_events(); + assert_eq!(captured.len(), 1); + assert!(!captured[0].metadata.skip_heatmap_processing); + } } diff --git a/rust/capture/src/events/recordings.rs b/rust/capture/src/events/recordings.rs index 05a43b5eb15e..45c3906f751a 100644 --- a/rust/capture/src/events/recordings.rs +++ b/rust/capture/src/events/recordings.rs @@ -311,6 +311,7 @@ pub async fn process_replay_events( skip_person_processing: applied.skip_person_processing(), redirect_to_dlq: applied.redirect_to_dlq(), redirect_to_topic: applied.redirect_to_topic().map(|s| s.to_string()), + skip_heatmap_processing: false, }; // Serialize snapshot data synchronously diff --git a/rust/capture/src/otel/filtering.rs b/rust/capture/src/otel/filtering.rs index bec3010e13bf..a4f4e7658e3a 100644 --- a/rust/capture/src/otel/filtering.rs +++ b/rust/capture/src/otel/filtering.rs @@ -122,6 +122,7 @@ pub fn build_events( skip_person_processing: restrictions.skip_person_processing(), redirect_to_dlq: restrictions.redirect_to_dlq(), redirect_to_topic: restrictions.redirect_to_topic().map(|s| s.to_string()), + skip_heatmap_processing: false, }; processed.push(ProcessedEvent { diff --git a/rust/capture/src/sinks/fallback.rs b/rust/capture/src/sinks/fallback.rs index 634e48d20f33..0540d980975a 100644 --- a/rust/capture/src/sinks/fallback.rs +++ b/rust/capture/src/sinks/fallback.rs @@ -160,6 +160,7 @@ mod tests { skip_person_processing: false, redirect_to_dlq: false, redirect_to_topic: None, + skip_heatmap_processing: false, }, }; @@ -209,6 +210,7 @@ mod tests { skip_person_processing: false, redirect_to_dlq: false, redirect_to_topic: None, + skip_heatmap_processing: false, }, }; diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index c5ee17487cdb..e6877f639eb5 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -317,6 +317,7 @@ impl KafkaSinkBase

{ let skip_person_processing = metadata.skip_person_processing; let redirect_to_dlq = metadata.redirect_to_dlq; let redirect_to_topic = metadata.redirect_to_topic; + let skip_heatmap_processing = metadata.skip_heatmap_processing; // Use the event's to_headers() method for consistent header serialization let mut headers = event.to_headers(); @@ -328,6 +329,10 @@ impl KafkaSinkBase

{ headers.set_force_disable_person_processing(true); } + if skip_heatmap_processing { + headers.set_skip_heatmap_processing(true); + } + // Check for redirect_to_dlq first - takes priority over all other routing let (topic, partition_key): (&str, Option<&str>) = if redirect_to_dlq { counter!( @@ -632,6 +637,7 @@ mod tests { skip_person_processing: false, redirect_to_dlq: false, redirect_to_topic: None, + skip_heatmap_processing: false, }; let event = ProcessedEvent { @@ -779,6 +785,7 @@ mod tests { now: Some("2023-01-01T12:00:00Z".to_string()), force_disable_person_processing: None, historical_migration: Some(true), + skip_heatmap_processing: None, dlq_reason: None, dlq_step: None, dlq_timestamp: None, @@ -799,6 +806,7 @@ mod tests { now: Some("2023-01-01T12:00:00Z".to_string()), force_disable_person_processing: None, historical_migration: Some(false), + skip_heatmap_processing: None, dlq_reason: None, dlq_step: None, dlq_timestamp: None, @@ -827,6 +835,7 @@ mod tests { now: Some(test_now.clone()), force_disable_person_processing: None, historical_migration: None, + skip_heatmap_processing: None, dlq_reason: None, dlq_step: None, dlq_timestamp: None, @@ -860,6 +869,7 @@ mod tests { now: Some(test_now.clone()), force_disable_person_processing: None, historical_migration: None, + skip_heatmap_processing: None, dlq_reason: Some("test reason".to_string()), dlq_step: Some("test step".to_string()), dlq_timestamp: Some(dlq_timestamp.clone()), @@ -940,6 +950,7 @@ mod tests { skip_person_processing: input.skip_person_processing, redirect_to_dlq: input.redirect_to_dlq, redirect_to_topic: input.redirect_to_topic.clone(), + skip_heatmap_processing: false, }; ProcessedEvent { event, metadata } diff --git a/rust/capture/src/sinks/s3.rs b/rust/capture/src/sinks/s3.rs index 6ca53567ec0f..fbe94426030e 100644 --- a/rust/capture/src/sinks/s3.rs +++ b/rust/capture/src/sinks/s3.rs @@ -362,6 +362,7 @@ mod tests { skip_person_processing: false, redirect_to_dlq: false, redirect_to_topic: None, + skip_heatmap_processing: false, }, } } diff --git a/rust/capture/src/v0_request.rs b/rust/capture/src/v0_request.rs index e1190dd7b228..8183300f1c06 100644 --- a/rust/capture/src/v0_request.rs +++ b/rust/capture/src/v0_request.rs @@ -189,6 +189,9 @@ pub struct ProcessedEventMetadata { pub redirect_to_dlq: bool, /// Redirect this event to a custom topic (set by event restrictions) pub redirect_to_topic: Option, + /// Signal to the pipeline that heatmap data for this event was redirected + /// to the heatmaps topic and should not be extracted again + pub skip_heatmap_processing: bool, } #[cfg(test)] diff --git a/rust/common/types/src/event.rs b/rust/common/types/src/event.rs index 6ac4baa25a77..54b18e0db8d4 100644 --- a/rust/common/types/src/event.rs +++ b/rust/common/types/src/event.rs @@ -119,6 +119,7 @@ pub struct CapturedEventHeaders { pub now: Option, pub force_disable_person_processing: Option, pub historical_migration: Option, + pub skip_heatmap_processing: Option, pub dlq_reason: Option, pub dlq_step: Option, pub dlq_timestamp: Option, @@ -129,6 +130,10 @@ impl CapturedEventHeaders { self.force_disable_person_processing = Some(value); } + pub fn set_skip_heatmap_processing(&mut self, value: bool) { + self.skip_heatmap_processing = Some(value); + } + pub fn set_dlq_reason(&mut self, value: String) { self.dlq_reason = Some(value); } @@ -186,7 +191,14 @@ impl From for OwnedHeaders { value: historical_migration_str.as_deref(), }); - // To prevent adding bloat to the other topic headers, only add add dlq headers when present. + // Only add optional headers when present, to avoid bloating every message. + if let Some(skip_heatmap_processing) = headers.skip_heatmap_processing { + let val = skip_heatmap_processing.to_string(); + owned = owned.insert(Header { + key: "skip_heatmap_processing", + value: Some(val.as_str()), + }); + } if let Some(ref reason) = headers.dlq_reason { owned = owned.insert(Header { key: "dlq_reason", @@ -235,6 +247,9 @@ impl From for CapturedEventHeaders { historical_migration: headers_map .get("historical_migration") .and_then(|v| v.parse::().ok()), + skip_heatmap_processing: headers_map + .get("skip_heatmap_processing") + .and_then(|v| v.parse::().ok()), dlq_reason: headers_map.get("dlq_reason").cloned(), dlq_step: headers_map.get("dlq_step").cloned(), dlq_timestamp: headers_map.get("dlq_timestamp").cloned(), @@ -294,7 +309,7 @@ impl CapturedEvent { } else { None }, - // DLQ headers should only be explicitly set when needed + skip_heatmap_processing: None, dlq_reason: None, dlq_step: None, dlq_timestamp: None,