From 126bb563b20622dd11a86989f1648e24bd2641b1 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Thu, 12 Feb 2026 15:11:04 -0500 Subject: [PATCH 1/2] Simplify polling test to remove handler-count threshold cases --- src/taskSchema.js | 11 +++++++++-- test/task.test.js | 28 ++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/src/taskSchema.js b/src/taskSchema.js index a8fb8be..c907840 100644 --- a/src/taskSchema.js +++ b/src/taskSchema.js @@ -259,6 +259,8 @@ taskSchema.statics.poll = async function poll(opts) { const parallel = (opts && opts.parallel) || 1; const workerName = opts?.workerName; const getCurrentTime = opts?.getCurrentTime; + const handlers = this._handlers || new Map(); + const registeredHandlerNames = Array.from(handlers.keys()); const additionalParams = workerName ? { workerName } : {}; @@ -266,8 +268,13 @@ taskSchema.statics.poll = async function poll(opts) { const tasksInProgress = []; for (let i = 0; i < parallel; ++i) { const now = typeof getCurrentTime === 'function' ? getCurrentTime() : time.now(); + const filter = { + status: 'pending', + scheduledAt: { $lte: now }, + name: { $in: registeredHandlerNames } + }; const task = await this.findOneAndUpdate( - { status: 'pending', scheduledAt: { $lte: now } }, + filter, { status: 'in_progress', startedRunningAt: now, @@ -293,7 +300,7 @@ taskSchema.statics.poll = async function poll(opts) { }; taskSchema.statics.execute = async function(task, options = {}) { - if (!this._handlers.has(task.name)) { + if (!this._handlers || !this._handlers.has(task.name)) { return null; } diff --git a/test/task.test.js b/test/task.test.js index 962855f..06ae2ac 100644 --- a/test/task.test.js +++ b/test/task.test.js @@ -199,6 +199,34 @@ describe('Task', function() { cancel(); }); + + it('poll() filters by handler names', async function() { + let called = 0; + Task.registerHandler('handledJob', async () => { + ++called; + return 'ok'; + }); + + await Task.schedule('unhandledJob', time.now(), { skip: true }); + const handledTask = await Task.schedule('handledJob', time.now(), { run: true }); + + await Task.poll(); + + const reloadedHandledTask = await Task.findById(handledTask._id); + assert.ok(reloadedHandledTask); + assert.equal(reloadedHandledTask.status, 'succeeded'); + assert.equal(called, 1); + + const unhandledTask = await Task.findOne({ name: 'unhandledJob' }); + assert.ok(unhandledTask); + assert.equal(unhandledTask.status, 'pending'); + assert.strictEqual(unhandledTask.startedRunningAt, null); + assert.strictEqual(unhandledTask.timeoutAt, null); + assert.strictEqual(unhandledTask.workerName, null); + }); + + + it('allows startPolling() to use getCurrentTime()', async function() { let resolve; const p = new Promise((_resolve) => { From 94a314c172ea68aaf530bb1ca797902958ac5d35 Mon Sep 17 00:00:00 2001 From: Valeri Karpov Date: Thu, 12 Feb 2026 15:29:58 -0500 Subject: [PATCH 2/2] fix tests --- test/task.test.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/task.test.js b/test/task.test.js index 06ae2ac..5d25295 100644 --- a/test/task.test.js +++ b/test/task.test.js @@ -220,9 +220,9 @@ describe('Task', function() { const unhandledTask = await Task.findOne({ name: 'unhandledJob' }); assert.ok(unhandledTask); assert.equal(unhandledTask.status, 'pending'); - assert.strictEqual(unhandledTask.startedRunningAt, null); - assert.strictEqual(unhandledTask.timeoutAt, null); - assert.strictEqual(unhandledTask.workerName, null); + assert.strictEqual(unhandledTask.startedRunningAt, undefined); + assert.strictEqual(unhandledTask.timeoutAt, undefined); + assert.strictEqual(unhandledTask.workerName, undefined); });