Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
3f76e55
fix: use platform instead of default for incorrectly formatted attrib…
themarolt Oct 3, 2025
df455de
chore: script to fix the corrupted attributes
themarolt Oct 3, 2025
62eb163
chore: fixed small issues
themarolt Oct 3, 2025
d343400
chore: fixed small issues
themarolt Oct 3, 2025
174826b
chore: fixed small issues
themarolt Oct 3, 2025
10cd7f2
chore: fixed small issues
themarolt Oct 3, 2025
01b07a5
chore: fixed small issues
themarolt Oct 3, 2025
cd2e527
chore: fixed small issues
themarolt Oct 3, 2025
31bfc8a
chore: fixed small issues
themarolt Oct 3, 2025
2ad563f
chore: fixed small issues
themarolt Oct 3, 2025
fd4f319
chore: fixed small issues
themarolt Oct 3, 2025
b7d09d8
chore: fixed small issues
themarolt Oct 3, 2025
eb16c58
chore: fixed small issues
themarolt Oct 3, 2025
c080a4d
chore: fixed small issues
themarolt Oct 3, 2025
6033af5
chore: fixed small issues
themarolt Oct 3, 2025
d99c730
chore: fixed small issues
themarolt Oct 3, 2025
6e6f8c7
chore: fixed small issues
themarolt Oct 3, 2025
8068e9b
chore: fixed small issues
themarolt Oct 3, 2025
7989f15
chore: fixed small issues
themarolt Oct 4, 2025
8c802ba
chore: fixed small issues
themarolt Oct 4, 2025
f1b502c
fix: lint
themarolt Oct 4, 2025
bd5d1ca
fix: include manuallyChangedFields
themarolt Mar 23, 2026
b444f81
fix: comments
themarolt Mar 23, 2026
6a1d5ec
fix: ignore default platform
themarolt Mar 24, 2026
b2520da
Merge branch 'main' into member-attribute-fix-CM-705
themarolt Mar 24, 2026
faeeceb
fix: bugfix
themarolt Mar 24, 2026
808425f
fix: bugfix
themarolt Mar 24, 2026
8775bd9
Merge branch 'main' into member-attribute-fix-CM-705
themarolt Mar 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
293 changes: 293 additions & 0 deletions services/apps/data_sink_worker/src/bin/fix-member-attributes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
import isEqual from 'lodash.isequal'
import mergeWith from 'lodash.mergewith'

import { connQx, updateMember } from '@crowd/data-access-layer'
import {
DbConnOrTx,
DbStore,
WRITE_DB_CONFIG,
getDbConnection,
} from '@crowd/data-access-layer/src/database'
import { getServiceLogger } from '@crowd/logging'
import { REDIS_CONFIG, getRedisClient } from '@crowd/redis'

import MemberAttributeService from '../service/memberAttribute.service'

/* eslint-disable @typescript-eslint/no-explicit-any */

const BATCH_SIZE = process.env.TEST_RUN ? 1 : 5000

const log = getServiceLogger()

async function getMemberIds(
db: DbConnOrTx,
lastId?: string,
): Promise<{ id: string; attributes: any; manuallyChangedFields: any }[]> {
try {
log.debug({ lastId }, 'Querying for members with attribute issues')

const results = await db.any(
`
with relevant_members as (with member_with_attributes as (select id,
"createdAt",
jsonb_object_keys(attributes) as attr_key,
attributes -> jsonb_object_keys(attributes) as attr_value
from members
where "deletedAt" is null
and attributes is not null
and attributes != 'null'::jsonb
and attributes != '{}'::jsonb)
select distinct id
from member_with_attributes
where jsonb_typeof(attr_value) = 'object'
and coalesce(attr_value ->> 'default', '') = ''
and exists (select 1
from jsonb_each_text(attr_value) as kv
where kv.key != 'default'
and coalesce(kv.value, '') != '')
${lastId ? `and id < '${lastId}'` : ''}
order by id desc
limit ${BATCH_SIZE})
select m.id, m.attributes, m."manuallyChangedFields"
from members m
inner join
relevant_members rm on rm.id = m.id
order by m.id desc;
`,
{ lastId },
)

log.debug(
{
resultCount: results.length,
lastId,
firstId: results.length > 0 ? results[0].id : null,
lastResultId: results.length > 0 ? results[results.length - 1].id : null,
},
'Query completed',
)

return results
} catch (error) {
log.error(
{
error: error.message,
lastId,
stack: error.stack,
},
'Failed to query member IDs',
)
throw error
}
}

setImmediate(async () => {
let dbClient: DbConnOrTx | undefined
let redisClient: any | undefined
let exitCode = 0

try {
log.info('Starting member attributes fix script')

// Initialize connections
log.info('Connecting to database...')
dbClient = await getDbConnection(WRITE_DB_CONFIG())
log.info('Database connection established')

log.info('Connecting to Redis...')
redisClient = await getRedisClient(REDIS_CONFIG())
log.info('Redis connection established')

const pgQx = connQx(dbClient)
const mas = new MemberAttributeService(redisClient, new DbStore(log, dbClient), log)

let totalProcessed = 0
let totalUpdated = 0
let batchNumber = 1

log.info('Starting to process members with attribute issues')
let membersToFix = await getMemberIds(dbClient)
log.info({ count: membersToFix.length }, 'Found members to process in first batch')

while (membersToFix.length > 0) {
log.info({ batchNumber, batchSize: membersToFix.length }, 'Processing batch')
let batchUpdated = 0

for (const data of membersToFix) {
try {
if (data.attributes) {
log.debug(
{ memberId: data.id, oldAttributes: data.attributes },
'Processing member attributes',
)

// check if any has default empty but other are full
let toProcess = false
for (const attName of Object.keys(data.attributes)) {
const defValue = data.attributes[attName].default

if (defValue === undefined || defValue === null || String(defValue) === '') {
log.debug(
{
memberId: data.id,
attribute: data.attributes[attName],
attName,
defValue: defValue ? String(defValue) : 'undefined',
},
'Attribute has default empty',
)
for (const platform of Object.keys(data.attributes[attName]).filter(
(p) => p !== 'default',
)) {
const value = data.attributes[attName][platform]

if (value !== undefined && value !== null && String(value) !== '') {
log.debug(
{ memberId: data.id, attName, platform, value },
'Found value for attribute',
)
toProcess = true
break
}
}

if (toProcess) {
break
}
}
}

if (toProcess) {
const oldAttributes = JSON.parse(JSON.stringify(data.attributes)) // Deep copy
data.attributes = await mas.setAttributesDefaultValues(data.attributes)

let attributes: Record<string, unknown> | undefined
const temp = mergeWith({}, oldAttributes, data.attributes)
const manuallyChangedFields: string[] = data.manuallyChangedFields || []

if (manuallyChangedFields.length > 0) {
log.warn(
{
memberId: data.id,
manuallyChangedFieldsCount: manuallyChangedFields.length,
},
'Member has manually changed fields',
)

const prefix = 'attributes.'
const manuallyChangedAttributes = [
...new Set(
manuallyChangedFields
.filter((f) => f.startsWith(prefix))
.map((f) => f.slice(prefix.length)),
),
]

log.warn(
{
memberId: data.id,
manuallyChangedAttributes,
},
'Preserving manually changed attributes',
)

// Preserve manually changed attributes
for (const key of manuallyChangedAttributes) {
if (oldAttributes?.[key] !== undefined) {
temp[key] = oldAttributes[key] // Fixed: removed .attributes
}
}
}

if (!isEqual(temp, oldAttributes)) {
attributes = temp
log.info({ memberId: data.id }, 'Attributes changed, will update')
} else {
log.debug(
{ memberId: data.id, newAttributes: temp, oldAttributes },
'No changes needed for attributes',
)
}

if (attributes) {
log.info({ memberId: data.id }, 'Updating member attributes')

if (!process.env.TEST_RUN) {
await updateMember(pgQx, data.id, { attributes } as any)
}

batchUpdated++
totalUpdated++
log.debug({ memberId: data.id }, 'Member attributes updated successfully')
}
} else {
log.debug(
{ memberId: data.id, attributes: data.attributes },
'No changes needed for attributes',
)
}
} else {
log.debug({ memberId: data.id }, 'Member has no attributes to process')
}

totalProcessed++
} catch (error) {
log.error(
{
error: error.message,
memberId: data.id,
stack: error.stack,
},
'Failed to process member',
)
// Continue processing other members
}
}

log.info(
{
batchNumber,
batchProcessed: membersToFix.length,
batchUpdated,
totalProcessed,
totalUpdated,
},
'Completed batch processing',
)

// Get next batch
const lastId = membersToFix[membersToFix.length - 1].id
log.debug({ lastId }, 'Fetching next batch starting from last ID')

if (process.env.TEST_RUN) {
break
}

membersToFix = await getMemberIds(dbClient, lastId)
log.info({ count: membersToFix.length }, 'Found members for next batch')

batchNumber++
}

log.info(
{
totalProcessed,
totalUpdated,
totalBatches: batchNumber - 1,
},
'Member attributes fix completed successfully',
)
} catch (error) {
log.error(
{
error: error.message,
stack: error.stack,
},
'Fatal error in member attributes fix script',
)
exitCode = 1
} finally {
log.info('Script execution completed')
process.exit(exitCode)
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ export default class ActivityService extends LoggerBase {
segmentIds: Set<string>
resultIds: Set<string>
integrationId: string
platform: string
platform: PlatformType
username: string
timestamp: string
}
Expand Down
14 changes: 6 additions & 8 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ export default class MemberService extends LoggerBase {
segmentIds: string[],
integrationId: string,
data: IMemberCreateData,
source: string,
platform: PlatformType,
releaseMemberLock?: () => Promise<void>,
): Promise<string> {
return logExecutionTimeV2(
Expand All @@ -89,8 +89,7 @@ export default class MemberService extends LoggerBase {
let attributes: Record<string, unknown> = {}
if (data.attributes) {
attributes = await logExecutionTimeV2(
() =>
memberAttributeService.validateAttributes(data.attributes, source as PlatformType),
() => memberAttributeService.validateAttributes(platform, data.attributes),
this.log,
'memberService -> create -> validateAttributes',
)
Expand Down Expand Up @@ -190,7 +189,7 @@ export default class MemberService extends LoggerBase {
if (data.organizations) {
for (const org of data.organizations) {
const id = await logExecutionTimeV2(
() => orgService.findOrCreate(source, integrationId, org),
() => orgService.findOrCreate(platform, integrationId, org),
this.log,
'memberService -> create -> findOrCreateOrg',
)
Expand Down Expand Up @@ -265,7 +264,7 @@ export default class MemberService extends LoggerBase {
data: IMemberUpdateData,
original: IDbMember,
originalIdentities: IMemberIdentity[],
source: string,
platform: PlatformType,
releaseMemberLock?: () => Promise<void>,
): Promise<void> {
await logExecutionTimeV2(
Expand All @@ -282,8 +281,7 @@ export default class MemberService extends LoggerBase {
if (data.attributes) {
this.log.trace({ memberId: id }, 'Validating member attributes!')
data.attributes = await logExecutionTimeV2(
() =>
memberAttributeService.validateAttributes(data.attributes, source as PlatformType),
() => memberAttributeService.validateAttributes(platform, data.attributes),
this.log,
'memberService -> update -> validateAttributes',
)
Expand Down Expand Up @@ -403,7 +401,7 @@ export default class MemberService extends LoggerBase {
this.log.trace({ memberId: id }, 'Finding or creating organization!')

const orgId = await logExecutionTimeV2(
() => orgService.findOrCreate(source, integrationId, org),
() => orgService.findOrCreate(platform, integrationId, org),
this.log,
'memberService -> update -> findOrCreateOrg',
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ export default class MemberAttributeService extends LoggerBase {
}

public async validateAttributes(
attributes: Record<string, unknown>,
platform: PlatformType,
attributes: Record<string, unknown>,
): Promise<Record<string, unknown>> {
const settings = await getMemberAttributeSettings(dbStoreQx(this.store), this.redis)
const memberAttributeSettings = settings.reduce((acc, attribute) => {
Expand Down
Loading
Loading