From 00ef4c57d57046c5018a3afe532257d2db841a48 Mon Sep 17 00:00:00 2001 From: Marvin Hagemeister Date: Thu, 23 Jul 2020 13:21:36 +0200 Subject: [PATCH] Support locking random item from resource pool --- external_locking.js | 5 +- locking.js | 126 ++++++++++++++++++++++++++++----- runner.js | 13 ++-- tests/selftest_locking_pool.js | 48 +++++++++++++ 4 files changed, 169 insertions(+), 23 deletions(-) create mode 100644 tests/selftest_locking_pool.js diff --git a/external_locking.js b/external_locking.js index cd5059cb..44c31afd 100644 --- a/external_locking.js +++ b/external_locking.js @@ -155,7 +155,10 @@ function prepare(config) { * @param {import('./runner').RunnerState} state */ async function refresh(state) { - const {config, locks} = state; + const {config} = state; + assert(state.locking); + + const {locks} = state.locking; assert(locks); if (locks.size > 0) { const locksArray = Array.from(locks); diff --git a/locking.js b/locking.js index 05928304..98801aa4 100644 --- a/locking.js +++ b/locking.js @@ -4,6 +4,13 @@ const output = require('./output'); const {wait} = require('./utils'); const external_locking = require('./external_locking'); +/** + * @typedef {{resource: string, client: string, expireIn: number}} Lock + */ + +/** + * @typedef {{locks: Set, by_task: Map>, pending: Set}} LockingState + */ function annotateTaskResources(config, task) { if (config.no_locking) { @@ -27,7 +34,11 @@ function annotateTaskResources(config, task) { async function init(state) { assert(state); assert(state.config); - state.locks = new Set(); + state.locking = { + locks: new Set(), + by_task: new Map(), + pending: new Set(), + }; external_locking.init(state); } @@ -37,10 +48,9 @@ async function init(state) { */ async function shutdown(config, state) { external_locking.shutdown(state); - state.locks.length = 0; assert.equal( - state.locks.size, 0, - `Still got some locks on shutdown: ${Array.from(state.locks).sort().join(',')}`); + state.locking.locks.size, 0, + `Still got some locks on shutdown: ${Array.from(state.locking.locks).sort().join(',')}`); } /** @@ -58,8 +68,9 @@ async function acquire(config, state, task) { return true; } - const {locks} = state; - assert(locks); + assert(state.locking); + + const {locks, by_task, pending} = state.locking; if (task.resources.some(r => locks.has(r))) { if (config.locking_verbose) { const failed = task.resources.filter(r => locks.has(r)); @@ -70,7 +81,11 @@ async function acquire(config, state, task) { } if (! config.no_external_locking) { + task.resources.forEach(r => pending.add(r)); try { + // TODO: There is no guarantee that all locking attempts are successful. + // I have the suspicion that only some of those may be acquired and if + // everybody has a lock someone else needs, we will starve each other indefinitely const acquireRes = await external_locking.externalAcquire(config, task.resources, 40000); if (acquireRes !== true) { if (config.locking_verbose) { @@ -83,11 +98,18 @@ async function acquire(config, state, task) { } catch(e) { output.log(config, `[exlocking] Failed to acquire locks for ${task.id}: ${e.stack}`); return false; + } finally { + task.resources.forEach(r => pending.delete(r)); } } + if (! by_task.has(task.id)) { + by_task.set(task.id, new Set()); + } + const taskLocks = by_task.get(task.id); for (const r of task.resources) { locks.add(r); + taskLocks.add(r); } if (config.locking_verbose) { output.log(config, `[locking] ${task.id}: Acquired ${task.resources.join(',')}`); @@ -95,6 +117,17 @@ async function acquire(config, state, task) { return true; } +/** + * @param {(waitTime: number) => Promise} fn + */ +async function runEventually(fn) { + let waitTime = 50; + while (! await fn(waitTime)) { + await wait(waitTime); + waitTime = Math.min(10000, waitTime * 2); + } +} + /** * @param {import('./config').Config} config * @param {import('./runner').RunnerState} state @@ -105,12 +138,66 @@ async function acquireEventually(config, state, task) { if (config.locking_verbose) { output.log(config, `[locking] ${task.id}: Trying to eventually acquire ${task.resources.join(',')}`); } - let waitTime = 50; - while (! await acquire(config, state, task)) { - await wait(waitTime); - waitTime = Math.min(10000, waitTime * 2); + return await runEventually( + () => acquire(config, state, task) + ); +} + +/** + * @param {import('./config').Config} config + * @param {string[]} pool + * @param {number} [count=1] Amount of resources to lock from pool + * @returns {Promise} Array with successfully locked ids + */ +async function acquireFromPool(config, pool, count = 1) { + if (config.no_locking) return true; + + const taskId = config._taskId; + + if (config.locking_verbose) { + output.log(config, `[locking] ${taskId}: Trying to eventually acquire one of ${pool.join(',')}`); } - return true; + + let out = []; + await runEventually( + async (waitTime) => { + /** @type {LockingState} */ + const locking = config._locking; + + let currentLocks = locking.locks; + if (! config.no_external_locking) { + const used = await external_locking.externalList(config); + currentLocks = new Set(used.map(l => l.resource)); + } + + const available = []; + for (let i = 0; i < pool.length; i++) { + const r = pool[i]; + if (! currentLocks.has(r) && ! locking.pending.has(r)) { + available.push(r); + } + + if (available.length >= count) { + break; + } + } + + if (available.length < count) { + if (config.locking_verbose) { + output.log(config, `[locking] Failed to acquire lock. Sleeping for ${waitTime}ms. Pool: ${pool.join(', ')}`); + } + return false; + } + + const result = acquire(config, {locking}, {id: taskId, resources: available}); + if (result) { + out = available; + } + return result; + } + ); + + return out; } /** @@ -121,13 +208,16 @@ async function acquireEventually(config, state, task) { */ async function release(config, state, task) { if (config.no_locking) return true; - if (! task.resources.length) { + if (! state.locking.by_task.has(task.id)) { return; } + const {locks, by_task} = state.locking; + const taskLocks = by_task.get(task.id); + if (! config.no_external_locking) { try { - const response = await external_locking.externalRelease(config, task.resources); + const response = await external_locking.externalRelease(config, Array.from(taskLocks)); if (response !== true) { if (config.locking_verbose) { output.log(config, @@ -140,13 +230,14 @@ async function release(config, state, task) { } } - const {locks} = state; - for (const r of task.resources) { - assert(locks.has(r), `Trying to release ${r} for ${task.id}, but not in current locks ${Array.from(locks).sort().join(',')}`); + + for (const r of taskLocks) { + assert(locks.has(r), `Trying to release ${r} for ${task.id}, but not in current locks ${Array.from(taskLocks).sort().join(',')}`); locks.delete(r); + taskLocks.delete(r); } if (config.locking_verbose) { - output.log(config, `[locking] ${task.id}: Released ${task.resources.join(',')}`); + output.log(config, `[locking] ${task.id}: Released ${Array.from(taskLocks).join(',')}`); } } @@ -178,6 +269,7 @@ function listConflicts(config, tasks) { module.exports = { acquire, acquireEventually, + acquireFromPool, annotateTaskResources, init, listConflicts, diff --git a/runner.js b/runner.js index 96a087e9..18a4f73a 100644 --- a/runner.js +++ b/runner.js @@ -16,12 +16,15 @@ const version = require('./version'); const {timeoutPromise} = require('./promise_utils'); -async function run_task(config, task) { +async function run_task(config, task, state) { const task_config = { ...config, + _locking: state.locking, + _taskLocks: new Set(), _browser_pages: [], _testName: task.tc.name, _taskName: task.name, + _taskId: task.id, }; let timeout; try { @@ -143,7 +146,7 @@ async function sequential_run(config, state) { task.status = 'running'; task.start = performance.now(); - await run_task(config, task); + await run_task(config, task, state); await locking.release(config, state, task); } @@ -158,7 +161,7 @@ async function run_one(config, state, task) { task.start = performance.now(); output.status(config, state); - await run_task(config, task); + await run_task(config, task, state); output.status(config, state); return task; @@ -245,7 +248,7 @@ async function parallel_run(config, state) { /** - * @typedef {{tc: any, status: string, name: string, id: string, skipReason?: string, expectedToFail?: boolean | ((config: any) => boolean)}} Task + * @typedef {{tc: any, status: string, name: string, id: string, skipReason?: string, expectedToFail?: boolean | ((config: any) => boolean), locks?: Set}} Task */ function testCases2tasks(config, testCases) { @@ -296,7 +299,7 @@ function testCases2tasks(config, testCases) { } /** - * @typedef {{config: any, tasks: Task[], locks?: Set }} RunnerState + * @typedef {{config: any, tasks: Task[], locking?: import('./locking').LockingState }} RunnerState */ async function run(config, testCases) { diff --git a/tests/selftest_locking_pool.js b/tests/selftest_locking_pool.js new file mode 100644 index 00000000..ea255ab1 --- /dev/null +++ b/tests/selftest_locking_pool.js @@ -0,0 +1,48 @@ +const assert = require('assert').strict; +const path = require('path'); + +const {wait} = require('../utils'); +const locking = require('../locking'); + +async function run(config) { + const baseUrl = `${config.pentf_lockserver_url}test_lockserver_${Math.random() + .toString(36) + .slice(2)}`; + const config1 = { + ...config, + no_locking: false, + external_locking_client: 'test_lockserver ONE', + external_locking_url: baseUrl, + }; + + const pool = ['AAA', 'BBB', 'CCC', 'DDD', 'EEE'].map(r => path.basename(__filename)+r); + + const lock1 = await locking.acquireFromPool(config1, pool, 1); + assert.deepEqual(lock1, [pool[0]]); + + const lock2 = await locking.acquireFromPool(config1, pool, 1); + assert.deepEqual(lock2, [pool[1]]); + + const lock3 = await locking.acquireFromPool(config1, pool, 1); + assert.deepEqual(lock3, [pool[2]]); + + // Acquire multiple locks at once + const lock4 = await locking.acquireFromPool(config1, pool, 2); + assert.deepEqual(lock4, [pool[3], pool[4]]); + + + // At this point all resources of the pool are exhausted. + // We should not be able to get a lock. + const res = await Promise.race([locking.acquireFromPool(config1, pool, 1), wait(100)]); + assert.equal(res, undefined); + + // Free one resource + await locking.release(config, {locking: config._locking}, {id: config._taskId}); + const lock5 = await locking.acquireFromPool(config1, pool, 1); + assert.deepEqual(lock5, [pool[0]]); +} + +module.exports = { + description: 'Lock random item from resource pool', + run, +};