From 018298b51fa42a04c8434e59d1209af17791c78c Mon Sep 17 00:00:00 2001 From: PastaClaw Date: Mon, 16 Mar 2026 00:59:31 -0500 Subject: [PATCH] fix: collect sigshares synchronously in dispatcher for parallel verification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The dispatcher thread in WorkThreadDispatcher() unconditionally enqueued a ProcessPendingSigShares worker into the thread pool every 10ms whenever any pendingIncomingSigShares existed, leading to redundant task allocations. Instead of guarding with a single-worker atomic latch (which would serialise all BLS verification to one thread), move the collection step (CollectPendingSigSharesToVerify) into the dispatcher loop where it runs synchronously — it holds cs only briefly and does no BLS work. Each collected batch is then moved into its own worker for parallel BLS verification. This allows multiple verification batches to run concurrently across the thread pool (up to 4 workers), while naturally preventing unbounded queue growth: each Collect call dequeues its batch from the pending queue, so only as many tasks are enqueued as there are actual batches of work. Co-authored-by: knst --- src/llmq/net_signing.cpp | 43 ++++++++++++++++++++-------------------- src/llmq/net_signing.h | 6 +++++- 2 files changed, 26 insertions(+), 23 deletions(-) diff --git a/src/llmq/net_signing.cpp b/src/llmq/net_signing.cpp index a170513815d8..01881ca59d65 100644 --- a/src/llmq/net_signing.cpp +++ b/src/llmq/net_signing.cpp @@ -306,17 +306,25 @@ void NetSigning::WorkThreadDispatcher() } } - if (m_shares_manager->IsAnyPendingProcessing()) { - // If there's processing work, spawn a helper worker - worker_pool.push([this](int) { - while (!workInterrupt) { - bool moreWork = ProcessPendingSigShares(); - - if (!moreWork) { - return; // No work found, exit immediately - } - } + // Collect pending sig shares synchronously and dispatch each batch to a worker for parallel BLS verification + while (!workInterrupt) { + std::unordered_map> sigSharesByNodes; + std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; + + const size_t nMaxBatchSize{32}; + bool more_work = m_shares_manager->CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums); + + if (sigSharesByNodes.empty()) { + break; + } + + worker_pool.push([this, sigSharesByNodes = std::move(sigSharesByNodes), quorums = std::move(quorums)](int) mutable { + ProcessPendingSigShares(std::move(sigSharesByNodes), std::move(quorums)); }); + + if (!more_work) { + break; + } } // Always sleep briefly between checks @@ -329,17 +337,10 @@ void NetSigning::NotifyRecoveredSig(const std::shared_ptr& m_peer_manager->PeerRelayRecoveredSig(*sig, proactive_relay); } -bool NetSigning::ProcessPendingSigShares() +void NetSigning::ProcessPendingSigShares( + std::unordered_map>&& sigSharesByNodes, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>&& quorums) { - std::unordered_map> sigSharesByNodes; - std::unordered_map, CQuorumCPtr, StaticSaltedHasher> quorums; - - const size_t nMaxBatchSize{32}; - bool more_work = m_shares_manager->CollectPendingSigSharesToVerify(nMaxBatchSize, sigSharesByNodes, quorums); - if (sigSharesByNodes.empty()) { - return false; - } - // It's ok to perform insecure batched verification here as we verify against the quorum public key shares, // which are not craftable by individual entities, making the rogue public key attack impossible CBLSBatchVerifier batchVerifier(false, true); @@ -400,8 +401,6 @@ bool NetSigning::ProcessPendingSigShares() ProcessRecoveredSig(rs, true); } } - - return more_work; } } // namespace llmq diff --git a/src/llmq/net_signing.h b/src/llmq/net_signing.h index 10ef78d4cd09..a9effd2575e3 100644 --- a/src/llmq/net_signing.h +++ b/src/llmq/net_signing.h @@ -6,6 +6,8 @@ #define BITCOIN_LLMQ_NET_SIGNING_H #include +#include +#include #include #include #include @@ -52,7 +54,9 @@ class NetSigning final : public NetHandler, public CValidationInterface void WorkThreadCleaning(); void WorkThreadDispatcher(); - bool ProcessPendingSigShares(); + void ProcessPendingSigShares( + std::unordered_map>&& sigSharesByNodes, + std::unordered_map, CQuorumCPtr, StaticSaltedHasher>&& quorums); void RemoveBannedNodeStates(); void BanNode(NodeId nodeid);