From 0383a3f468c2380becb9f787ce4fff50db4ebde5 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Tue, 10 Feb 2026 11:27:28 +0100 Subject: [PATCH 1/3] feat: add DAL Signed-off-by: Umberto Sgueglia --- .../evaluated-projects/evaluatedProjects.ts | 397 ++++++++++++++++++ .../src/evaluated-projects/index.ts | 2 + .../src/evaluated-projects/types.ts | 69 +++ services/libs/data-access-layer/src/index.ts | 2 + .../src/project-catalog/index.ts | 2 + .../src/project-catalog/projectCatalog.ts | 315 ++++++++++++++ .../src/project-catalog/types.ts | 23 + 7 files changed, 810 insertions(+) create mode 100644 services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts create mode 100644 services/libs/data-access-layer/src/evaluated-projects/index.ts create mode 100644 services/libs/data-access-layer/src/evaluated-projects/types.ts create mode 100644 services/libs/data-access-layer/src/project-catalog/index.ts create mode 100644 services/libs/data-access-layer/src/project-catalog/projectCatalog.ts create mode 100644 services/libs/data-access-layer/src/project-catalog/types.ts diff --git a/services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts b/services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts new file mode 100644 index 0000000000..caec6a72a1 --- /dev/null +++ b/services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts @@ -0,0 +1,397 @@ +import { QueryExecutor } from '../queryExecutor' +import { prepareSelectColumns } from '../utils' + +import { + EvaluationStatus, + IDbEvaluatedProject, + IDbEvaluatedProjectCreate, + IDbEvaluatedProjectUpdate, +} from './types' + +const EVALUATED_PROJECT_COLUMNS = [ + 'id', + 'projectCatalogId', + 'evaluationStatus', + 'evaluationScore', + 'evaluation', + 'evaluationReason', + 'evaluatedAt', + 'starsCount', + 'forksCount', + 'commitsCount', + 'pullRequestsCount', + 'issuesCount', + 'onboarded', + 'onboardedAt', + 'createdAt', + 'updatedAt', +] + +export async function findEvaluatedProjectById( + qx: QueryExecutor, + id: string, +): Promise { + return qx.selectOneOrNone( + ` + SELECT ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS)} + FROM "evaluatedProjects" + WHERE id = $(id) + `, + { id }, + ) +} + +export async function findEvaluatedProjectByProjectCatalogId( + qx: QueryExecutor, + projectCatalogId: string, +): Promise { + return qx.selectOneOrNone( + ` + SELECT ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS)} + FROM "evaluatedProjects" + WHERE "projectCatalogId" = $(projectCatalogId) + `, + { projectCatalogId }, + ) +} + +export async function findEvaluatedProjectsByStatus( + qx: QueryExecutor, + evaluationStatus: EvaluationStatus, + options: { limit?: number; offset?: number } = {}, +): Promise { + const { limit, offset } = options + + return qx.select( + ` + SELECT ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS)} + FROM "evaluatedProjects" + WHERE "evaluationStatus" = $(evaluationStatus) + ORDER BY "createdAt" ASC + ${limit !== undefined ? 'LIMIT $(limit)' : ''} + ${offset !== undefined ? 'OFFSET $(offset)' : ''} + `, + { evaluationStatus, limit, offset }, + ) +} + +export async function findAllEvaluatedProjects( + qx: QueryExecutor, + options: { limit?: number; offset?: number } = {}, +): Promise { + const { limit, offset } = options + + return qx.select( + ` + SELECT ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS)} + FROM "evaluatedProjects" + ORDER BY "createdAt" DESC + ${limit !== undefined ? 'LIMIT $(limit)' : ''} + ${offset !== undefined ? 'OFFSET $(offset)' : ''} + `, + { limit, offset }, + ) +} + +export async function countEvaluatedProjects( + qx: QueryExecutor, + evaluationStatus?: EvaluationStatus, +): Promise { + const statusFilter = evaluationStatus ? 'WHERE "evaluationStatus" = $(evaluationStatus)' : '' + + const result = await qx.selectOne( + ` + SELECT COUNT(*) AS count + FROM "evaluatedProjects" + ${statusFilter} + `, + { evaluationStatus }, + ) + return parseInt(result.count, 10) +} + +export async function insertEvaluatedProject( + qx: QueryExecutor, + data: IDbEvaluatedProjectCreate, +): Promise { + return qx.selectOne( + ` + INSERT INTO "evaluatedProjects" ( + "projectCatalogId", + "evaluationStatus", + "evaluationScore", + evaluation, + "evaluationReason", + "starsCount", + "forksCount", + "commitsCount", + "pullRequestsCount", + "issuesCount", + "createdAt", + "updatedAt" + ) + VALUES ( + $(projectCatalogId), + $(evaluationStatus), + $(evaluationScore), + $(evaluation), + $(evaluationReason), + $(starsCount), + $(forksCount), + $(commitsCount), + $(pullRequestsCount), + $(issuesCount), + NOW(), + NOW() + ) + RETURNING ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS)} + `, + { + projectCatalogId: data.projectCatalogId, + evaluationStatus: data.evaluationStatus ?? 'pending', + evaluationScore: data.evaluationScore ?? null, + evaluation: data.evaluation ? JSON.stringify(data.evaluation) : null, + evaluationReason: data.evaluationReason ?? null, + starsCount: data.starsCount ?? null, + forksCount: data.forksCount ?? null, + commitsCount: data.commitsCount ?? null, + pullRequestsCount: data.pullRequestsCount ?? null, + issuesCount: data.issuesCount ?? null, + }, + ) +} + +export async function bulkInsertEvaluatedProjects( + qx: QueryExecutor, + items: IDbEvaluatedProjectCreate[], +): Promise { + if (items.length === 0) { + return + } + + const values = items.map((item) => ({ + projectCatalogId: item.projectCatalogId, + evaluationStatus: item.evaluationStatus ?? 'pending', + evaluationScore: item.evaluationScore ?? null, + evaluation: item.evaluation ? JSON.stringify(item.evaluation) : null, + evaluationReason: item.evaluationReason ?? null, + starsCount: item.starsCount ?? null, + forksCount: item.forksCount ?? null, + commitsCount: item.commitsCount ?? null, + pullRequestsCount: item.pullRequestsCount ?? null, + issuesCount: item.issuesCount ?? null, + })) + + await qx.result( + ` + INSERT INTO "evaluatedProjects" ( + "projectCatalogId", + "evaluationStatus", + "evaluationScore", + evaluation, + "evaluationReason", + "starsCount", + "forksCount", + "commitsCount", + "pullRequestsCount", + "issuesCount", + "createdAt", + "updatedAt" + ) + SELECT + v."projectCatalogId"::uuid, + v."evaluationStatus", + v."evaluationScore"::double precision, + v.evaluation::jsonb, + v."evaluationReason", + v."starsCount"::integer, + v."forksCount"::integer, + v."commitsCount"::integer, + v."pullRequestsCount"::integer, + v."issuesCount"::integer, + NOW(), + NOW() + FROM jsonb_to_recordset($(values)::jsonb) AS v( + "projectCatalogId" text, + "evaluationStatus" text, + "evaluationScore" double precision, + evaluation jsonb, + "evaluationReason" text, + "starsCount" integer, + "forksCount" integer, + "commitsCount" integer, + "pullRequestsCount" integer, + "issuesCount" integer + ) + `, + { values: JSON.stringify(values) }, + ) +} + +export async function updateEvaluatedProject( + qx: QueryExecutor, + id: string, + data: IDbEvaluatedProjectUpdate, +): Promise { + const setClauses: string[] = [] + const params: Record = { id } + + if (data.evaluationStatus !== undefined) { + setClauses.push('"evaluationStatus" = $(evaluationStatus)') + params.evaluationStatus = data.evaluationStatus + } + if (data.evaluationScore !== undefined) { + setClauses.push('"evaluationScore" = $(evaluationScore)') + params.evaluationScore = data.evaluationScore + } + if (data.evaluation !== undefined) { + setClauses.push('evaluation = $(evaluation)') + params.evaluation = data.evaluation ? JSON.stringify(data.evaluation) : null + } + if (data.evaluationReason !== undefined) { + setClauses.push('"evaluationReason" = $(evaluationReason)') + params.evaluationReason = data.evaluationReason + } + if (data.evaluatedAt !== undefined) { + setClauses.push('"evaluatedAt" = $(evaluatedAt)') + params.evaluatedAt = data.evaluatedAt + } + if (data.starsCount !== undefined) { + setClauses.push('"starsCount" = $(starsCount)') + params.starsCount = data.starsCount + } + if (data.forksCount !== undefined) { + setClauses.push('"forksCount" = $(forksCount)') + params.forksCount = data.forksCount + } + if (data.commitsCount !== undefined) { + setClauses.push('"commitsCount" = $(commitsCount)') + params.commitsCount = data.commitsCount + } + if (data.pullRequestsCount !== undefined) { + setClauses.push('"pullRequestsCount" = $(pullRequestsCount)') + params.pullRequestsCount = data.pullRequestsCount + } + if (data.issuesCount !== undefined) { + setClauses.push('"issuesCount" = $(issuesCount)') + params.issuesCount = data.issuesCount + } + if (data.onboarded !== undefined) { + setClauses.push('onboarded = $(onboarded)') + params.onboarded = data.onboarded + } + if (data.onboardedAt !== undefined) { + setClauses.push('"onboardedAt" = $(onboardedAt)') + params.onboardedAt = data.onboardedAt + } + + if (setClauses.length === 0) { + return findEvaluatedProjectById(qx, id) + } + + return qx.selectOneOrNone( + ` + UPDATE "evaluatedProjects" + SET + ${setClauses.join(',\n ')}, + "updatedAt" = NOW() + WHERE id = $(id) + RETURNING ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS)} + `, + params, + ) +} + +export async function markEvaluatedProjectAsEvaluated( + qx: QueryExecutor, + id: string, + data: { + evaluationScore: number + evaluation: Record + evaluationReason?: string + }, +): Promise { + return qx.selectOneOrNone( + ` + UPDATE "evaluatedProjects" + SET + "evaluationStatus" = 'evaluated', + "evaluationScore" = $(evaluationScore), + evaluation = $(evaluation), + "evaluationReason" = $(evaluationReason), + "evaluatedAt" = NOW(), + "updatedAt" = NOW() + WHERE id = $(id) + RETURNING ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS)} + `, + { + id, + evaluationScore: data.evaluationScore, + evaluation: JSON.stringify(data.evaluation), + evaluationReason: data.evaluationReason ?? null, + }, + ) +} + +export async function markEvaluatedProjectAsOnboarded( + qx: QueryExecutor, + id: string, +): Promise { + await qx.selectNone( + ` + UPDATE "evaluatedProjects" + SET + onboarded = true, + "onboardedAt" = NOW(), + "updatedAt" = NOW() + WHERE id = $(id) + `, + { id }, + ) +} + +export async function deleteEvaluatedProject(qx: QueryExecutor, id: string): Promise { + return qx.result( + ` + DELETE FROM "evaluatedProjects" + WHERE id = $(id) + `, + { id }, + ) +} + +export async function deleteEvaluatedProjectByProjectCatalogId( + qx: QueryExecutor, + projectCatalogId: string, +): Promise { + return qx.result( + ` + DELETE FROM "evaluatedProjects" + WHERE "projectCatalogId" = $(projectCatalogId) + `, + { projectCatalogId }, + ) +} + +export async function findPendingEvaluatedProjectsWithCatalog( + qx: QueryExecutor, + options: { limit?: number } = {}, +): Promise<(IDbEvaluatedProject & { projectSlug: string; repoName: string; repoUrl: string })[]> { + const { limit } = options + + return qx.select( + ` + SELECT + ${prepareSelectColumns(EVALUATED_PROJECT_COLUMNS, 'ep')}, + pc."projectSlug", + pc."repoName", + pc."repoUrl" + FROM "evaluatedProjects" ep + JOIN "projectCatalog" pc ON pc.id = ep."projectCatalogId" + WHERE ep."evaluationStatus" = 'pending' + ORDER BY ep."createdAt" ASC + ${limit !== undefined ? 'LIMIT $(limit)' : ''} + `, + { limit }, + ) +} diff --git a/services/libs/data-access-layer/src/evaluated-projects/index.ts b/services/libs/data-access-layer/src/evaluated-projects/index.ts new file mode 100644 index 0000000000..7a4064eec2 --- /dev/null +++ b/services/libs/data-access-layer/src/evaluated-projects/index.ts @@ -0,0 +1,2 @@ +export * from './types' +export * from './evaluatedProjects' diff --git a/services/libs/data-access-layer/src/evaluated-projects/types.ts b/services/libs/data-access-layer/src/evaluated-projects/types.ts new file mode 100644 index 0000000000..f8661f47a2 --- /dev/null +++ b/services/libs/data-access-layer/src/evaluated-projects/types.ts @@ -0,0 +1,69 @@ +export type EvaluationStatus = 'pending' | 'evaluating' | 'evaluated' | 'failed' + +export interface IDbEvaluatedProject { + id: string + projectCatalogId: string + evaluationStatus: EvaluationStatus + evaluationScore: number | null + evaluation: Record | null + evaluationReason: string | null + evaluatedAt: string | null + starsCount: number | null + forksCount: number | null + commitsCount: number | null + pullRequestsCount: number | null + issuesCount: number | null + onboarded: boolean + onboardedAt: string | null + createdAt: string | null + updatedAt: string | null +} + +type EvaluatedProjectWritable = Pick< + IDbEvaluatedProject, + | 'projectCatalogId' + | 'evaluationStatus' + | 'evaluationScore' + | 'evaluation' + | 'evaluationReason' + | 'evaluatedAt' + | 'starsCount' + | 'forksCount' + | 'commitsCount' + | 'pullRequestsCount' + | 'issuesCount' + | 'onboarded' + | 'onboardedAt' +> + +export type IDbEvaluatedProjectCreate = Omit & { + projectCatalogId: string +} & { + evaluationStatus?: EvaluationStatus + evaluationScore?: number + evaluation?: Record + evaluationReason?: string + evaluatedAt?: string + starsCount?: number + forksCount?: number + commitsCount?: number + pullRequestsCount?: number + issuesCount?: number + onboarded?: boolean + onboardedAt?: string +} + +export type IDbEvaluatedProjectUpdate = Partial<{ + evaluationStatus: EvaluationStatus + evaluationScore: number + evaluation: Record + evaluationReason: string + evaluatedAt: string + starsCount: number + forksCount: number + commitsCount: number + pullRequestsCount: number + issuesCount: number + onboarded: boolean + onboardedAt: string +}> diff --git a/services/libs/data-access-layer/src/index.ts b/services/libs/data-access-layer/src/index.ts index 639f0547b8..5ef4749d79 100644 --- a/services/libs/data-access-layer/src/index.ts +++ b/services/libs/data-access-layer/src/index.ts @@ -13,3 +13,5 @@ export * from './systemSettings' export * from './integrations' export * from './auditLogs' export * from './maintainers' +export * from './project-catalog' +export * from './evaluated-projects' diff --git a/services/libs/data-access-layer/src/project-catalog/index.ts b/services/libs/data-access-layer/src/project-catalog/index.ts new file mode 100644 index 0000000000..af7ef7faa1 --- /dev/null +++ b/services/libs/data-access-layer/src/project-catalog/index.ts @@ -0,0 +1,2 @@ +export * from './types' +export * from './projectCatalog' diff --git a/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts b/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts new file mode 100644 index 0000000000..2e3b409579 --- /dev/null +++ b/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts @@ -0,0 +1,315 @@ +import { QueryExecutor } from '../queryExecutor' +import { prepareSelectColumns } from '../utils' + +import { IDbProjectCatalog, IDbProjectCatalogCreate, IDbProjectCatalogUpdate } from './types' + +const PROJECT_CATALOG_COLUMNS = [ + 'id', + 'projectSlug', + 'repoName', + 'repoUrl', + 'criticalityScore', + 'syncedAt', + 'createdAt', + 'updatedAt', +] + +export async function findProjectCatalogById( + qx: QueryExecutor, + id: string, +): Promise { + return qx.selectOneOrNone( + ` + SELECT ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} + FROM "projectCatalog" + WHERE id = $(id) + `, + { id }, + ) +} + +export async function findProjectCatalogByRepoUrl( + qx: QueryExecutor, + repoUrl: string, +): Promise { + return qx.selectOneOrNone( + ` + SELECT ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} + FROM "projectCatalog" + WHERE "repoUrl" = $(repoUrl) + `, + { repoUrl }, + ) +} + +export async function findProjectCatalogBySlug( + qx: QueryExecutor, + projectSlug: string, +): Promise { + return qx.select( + ` + SELECT ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} + FROM "projectCatalog" + WHERE "projectSlug" = $(projectSlug) + ORDER BY "createdAt" DESC + `, + { projectSlug }, + ) +} + +export async function findAllProjectCatalog( + qx: QueryExecutor, + options: { limit?: number; offset?: number } = {}, +): Promise { + const { limit, offset } = options + + return qx.select( + ` + SELECT ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} + FROM "projectCatalog" + ORDER BY "createdAt" DESC + ${limit !== undefined ? 'LIMIT $(limit)' : ''} + ${offset !== undefined ? 'OFFSET $(offset)' : ''} + `, + { limit, offset }, + ) +} + +export async function countProjectCatalog(qx: QueryExecutor): Promise { + const result = await qx.selectOne( + ` + SELECT COUNT(*) AS count + FROM "projectCatalog" + `, + ) + return parseInt(result.count, 10) +} + +export async function insertProjectCatalog( + qx: QueryExecutor, + data: IDbProjectCatalogCreate, +): Promise { + return qx.selectOne( + ` + INSERT INTO "projectCatalog" ( + "projectSlug", + "repoName", + "repoUrl", + "criticalityScore", + "createdAt", + "updatedAt" + ) + VALUES ( + $(projectSlug), + $(repoName), + $(repoUrl), + $(criticalityScore), + NOW(), + NOW() + ) + RETURNING ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} + `, + { + projectSlug: data.projectSlug, + repoName: data.repoName, + repoUrl: data.repoUrl, + criticalityScore: data.criticalityScore ?? null, + }, + ) +} + +export async function bulkInsertProjectCatalog( + qx: QueryExecutor, + items: IDbProjectCatalogCreate[], +): Promise { + if (items.length === 0) { + return + } + + const values = items.map((item) => ({ + projectSlug: item.projectSlug, + repoName: item.repoName, + repoUrl: item.repoUrl, + criticalityScore: item.criticalityScore ?? null, + })) + + await qx.result( + ` + INSERT INTO "projectCatalog" ( + "projectSlug", + "repoName", + "repoUrl", + "criticalityScore", + "createdAt", + "updatedAt" + ) + SELECT + v."projectSlug", + v."repoName", + v."repoUrl", + v."criticalityScore"::double precision, + NOW(), + NOW() + FROM jsonb_to_recordset($(values)::jsonb) AS v( + "projectSlug" text, + "repoName" text, + "repoUrl" text, + "criticalityScore" double precision + ) + `, + { values: JSON.stringify(values) }, + ) +} + +export async function upsertProjectCatalog( + qx: QueryExecutor, + data: IDbProjectCatalogCreate, +): Promise { + return qx.selectOne( + ` + INSERT INTO "projectCatalog" ( + "projectSlug", + "repoName", + "repoUrl", + "criticalityScore", + "createdAt", + "updatedAt" + ) + VALUES ( + $(projectSlug), + $(repoName), + $(repoUrl), + $(criticalityScore), + NOW(), + NOW() + ) + ON CONFLICT ("repoUrl") DO UPDATE SET + "projectSlug" = EXCLUDED."projectSlug", + "repoName" = EXCLUDED."repoName", + "criticalityScore" = EXCLUDED."criticalityScore", + "updatedAt" = NOW() + RETURNING ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} + `, + { + projectSlug: data.projectSlug, + repoName: data.repoName, + repoUrl: data.repoUrl, + criticalityScore: data.criticalityScore ?? null, + }, + ) +} + +export async function bulkUpsertProjectCatalog( + qx: QueryExecutor, + items: IDbProjectCatalogCreate[], +): Promise { + if (items.length === 0) { + return + } + + const values = items.map((item) => ({ + projectSlug: item.projectSlug, + repoName: item.repoName, + repoUrl: item.repoUrl, + criticalityScore: item.criticalityScore ?? null, + })) + + await qx.result( + ` + INSERT INTO "projectCatalog" ( + "projectSlug", + "repoName", + "repoUrl", + "criticalityScore", + "createdAt", + "updatedAt" + ) + SELECT + v."projectSlug", + v."repoName", + v."repoUrl", + v."criticalityScore"::double precision, + NOW(), + NOW() + FROM jsonb_to_recordset($(values)::jsonb) AS v( + "projectSlug" text, + "repoName" text, + "repoUrl" text, + "criticalityScore" double precision + ) + ON CONFLICT ("repoUrl") DO UPDATE SET + "projectSlug" = EXCLUDED."projectSlug", + "repoName" = EXCLUDED."repoName", + "criticalityScore" = EXCLUDED."criticalityScore", + "updatedAt" = NOW() + `, + { values: JSON.stringify(values) }, + ) +} + +export async function updateProjectCatalog( + qx: QueryExecutor, + id: string, + data: IDbProjectCatalogUpdate, +): Promise { + const setClauses: string[] = [] + const params: Record = { id } + + if (data.projectSlug !== undefined) { + setClauses.push('"projectSlug" = $(projectSlug)') + params.projectSlug = data.projectSlug + } + if (data.repoName !== undefined) { + setClauses.push('"repoName" = $(repoName)') + params.repoName = data.repoName + } + if (data.repoUrl !== undefined) { + setClauses.push('"repoUrl" = $(repoUrl)') + params.repoUrl = data.repoUrl + } + if (data.criticalityScore !== undefined) { + setClauses.push('"criticalityScore" = $(criticalityScore)') + params.criticalityScore = data.criticalityScore + } + if (data.syncedAt !== undefined) { + setClauses.push('"syncedAt" = $(syncedAt)') + params.syncedAt = data.syncedAt + } + + if (setClauses.length === 0) { + return findProjectCatalogById(qx, id) + } + + return qx.selectOneOrNone( + ` + UPDATE "projectCatalog" + SET + ${setClauses.join(',\n ')}, + "updatedAt" = NOW() + WHERE id = $(id) + RETURNING ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} + `, + params, + ) +} + +export async function updateProjectCatalogSyncedAt(qx: QueryExecutor, id: string): Promise { + await qx.selectNone( + ` + UPDATE "projectCatalog" + SET "syncedAt" = NOW(), "updatedAt" = NOW() + WHERE id = $(id) + `, + { id }, + ) +} + +export async function deleteProjectCatalog(qx: QueryExecutor, id: string): Promise { + return qx.result( + ` + DELETE FROM "projectCatalog" + WHERE id = $(id) + `, + { id }, + ) +} diff --git a/services/libs/data-access-layer/src/project-catalog/types.ts b/services/libs/data-access-layer/src/project-catalog/types.ts new file mode 100644 index 0000000000..382527f57f --- /dev/null +++ b/services/libs/data-access-layer/src/project-catalog/types.ts @@ -0,0 +1,23 @@ +export interface IDbProjectCatalog { + id: string + projectSlug: string + repoName: string + repoUrl: string + criticalityScore: number | null + syncedAt: string | null + createdAt: string | null + updatedAt: string | null +} + +type ProjectCatalogWritable = Pick< + IDbProjectCatalog, + 'projectSlug' | 'repoName' | 'repoUrl' | 'criticalityScore' +> + +export type IDbProjectCatalogCreate = Omit & { + criticalityScore?: number +} + +export type IDbProjectCatalogUpdate = Partial & { + syncedAt?: string +} From 5dede3de4aa687cf7b91a92ee97a8a265a9904b8 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 26 Mar 2026 09:25:39 +0100 Subject: [PATCH 2/3] fix: updated the types as the current db Signed-off-by: Umberto Sgueglia --- .../evaluated-projects/evaluatedProjects.ts | 2 +- .../src/evaluated-projects/types.ts | 45 ++++---------- .../src/project-catalog/projectCatalog.ts | 61 +++++++++++++------ .../src/project-catalog/types.ts | 13 ++-- 4 files changed, 64 insertions(+), 57 deletions(-) diff --git a/services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts b/services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts index caec6a72a1..2e078beb0d 100644 --- a/services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts +++ b/services/libs/data-access-layer/src/evaluated-projects/evaluatedProjects.ts @@ -173,7 +173,7 @@ export async function bulkInsertEvaluatedProjects( projectCatalogId: item.projectCatalogId, evaluationStatus: item.evaluationStatus ?? 'pending', evaluationScore: item.evaluationScore ?? null, - evaluation: item.evaluation ? JSON.stringify(item.evaluation) : null, + evaluation: item.evaluation ?? null, evaluationReason: item.evaluationReason ?? null, starsCount: item.starsCount ?? null, forksCount: item.forksCount ?? null, diff --git a/services/libs/data-access-layer/src/evaluated-projects/types.ts b/services/libs/data-access-layer/src/evaluated-projects/types.ts index f8661f47a2..bb11eb5d65 100644 --- a/services/libs/data-access-layer/src/evaluated-projects/types.ts +++ b/services/libs/data-access-layer/src/evaluated-projects/types.ts @@ -19,51 +19,32 @@ export interface IDbEvaluatedProject { updatedAt: string | null } -type EvaluatedProjectWritable = Pick< - IDbEvaluatedProject, - | 'projectCatalogId' - | 'evaluationStatus' - | 'evaluationScore' - | 'evaluation' - | 'evaluationReason' - | 'evaluatedAt' - | 'starsCount' - | 'forksCount' - | 'commitsCount' - | 'pullRequestsCount' - | 'issuesCount' - | 'onboarded' - | 'onboardedAt' -> - -export type IDbEvaluatedProjectCreate = Omit & { +// onboarded/onboardedAt/evaluatedAt are excluded: they are managed by dedicated helpers +// (markEvaluatedProjectAsEvaluated, markEvaluatedProjectAsOnboarded) and never written on insert. +export type IDbEvaluatedProjectCreate = { projectCatalogId: string -} & { evaluationStatus?: EvaluationStatus evaluationScore?: number evaluation?: Record evaluationReason?: string - evaluatedAt?: string starsCount?: number forksCount?: number commitsCount?: number pullRequestsCount?: number issuesCount?: number - onboarded?: boolean - onboardedAt?: string } export type IDbEvaluatedProjectUpdate = Partial<{ evaluationStatus: EvaluationStatus - evaluationScore: number - evaluation: Record - evaluationReason: string - evaluatedAt: string - starsCount: number - forksCount: number - commitsCount: number - pullRequestsCount: number - issuesCount: number + evaluationScore: number | null + evaluation: Record | null + evaluationReason: string | null + evaluatedAt: string | null + starsCount: number | null + forksCount: number | null + commitsCount: number | null + pullRequestsCount: number | null + issuesCount: number | null onboarded: boolean - onboardedAt: string + onboardedAt: string | null }> diff --git a/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts b/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts index 2e3b409579..5b94e2b8bc 100644 --- a/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts +++ b/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts @@ -8,7 +8,8 @@ const PROJECT_CATALOG_COLUMNS = [ 'projectSlug', 'repoName', 'repoUrl', - 'criticalityScore', + 'ossfCriticalityScore', + 'lfCriticalityScore', 'syncedAt', 'createdAt', 'updatedAt', @@ -95,7 +96,8 @@ export async function insertProjectCatalog( "projectSlug", "repoName", "repoUrl", - "criticalityScore", + "ossfCriticalityScore", + "lfCriticalityScore", "createdAt", "updatedAt" ) @@ -103,7 +105,8 @@ export async function insertProjectCatalog( $(projectSlug), $(repoName), $(repoUrl), - $(criticalityScore), + $(ossfCriticalityScore), + $(lfCriticalityScore), NOW(), NOW() ) @@ -113,7 +116,8 @@ export async function insertProjectCatalog( projectSlug: data.projectSlug, repoName: data.repoName, repoUrl: data.repoUrl, - criticalityScore: data.criticalityScore ?? null, + ossfCriticalityScore: data.ossfCriticalityScore ?? null, + lfCriticalityScore: data.lfCriticalityScore ?? null, }, ) } @@ -130,7 +134,8 @@ export async function bulkInsertProjectCatalog( projectSlug: item.projectSlug, repoName: item.repoName, repoUrl: item.repoUrl, - criticalityScore: item.criticalityScore ?? null, + ossfCriticalityScore: item.ossfCriticalityScore ?? null, + lfCriticalityScore: item.lfCriticalityScore ?? null, })) await qx.result( @@ -139,7 +144,8 @@ export async function bulkInsertProjectCatalog( "projectSlug", "repoName", "repoUrl", - "criticalityScore", + "ossfCriticalityScore", + "lfCriticalityScore", "createdAt", "updatedAt" ) @@ -147,14 +153,16 @@ export async function bulkInsertProjectCatalog( v."projectSlug", v."repoName", v."repoUrl", - v."criticalityScore"::double precision, + v."ossfCriticalityScore"::double precision, + v."lfCriticalityScore"::double precision, NOW(), NOW() FROM jsonb_to_recordset($(values)::jsonb) AS v( "projectSlug" text, "repoName" text, "repoUrl" text, - "criticalityScore" double precision + "ossfCriticalityScore" double precision, + "lfCriticalityScore" double precision ) `, { values: JSON.stringify(values) }, @@ -171,7 +179,8 @@ export async function upsertProjectCatalog( "projectSlug", "repoName", "repoUrl", - "criticalityScore", + "ossfCriticalityScore", + "lfCriticalityScore", "createdAt", "updatedAt" ) @@ -179,14 +188,16 @@ export async function upsertProjectCatalog( $(projectSlug), $(repoName), $(repoUrl), - $(criticalityScore), + $(ossfCriticalityScore), + $(lfCriticalityScore), NOW(), NOW() ) ON CONFLICT ("repoUrl") DO UPDATE SET "projectSlug" = EXCLUDED."projectSlug", "repoName" = EXCLUDED."repoName", - "criticalityScore" = EXCLUDED."criticalityScore", + "ossfCriticalityScore" = EXCLUDED."ossfCriticalityScore", + "lfCriticalityScore" = EXCLUDED."lfCriticalityScore", "updatedAt" = NOW() RETURNING ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} `, @@ -194,7 +205,8 @@ export async function upsertProjectCatalog( projectSlug: data.projectSlug, repoName: data.repoName, repoUrl: data.repoUrl, - criticalityScore: data.criticalityScore ?? null, + ossfCriticalityScore: data.ossfCriticalityScore ?? null, + lfCriticalityScore: data.lfCriticalityScore ?? null, }, ) } @@ -211,7 +223,8 @@ export async function bulkUpsertProjectCatalog( projectSlug: item.projectSlug, repoName: item.repoName, repoUrl: item.repoUrl, - criticalityScore: item.criticalityScore ?? null, + ossfCriticalityScore: item.ossfCriticalityScore ?? null, + lfCriticalityScore: item.lfCriticalityScore ?? null, })) await qx.result( @@ -220,7 +233,8 @@ export async function bulkUpsertProjectCatalog( "projectSlug", "repoName", "repoUrl", - "criticalityScore", + "ossfCriticalityScore", + "lfCriticalityScore", "createdAt", "updatedAt" ) @@ -228,19 +242,22 @@ export async function bulkUpsertProjectCatalog( v."projectSlug", v."repoName", v."repoUrl", - v."criticalityScore"::double precision, + v."ossfCriticalityScore"::double precision, + v."lfCriticalityScore"::double precision, NOW(), NOW() FROM jsonb_to_recordset($(values)::jsonb) AS v( "projectSlug" text, "repoName" text, "repoUrl" text, - "criticalityScore" double precision + "ossfCriticalityScore" double precision, + "lfCriticalityScore" double precision ) ON CONFLICT ("repoUrl") DO UPDATE SET "projectSlug" = EXCLUDED."projectSlug", "repoName" = EXCLUDED."repoName", - "criticalityScore" = EXCLUDED."criticalityScore", + "ossfCriticalityScore" = EXCLUDED."ossfCriticalityScore", + "lfCriticalityScore" = EXCLUDED."lfCriticalityScore", "updatedAt" = NOW() `, { values: JSON.stringify(values) }, @@ -267,9 +284,13 @@ export async function updateProjectCatalog( setClauses.push('"repoUrl" = $(repoUrl)') params.repoUrl = data.repoUrl } - if (data.criticalityScore !== undefined) { - setClauses.push('"criticalityScore" = $(criticalityScore)') - params.criticalityScore = data.criticalityScore + if (data.ossfCriticalityScore !== undefined) { + setClauses.push('"ossfCriticalityScore" = $(ossfCriticalityScore)') + params.ossfCriticalityScore = data.ossfCriticalityScore + } + if (data.lfCriticalityScore !== undefined) { + setClauses.push('"lfCriticalityScore" = $(lfCriticalityScore)') + params.lfCriticalityScore = data.lfCriticalityScore } if (data.syncedAt !== undefined) { setClauses.push('"syncedAt" = $(syncedAt)') diff --git a/services/libs/data-access-layer/src/project-catalog/types.ts b/services/libs/data-access-layer/src/project-catalog/types.ts index 382527f57f..8cbb39a310 100644 --- a/services/libs/data-access-layer/src/project-catalog/types.ts +++ b/services/libs/data-access-layer/src/project-catalog/types.ts @@ -3,7 +3,8 @@ export interface IDbProjectCatalog { projectSlug: string repoName: string repoUrl: string - criticalityScore: number | null + ossfCriticalityScore: number | null + lfCriticalityScore: number | null syncedAt: string | null createdAt: string | null updatedAt: string | null @@ -11,11 +12,15 @@ export interface IDbProjectCatalog { type ProjectCatalogWritable = Pick< IDbProjectCatalog, - 'projectSlug' | 'repoName' | 'repoUrl' | 'criticalityScore' + 'projectSlug' | 'repoName' | 'repoUrl' | 'ossfCriticalityScore' | 'lfCriticalityScore' > -export type IDbProjectCatalogCreate = Omit & { - criticalityScore?: number +export type IDbProjectCatalogCreate = Omit< + ProjectCatalogWritable, + 'ossfCriticalityScore' | 'lfCriticalityScore' +> & { + ossfCriticalityScore?: number + lfCriticalityScore?: number } export type IDbProjectCatalogUpdate = Partial & { From e9852f171d41b3b7de726c6868929ed097dcbc82 Mon Sep 17 00:00:00 2001 From: Umberto Sgueglia Date: Thu, 26 Mar 2026 10:15:29 +0100 Subject: [PATCH 3/3] feat: add ossf data fetcher (CM-952) (#3839) Signed-off-by: Umberto Sgueglia --- pnpm-lock.yaml | 3 + .../README.md | 73 +++++++ .../package.json | 3 +- .../src/activities.ts | 4 +- .../src/activities/activities.ts | 121 ++++++++++- .../src/main.ts | 2 +- .../schedules/scheduleProjectsDiscovery.ts | 11 +- .../sources/lf-criticality-score/source.ts | 188 ++++++++++++++++++ .../ossf-criticality-score/bucketClient.ts | 96 +++++++++ .../sources/ossf-criticality-score/source.ts | 75 +++++++ .../src/sources/registry.ts | 21 ++ .../src/sources/types.ts | 27 +++ .../src/workflows/discoverProjects.ts | 47 ++++- .../src/project-catalog/projectCatalog.ts | 8 +- 14 files changed, 659 insertions(+), 20 deletions(-) create mode 100644 services/apps/automatic_projects_discovery_worker/README.md create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/lf-criticality-score/source.ts create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/registry.ts create mode 100644 services/apps/automatic_projects_discovery_worker/src/sources/types.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ef062f9f07..616fea2b48 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -566,6 +566,9 @@ importers: '@temporalio/workflow': specifier: ~1.11.8 version: 1.11.8 + csv-parse: + specifier: ^5.5.6 + version: 5.5.6 tsx: specifier: ^4.7.1 version: 4.7.3 diff --git a/services/apps/automatic_projects_discovery_worker/README.md b/services/apps/automatic_projects_discovery_worker/README.md new file mode 100644 index 0000000000..77623513cc --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/README.md @@ -0,0 +1,73 @@ +# Automatic Projects Discovery Worker + +Temporal worker that discovers open-source projects from external data sources and writes them to the `projectCatalog` table. + +## Architecture + +### Source abstraction + +Every data source implements the `IDiscoverySource` interface (`src/sources/types.ts`): + +| Method | Purpose | +| ----------------------------- | --------------------------------------------------------------------------- | +| `listAvailableDatasets()` | Returns available dataset snapshots, sorted newest-first | +| `fetchDatasetStream(dataset)` | Returns a readable stream for the dataset (e.g. HTTP response) | +| `parseRow(rawRow)` | Converts a raw CSV/JSON row into a `IDiscoverySourceRow`, or `null` to skip | + +Sources are registered in `src/sources/registry.ts` as a simple name → factory map. + +**To add a new source:** create a class implementing `IDiscoverySource`, then add one line to the registry. + +### Current sources + +| Name | Folder | Description | +| ------------------------ | ------------------------------------- | ------------------------------------------------------------------------------------ | +| `ossf-criticality-score` | `src/sources/ossf-criticality-score/` | OSSF Criticality Score snapshots from a public GCS bucket (~750K repos per snapshot) | + +### Workflow + +``` +discoverProjects({ mode: 'incremental' | 'full' }) + │ + ├─ Activity: listDatasets(sourceName) + │ → returns dataset descriptors sorted newest-first + │ + ├─ Selection: incremental → latest only, full → all datasets + │ + └─ For each dataset: + └─ Activity: processDataset(sourceName, dataset) + → HTTP stream → csv-parse → batches of 5000 → bulkUpsertProjectCatalog +``` + +### Timeouts + +| Activity | startToCloseTimeout | retries | +| ------------------ | ------------------- | ------- | +| `listDatasets` | 2 min | 3 | +| `processDataset` | 30 min | 3 | +| Workflow execution | 2 hours | 3 | + +### Schedule + +Runs daily at midnight via Temporal cron (`0 0 * * *`). + +## File structure + +``` +src/ +├── main.ts # Service bootstrap (postgres enabled) +├── activities.ts # Barrel re-export +├── workflows.ts # Barrel re-export +├── activities/ +│ └── activities.ts # listDatasets, processDataset +├── workflows/ +│ └── discoverProjects.ts # Orchestration with mode selection +├── schedules/ +│ └── scheduleProjectsDiscovery.ts # Temporal cron schedule +└── sources/ + ├── types.ts # IDiscoverySource, IDatasetDescriptor + ├── registry.ts # Source factory map + └── ossf-criticality-score/ + ├── source.ts # IDiscoverySource implementation + └── bucketClient.ts # GCS public bucket HTTP client +``` diff --git a/services/apps/automatic_projects_discovery_worker/package.json b/services/apps/automatic_projects_discovery_worker/package.json index 1c79505f89..022c1a6297 100644 --- a/services/apps/automatic_projects_discovery_worker/package.json +++ b/services/apps/automatic_projects_discovery_worker/package.json @@ -2,7 +2,7 @@ "name": "@crowd/automatic-projects-discovery-worker", "scripts": { "start": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx src/main.ts", - "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", + "start:debug:local": "set -a && . ../../../backend/.env.dist.local && . ../../../backend/.env.override.local && set +a && CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker tsx --inspect=0.0.0.0:9232 src/main.ts", "start:debug": "CROWD_TEMPORAL_TASKQUEUE=automatic-projects-discovery SERVICE=automatic-projects-discovery-worker LOG_LEVEL=trace tsx --inspect=0.0.0.0:9232 src/main.ts", "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", @@ -24,6 +24,7 @@ "@temporalio/activity": "~1.11.8", "@temporalio/client": "~1.11.8", "@temporalio/workflow": "~1.11.8", + "csv-parse": "^5.5.6", "tsx": "^4.7.1", "typescript": "^5.6.3" }, diff --git a/services/apps/automatic_projects_discovery_worker/src/activities.ts b/services/apps/automatic_projects_discovery_worker/src/activities.ts index 3662234550..a2c8cf9935 100644 --- a/services/apps/automatic_projects_discovery_worker/src/activities.ts +++ b/services/apps/automatic_projects_discovery_worker/src/activities.ts @@ -1 +1,3 @@ -export * from './activities/activities' +import { listDatasets, listSources, processDataset } from './activities/activities' + +export { listDatasets, listSources, processDataset } diff --git a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts index 3aea7f8200..fbbe6c28a8 100644 --- a/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts +++ b/services/apps/automatic_projects_discovery_worker/src/activities/activities.ts @@ -1,7 +1,124 @@ +import { parse } from 'csv-parse' + +import { bulkUpsertProjectCatalog } from '@crowd/data-access-layer' +import { IDbProjectCatalogCreate } from '@crowd/data-access-layer/src/project-catalog/types' +import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' import { getServiceLogger } from '@crowd/logging' +import { svc } from '../main' +import { getAvailableSourceNames, getSource } from '../sources/registry' +import { IDatasetDescriptor } from '../sources/types' + const log = getServiceLogger() -export async function logDiscoveryRun(): Promise { - log.info('Automatic projects discovery workflow executed successfully.') +const BATCH_SIZE = 5000 + +export async function listSources(): Promise { + return getAvailableSourceNames() +} + +export async function listDatasets(sourceName: string): Promise { + const source = getSource(sourceName) + const datasets = await source.listAvailableDatasets() + + log.info({ sourceName, count: datasets.length, newest: datasets[0]?.id }, 'Datasets listed.') + + return datasets +} + +export async function processDataset( + sourceName: string, + dataset: IDatasetDescriptor, +): Promise { + const qx = pgpQx(svc.postgres.writer.connection()) + const startTime = Date.now() + + log.info({ sourceName, datasetId: dataset.id, url: dataset.url }, 'Processing dataset...') + + const source = getSource(sourceName) + const stream = await source.fetchDatasetStream(dataset) + + // For CSV sources: pipe through csv-parse to get Record objects. + // For JSON sources: the stream already emits pre-parsed objects in object mode. + const records = + source.format === 'json' + ? stream + : stream.pipe( + parse({ + columns: true, + skip_empty_lines: true, + trim: true, + }), + ) + + // pipe() does not forward source errors to the destination automatically, so we + // destroy records explicitly — this surfaces the error in the for-await loop and + // lets Temporal mark the activity as failed and retry it. + stream.on('error', (err: Error) => { + log.error({ datasetId: dataset.id, error: err.message }, 'Stream error.') + records.destroy(err) + }) + + if (source.format !== 'json') { + const csvRecords = records as ReturnType + csvRecords.on('error', (err) => { + log.error({ datasetId: dataset.id, error: err.message }, 'CSV parser error.') + }) + } + + let batch: IDbProjectCatalogCreate[] = [] + let totalProcessed = 0 + let totalSkipped = 0 + let batchNumber = 0 + let totalRows = 0 + + for await (const rawRow of records) { + totalRows++ + + const parsed = source.parseRow(rawRow as Record) + if (!parsed) { + totalSkipped++ + continue + } + + batch.push({ + projectSlug: parsed.projectSlug, + repoName: parsed.repoName, + repoUrl: parsed.repoUrl, + ossfCriticalityScore: parsed.ossfCriticalityScore, + lfCriticalityScore: parsed.lfCriticalityScore, + }) + + if (batch.length >= BATCH_SIZE) { + batchNumber++ + + await bulkUpsertProjectCatalog(qx, batch) + totalProcessed += batch.length + batch = [] + + log.info({ totalProcessed, batchNumber, datasetId: dataset.id }, 'Batch upserted.') + } + } + + // Flush remaining rows that didn't fill a complete batch + if (batch.length > 0) { + batchNumber++ + await bulkUpsertProjectCatalog(qx, batch) + totalProcessed += batch.length + } + + const elapsedSeconds = ((Date.now() - startTime) / 1000).toFixed(1) + + log.info( + { + sourceName, + datasetId: dataset.id, + totalRows, + totalProcessed, + totalSkipped, + totalBatches: batchNumber, + elapsedSeconds, + }, + 'Dataset processing complete.', + ) } diff --git a/services/apps/automatic_projects_discovery_worker/src/main.ts b/services/apps/automatic_projects_discovery_worker/src/main.ts index 326c3a361a..0345c420f8 100644 --- a/services/apps/automatic_projects_discovery_worker/src/main.ts +++ b/services/apps/automatic_projects_discovery_worker/src/main.ts @@ -18,7 +18,7 @@ const config: Config = { const options: Options = { postgres: { - enabled: false, + enabled: true, }, opensearch: { enabled: false, diff --git a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts index 847c2e4ce9..b173126a78 100644 --- a/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts +++ b/services/apps/automatic_projects_discovery_worker/src/schedules/scheduleProjectsDiscovery.ts @@ -3,18 +3,15 @@ import { ScheduleAlreadyRunning, ScheduleOverlapPolicy } from '@temporalio/clien import { svc } from '../main' import { discoverProjects } from '../workflows' -const DEFAULT_CRON = '0 2 * * *' // Daily at 2:00 AM - export const scheduleProjectsDiscovery = async () => { - const cronExpression = process.env.CROWD_AUTOMATIC_PROJECTS_DISCOVERY_CRON || DEFAULT_CRON - - svc.log.info(`Scheduling projects discovery with cron: ${cronExpression}`) + svc.log.info(`Scheduling projects discovery`) try { await svc.temporal.schedule.create({ scheduleId: 'automaticProjectsDiscovery', spec: { - cronExpressions: [cronExpression], + // Run every day at midnight + cronExpressions: ['0 0 * * *'], }, policies: { overlap: ScheduleOverlapPolicy.SKIP, @@ -24,6 +21,8 @@ export const scheduleProjectsDiscovery = async () => { type: 'startWorkflow', workflowType: discoverProjects, taskQueue: 'automatic-projects-discovery', + args: [{ mode: 'incremental' as const }], + workflowExecutionTimeout: '2 hours', retry: { initialInterval: '15 seconds', backoffCoefficient: 2, diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/lf-criticality-score/source.ts b/services/apps/automatic_projects_discovery_worker/src/sources/lf-criticality-score/source.ts new file mode 100644 index 0000000000..4738318454 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/lf-criticality-score/source.ts @@ -0,0 +1,188 @@ +import http from 'http' +import https from 'https' +import { Readable } from 'stream' + +import { getServiceLogger } from '@crowd/logging' + +import { IDatasetDescriptor, IDiscoverySource, IDiscoverySourceRow } from '../types' + +const log = getServiceLogger() + +const DEFAULT_API_URL = 'https://hypervascular-nonduplicative-vern.ngrok-free.dev' +const PAGE_SIZE = 100 + +interface LfApiResponse { + page: number + pageSize: number + total: number + totalPages: number + data: LfApiRow[] +} + +interface LfApiRow { + runDate: string + repoUrl: string + owner: string + repoName: string + contributors: number + organizations: number + sizeSloc: number + lastUpdated: number + age: number + commitFreq: number + score: number +} + +function getApiBaseUrl(): string { + return (process.env.LF_CRITICALITY_SCORE_API_URL ?? DEFAULT_API_URL).replace(/\/$/, '') +} + +async function fetchPage( + baseUrl: string, + startDate: string, + endDate: string, + page: number, +): Promise { + const url = `${baseUrl}/projects/scores?startDate=${startDate}&endDate=${endDate}&page=${page}&pageSize=${PAGE_SIZE}` + + return new Promise((resolve, reject) => { + const client = url.startsWith('https://') ? https : http + + const req = client.get(url, (res) => { + if (res.statusCode !== 200) { + reject(new Error(`LF Criticality Score API returned status ${res.statusCode} for ${url}`)) + res.resume() + return + } + + const chunks: Uint8Array[] = [] + res.on('data', (chunk: Uint8Array) => chunks.push(chunk)) + res.on('end', () => { + try { + resolve(JSON.parse(Buffer.concat(chunks).toString('utf8')) as LfApiResponse) + } catch (err) { + reject(new Error(`Failed to parse LF Criticality Score API response: ${err}`)) + } + }) + res.on('error', reject) + }) + + req.on('error', reject) + req.end() + }) +} + +/** + * Generates the first day and last day of a given month. + * monthOffset = 0 → current month, -1 → previous month, etc. + */ +function monthRange(monthOffset: number): { startDate: string; endDate: string } { + const now = new Date() + const year = now.getUTCFullYear() + const month = now.getUTCMonth() + monthOffset // can be negative; Date handles rollover + + const first = new Date(Date.UTC(year, month, 1)) + const last = new Date(Date.UTC(year, month + 1, 0)) // last day of month + + const pad = (n: number) => String(n).padStart(2, '0') + const fmt = (d: Date) => + `${d.getUTCFullYear()}-${pad(d.getUTCMonth() + 1)}-${pad(d.getUTCDate())}` + + return { startDate: fmt(first), endDate: fmt(last) } +} + +export class LfCriticalityScoreSource implements IDiscoverySource { + public readonly name = 'lf-criticality-score' + public readonly format = 'json' as const + + async listAvailableDatasets(): Promise { + const baseUrl = getApiBaseUrl() + + // Return one dataset per month for the last 12 months (newest first) + const datasets: IDatasetDescriptor[] = [] + + for (let offset = 0; offset >= -11; offset--) { + const { startDate, endDate } = monthRange(offset) + const id = startDate.slice(0, 7) // e.g. "2026-02" + + datasets.push({ + id, + date: startDate, + url: `${baseUrl}/projects/scores?startDate=${startDate}&endDate=${endDate}`, + }) + } + + return datasets + } + + /** + * Returns an object-mode Readable that fetches all pages from the API + * and pushes each row as a plain object. Activities.ts iterates this + * directly (no csv-parse) because format === 'json'. + */ + async fetchDatasetStream(dataset: IDatasetDescriptor): Promise { + const baseUrl = getApiBaseUrl() + + // Extract startDate and endDate from the stored URL + const parsed = new URL(dataset.url) + const startDate = parsed.searchParams.get('startDate') ?? '' + const endDate = parsed.searchParams.get('endDate') ?? '' + + async function* pages() { + let page = 1 + let totalPages = 1 + + do { + const response = await fetchPage(baseUrl, startDate, endDate, page) + totalPages = response.totalPages + + for (const row of response.data) { + yield row + } + + log.debug( + { datasetId: dataset.id, page, totalPages, rowsInPage: response.data.length }, + 'LF Criticality Score page fetched.', + ) + + page++ + } while (page <= totalPages) + } + + return Readable.from(pages(), { objectMode: true }) + } + + parseRow(rawRow: Record): IDiscoverySourceRow | null { + const repoUrl = rawRow['repoUrl'] as string | undefined + if (!repoUrl) { + return null + } + + let repoName = '' + let projectSlug = '' + + try { + const urlPath = new URL(repoUrl).pathname.replace(/^\//, '').replace(/\/$/, '') + projectSlug = urlPath + repoName = urlPath.split('/').pop() || '' + } catch { + const parts = repoUrl.replace(/\/$/, '').split('/') + projectSlug = parts.slice(-2).join('/') + repoName = parts.pop() || '' + } + + if (!projectSlug || !repoName) { + return null + } + + const score = rawRow['score'] + const lfCriticalityScore = typeof score === 'number' ? score : parseFloat(score as string) + + return { + projectSlug, + repoName, + repoUrl, + lfCriticalityScore: Number.isNaN(lfCriticalityScore) ? undefined : lfCriticalityScore, + } + } +} diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts new file mode 100644 index 0000000000..71b2066ae7 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/bucketClient.ts @@ -0,0 +1,96 @@ +import https from 'https' + +const BUCKET_URL = 'https://commondatastorage.googleapis.com/ossf-criticality-score' + +function httpsGet(url: string): Promise { + return new Promise((resolve, reject) => { + https + .get(url, (res) => { + if ( + res.statusCode && + res.statusCode >= 300 && + res.statusCode < 400 && + res.headers.location + ) { + httpsGet(res.headers.location).then(resolve, reject) + return + } + + if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { + reject(new Error(`HTTP ${res.statusCode} for ${url}`)) + return + } + + const chunks: Uint8Array[] = [] + res.on('data', (chunk: Uint8Array) => chunks.push(chunk)) + res.on('end', () => resolve(Buffer.concat(chunks).toString('utf-8'))) + res.on('error', reject) + }) + .on('error', reject) + }) +} + +function extractPrefixes(xml: string): string[] { + const prefixes: string[] = [] + const regex = /([^<]+)<\/Prefix>/g + let match: RegExpExecArray | null + + while ((match = regex.exec(xml)) !== null) { + prefixes.push(match[1]) + } + + return prefixes +} + +/** + * List all date prefixes in the OSSF Criticality Score bucket. + * Returns prefixes like ['2024.07.01/', '2024.07.08/', ...] + */ +export async function listDatePrefixes(): Promise { + const xml = await httpsGet(`${BUCKET_URL}?delimiter=/`) + return extractPrefixes(xml).filter((p) => /^\d{4}\.\d{2}\.\d{2}\/$/.test(p)) +} + +/** + * List time sub-prefixes for a given date prefix. + * E.g., for '2024.07.01/' returns ['2024.07.01/060102/', ...] + */ +export async function listTimePrefixes(datePrefix: string): Promise { + const xml = await httpsGet(`${BUCKET_URL}?prefix=${encodeURIComponent(datePrefix)}&delimiter=/`) + return extractPrefixes(xml).filter((p) => p !== datePrefix) +} + +/** + * Build the full URL for the all.csv file within a given dataset prefix. + */ +export function buildDatasetUrl(prefix: string): string { + return `${BUCKET_URL}/${prefix}all.csv` +} + +/** + * Get an HTTPS readable stream for a given URL. + */ +export function getHttpsStream(url: string): Promise { + return new Promise((resolve, reject) => { + https + .get(url, (res) => { + if ( + res.statusCode && + res.statusCode >= 300 && + res.statusCode < 400 && + res.headers.location + ) { + getHttpsStream(res.headers.location).then(resolve, reject) + return + } + + if (res.statusCode && (res.statusCode < 200 || res.statusCode >= 300)) { + reject(new Error(`HTTP ${res.statusCode} for ${url}`)) + return + } + + resolve(res) + }) + .on('error', reject) + }) +} diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts new file mode 100644 index 0000000000..8ee20fb602 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/ossf-criticality-score/source.ts @@ -0,0 +1,75 @@ +import { Readable } from 'stream' + +import { IDatasetDescriptor, IDiscoverySource, IDiscoverySourceRow } from '../types' + +import { buildDatasetUrl, getHttpsStream, listDatePrefixes, listTimePrefixes } from './bucketClient' + +export class OssfCriticalityScoreSource implements IDiscoverySource { + public readonly name = 'ossf-criticality-score' + + async listAvailableDatasets(): Promise { + const datePrefixes = await listDatePrefixes() + + const datasets: IDatasetDescriptor[] = [] + + for (const datePrefix of datePrefixes) { + const timePrefixes = await listTimePrefixes(datePrefix) + + for (const timePrefix of timePrefixes) { + const date = datePrefix.replace(/\/$/, '') + const url = buildDatasetUrl(timePrefix) + + datasets.push({ + id: timePrefix.replace(/\/$/, ''), + date, + url, + }) + } + } + + // Sort newest-first by date + datasets.sort((a, b) => b.date.localeCompare(a.date)) + + return datasets + } + + async fetchDatasetStream(dataset: IDatasetDescriptor): Promise { + const stream = await getHttpsStream(dataset.url) + return stream as Readable + } + + // CSV columns use dot notation (e.g. "repo.url", "default_score") + parseRow(rawRow: Record): IDiscoverySourceRow | null { + const repoUrl = rawRow['repo.url'] as string | undefined + if (!repoUrl) { + return null + } + + let repoName = '' + let projectSlug = '' + + try { + const urlPath = new URL(repoUrl).pathname.replace(/^\//, '').replace(/\/$/, '') + projectSlug = urlPath + repoName = urlPath.split('/').pop() || '' + } catch { + const parts = repoUrl.replace(/\/$/, '').split('/') + projectSlug = parts.slice(-2).join('/') + repoName = parts.pop() || '' + } + + if (!projectSlug || !repoName) { + return null + } + + const scoreRaw = rawRow['default_score'] + const ossfCriticalityScore = scoreRaw ? parseFloat(scoreRaw as string) : undefined + + return { + projectSlug, + repoName, + repoUrl, + ossfCriticalityScore: Number.isNaN(ossfCriticalityScore) ? undefined : ossfCriticalityScore, + } + } +} diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts b/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts new file mode 100644 index 0000000000..1c7af148a3 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/registry.ts @@ -0,0 +1,21 @@ +import { LfCriticalityScoreSource } from './lf-criticality-score/source' +import { OssfCriticalityScoreSource } from './ossf-criticality-score/source' +import { IDiscoverySource } from './types' + +// To add a new source: instantiate it here. +const sources: IDiscoverySource[] = [ + new OssfCriticalityScoreSource(), + new LfCriticalityScoreSource(), +] + +export function getSource(name: string): IDiscoverySource { + const source = sources.find((s) => s.name === name) + if (!source) { + throw new Error(`Unknown source: ${name}. Available: ${sources.map((s) => s.name).join(', ')}`) + } + return source +} + +export function getAvailableSourceNames(): string[] { + return sources.map((s) => s.name) +} diff --git a/services/apps/automatic_projects_discovery_worker/src/sources/types.ts b/services/apps/automatic_projects_discovery_worker/src/sources/types.ts new file mode 100644 index 0000000000..9b386b5da7 --- /dev/null +++ b/services/apps/automatic_projects_discovery_worker/src/sources/types.ts @@ -0,0 +1,27 @@ +import { Readable } from 'stream' + +export interface IDatasetDescriptor { + id: string + date: string + url: string +} + +export interface IDiscoverySource { + name: string + /** + * 'csv' (default): fetchDatasetStream returns a raw text stream, piped through csv-parse. + * 'json': fetchDatasetStream returns an object-mode Readable that emits pre-parsed records. + */ + format?: 'csv' | 'json' + listAvailableDatasets(): Promise + fetchDatasetStream(dataset: IDatasetDescriptor): Promise + parseRow(rawRow: Record): IDiscoverySourceRow | null +} + +export interface IDiscoverySourceRow { + projectSlug: string + repoName: string + repoUrl: string + ossfCriticalityScore?: number + lfCriticalityScore?: number +} diff --git a/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts b/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts index f43a9b5a12..00856493d4 100644 --- a/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts +++ b/services/apps/automatic_projects_discovery_worker/src/workflows/discoverProjects.ts @@ -1,11 +1,48 @@ -import { proxyActivities } from '@temporalio/workflow' +import { log, proxyActivities } from '@temporalio/workflow' import type * as activities from '../activities' -const activity = proxyActivities({ - startToCloseTimeout: '1 minutes', +const listActivities = proxyActivities({ + startToCloseTimeout: '2 minutes', + retry: { maximumAttempts: 3 }, }) -export async function discoverProjects(): Promise { - await activity.logDiscoveryRun() +// processDataset is long-running (10-20 min for ~119MB / ~750K rows). +const processActivities = proxyActivities({ + startToCloseTimeout: '30 minutes', + retry: { maximumAttempts: 3 }, +}) + +export async function discoverProjects( + input: { mode: 'incremental' | 'full' } = { mode: 'incremental' }, +): Promise { + const { mode } = input + + const sourceNames = await listActivities.listSources() + + for (const sourceName of sourceNames) { + const allDatasets = await listActivities.listDatasets(sourceName) + + if (allDatasets.length === 0) { + log.warn(`No datasets found for source "${sourceName}". Skipping.`) + continue + } + + // allDatasets is sorted newest-first. + // Incremental: process only the latest snapshot. + // Full: process oldest-first so the newest data wins the final upsert. + const datasets = mode === 'incremental' ? [allDatasets[0]] : [...allDatasets].reverse() + + log.info( + `source=${sourceName} mode=${mode}, ${datasets.length}/${allDatasets.length} datasets to process.`, + ) + + for (let i = 0; i < datasets.length; i++) { + const dataset = datasets[i] + log.info(`[${sourceName}] Processing dataset ${i + 1}/${datasets.length}: ${dataset.id}`) + await processActivities.processDataset(sourceName, dataset) + } + + log.info(`[${sourceName}] Done. Processed ${datasets.length} dataset(s).`) + } } diff --git a/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts b/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts index 5b94e2b8bc..b951e11317 100644 --- a/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts +++ b/services/libs/data-access-layer/src/project-catalog/projectCatalog.ts @@ -196,8 +196,8 @@ export async function upsertProjectCatalog( ON CONFLICT ("repoUrl") DO UPDATE SET "projectSlug" = EXCLUDED."projectSlug", "repoName" = EXCLUDED."repoName", - "ossfCriticalityScore" = EXCLUDED."ossfCriticalityScore", - "lfCriticalityScore" = EXCLUDED."lfCriticalityScore", + "ossfCriticalityScore" = COALESCE(EXCLUDED."ossfCriticalityScore", "projectCatalog"."ossfCriticalityScore"), + "lfCriticalityScore" = COALESCE(EXCLUDED."lfCriticalityScore", "projectCatalog"."lfCriticalityScore"), "updatedAt" = NOW() RETURNING ${prepareSelectColumns(PROJECT_CATALOG_COLUMNS)} `, @@ -256,8 +256,8 @@ export async function bulkUpsertProjectCatalog( ON CONFLICT ("repoUrl") DO UPDATE SET "projectSlug" = EXCLUDED."projectSlug", "repoName" = EXCLUDED."repoName", - "ossfCriticalityScore" = EXCLUDED."ossfCriticalityScore", - "lfCriticalityScore" = EXCLUDED."lfCriticalityScore", + "ossfCriticalityScore" = COALESCE(EXCLUDED."ossfCriticalityScore", "projectCatalog"."ossfCriticalityScore"), + "lfCriticalityScore" = COALESCE(EXCLUDED."lfCriticalityScore", "projectCatalog"."lfCriticalityScore"), "updatedAt" = NOW() `, { values: JSON.stringify(values) },