diff --git a/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts new file mode 100644 index 0000000000..302adbfc3f --- /dev/null +++ b/services/apps/data_sink_worker/src/bin/fix-member-attributes.ts @@ -0,0 +1,293 @@ +import isEqual from 'lodash.isequal' +import mergeWith from 'lodash.mergewith' + +import { connQx, updateMember } from '@crowd/data-access-layer' +import { + DbConnOrTx, + DbStore, + WRITE_DB_CONFIG, + getDbConnection, +} from '@crowd/data-access-layer/src/database' +import { getServiceLogger } from '@crowd/logging' +import { REDIS_CONFIG, getRedisClient } from '@crowd/redis' + +import MemberAttributeService from '../service/memberAttribute.service' + +/* eslint-disable @typescript-eslint/no-explicit-any */ + +const BATCH_SIZE = process.env.TEST_RUN ? 1 : 5000 + +const log = getServiceLogger() + +async function getMemberIds( + db: DbConnOrTx, + lastId?: string, +): Promise<{ id: string; attributes: any; manuallyChangedFields: any }[]> { + try { + log.debug({ lastId }, 'Querying for members with attribute issues') + + const results = await db.any( + ` + with relevant_members as (with member_with_attributes as (select id, + "createdAt", + jsonb_object_keys(attributes) as attr_key, + attributes -> jsonb_object_keys(attributes) as attr_value + from members + where "deletedAt" is null + and attributes is not null + and attributes != 'null'::jsonb + and attributes != '{}'::jsonb) + select distinct id + from member_with_attributes + where jsonb_typeof(attr_value) = 'object' + and coalesce(attr_value ->> 'default', '') = '' + and exists (select 1 + from jsonb_each_text(attr_value) as kv + where kv.key != 'default' + and coalesce(kv.value, '') != '') + ${lastId ? `and id < '${lastId}'` : ''} + order by id desc + limit ${BATCH_SIZE}) +select m.id, m.attributes, m."manuallyChangedFields" +from members m + inner join + relevant_members rm on rm.id = m.id +order by m.id desc; + `, + { lastId }, + ) + + log.debug( + { + resultCount: results.length, + lastId, + firstId: results.length > 0 ? results[0].id : null, + lastResultId: results.length > 0 ? results[results.length - 1].id : null, + }, + 'Query completed', + ) + + return results + } catch (error) { + log.error( + { + error: error.message, + lastId, + stack: error.stack, + }, + 'Failed to query member IDs', + ) + throw error + } +} + +setImmediate(async () => { + let dbClient: DbConnOrTx | undefined + let redisClient: any | undefined + let exitCode = 0 + + try { + log.info('Starting member attributes fix script') + + // Initialize connections + log.info('Connecting to database...') + dbClient = await getDbConnection(WRITE_DB_CONFIG()) + log.info('Database connection established') + + log.info('Connecting to Redis...') + redisClient = await getRedisClient(REDIS_CONFIG()) + log.info('Redis connection established') + + const pgQx = connQx(dbClient) + const mas = new MemberAttributeService(redisClient, new DbStore(log, dbClient), log) + + let totalProcessed = 0 + let totalUpdated = 0 + let batchNumber = 1 + + log.info('Starting to process members with attribute issues') + let membersToFix = await getMemberIds(dbClient) + log.info({ count: membersToFix.length }, 'Found members to process in first batch') + + while (membersToFix.length > 0) { + log.info({ batchNumber, batchSize: membersToFix.length }, 'Processing batch') + let batchUpdated = 0 + + for (const data of membersToFix) { + try { + if (data.attributes) { + log.debug( + { memberId: data.id, oldAttributes: data.attributes }, + 'Processing member attributes', + ) + + // check if any has default empty but other are full + let toProcess = false + for (const attName of Object.keys(data.attributes)) { + const defValue = data.attributes[attName].default + + if (defValue === undefined || defValue === null || String(defValue) === '') { + log.debug( + { + memberId: data.id, + attribute: data.attributes[attName], + attName, + defValue: defValue ? String(defValue) : 'undefined', + }, + 'Attribute has default empty', + ) + for (const platform of Object.keys(data.attributes[attName]).filter( + (p) => p !== 'default', + )) { + const value = data.attributes[attName][platform] + + if (value !== undefined && value !== null && String(value) !== '') { + log.debug( + { memberId: data.id, attName, platform, value }, + 'Found value for attribute', + ) + toProcess = true + break + } + } + + if (toProcess) { + break + } + } + } + + if (toProcess) { + const oldAttributes = JSON.parse(JSON.stringify(data.attributes)) // Deep copy + data.attributes = await mas.setAttributesDefaultValues(data.attributes) + + let attributes: Record | undefined + const temp = mergeWith({}, oldAttributes, data.attributes) + const manuallyChangedFields: string[] = data.manuallyChangedFields || [] + + if (manuallyChangedFields.length > 0) { + log.warn( + { + memberId: data.id, + manuallyChangedFieldsCount: manuallyChangedFields.length, + }, + 'Member has manually changed fields', + ) + + const prefix = 'attributes.' + const manuallyChangedAttributes = [ + ...new Set( + manuallyChangedFields + .filter((f) => f.startsWith(prefix)) + .map((f) => f.slice(prefix.length)), + ), + ] + + log.warn( + { + memberId: data.id, + manuallyChangedAttributes, + }, + 'Preserving manually changed attributes', + ) + + // Preserve manually changed attributes + for (const key of manuallyChangedAttributes) { + if (oldAttributes?.[key] !== undefined) { + temp[key] = oldAttributes[key] // Fixed: removed .attributes + } + } + } + + if (!isEqual(temp, oldAttributes)) { + attributes = temp + log.info({ memberId: data.id }, 'Attributes changed, will update') + } else { + log.debug( + { memberId: data.id, newAttributes: temp, oldAttributes }, + 'No changes needed for attributes', + ) + } + + if (attributes) { + log.info({ memberId: data.id }, 'Updating member attributes') + + if (!process.env.TEST_RUN) { + await updateMember(pgQx, data.id, { attributes } as any) + } + + batchUpdated++ + totalUpdated++ + log.debug({ memberId: data.id }, 'Member attributes updated successfully') + } + } else { + log.debug( + { memberId: data.id, attributes: data.attributes }, + 'No changes needed for attributes', + ) + } + } else { + log.debug({ memberId: data.id }, 'Member has no attributes to process') + } + + totalProcessed++ + } catch (error) { + log.error( + { + error: error.message, + memberId: data.id, + stack: error.stack, + }, + 'Failed to process member', + ) + // Continue processing other members + } + } + + log.info( + { + batchNumber, + batchProcessed: membersToFix.length, + batchUpdated, + totalProcessed, + totalUpdated, + }, + 'Completed batch processing', + ) + + // Get next batch + const lastId = membersToFix[membersToFix.length - 1].id + log.debug({ lastId }, 'Fetching next batch starting from last ID') + + if (process.env.TEST_RUN) { + break + } + + membersToFix = await getMemberIds(dbClient, lastId) + log.info({ count: membersToFix.length }, 'Found members for next batch') + + batchNumber++ + } + + log.info( + { + totalProcessed, + totalUpdated, + totalBatches: batchNumber - 1, + }, + 'Member attributes fix completed successfully', + ) + } catch (error) { + log.error( + { + error: error.message, + stack: error.stack, + }, + 'Fatal error in member attributes fix script', + ) + exitCode = 1 + } finally { + log.info('Script execution completed') + process.exit(exitCode) + } +}) diff --git a/services/apps/data_sink_worker/src/service/activity.service.ts b/services/apps/data_sink_worker/src/service/activity.service.ts index ed36ec614f..6dbd8ebde7 100644 --- a/services/apps/data_sink_worker/src/service/activity.service.ts +++ b/services/apps/data_sink_worker/src/service/activity.service.ts @@ -1068,7 +1068,7 @@ export default class ActivityService extends LoggerBase { segmentIds: Set resultIds: Set integrationId: string - platform: string + platform: PlatformType username: string timestamp: string } diff --git a/services/apps/data_sink_worker/src/service/member.service.ts b/services/apps/data_sink_worker/src/service/member.service.ts index 73970672f3..385674ab33 100644 --- a/services/apps/data_sink_worker/src/service/member.service.ts +++ b/services/apps/data_sink_worker/src/service/member.service.ts @@ -65,7 +65,7 @@ export default class MemberService extends LoggerBase { segmentIds: string[], integrationId: string, data: IMemberCreateData, - source: string, + platform: PlatformType, releaseMemberLock?: () => Promise, ): Promise { return logExecutionTimeV2( @@ -89,8 +89,7 @@ export default class MemberService extends LoggerBase { let attributes: Record = {} if (data.attributes) { attributes = await logExecutionTimeV2( - () => - memberAttributeService.validateAttributes(data.attributes, source as PlatformType), + () => memberAttributeService.validateAttributes(platform, data.attributes), this.log, 'memberService -> create -> validateAttributes', ) @@ -190,7 +189,7 @@ export default class MemberService extends LoggerBase { if (data.organizations) { for (const org of data.organizations) { const id = await logExecutionTimeV2( - () => orgService.findOrCreate(source, integrationId, org), + () => orgService.findOrCreate(platform, integrationId, org), this.log, 'memberService -> create -> findOrCreateOrg', ) @@ -265,7 +264,7 @@ export default class MemberService extends LoggerBase { data: IMemberUpdateData, original: IDbMember, originalIdentities: IMemberIdentity[], - source: string, + platform: PlatformType, releaseMemberLock?: () => Promise, ): Promise { await logExecutionTimeV2( @@ -282,8 +281,7 @@ export default class MemberService extends LoggerBase { if (data.attributes) { this.log.trace({ memberId: id }, 'Validating member attributes!') data.attributes = await logExecutionTimeV2( - () => - memberAttributeService.validateAttributes(data.attributes, source as PlatformType), + () => memberAttributeService.validateAttributes(platform, data.attributes), this.log, 'memberService -> update -> validateAttributes', ) @@ -403,7 +401,7 @@ export default class MemberService extends LoggerBase { this.log.trace({ memberId: id }, 'Finding or creating organization!') const orgId = await logExecutionTimeV2( - () => orgService.findOrCreate(source, integrationId, org), + () => orgService.findOrCreate(platform, integrationId, org), this.log, 'memberService -> update -> findOrCreateOrg', ) diff --git a/services/apps/data_sink_worker/src/service/memberAttribute.service.ts b/services/apps/data_sink_worker/src/service/memberAttribute.service.ts index c1ac384d0e..c6e9168978 100644 --- a/services/apps/data_sink_worker/src/service/memberAttribute.service.ts +++ b/services/apps/data_sink_worker/src/service/memberAttribute.service.ts @@ -26,8 +26,8 @@ export default class MemberAttributeService extends LoggerBase { } public async validateAttributes( - attributes: Record, platform: PlatformType, + attributes: Record, ): Promise> { const settings = await getMemberAttributeSettings(dbStoreQx(this.store), this.redis) const memberAttributeSettings = settings.reduce((acc, attribute) => { diff --git a/services/libs/common/src/member.ts b/services/libs/common/src/member.ts index e9253847a4..58d64a6b03 100644 --- a/services/libs/common/src/member.ts +++ b/services/libs/common/src/member.ts @@ -14,15 +14,17 @@ export async function setAttributesDefaultValues( for (const attributeName of Object.keys(attributes)) { if (typeof attributes[attributeName] === 'string') { // we try to fix it - try { - attributes[attributeName] = JSON.parse(attributes[attributeName] as string) - } catch (err) { - this.log.error(err, { attributeName }, 'Could not parse a string attribute value!') - throw err - } + attributes[attributeName] = JSON.parse(attributes[attributeName] as string) } + + const nonEmptyPlatform = Object.keys(attributes[attributeName]).filter((p) => { + if (p === 'default') return false + const value = attributes[attributeName][p] + return value !== undefined && value !== null && String(value).trim().length > 0 + }) + const highestPriorityPlatform = getHighestPriorityPlatformForAttributes( - Object.keys(attributes[attributeName]), + nonEmptyPlatform, priorities, ) @@ -31,7 +33,17 @@ export async function setAttributesDefaultValues( ;(attributes[attributeName] as any).default = attributes[attributeName][highestPriorityPlatform] } else { - delete attributes[attributeName] + // Only delete if there is no existing non-empty default value. + // An attribute with only a `default` key and no platform-specific keys + // has no source platform to derive from, but its value should be preserved. + const existingDefault = (attributes[attributeName] as any).default + if ( + existingDefault === undefined || + existingDefault === null || + String(existingDefault).trim().length === 0 + ) { + delete attributes[attributeName] + } } }