diff --git a/src/Makefile.am b/src/Makefile.am index a36f08be79c5..d7dd14447753 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -155,7 +155,6 @@ BITCOIN_CORE_H = \ active/dkgsession.h \ active/dkgsessionhandler.h \ active/masternode.h \ - active/quorums.h \ addrdb.h \ addressindex.h \ spentindex.h \ @@ -282,19 +281,19 @@ BITCOIN_CORE_H = \ llmq/dkgsessionhandler.h \ llmq/dkgsessionmgr.h \ llmq/ehf_signals.h \ + llmq/observer.h \ llmq/options.h \ llmq/params.h \ llmq/quorums.h \ llmq/quorumsman.h \ llmq/signhash.h \ llmq/signing.h \ + llmq/net_quorum.h \ llmq/net_signing.h \ llmq/signing_shares.h \ llmq/snapshot.h \ llmq/types.h \ llmq/utils.h \ - llmq/observer/context.h \ - llmq/observer/quorums.h \ logging.h \ logging/timer.h \ mapport.h \ @@ -485,7 +484,6 @@ libbitcoin_node_a_SOURCES = \ active/dkgsession.cpp \ active/dkgsessionhandler.cpp \ active/masternode.cpp \ - active/quorums.cpp \ addrdb.cpp \ addressindex.cpp \ addrman.cpp \ @@ -554,7 +552,9 @@ libbitcoin_node_a_SOURCES = \ llmq/dkgsessionhandler.cpp \ llmq/dkgsessionmgr.cpp \ llmq/ehf_signals.cpp \ + llmq/net_quorum.cpp \ llmq/net_signing.cpp \ + llmq/observer.cpp \ llmq/options.cpp \ llmq/quorums.cpp \ llmq/quorumsman.cpp \ @@ -563,8 +563,6 @@ libbitcoin_node_a_SOURCES = \ llmq/signing_shares.cpp \ llmq/snapshot.cpp \ llmq/utils.cpp \ - llmq/observer/context.cpp \ - llmq/observer/quorums.cpp \ mapport.cpp \ masternode/meta.cpp \ masternode/payments.cpp \ diff --git a/src/active/context.cpp b/src/active/context.cpp index 4d934f206b40..7b8d40cbc35d 100644 --- a/src/active/context.cpp +++ b/src/active/context.cpp @@ -6,19 +6,21 @@ #include #include -#include +#include #include #include +#include #include #include #include #include -#include #include #include #include +#include #include #include +#include #include #include #include @@ -30,17 +32,16 @@ ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, llmq::CSigningManager& sigman, const CMasternodeSync& mn_sync, const CBLSSecretKey& operator_sk, - const llmq::QvvecSyncModeMap& sync_map, const util::DbWrapperParams& db_params, - bool quorums_recovery, bool quorums_watch) : - m_qman{qman}, + const util::DbWrapperParams& db_params, bool quorums_watch) : + llmq::QuorumRole{qman}, + m_bls_worker{bls_worker}, + m_quorums_watch{quorums_watch}, nodeman{std::make_unique(connman, dmnman, operator_sk)}, dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, quorums_watch)}, shareman{std::make_unique(connman, chainman, sigman, *nodeman, qman, sporkman)}, gov_signer{std::make_unique(connman, dmnman, govman, *nodeman, chainman, mn_sync)}, ehf_sighandler{std::make_unique(chainman, sigman, *shareman, qman)}, - qman_handler{std::make_unique(bls_worker, connman, dmnman, qman, qsnapman, *nodeman, chainman, - mn_sync, sporkman, sync_map, quorums_recovery, quorums_watch)}, cl_signer{std::make_unique(chainman.ActiveChainstate(), chainlocks, clhandler, isman, qman, sigman, *shareman, mn_sync)}, is_signer{std::make_unique(chainman.ActiveChainstate(), chainlocks, isman, sigman, @@ -52,7 +53,7 @@ ActiveContext::ActiveContext(CBLSWorker& bls_worker, ChainstateManager& chainman qblockman, qsnapman, *nodeman, chainman, sporkman, llmq_params, quorums_watch, quorum_idx); }); - m_qman.ConnectManagers(qman_handler.get(), qdkgsman.get()); + m_qman.ConnectManagers(this, qdkgsman.get()); } ActiveContext::~ActiveContext() @@ -60,9 +61,8 @@ ActiveContext::~ActiveContext() m_qman.DisconnectManagers(); } -void ActiveContext::Start(CConnman& connman, PeerManager& peerman, int16_t worker_count) +void ActiveContext::Start(CConnman& connman, PeerManager& peerman) { - qman_handler->Start(worker_count); qdkgsman->StartThreads(connman, peerman); cl_signer->Start(); cl_signer->RegisterRecoveryInterface(); @@ -81,7 +81,6 @@ void ActiveContext::Stop() cl_signer->UnregisterRecoveryInterface(); cl_signer->Stop(); qdkgsman->StopThreads(); - qman_handler->Stop(); } CCoinJoinServer& ActiveContext::GetCJServer() const @@ -96,14 +95,6 @@ void ActiveContext::SetCJServer(gsl::not_null cj_server) m_cj_server = cj_server; } -void ActiveContext::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) -{ - UpdatedBlockTip(tip, nullptr, ibd); - if (tip) { - qman_handler->InitializeQuorumConnections(tip); - } -} - void ActiveContext::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) { if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones @@ -113,5 +104,26 @@ void ActiveContext::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIn ehf_sighandler->UpdatedBlockTip(pindexNew); gov_signer->UpdatedBlockTip(pindexNew); qdkgsman->UpdatedBlockTip(pindexNew, fInitialDownload); - qman_handler->UpdatedBlockTip(pindexNew, fInitialDownload); +} + +bool ActiveContext::IsMasternode() const +{ + // We are only initialized if masternode mode is enabled + return true; +} + +bool ActiveContext::IsWatching() const +{ + // Watch-only mode can co-exist with masternode mode + return m_quorums_watch; +} + +uint256 ActiveContext::GetProTxHash() const +{ + return nodeman->GetProTxHash(); +} + +bool ActiveContext::SetQuorumSecretKeyShare(llmq::CQuorum& quorum, Span skContributions) const +{ + return quorum.SetSecretKeyShare(m_bls_worker.AggregateSecretKeys(skContributions), nodeman->GetProTxHash()); } diff --git a/src/active/context.h b/src/active/context.h index 4177ebd047c6..6946f26aa604 100644 --- a/src/active/context.h +++ b/src/active/context.h @@ -5,21 +5,19 @@ #ifndef BITCOIN_ACTIVE_CONTEXT_H #define BITCOIN_ACTIVE_CONTEXT_H -#include +#include #include #include +#include #include class CActiveMasternodeManager; -class CBLSSecretKey; class CBLSWorker; -class ChainstateManager; class CCoinJoinServer; class CConnman; -class CDeterministicMNManager; class CGovernanceManager; class CMasternodeMetaMan; class CMasternodeSync; @@ -38,23 +36,19 @@ class InstantSendSigner; } // namespace instantsend namespace llmq { class CDKGDebugManager; -class CDKGSessionManager; class CEHFSignalsHandler; class CInstantSendManager; -class CQuorumBlockProcessor; -class CQuorumManager; -class CQuorumSnapshotManager; class CSigningManager; class CSigSharesManager; -class QuorumParticipant; } // namespace llmq namespace util { struct DbWrapperParams; } // namespace util -struct ActiveContext final : public CValidationInterface { +struct ActiveContext final : public llmq::QuorumRole, public CValidationInterface { private: - llmq::CQuorumManager& m_qman; + CBLSWorker& m_bls_worker; + const bool m_quorums_watch{false}; public: ActiveContext() = delete; @@ -67,17 +61,21 @@ struct ActiveContext final : public CValidationInterface { llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, llmq::CSigningManager& sigman, const CMasternodeSync& mn_sync, const CBLSSecretKey& operator_sk, - const llmq::QvvecSyncModeMap& sync_map, const util::DbWrapperParams& db_params, - bool quorums_recovery, bool quorums_watch); + const util::DbWrapperParams& db_params, bool quorums_watch); ~ActiveContext(); - void Start(CConnman& connman, PeerManager& peerman, int16_t worker_count); + void Start(CConnman& connman, PeerManager& peerman); void Stop(); - void InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd); CCoinJoinServer& GetCJServer() const; void SetCJServer(gsl::not_null cj_server); + // QuorumRole + bool IsMasternode() const override; + bool IsWatching() const override; + uint256 GetProTxHash() const override; + bool SetQuorumSecretKeyShare(llmq::CQuorum& quorum, Span skContributions) const override; + protected: // CValidationInterface void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) override; @@ -95,7 +93,6 @@ struct ActiveContext final : public CValidationInterface { private: const std::unique_ptr gov_signer; const std::unique_ptr ehf_sighandler; - const std::unique_ptr qman_handler; const std::unique_ptr cl_signer; public: diff --git a/src/active/dkgsession.cpp b/src/active/dkgsession.cpp index 2bce8993d7b9..00f037f8088b 100644 --- a/src/active/dkgsession.cpp +++ b/src/active/dkgsession.cpp @@ -25,8 +25,8 @@ ActiveDKGSession::ActiveDKGSession(CBLSWorker& bls_worker, CDeterministicMNManag CDKGSessionManager& qdkgsman, CMasternodeMetaMan& mn_metaman, CQuorumSnapshotManager& qsnapman, const CActiveMasternodeManager& mn_activeman, const ChainstateManager& chainman, const CSporkManager& sporkman, - const CBlockIndex* base_block_index, const Consensus::LLMQParams& params) : - CDKGSession(bls_worker, dmnman, dkgdbgman, qdkgsman, qsnapman, chainman, base_block_index, params), + const CBlockIndex* base_block_index, const Consensus::LLMQParams& llmq_params) : + CDKGSession(bls_worker, dmnman, dkgdbgman, qdkgsman, qsnapman, chainman, base_block_index, llmq_params), m_mn_metaman{mn_metaman}, m_mn_activeman{mn_activeman}, m_sporkman{sporkman}, diff --git a/src/active/dkgsessionhandler.cpp b/src/active/dkgsessionhandler.cpp index 553ab9742be0..9b796d796516 100644 --- a/src/active/dkgsessionhandler.cpp +++ b/src/active/dkgsessionhandler.cpp @@ -106,11 +106,11 @@ bool ActiveDKGSessionHandler::InitNewQuorum(gsl::not_null pQ if (!curSession->Init(m_mn_activeman.GetProTxHash(), quorumIndex)) { LogPrintf("ActiveDKGSessionHandler::%s -- height[%d] quorum initialization failed for %s qi[%d]\n", __func__, - pQuorumBaseBlockIndex->nHeight, curSession->params.name, quorumIndex); + pQuorumBaseBlockIndex->nHeight, params.name, quorumIndex); return false; } - LogPrintf("ActiveDKGSessionHandler::%s -- height[%d] quorum initialization OK for %s qi[%d]\n", __func__, pQuorumBaseBlockIndex->nHeight, curSession->params.name, quorumIndex); + LogPrintf("ActiveDKGSessionHandler::%s -- height[%d] quorum initialization OK for %s qi[%d]\n", __func__, pQuorumBaseBlockIndex->nHeight, params.name, quorumIndex); return true; } @@ -461,11 +461,11 @@ void ActiveDKGSessionHandler::HandleDKGRound(CConnman& connman, PeerManager& pee const auto tip_mn_list = m_dmnman.GetListAtChainTip(); utils::EnsureQuorumConnections(params, connman, m_sporkman, {m_dmnman, m_qsnapman, m_chainman, pQuorumBaseBlockIndex}, - tip_mn_list, curSession->myProTxHash, /*is_masternode=*/true, m_quorums_watch); + tip_mn_list, curSession->ProTx(), /*is_masternode=*/true, m_quorums_watch); if (curSession->AreWeMember()) { utils::AddQuorumProbeConnections(params, connman, m_mn_metaman, m_sporkman, {m_dmnman, m_qsnapman, m_chainman, pQuorumBaseBlockIndex}, tip_mn_list, - curSession->myProTxHash); + curSession->ProTx()); } WaitForNextPhase(QuorumPhase::Initialized, QuorumPhase::Contribute, curQuorumHash); @@ -527,58 +527,22 @@ void ActiveDKGSessionHandler::PhaseHandlerThread(CConnman& connman, PeerManager& bool ActiveDKGSessionHandler::GetContribution(const uint256& hash, CDKGContribution& ret) const { - if (!curSession) { - return false; - } - LOCK(curSession->invCs); - auto it = curSession->contributions.find(hash); - if (it != curSession->contributions.end()) { - ret = it->second; - return true; - } - return false; + return curSession && curSession->GetContribution(hash, ret); } bool ActiveDKGSessionHandler::GetComplaint(const uint256& hash, CDKGComplaint& ret) const { - if (!curSession) { - return false; - } - LOCK(curSession->invCs); - auto it = curSession->complaints.find(hash); - if (it != curSession->complaints.end()) { - ret = it->second; - return true; - } - return false; + return curSession && curSession->GetComplaint(hash, ret); } bool ActiveDKGSessionHandler::GetJustification(const uint256& hash, CDKGJustification& ret) const { - if (!curSession) { - return false; - } - LOCK(curSession->invCs); - auto it = curSession->justifications.find(hash); - if (it != curSession->justifications.end()) { - ret = it->second; - return true; - } - return false; + return curSession && curSession->GetJustification(hash, ret); } bool ActiveDKGSessionHandler::GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const { - if (!curSession) { - return false; - } - LOCK(curSession->invCs); - auto it = curSession->prematureCommitments.find(hash); - if (it != curSession->prematureCommitments.end() && curSession->validCommitments.count(hash)) { - ret = it->second; - return true; - } - return false; + return curSession && curSession->GetPrematureCommitment(hash, ret); } QuorumPhase ActiveDKGSessionHandler::GetPhase() const diff --git a/src/active/quorums.cpp b/src/active/quorums.cpp deleted file mode 100644 index 0472ebe70f9d..000000000000 --- a/src/active/quorums.cpp +++ /dev/null @@ -1,242 +0,0 @@ -// Copyright (c) 2018-2026 The Dash Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -#include - -namespace llmq { -QuorumParticipant::QuorumParticipant(CBLSWorker& bls_worker, CConnman& connman, CDeterministicMNManager& dmnman, - QuorumObserverParent& qman, CQuorumSnapshotManager& qsnapman, - const CActiveMasternodeManager& mn_activeman, const ChainstateManager& chainman, - const CMasternodeSync& mn_sync, const CSporkManager& sporkman, - const llmq::QvvecSyncModeMap& sync_map, bool quorums_recovery, bool quorums_watch) : - QuorumObserver(connman, dmnman, qman, qsnapman, chainman, mn_sync, sporkman, sync_map, quorums_recovery), - m_bls_worker{bls_worker}, - m_mn_activeman{mn_activeman}, - m_quorums_watch{quorums_watch} -{ -} - -QuorumParticipant::~QuorumParticipant() = default; - -void QuorumParticipant::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, - gsl::not_null pindexNew) const -{ - auto lastQuorums = m_qman.ScanQuorums(llmqParams.type, pindexNew, (size_t)llmqParams.keepOldConnections); - auto deletableQuorums = GetQuorumsToDelete(llmqParams, pindexNew); - - const uint256 proTxHash = m_mn_activeman.GetProTxHash(); - const bool watchOtherISQuorums = llmqParams.type == Params().GetConsensus().llmqTypeDIP0024InstantSend && - std::ranges::any_of(lastQuorums, [&proTxHash](const auto& old_quorum) { - return old_quorum->IsMember(proTxHash); - }); - - for (const auto& quorum : lastQuorums) { - if (utils::EnsureQuorumConnections(llmqParams, m_connman, m_sporkman, {m_dmnman, m_qsnapman, m_chainman, quorum->m_quorum_base_block_index}, - m_dmnman.GetListAtChainTip(), proTxHash, /*is_masternode=*/true, m_quorums_watch)) { - if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) { - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n", __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, quorum->m_quorum_base_block_index->nHeight, quorum->m_quorum_base_block_index->GetBlockHash().ToString()); - } - } else if (watchOtherISQuorums && !quorum->IsMember(proTxHash)) { - Uint256HashSet connections; - const auto& cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type, quorum->m_quorum_base_block_index, quorum->members.size(), 1); - for (auto idx : cindexes) { - connections.emplace(quorum->members[idx]->proTxHash); - } - if (!connections.empty()) { - if (!m_connman.HasMasternodeQuorumNodes(llmqParams.type, quorum->m_quorum_base_block_index->GetBlockHash())) { - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- llmqType[%d] h[%d] adding mn inter-quorum connections for quorum: [%d:%s]\n", __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, quorum->m_quorum_base_block_index->nHeight, quorum->m_quorum_base_block_index->GetBlockHash().ToString()); - m_connman.SetMasternodeQuorumNodes(llmqParams.type, quorum->m_quorum_base_block_index->GetBlockHash(), connections); - m_connman.SetMasternodeQuorumRelayMembers(llmqParams.type, quorum->m_quorum_base_block_index->GetBlockHash(), connections); - } - if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) { - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- llmqType[%d] h[%d] keeping mn inter-quorum connections for quorum: [%d:%s]\n", __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, quorum->m_quorum_base_block_index->nHeight, quorum->m_quorum_base_block_index->GetBlockHash().ToString()); - } - } - } - } - - for (const auto& quorumHash : deletableQuorums) { - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- removing masternodes quorum connections for quorum %s:\n", __func__, quorumHash.ToString()); - m_connman.RemoveMasternodeQuorumNodes(llmqParams.type, quorumHash); - } -} - -bool QuorumParticipant::SetQuorumSecretKeyShare(CQuorum& quorum, Span skContributions) const -{ - return quorum.SetSecretKeyShare(m_bls_worker.AggregateSecretKeys(skContributions), m_mn_activeman.GetProTxHash()); -} - -size_t QuorumParticipant::GetQuorumRecoveryStartOffset(const CQuorum& quorum, gsl::not_null pIndex) const -{ - auto mns = m_dmnman.GetListForBlock(pIndex); - std::vector vecProTxHashes; - vecProTxHashes.reserve(mns.GetCounts().enabled()); - mns.ForEachMN(/*onlyValid=*/true, - [&](const auto& pMasternode) { vecProTxHashes.emplace_back(pMasternode.proTxHash); }); - std::sort(vecProTxHashes.begin(), vecProTxHashes.end()); - size_t nIndex{0}; - { - auto my_protx_hash = m_mn_activeman.GetProTxHash(); - for (const auto i : util::irange(vecProTxHashes.size())) { - // cppcheck-suppress useStlAlgorithm - if (my_protx_hash == vecProTxHashes[i]) { - nIndex = i; - break; - } - } - } - return nIndex % quorum.qc->validMembers.size(); -} - -MessageProcessingResult QuorumParticipant::ProcessContribQGETDATA(bool request_limit_exceeded, CDataStream& vStream, - const CQuorum& quorum, CQuorumDataRequest& request, - gsl::not_null block_index) -{ - if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) { - assert(block_index); - - int memberIdx = quorum.GetMemberIndex(request.GetProTxHash()); - if (memberIdx == -1) { - request.SetError(CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER); - return request_limit_exceeded ? MisbehavingError{25, "request limit exceeded"} : MessageProcessingResult{}; - } - - std::vector> vecEncrypted; - if (!m_qman.GetEncryptedContributions(request.GetLLMQType(), block_index, - quorum.qc->validMembers, request.GetProTxHash(), vecEncrypted)) { - request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING); - return request_limit_exceeded ? MisbehavingError{25, "request limit exceeded"} : MessageProcessingResult{}; - } - - vStream << vecEncrypted; - } - - return {}; -} - -MessageProcessingResult QuorumParticipant::ProcessContribQDATA(CNode& pfrom, CDataStream& vStream, - CQuorum& quorum, CQuorumDataRequest& request) -{ - if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) { - if (WITH_LOCK(quorum.cs_vvec_shShare, return !quorum.HasVerificationVectorInternal() - || quorum.quorumVvec->size() != size_t(quorum.params.threshold))) { - // Don't bump score because we asked for it - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- %s: No valid quorum verification vector available, from peer=%d\n", __func__, NetMsgType::QDATA, pfrom.GetId()); - return {}; - } - - int memberIdx = quorum.GetMemberIndex(request.GetProTxHash()); - if (memberIdx == -1) { - // Don't bump score because we asked for it - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- %s: Not a member of the quorum, from peer=%d\n", __func__, NetMsgType::QDATA, pfrom.GetId()); - return {}; - } - - std::vector> vecEncrypted; - vStream >> vecEncrypted; - - std::vector vecSecretKeys; - vecSecretKeys.resize(vecEncrypted.size()); - for (const auto i : util::irange(vecEncrypted.size())) { - if (!m_mn_activeman.Decrypt(vecEncrypted[i], memberIdx, vecSecretKeys[i], PROTOCOL_VERSION)) { - return MisbehavingError{10, "failed to decrypt"}; - } - } - - if (!quorum.SetSecretKeyShare(m_bls_worker.AggregateSecretKeys(vecSecretKeys), m_mn_activeman.GetProTxHash())) { - return MisbehavingError{10, "invalid secret key share received"}; - } - } - - return {}; -} - -bool QuorumParticipant::IsMasternode() const -{ - // We are only initialized if masternode mode is enabled - return true; -} - -bool QuorumParticipant::IsWatching() const -{ - // Watch-only mode can co-exist with masternode mode - return m_quorums_watch; -} - -void QuorumParticipant::StartDataRecoveryThread(gsl::not_null pIndex, CQuorumCPtr pQuorum, - uint16_t nDataMaskIn) const -{ - bool expected = false; - if (!pQuorum->fQuorumDataRecoveryThreadRunning.compare_exchange_strong(expected, true)) { - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- Already running\n", __func__); - return; - } - - workerPool.push([pQuorum = std::move(pQuorum), pIndex, nDataMaskIn, this](int threadId) mutable { - const size_t size_offset = GetQuorumRecoveryStartOffset(*pQuorum, pIndex); - DataRecoveryThread(pIndex, std::move(pQuorum), nDataMaskIn, m_mn_activeman.GetProTxHash(), size_offset); - }); -} - -void QuorumParticipant::TriggerQuorumDataRecoveryThreads(gsl::not_null block_index) const -{ - if (!m_quorums_recovery) { - return; - } - - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- Process block %s\n", __func__, block_index->GetBlockHash().ToString()); - - const uint256 proTxHash = m_mn_activeman.GetProTxHash(); - - for (const auto& params : Params().GetConsensus().llmqs) { - auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections); - const bool fWeAreQuorumTypeMember = std::ranges::any_of(vecQuorums, [&proTxHash](const auto& pQuorum) { - return pQuorum->IsValidMember(proTxHash); - }); - - for (auto& pQuorum : vecQuorums) { - if (pQuorum->IsValidMember(proTxHash)) { - uint16_t nDataMask{0}; - if (!pQuorum->HasVerificationVector()) { - nDataMask |= CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR; - } - if (!pQuorum->GetSkShare().IsValid()) { - nDataMask |= CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS; - } - if (nDataMask != 0) { - StartDataRecoveryThread(block_index, std::move(pQuorum), nDataMask); - } else { - LogPrint(BCLog::LLMQ, "QuorumParticipant::%s -- No data needed from (%d, %s) at height %d\n", __func__, - std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight); - } - } else { - TryStartVvecSyncThread(block_index, std::move(pQuorum), fWeAreQuorumTypeMember); - } - } - } -} -} // namespace llmq diff --git a/src/active/quorums.h b/src/active/quorums.h deleted file mode 100644 index 7187e6a4808c..000000000000 --- a/src/active/quorums.h +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (c) 2018-2026 The Dash Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#ifndef BITCOIN_ACTIVE_QUORUMS_H -#define BITCOIN_ACTIVE_QUORUMS_H - -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -class CActiveMasternodeManager; -class CBlockIndex; -class CBLSWorker; -class CConnman; -class CDeterministicMNManager; -class CDKGSessionManager; -class CNode; -class CSporkManager; -struct MessageProcessingResult; -namespace llmq { -class CQuorum; -class CQuorumDataRequest; -class CQuorumSnapshotManager; -enum class QvvecSyncMode : int8_t; -} // namespace llmq - -namespace llmq { -class QuorumParticipant final : public QuorumObserver -{ -private: - CBLSWorker& m_bls_worker; - const CActiveMasternodeManager& m_mn_activeman; - const bool m_quorums_watch{false}; - -public: - QuorumParticipant() = delete; - QuorumParticipant(const QuorumParticipant&) = delete; - QuorumParticipant& operator=(const QuorumParticipant&) = delete; - explicit QuorumParticipant(CBLSWorker& bls_worker, CConnman& connman, CDeterministicMNManager& dmnman, - QuorumObserverParent& qman, CQuorumSnapshotManager& qsnapman, - const CActiveMasternodeManager& mn_activeman, const ChainstateManager& chainman, - const CMasternodeSync& mn_sync, const CSporkManager& sporkman, - const llmq::QvvecSyncModeMap& sync_map, bool quorums_recovery, bool quorums_watch); - ~QuorumParticipant(); - -public: - // QuorumObserver - bool IsMasternode() const override; - bool IsWatching() const override; - bool SetQuorumSecretKeyShare(CQuorum& quorum, Span skContributions) const override; - [[nodiscard]] MessageProcessingResult ProcessContribQGETDATA(bool request_limit_exceeded, CDataStream& vStream, - const CQuorum& quorum, CQuorumDataRequest& request, - gsl::not_null block_index) override; - [[nodiscard]] MessageProcessingResult ProcessContribQDATA(CNode& pfrom, CDataStream& vStream, CQuorum& quorum, - CQuorumDataRequest& request) override; - -protected: - // QuorumObserver - void CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, - gsl::not_null pindexNew) const override; - void TriggerQuorumDataRecoveryThreads(gsl::not_null block_index) const override; - -private: - /// Returns the start offset for the masternode with the given proTxHash. This offset is applied when picking data - /// recovery members of a quorum's memberlist and is calculated based on a list of all member of all active quorums - /// for the given llmqType in a way that each member should receive the same number of request if all active - /// llmqType members requests data from one llmqType quorum. - size_t GetQuorumRecoveryStartOffset(const CQuorum& quorum, gsl::not_null pIndex) const; - - void StartDataRecoveryThread(gsl::not_null pIndex, CQuorumCPtr pQuorum, uint16_t nDataMaskIn) const; -}; -} // namespace llmq - -#endif // BITCOIN_ACTIVE_QUORUMS_H diff --git a/src/chainlock/handler.cpp b/src/chainlock/handler.cpp index ddb7bf1ff59a..4e1392406bb3 100644 --- a/src/chainlock/handler.cpp +++ b/src/chainlock/handler.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include diff --git a/src/dsnotificationinterface.cpp b/src/dsnotificationinterface.cpp index 271ce2daa752..4ebc9ccb4c34 100644 --- a/src/dsnotificationinterface.cpp +++ b/src/dsnotificationinterface.cpp @@ -29,10 +29,10 @@ CDSNotificationInterface::CDSNotificationInterface(CConnman& connman, CDSTXManag CDSNotificationInterface::~CDSNotificationInterface() = default; -void CDSNotificationInterface::InitializeCurrentBlockTip() +void CDSNotificationInterface::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) { - SynchronousUpdatedBlockTip(m_chainman.ActiveChain().Tip(), nullptr, m_chainman.ActiveChainstate().IsInitialBlockDownload()); - UpdatedBlockTip(m_chainman.ActiveChain().Tip(), nullptr, m_chainman.ActiveChainstate().IsInitialBlockDownload()); + SynchronousUpdatedBlockTip(tip, nullptr, ibd); + UpdatedBlockTip(tip, nullptr, ibd); } void CDSNotificationInterface::AcceptedBlockHeader(const CBlockIndex *pindexNew) diff --git a/src/dsnotificationinterface.h b/src/dsnotificationinterface.h index 07d49332fd60..889438397fc5 100644 --- a/src/dsnotificationinterface.h +++ b/src/dsnotificationinterface.h @@ -25,9 +25,8 @@ class CDSNotificationInterface : public CValidationInterface const std::unique_ptr& dmnman); virtual ~CDSNotificationInterface(); - // a small helper to initialize current block height in sub-modules on startup - void InitializeCurrentBlockTip(); - + // CValidationInterface + void InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) override; protected: // CValidationInterface void AcceptedBlockHeader(const CBlockIndex *pindexNew) override; diff --git a/src/init.cpp b/src/init.cpp index 703c50646cc3..1dd9282421f4 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -98,9 +98,10 @@ #include #include #include +#include #include +#include #include -#include #include #include #include @@ -290,7 +291,6 @@ void PrepareShutdown(NodeContext& node) StopRPC(); StopHTTPServer(); - if (node.observer_ctx) node.observer_ctx->Stop(); if (node.active_ctx) node.active_ctx->Stop(); if (node.clhandler) node.clhandler->Stop(); if (node.peerman) node.peerman->StopHandlers(); @@ -2202,12 +2202,12 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.active_ctx = std::make_unique(*node.llmq_ctx->bls_worker, chainman, *node.connman, *node.dmnman, *node.govman, *node.mn_metaman, *node.sporkman, *node.chainlocks, *node.mempool, *node.clhandler, *node.llmq_ctx->isman, *node.llmq_ctx->quorum_block_processor, *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, *node.llmq_ctx->sigman, - *node.mn_sync, operator_sk, sync_map, dash_db_params, quorums_recovery, quorums_watch); + *node.mn_sync, operator_sk, dash_db_params, quorums_watch); RegisterValidationInterface(node.active_ctx.get()); } else if (quorums_watch) { - node.observer_ctx = std::make_unique(*node.llmq_ctx->bls_worker, *node.connman, *node.dmnman, *node.mn_metaman, *node.mn_sync, + node.observer_ctx = std::make_unique(*node.llmq_ctx->bls_worker, *node.dmnman, *node.mn_metaman, *node.llmq_ctx->quorum_block_processor, *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, - chainman, *node.sporkman, sync_map, dash_db_params, quorums_recovery); + chainman, *node.sporkman, dash_db_params); RegisterValidationInterface(node.observer_ctx.get()); } @@ -2216,6 +2216,18 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->isman, node.active_ctx ? node.active_ctx->is_signer.get() : nullptr, *node.llmq_ctx->sigman, *node.llmq_ctx->qman, *node.chainlocks, chainman.ActiveChainstate(), *node.mempool, *node.mn_sync)); node.peerman->AddExtraHandler(std::make_unique(node.peerman.get(), *node.llmq_ctx->sigman, node.active_ctx ? node.active_ctx->shareman.get() : nullptr, *node.sporkman)); + { + llmq::QuorumRole* quorum_role = node.active_ctx ? static_cast(node.active_ctx.get()) + : static_cast(node.observer_ctx.get()); + auto net_quorum = std::make_unique( + node.peerman.get(), *node.llmq_ctx->bls_worker, *node.connman, *node.dmnman, + *node.llmq_ctx->qman, *node.llmq_ctx->qsnapman, chainman, + *node.mn_sync, *node.sporkman, quorum_role, + node.active_ctx ? node.active_ctx->nodeman.get() : nullptr, + llmq::DEFAULT_WORKER_COUNT, sync_map, quorums_recovery); + node.peerman->AddExtraHandler(std::move(net_quorum)); + } + if (node.active_ctx) { auto cj_server = std::make_unique(node.peerman.get(), chainman, *node.connman, *node.dmnman, *node.dstxman, *node.mn_metaman, *node.mempool, *node.active_ctx->nodeman, *node.mn_sync, *node.llmq_ctx->isman); @@ -2327,7 +2339,6 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->StartHandlers(); node.clhandler->Start(); - if (node.observer_ctx) node.observer_ctx->Start(llmq::DEFAULT_WORKER_COUNT); node.scheduler->scheduleEvery(std::bind(&CNetFulfilledRequestManager::DoMaintenance, std::ref(*node.netfulfilledman)), std::chrono::minutes{1}); node.scheduler->scheduleEvery(std::bind(&CMasternodeUtils::DoMaintenance, std::ref(*node.connman), std::ref(*node.dmnman), std::ref(*node.mn_sync), node.cj_walletman.get()), std::chrono::minutes{1}); @@ -2335,7 +2346,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) node.peerman->ScheduleHandlers(*node.scheduler); if (node.active_ctx) { - node.active_ctx->Start(*node.connman, *node.peerman, llmq::DEFAULT_WORKER_COUNT); + node.active_ctx->Start(*node.connman, *node.peerman); node.scheduler->scheduleEvery(std::bind(&llmq::CDKGSessionManager::CleanupOldContributions, std::ref(*node.active_ctx->qdkgsman)), std::chrono::hours{1}); } @@ -2418,18 +2429,21 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) bool skip_evodb_repair_on_reindex = fReindex || fReindexChainState; ThreadImport(chainman, vImportFiles, args); - // force UpdatedBlockTip to initialize nCachedBlockHeight for DS, MN payments and budgets - // but don't call it directly to prevent triggering of other listeners like zmq etc. - // GetMainSignals().UpdatedBlockTip(::ChainActive().Tip()); - g_ds_notification_interface->InitializeCurrentBlockTip(); { const CBlockIndex* tip = WITH_LOCK(::cs_main, return chainman.ActiveTip()); const bool ibd = chainman.ActiveChainstate().IsInitialBlockDownload(); - if (node.observer_ctx && !node.active_ctx) { - node.observer_ctx->InitializeCurrentBlockTip(tip, ibd); + if (node.active_ctx) { + // On masternodes, defer the full broadcast until after + // nodeman->Init() so that GetProTxHash() is available + // for quorum connection setup and skShare derivation. + // Only kick CDSNotificationInterface here (cached block + // height for DS/MN payments/budgets). + g_ds_notification_interface->InitializeCurrentBlockTip(tip, ibd); + } else { + // Non-masternode nodes (including observer-only): broadcast + // to all subscribers now; no proTxHash dependency. + GetMainSignals().InitializeCurrentBlockTip(tip, ibd); } - // Note: active_ctx initialization is deferred until after nodeman->Init() - // so that GetProTxHash() is available for quorum connection setup. } // Seed InstantSend tip-height cache; NetInstantSend receives future @@ -2500,13 +2514,13 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) if (node.active_ctx) { node.active_ctx->nodeman->Init(chainman.ActiveTip()); - // Initialize current block tip after nodeman->Init() so that - // GetProTxHash() is available for quorum connection setup. - // Without this ordering, EnsureQuorumConnections returns early - // because the null proTxHash makes the MN appear as a non-member. + // Re-initialize block tip for handlers that depend on + // proTxHash (set by nodeman->Init above). The first call + // at step 7d ran before Init, so quorum connections were + // skipped for masternodes with a null proTxHash. const CBlockIndex* tip = WITH_LOCK(::cs_main, return chainman.ActiveTip()); const bool ibd = chainman.ActiveChainstate().IsInitialBlockDownload(); - node.active_ctx->InitializeCurrentBlockTip(tip, ibd); + GetMainSignals().InitializeCurrentBlockTip(tip, ibd); } }); #ifdef ENABLE_WALLET diff --git a/src/llmq/dkgsession.cpp b/src/llmq/dkgsession.cpp index 6a315ce5c5f7..8a35a5f75626 100644 --- a/src/llmq/dkgsession.cpp +++ b/src/llmq/dkgsession.cpp @@ -618,6 +618,42 @@ CDKGMember* CDKGSession::GetMemberAtIndex(size_t index) const return members[index].get(); } +bool CDKGSession::GetContribution(const uint256& hash, CDKGContribution& ret) const +{ + LOCK(invCs); + auto it = contributions.find(hash); + if (it == contributions.end()) return false; + ret = it->second; + return true; +} + +bool CDKGSession::GetComplaint(const uint256& hash, CDKGComplaint& ret) const +{ + LOCK(invCs); + auto it = complaints.find(hash); + if (it == complaints.end()) return false; + ret = it->second; + return true; +} + +bool CDKGSession::GetJustification(const uint256& hash, CDKGJustification& ret) const +{ + LOCK(invCs); + auto it = justifications.find(hash); + if (it == justifications.end()) return false; + ret = it->second; + return true; +} + +bool CDKGSession::GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const +{ + LOCK(invCs); + auto it = prematureCommitments.find(hash); + if (it == prematureCommitments.end() || !validCommitments.count(hash)) return false; + ret = it->second; + return true; +} + template std::optional CDKGSession::ReceiveMessagePreamble(const MsgType& msg, MsgPhase phase, CDKGLogger& logger) { diff --git a/src/llmq/dkgsession.h b/src/llmq/dkgsession.h index 06b5dd610d98..abe65f6b5e84 100644 --- a/src/llmq/dkgsession.h +++ b/src/llmq/dkgsession.h @@ -276,13 +276,9 @@ class CDKGLogger : public CBatchedLogger */ class CDKGSession { - friend class ActiveDKGSession; - friend class ActiveDKGSessionHandler; - friend class CDKGSessionHandler; - friend class CDKGSessionManager; friend class CDKGLogger; -private: +protected: enum class MsgPhase : uint8_t { Contribution, Complaint, @@ -296,7 +292,7 @@ class CDKGSession bool should_process{true}; }; -private: +protected: CBLSWorker& blsWorker; CBLSWorkerCache cache; CDeterministicMNManager& m_dmnman; @@ -307,7 +303,7 @@ class CDKGSession const Consensus::LLMQParams& params; const CBlockIndex* const m_quorum_base_block_index; -private: +protected: int quorumIndex{0}; std::vector> members; std::map membersMap; @@ -397,6 +393,12 @@ class CDKGSession // All Phases 5-in-1 for single-node-quorum virtual CFinalCommitment FinalizeSingleCommitment() { return {}; } + //! Look up a received message by hash. Used by CDKGSessionHandler subclasses to implement their Get* virtuals. + [[nodiscard]] bool GetContribution(const uint256& hash, CDKGContribution& ret) const EXCLUSIVE_LOCKS_REQUIRED(!invCs); + [[nodiscard]] bool GetComplaint(const uint256& hash, CDKGComplaint& ret) const EXCLUSIVE_LOCKS_REQUIRED(!invCs); + [[nodiscard]] bool GetJustification(const uint256& hash, CDKGJustification& ret) const EXCLUSIVE_LOCKS_REQUIRED(!invCs); + [[nodiscard]] bool GetPrematureCommitment(const uint256& hash, CDKGPrematureCommitment& ret) const EXCLUSIVE_LOCKS_REQUIRED(!invCs); + public: [[nodiscard]] bool AreWeMember() const { return !myProTxHash.IsNull(); } [[nodiscard]] CDKGMember* GetMember(const uint256& proTxHash) const; @@ -414,7 +416,6 @@ class CDKGSession return false; } -private: [[nodiscard]] bool ShouldSimulateError(DKGError::type type) const; template diff --git a/src/llmq/net_quorum.cpp b/src/llmq/net_quorum.cpp new file mode 100644 index 000000000000..1f255c78296e --- /dev/null +++ b/src/llmq/net_quorum.cpp @@ -0,0 +1,711 @@ +// Copyright (c) 2025-2026 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include +#include + +namespace llmq { + +NetQuorum::NetQuorum(PeerManagerInternal* peer_manager, CBLSWorker& bls_worker, + CConnman& connman, CDeterministicMNManager& dmnman, CQuorumManager& qman, + CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, + const CMasternodeSync& mn_sync, const CSporkManager& sporkman, + QuorumRole* quorum_role, CActiveMasternodeManager* nodeman, + int16_t worker_count, const QvvecSyncModeMap& sync_map, bool quorums_recovery) : + NetHandler(peer_manager), + m_bls_worker{bls_worker}, + m_connman{connman}, + m_dmnman{dmnman}, + m_qman{qman}, + m_qsnapman{qsnapman}, + m_chainman{chainman}, + m_mn_sync{mn_sync}, + m_sporkman{sporkman}, + m_role{quorum_role}, + m_nodeman{nodeman}, + m_worker_count{worker_count}, + m_sync_map{sync_map}, + m_quorums_recovery{quorums_recovery} +{ + quorumThreadInterrupt.reset(); +} + +// NetHandler + +void NetQuorum::Start() +{ + if (!m_role) return; + assert(m_worker_count > 0); + workerPool.resize(m_worker_count); + RenameThreadPool(workerPool, "q-mngr"); +} + +void NetQuorum::Stop() +{ + workerPool.clear_queue(); + workerPool.stop(true); +} + +void NetQuorum::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) +{ + if (msg_type == NetMsgType::QGETDATA) { + if (!m_role || !m_role->IsMasternode() || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode or a qwatch connection"); + return; + } + + CQuorumDataRequest request; + vRecv >> request; + + auto sendQDATA = [&](CQuorumDataRequest::Errors nError, + bool request_limit_exceeded, + const CDataStream& body = CDataStream(SER_NETWORK, PROTOCOL_VERSION)) -> bool { + bool misbehave = false; + switch (nError) { + case (CQuorumDataRequest::Errors::NONE): + case (CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID): + case (CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND): + case (CQuorumDataRequest::Errors::QUORUM_NOT_FOUND): + case (CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER): + case (CQuorumDataRequest::Errors::UNDEFINED): + misbehave = request_limit_exceeded; + break; + case (CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING): + case (CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING): + // Do not punish limit exceed if we don't have the requested data + break; + } + request.SetError(nError); + CDataStream ssResponse{SER_NETWORK, pfrom.GetCommonVersion()}; + ssResponse << request << body; + m_connman.PushMessage(&pfrom, CNetMsgMaker(pfrom.GetCommonVersion()).Make(NetMsgType::QDATA, ssResponse)); + return misbehave; + }; + + const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), false, request.GetQuorumHash(), request.GetLLMQType()); + const bool request_limit_exceeded = !m_qman.RegisterDataRequest(key, request, /*add_expiry_bias=*/false); + + if (!Params().GetLLMQ(request.GetLLMQType()).has_value()) { + if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID, request_limit_exceeded)) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded"); + } + return; + } + + const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, return m_chainman.m_blockman.LookupBlockIndex(request.GetQuorumHash())); + if (pQuorumBaseBlockIndex == nullptr) { + if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND, request_limit_exceeded)) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded"); + } + return; + } + + const auto pQuorum = m_qman.GetQuorum(request.GetLLMQType(), request.GetQuorumHash()); + if (pQuorum == nullptr) { + if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND, request_limit_exceeded)) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded"); + } + return; + } + + CDataStream ssResponseData(SER_NETWORK, pfrom.GetCommonVersion()); + + // Check if request wants QUORUM_VERIFICATION_VECTOR data + if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) { + if (!pQuorum->HasVerificationVector()) { + if (sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING, request_limit_exceeded)) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded"); + } + return; + } + ssResponseData << *pQuorum->GetVerificationVector(); + } + + // Check if request wants ENCRYPTED_CONTRIBUTIONS data + bool misbehave_contrib = ProcessContribQGETDATA(ssResponseData, *pQuorum, request, pQuorumBaseBlockIndex); + + CQuorumDataRequest::Errors ret_err{CQuorumDataRequest::Errors::NONE}; + if (auto request_err = request.GetError(); request_err != CQuorumDataRequest::Errors::NONE && + request_err != CQuorumDataRequest::Errors::UNDEFINED) { + ret_err = request_err; + } + + bool misbehave_qdata = (ret_err != CQuorumDataRequest::Errors::NONE) + ? sendQDATA(ret_err, request_limit_exceeded) + : sendQDATA(CQuorumDataRequest::Errors::NONE, request_limit_exceeded, ssResponseData); + + if (request_limit_exceeded && (misbehave_contrib || misbehave_qdata)) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 25, "request limit exceeded"); + } + return; + } + + if (msg_type == NetMsgType::QDATA) { + if (!m_role || pfrom.GetVerifiedProRegTxHash().IsNull()) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not a verified masternode and -watchquorums is not enabled"); + return; + } + + CQuorumDataRequest request; + vRecv >> request; + + { + const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), true, request.GetQuorumHash(), request.GetLLMQType()); + const auto validation = m_qman.ValidateDataResponse(key, request); + switch (validation) { + case CQuorumManager::DataResponseValidation::NotRequested: + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not requested"); + return; + case CQuorumManager::DataResponseValidation::AlreadyReceived: + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "already received"); + return; + case CQuorumManager::DataResponseValidation::Mismatch: + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "not like requested"); + return; + case CQuorumManager::DataResponseValidation::OK: + break; + } + } + + if (request.GetError() != CQuorumDataRequest::Errors::NONE) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Error %d (%s), from peer=%d\n", __func__, msg_type, request.GetError(), request.GetErrorString(), pfrom.GetId()); + return; + } + + CQuorumPtr pQuorum = m_qman.GetCachedMutableQuorum(request.GetLLMQType(), request.GetQuorumHash()); + if (!pQuorum) { + // Don't bump score because we asked for it + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId()); + return; + } + + // Check if request has QUORUM_VERIFICATION_VECTOR data + if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) { + std::vector verificationVector; + vRecv >> verificationVector; + + if (pQuorum->SetVerificationVector(verificationVector)) { + m_qman.QueueQuorumForWarming(pQuorum); + } else { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid quorum verification vector"); + return; + } + } + + // Check if request has ENCRYPTED_CONTRIBUTIONS data + if (!ProcessContribQDATA(pfrom, vRecv, *pQuorum, request)) { + return; + } + + m_qman.WriteContributions(pQuorum); + } +} + +bool NetQuorum::ProcessContribQGETDATA(CDataStream& ssResponseData, const CQuorum& quorum, + CQuorumDataRequest& request, + gsl::not_null block_index) const +{ + if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) { + return false; + } + + if (!m_nodeman) { + request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING); + return true; + } + + int memberIdx = quorum.GetMemberIndex(request.GetProTxHash()); + if (memberIdx == -1) { + request.SetError(CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER); + return true; + } + + std::vector> vecEncrypted; + if (!m_qman.GetEncryptedContributions(request.GetLLMQType(), block_index, + quorum.qc->validMembers, request.GetProTxHash(), vecEncrypted)) { + request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING); + return true; + } + + ssResponseData << vecEncrypted; + return false; +} + +bool NetQuorum::ProcessContribQDATA(CNode& pfrom, CDataStream& vRecv, + CQuorum& quorum, CQuorumDataRequest& request) +{ + if (!(request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS)) { + return true; + } + + if (!m_nodeman) { + return true; + } + + auto vvec = quorum.GetVerificationVector(); + if (!vvec || vvec->size() != size_t(quorum.params.threshold)) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: No valid quorum verification vector available, from peer=%d\n", + __func__, NetMsgType::QDATA, pfrom.GetId()); + return false; + } + + int memberIdx = quorum.GetMemberIndex(request.GetProTxHash()); + if (memberIdx == -1) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- %s: Not a member of the quorum, from peer=%d\n", + __func__, NetMsgType::QDATA, pfrom.GetId()); + return false; + } + + std::vector> vecEncrypted; + vRecv >> vecEncrypted; + + std::vector vecSecretKeys; + vecSecretKeys.resize(vecEncrypted.size()); + for (const auto i : util::irange(vecEncrypted.size())) { + if (!m_nodeman->Decrypt(vecEncrypted[i], memberIdx, vecSecretKeys[i], PROTOCOL_VERSION)) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "failed to decrypt"); + return false; + } + } + + if (!quorum.SetSecretKeyShare(m_bls_worker.AggregateSecretKeys(vecSecretKeys), + m_nodeman->GetProTxHash())) { + m_peer_manager->PeerMisbehaving(pfrom.GetId(), 10, "invalid secret key share received"); + return false; + } + + return true; +} + +DataRequestStatus NetQuorum::RequestQuorumData(CNode& peer, const CQuorum& quorum, uint16_t nDataMask, + const uint256& proTxHash) const +{ + const CQuorumDataRequestKey key(peer.GetVerifiedProRegTxHash(), true, + quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType); + const CQuorumDataRequest request(quorum.qc->llmqType, quorum.m_quorum_base_block_index->GetBlockHash(), + nDataMask, proTxHash); + if (!m_qman.RegisterDataRequest(key, request)) { + return m_qman.GetDataRequestStatus(peer.GetVerifiedProRegTxHash(), /*we_requested=*/true, + quorum.m_quorum_base_block_index->GetBlockHash(), quorum.qc->llmqType); + } + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- sending QGETDATA quorumHash[%s] llmqType[%d] proRegTx[%s]\n", __func__, + key.quorumHash.ToString(), std23::to_underlying(key.llmqType), key.proRegTx.ToString()); + + CNetMsgMaker msgMaker(peer.GetCommonVersion()); + m_connman.PushMessage(&peer, msgMaker.Make(NetMsgType::QGETDATA, request)); + return DataRequestStatus::Requested; +} + + +void NetQuorum::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) +{ + if (!m_role) return; + UpdatedBlockTip(tip, nullptr, ibd); + if (tip) { + for (const auto& params : Params().GetConsensus().llmqs) { + CheckQuorumConnections(params, tip); + } + } +} + +// CValidationInterface + +void NetQuorum::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) +{ + if (!m_role) return; + if (!pindexNew) return; + if (fInitialDownload || pindexNew == pindexFork) return; + if (!m_mn_sync.IsBlockchainSynced()) return; + + for (const auto& params : Params().GetConsensus().llmqs) { + CheckQuorumConnections(params, pindexNew); + } + + m_qman.CleanupExpiredDataRequests(); + TriggerQuorumDataRecoveryThreads(pindexNew); + StartCleanupOldQuorumDataThread(pindexNew); +} + +// Private helpers + +Uint256HashSet NetQuorum::GetQuorumsToDelete(const Consensus::LLMQParams& llmqParams, + gsl::not_null pindexNew) const +{ + auto connmanQuorumsToDelete = m_connman.GetMasternodeQuorums(llmqParams.type); + + if (IsQuorumRotationEnabled(llmqParams, pindexNew)) { + int cycleIndexTipHeight = pindexNew->nHeight % llmqParams.dkgInterval; + int cycleQuorumBaseHeight = pindexNew->nHeight - cycleIndexTipHeight; + std::stringstream ss; + for (const auto quorumIndex : util::irange(llmqParams.signingActiveQuorumCount)) { + if (quorumIndex <= cycleIndexTipHeight) { + int curDkgHeight = cycleQuorumBaseHeight + quorumIndex; + auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash(); + ss << curDkgHeight << ":" << curDkgBlock.ToString() << " | "; + connmanQuorumsToDelete.erase(curDkgBlock); + } + } + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for rotated quorums: [%s]\n", + __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, ss.str()); + } else { + int curDkgHeight = pindexNew->nHeight - (pindexNew->nHeight % llmqParams.dkgInterval); + auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash(); + connmanQuorumsToDelete.erase(curDkgBlock); + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n", + __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, curDkgHeight, curDkgBlock.ToString()); + } + + return connmanQuorumsToDelete; +} + +void NetQuorum::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, + gsl::not_null pindexNew) const +{ + const bool is_masternode = m_role->IsMasternode(); + const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{}; + + auto lastQuorums = m_qman.ScanQuorums(llmqParams.type, pindexNew, (size_t)llmqParams.keepOldConnections); + auto deletableQuorums = GetQuorumsToDelete(llmqParams, pindexNew); + + const bool watchOtherISQuorums = is_masternode && + llmqParams.type == Params().GetConsensus().llmqTypeDIP0024InstantSend && + std::ranges::any_of(lastQuorums, [&proTxHash](const auto& old_quorum) { return old_quorum->IsMember(proTxHash); }); + + for (const auto& quorum : lastQuorums) { + if (utils::EnsureQuorumConnections(llmqParams, m_connman, m_sporkman, + {m_dmnman, m_qsnapman, m_chainman, quorum->m_quorum_base_block_index}, + m_dmnman.GetListAtChainTip(), proTxHash, + /*is_masternode=*/is_masternode, + /*quorums_watch=*/is_masternode ? m_role->IsWatching() : true)) { + if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n", + __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, + quorum->m_quorum_base_block_index->nHeight, + quorum->m_quorum_base_block_index->GetBlockHash().ToString()); + } + } else if (watchOtherISQuorums && !quorum->IsMember(proTxHash)) { + Uint256HashSet connections; + const auto& cindexes = utils::CalcDeterministicWatchConnections(llmqParams.type, + quorum->m_quorum_base_block_index, + quorum->members.size(), 1); + for (auto idx : cindexes) { + connections.emplace(quorum->members[idx]->proTxHash); + } + if (!connections.empty()) { + if (!m_connman.HasMasternodeQuorumNodes(llmqParams.type, quorum->m_quorum_base_block_index->GetBlockHash())) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] adding mn inter-quorum connections for quorum: [%d:%s]\n", + __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, + quorum->m_quorum_base_block_index->nHeight, + quorum->m_quorum_base_block_index->GetBlockHash().ToString()); + m_connman.SetMasternodeQuorumNodes(llmqParams.type, + quorum->m_quorum_base_block_index->GetBlockHash(), connections); + m_connman.SetMasternodeQuorumRelayMembers(llmqParams.type, + quorum->m_quorum_base_block_index->GetBlockHash(), connections); + } + if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- llmqType[%d] h[%d] keeping mn inter-quorum connections for quorum: [%d:%s]\n", + __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, + quorum->m_quorum_base_block_index->nHeight, + quorum->m_quorum_base_block_index->GetBlockHash().ToString()); + } + } + } + } + + for (const auto& quorumHash : deletableQuorums) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- removing masternodes quorum connections for quorum %s:\n", + __func__, quorumHash.ToString()); + m_connman.RemoveMasternodeQuorumNodes(llmqParams.type, quorumHash); + } +} + +void NetQuorum::TriggerQuorumDataRecoveryThreads(gsl::not_null block_index) const +{ + if (!m_quorums_recovery) return; + + const bool is_masternode = m_role->IsMasternode(); + const uint256 proTxHash = is_masternode ? m_role->GetProTxHash() : uint256{}; + + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Process block %s as protx_hash=%s\n", __func__, block_index->GetBlockHash().ToString(), proTxHash.ToString()); + + for (const auto& params : Params().GetConsensus().llmqs) { + auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections); + const bool fWeAreQuorumTypeMember = is_masternode && std::ranges::any_of(vecQuorums, [&proTxHash](const auto& pQuorum) { + return pQuorum->IsValidMember(proTxHash); + }); + + for (auto& pQuorum : vecQuorums) { + if (is_masternode && pQuorum->IsValidMember(proTxHash)) { + uint16_t nDataMask{0}; + if (!pQuorum->HasVerificationVector()) { + nDataMask |= CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR; + } + if (!pQuorum->GetSkShare().IsValid()) { + nDataMask |= CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS; + } + if (nDataMask != 0) { + StartSkShareRecoveryThread(block_index, std::move(pQuorum), nDataMask); + } else { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__, + std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), + block_index->nHeight); + } + } else { + TryStartVvecSyncThread(block_index, std::move(pQuorum), fWeAreQuorumTypeMember); + } + } + } +} + +void NetQuorum::DataRecoveryThread(gsl::not_null block_index, CQuorumCPtr pQuorum, + uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const +{ + size_t nTries{0}; + uint16_t nDataMask{data_mask}; + int64_t nTimeLastSuccess{0}; + uint256* pCurrentMemberHash{nullptr}; + std::vector vecMemberHashes; + const int64_t nRequestTimeout{10}; + + auto printLog = [&](const std::string& strMessage) { + const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()}; + LogPrint(BCLog::LLMQ, "NetQuorum::DataRecoveryThread -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n", + strMessage, std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), + nDataMask, data_mask, strMember, nTries); + }; + printLog("Start"); + + while (!m_mn_sync.IsBlockchainSynced() && !quorumThreadInterrupt) { + quorumThreadInterrupt.sleep_for(std::chrono::seconds(nRequestTimeout)); + } + + if (quorumThreadInterrupt) { + printLog("Aborted"); + return; + } + + vecMemberHashes.reserve(pQuorum->qc->validMembers.size()); + for (auto& member : pQuorum->members) { + if (pQuorum->IsValidMember(member->proTxHash) && member->proTxHash != protx_hash) { + vecMemberHashes.push_back(member->proTxHash); + } + } + std::sort(vecMemberHashes.begin(), vecMemberHashes.end()); + + printLog("Try to request"); + + while (nDataMask > 0 && !quorumThreadInterrupt) { + if (nDataMask & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR && + pQuorum->HasVerificationVector()) { + nDataMask &= ~CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR; + printLog("Received quorumVvec"); + } + + if (nDataMask & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && pQuorum->GetSkShare().IsValid()) { + nDataMask &= ~CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS; + printLog("Received skShare"); + } + + if (nDataMask == 0) { + printLog("Success"); + break; + } + + if ((GetTime().count() - nTimeLastSuccess) > nRequestTimeout) { + if (nTries >= vecMemberHashes.size()) { + printLog("All tried but failed"); + break; + } + pCurrentMemberHash = &vecMemberHashes[(start_offset + nTries++) % vecMemberHashes.size()]; + if (m_qman.IsDataRequestPending(*pCurrentMemberHash, /*we_requested=*/true, pQuorum->qc->quorumHash, + pQuorum->qc->llmqType)) { + printLog("Already asked"); + continue; + } + quorumThreadInterrupt.sleep_for(std::chrono::milliseconds(start_offset * 100)); + nTimeLastSuccess = GetTime().count(); + m_connman.AddPendingMasternode(*pCurrentMemberHash); + printLog("Connect"); + } + + m_connman.ForEachNode([&](CNode* pNode) { + auto verifiedProRegTxHash = pNode->GetVerifiedProRegTxHash(); + if (pCurrentMemberHash == nullptr || verifiedProRegTxHash != *pCurrentMemberHash) { + return; + } + + switch (RequestQuorumData(*pNode, *pQuorum, nDataMask, protx_hash)) { + case DataRequestStatus::Requested: + nTimeLastSuccess = GetTime().count(); + printLog("Requested"); + return; + case DataRequestStatus::NotFound: + printLog("Failed"); + pNode->fDisconnect = true; + pCurrentMemberHash = nullptr; + return; + case DataRequestStatus::Processed: + printLog("Processed"); + pNode->fDisconnect = true; + pCurrentMemberHash = nullptr; + return; + case DataRequestStatus::Pending: + printLog("Waiting"); + return; + } + }); + quorumThreadInterrupt.sleep_for(std::chrono::seconds(1)); + } + pQuorum->ReleaseRecovery(); + printLog("Done"); +} + +void NetQuorum::StartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum) const +{ + if (pQuorum->qc->validMembers.empty()) return; + if (!pQuorum->TryClaimRecovery()) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__); + return; + } + + workerPool.push([pQuorum = std::move(pQuorum), block_index, this](int threadId) mutable { + DataRecoveryThread(block_index, std::move(pQuorum), CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR, + /*protx_hash=*/uint256(), /*start_offset=*/0); + }); +} + +void NetQuorum::TryStartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum, + bool fWeAreQuorumTypeMember) const +{ + if (pQuorum->IsRecoveryRunning()) return; + + const bool fSyncForTypeEnabled = m_sync_map.count(pQuorum->qc->llmqType) > 0; + const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at(pQuorum->qc->llmqType) : QvvecSyncMode::Invalid; + const bool fSyncCurrent = syncMode == QvvecSyncMode::Always || + (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember); + + if ((fSyncForTypeEnabled && fSyncCurrent) && !pQuorum->HasVerificationVector()) { + StartVvecSyncThread(block_index, std::move(pQuorum)); + } else { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- No data needed from (%d, %s) at height %d\n", __func__, + std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight); + } +} + +void NetQuorum::StartSkShareRecoveryThread(gsl::not_null pIndex, CQuorumCPtr pQuorum, + uint16_t nDataMaskIn) const +{ + if (pQuorum->qc->validMembers.empty()) return; + + if (!pQuorum->TryClaimRecovery()) { + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- Already running\n", __func__); + return; + } + + workerPool.push([pQuorum = std::move(pQuorum), pIndex, nDataMaskIn, this](int threadId) mutable { + const size_t size_offset = GetQuorumRecoveryStartOffset(*pQuorum, pIndex); + DataRecoveryThread(pIndex, std::move(pQuorum), nDataMaskIn, m_role->GetProTxHash(), size_offset); + }); +} + +size_t NetQuorum::GetQuorumRecoveryStartOffset(const CQuorum& quorum, + gsl::not_null pIndex) const +{ + auto mns = m_dmnman.GetListForBlock(pIndex); + std::vector vecProTxHashes; + vecProTxHashes.reserve(mns.GetCounts().enabled()); + mns.ForEachMN(/*onlyValid=*/true, + [&](const auto& pMasternode) { vecProTxHashes.emplace_back(pMasternode.proTxHash); }); + std::sort(vecProTxHashes.begin(), vecProTxHashes.end()); + size_t nIndex{0}; + { + auto my_protx_hash = m_role->GetProTxHash(); + for (const auto i : util::irange(vecProTxHashes.size())) { + // cppcheck-suppress useStlAlgorithm + if (my_protx_hash == vecProTxHashes[i]) { + nIndex = i; + break; + } + } + } + return nIndex % quorum.qc->validMembers.size(); +} + +void NetQuorum::StartCleanupOldQuorumDataThread(gsl::not_null pIndex) const +{ + // Note: this function is CPU heavy and we don't want it to be running during DKGs. + // The largest dkgMiningWindowStart for a related quorum type is 42 (LLMQ_60_75). + // At the same time most quorums use dkgInterval = 24 so the next DKG for them + // (after block 576 + 42) will start at block 576 + 24 * 2. That's only a 6 blocks + // window and it's better to have more room so we pick next cycle. + // dkgMiningWindowStart for small quorums is 10 i.e. a safe block to start + // these calculations is at height 576 + 24 * 2 + 10 = 576 + 58. + if (pIndex->nHeight % 576 != 58) { + return; + } + + cxxtimer::Timer t(/*start=*/true); + LogPrint(BCLog::LLMQ, "NetQuorum::%s -- start\n", __func__); + + // do not block the caller thread + workerPool.push([pIndex, t, this](int threadId) { + Uint256HashSet dbKeysToSkip; + + if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) { + utils::InitQuorumsCache(cleanupQuorumsCache, m_chainman.GetConsensus(), /*limit_by_connections=*/false); + } + for (const auto& params : Params().GetConsensus().llmqs) { + if (quorumThreadInterrupt) { + break; + } + LOCK(cs_cleanup); + auto& cache = cleanupQuorumsCache[params.type]; + const CBlockIndex* pindex_loop{pIndex}; + Uint256HashSet quorum_keys; + while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < params.max_store_depth()) { + uint256 quorum_key; + if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) { + quorum_keys.insert(quorum_key); + if (quorum_keys.size() >= static_cast(params.keepOldKeys)) break; // extra safety belt + } + pindex_loop = pindex_loop->pprev; + } + for (const auto& pQuorum : m_qman.ScanQuorums(params.type, pIndex, params.keepOldKeys - quorum_keys.size())) { + const uint256 quorum_key = MakeQuorumKey(*pQuorum); + quorum_keys.insert(quorum_key); + cache.insert(pQuorum->m_quorum_base_block_index->GetBlockHash(), quorum_key); + } + dbKeysToSkip.merge(quorum_keys); + } + + if (!quorumThreadInterrupt) { + m_qman.CleanupOldQuorumData(dbKeysToSkip); + } + + LogPrint(BCLog::LLMQ, "NetQuorum::StartCleanupOldQuorumDataThread -- done. time=%d\n", t.count()); + }); +} + +} // namespace llmq diff --git a/src/llmq/net_quorum.h b/src/llmq/net_quorum.h new file mode 100644 index 000000000000..8b48f1e743c8 --- /dev/null +++ b/src/llmq/net_quorum.h @@ -0,0 +1,127 @@ +// Copyright (c) 2025-2026 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_LLMQ_NET_QUORUM_H +#define BITCOIN_LLMQ_NET_QUORUM_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +class CActiveMasternodeManager; +class CBlockIndex; +class CBLSWorker; +class CConnman; +class CDeterministicMNManager; +class CMasternodeSync; +class CSporkManager; +namespace Consensus { +struct LLMQParams; +} // namespace Consensus +namespace llmq { +class CQuorumManager; +class CQuorumSnapshotManager; +class QuorumRole; +} // namespace llmq + +namespace llmq { +/** + * NetHandler responsible for all quorum networking: + * - QGETDATA / QDATA message processing (quorum vvec and encrypted contribution exchange) + * - Background quorum peer connection management (CheckQuorumConnections) + * - Background vvec and sk-share data recovery threads + * - Periodic cleanup of old quorum DB entries + * + * Owned exclusively by PeerManagerImpl via AddExtraHandler. No other subsystem + * holds a reference to this object. + */ +class NetQuorum final : public NetHandler, public CValidationInterface +{ +public: + NetQuorum(PeerManagerInternal* peer_manager, CBLSWorker& bls_worker, + CConnman& connman, CDeterministicMNManager& dmnman, CQuorumManager& qman, + CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, + const CMasternodeSync& mn_sync, const CSporkManager& sporkman, + QuorumRole* quorum_role, CActiveMasternodeManager* nodeman, + int16_t worker_count, const QvvecSyncModeMap& sync_map, bool quorums_recovery); + + // NetHandler + void ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv) override; + void Start() override; + void Stop() override; + void Interrupt() override { quorumThreadInterrupt(); } + +protected: + // CValidationInterface + void InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) override; + void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, + bool fInitialDownload) override; + +private: + DataRequestStatus RequestQuorumData(CNode& peer, const CQuorum& quorum, uint16_t nDataMask, + const uint256& proTxHash = uint256()) const; + + Uint256HashSet GetQuorumsToDelete(const Consensus::LLMQParams& llmqParams, + gsl::not_null pindexNew) const; + void CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, + gsl::not_null pindexNew) const; + void TriggerQuorumDataRecoveryThreads(gsl::not_null block_index) const; + void DataRecoveryThread(gsl::not_null block_index, CQuorumCPtr pQuorum, + uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const; + void StartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum) const; + void TryStartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum, + bool fWeAreQuorumTypeMember) const; + void StartSkShareRecoveryThread(gsl::not_null pIndex, CQuorumCPtr pQuorum, + uint16_t nDataMaskIn) const; + /// Returns the start offset for the masternode with the given proTxHash. This offset is applied when picking data + /// recovery members of a quorum's memberlist and is calculated based on a list of all member of all active quorums + /// for the given llmqType in a way that each member should receive the same number of request if all active + /// llmqType members requests data from one llmqType quorum. + size_t GetQuorumRecoveryStartOffset(const CQuorum& quorum, + gsl::not_null pIndex) const; + void StartCleanupOldQuorumDataThread(gsl::not_null pIndex) const; + + bool ProcessContribQGETDATA(CDataStream& ssResponseData, const CQuorum& quorum, + CQuorumDataRequest& request, + gsl::not_null block_index) const; + bool ProcessContribQDATA(CNode& pfrom, CDataStream& vRecv, + CQuorum& quorum, CQuorumDataRequest& request); + +private: + CBLSWorker& m_bls_worker; + CConnman& m_connman; + CDeterministicMNManager& m_dmnman; + CQuorumManager& m_qman; + CQuorumSnapshotManager& m_qsnapman; + const ChainstateManager& m_chainman; + const CMasternodeSync& m_mn_sync; + const CSporkManager& m_sporkman; + //! Non-null only when masternode or observer mode is active + QuorumRole* const m_role; + //! Non-null only in masternode mode + CActiveMasternodeManager* const m_nodeman; + + const int16_t m_worker_count; + const QvvecSyncModeMap m_sync_map; + const bool m_quorums_recovery; + + mutable Mutex cs_cleanup; + mutable std::map> cleanupQuorumsCache + GUARDED_BY(cs_cleanup); + + mutable ctpl::thread_pool workerPool; + mutable CThreadInterrupt quorumThreadInterrupt; +}; +} // namespace llmq + +#endif // BITCOIN_LLMQ_NET_QUORUM_H diff --git a/src/llmq/observer/context.cpp b/src/llmq/observer.cpp similarity index 58% rename from src/llmq/observer/context.cpp rename to src/llmq/observer.cpp index 00e4ce2ef784..3d0dcf1eb86a 100644 --- a/src/llmq/observer/context.cpp +++ b/src/llmq/observer.cpp @@ -2,34 +2,30 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#include +#include #include #include -#include -#include +#include #include namespace llmq { -ObserverContext::ObserverContext(CBLSWorker& bls_worker, CConnman& connman, CDeterministicMNManager& dmnman, - CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync, +ObserverContext::ObserverContext(CBLSWorker& bls_worker, CDeterministicMNManager& dmnman, + CMasternodeMetaMan& mn_metaman, llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, - const CSporkManager& sporkman, const llmq::QvvecSyncModeMap& sync_map, - const util::DbWrapperParams& db_params, bool quorums_recovery) : - m_qman{qman}, + const CSporkManager& sporkman, const util::DbWrapperParams& db_params) : + QuorumRole{qman}, dkgdbgman{std::make_unique(dmnman, qsnapman, chainman)}, qdkgsman{std::make_unique(dmnman, qsnapman, chainman, sporkman, db_params, - /*quorums_watch=*/true)}, - qman_handler{std::make_unique(connman, dmnman, qman, qsnapman, chainman, mn_sync, sporkman, - sync_map, quorums_recovery)} + /*quorums_watch=*/true)} { qdkgsman->InitializeHandlers([&](const Consensus::LLMQParams& llmq_params, [[maybe_unused]] int quorum_idx) -> std::unique_ptr { return std::make_unique(llmq_params); }); - m_qman.ConnectManagers(qman_handler.get(), qdkgsman.get()); + m_qman.ConnectManagers(this, qdkgsman.get()); } ObserverContext::~ObserverContext() @@ -37,30 +33,11 @@ ObserverContext::~ObserverContext() m_qman.DisconnectManagers(); } -void ObserverContext::Start(int16_t worker_count) -{ - qman_handler->Start(worker_count); -} - -void ObserverContext::Stop() -{ - qman_handler->Stop(); -} - -void ObserverContext::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) -{ - UpdatedBlockTip(tip, nullptr, ibd); - if (tip) { - qman_handler->InitializeQuorumConnections(tip); - } -} - void ObserverContext::UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) { if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones return; qdkgsman->UpdatedBlockTip(pindexNew, fInitialDownload); - qman_handler->UpdatedBlockTip(pindexNew, fInitialDownload); } } // namespace llmq diff --git a/src/llmq/observer/context.h b/src/llmq/observer.h similarity index 56% rename from src/llmq/observer/context.h rename to src/llmq/observer.h index c373508dd386..0244c805c300 100644 --- a/src/llmq/observer/context.h +++ b/src/llmq/observer.h @@ -2,55 +2,53 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#ifndef BITCOIN_LLMQ_OBSERVER_CONTEXT_H -#define BITCOIN_LLMQ_OBSERVER_CONTEXT_H +#ifndef BITCOIN_LLMQ_OBSERVER_H +#define BITCOIN_LLMQ_OBSERVER_H -#include +#include #include +#include +#include + #include class CBLSWorker; class CBlockIndex; -class CConnman; class CDeterministicMNManager; -class ChainstateManager; class CMasternodeMetaMan; -class CMasternodeSync; class CSporkManager; namespace llmq { class CDKGDebugManager; class CDKGSessionManager; +class CQuorum; class CQuorumBlockProcessor; -class CQuorumManager; +class CQuorumDataRequest; class CQuorumSnapshotManager; -class QuorumObserver; } // namespace llmq namespace util { struct DbWrapperParams; } // namespace util namespace llmq { -struct ObserverContext final : public CValidationInterface { -private: - llmq::CQuorumManager& m_qman; - +struct ObserverContext final : public QuorumRole, public CValidationInterface { public: ObserverContext() = delete; ObserverContext(const ObserverContext&) = delete; ObserverContext& operator=(const ObserverContext&) = delete; - ObserverContext(CBLSWorker& bls_worker, CConnman& connman, CDeterministicMNManager& dmnman, - CMasternodeMetaMan& mn_metaman, CMasternodeSync& mn_sync, llmq::CQuorumBlockProcessor& qblockman, + ObserverContext(CBLSWorker& bls_worker, CDeterministicMNManager& dmnman, + CMasternodeMetaMan& mn_metaman, llmq::CQuorumBlockProcessor& qblockman, llmq::CQuorumManager& qman, llmq::CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, - const CSporkManager& sporkman, const llmq::QvvecSyncModeMap& sync_map, - const util::DbWrapperParams& db_params, bool quorums_recovery); + const CSporkManager& sporkman, const util::DbWrapperParams& db_params); ~ObserverContext(); - void Start(int16_t worker_count); - void Stop(); - void InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd); - + // QuorumRole + // Watch-only nodes are not masternodes + bool IsMasternode() const override { return false; } + // We are only initialized if watch-only mode is enabled + bool IsWatching() const override { return true; } + bool SetQuorumSecretKeyShare(CQuorum& quorum, Span skContributions) const override { return false; } protected: // CValidationInterface void UpdatedBlockTip(const CBlockIndex* pindexNew, const CBlockIndex* pindexFork, bool fInitialDownload) override; @@ -58,10 +56,7 @@ struct ObserverContext final : public CValidationInterface { public: const std::unique_ptr dkgdbgman; const std::unique_ptr qdkgsman; - -private: - const std::unique_ptr qman_handler; }; } // namespace llmq -#endif // BITCOIN_LLMQ_OBSERVER_CONTEXT_H +#endif // BITCOIN_LLMQ_OBSERVER_H diff --git a/src/llmq/observer/quorums.cpp b/src/llmq/observer/quorums.cpp deleted file mode 100644 index 69c727df9f47..000000000000 --- a/src/llmq/observer/quorums.cpp +++ /dev/null @@ -1,380 +0,0 @@ -// Copyright (c) 2018-2026 The Dash Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include - -#include - -namespace llmq { -QuorumObserver::QuorumObserver(CConnman& connman, CDeterministicMNManager& dmnman, QuorumObserverParent& qman, - CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, - const CMasternodeSync& mn_sync, const CSporkManager& sporkman, - const llmq::QvvecSyncModeMap& sync_map, bool quorums_recovery) : - m_connman{connman}, - m_dmnman{dmnman}, - m_qman{qman}, - m_qsnapman{qsnapman}, - m_chainman{chainman}, - m_mn_sync{mn_sync}, - m_sporkman{sporkman}, - m_quorums_recovery{quorums_recovery}, - m_sync_map{sync_map} -{ - quorumThreadInterrupt.reset(); -} - -QuorumObserver::~QuorumObserver() -{ - Stop(); -} - -void QuorumObserver::Start(int16_t worker_count) -{ - assert(worker_count > 0); - workerPool.resize(worker_count); - RenameThreadPool(workerPool, "q-mngr"); -} - -void QuorumObserver::Stop() -{ - quorumThreadInterrupt(); - workerPool.clear_queue(); - workerPool.stop(true); -} - -void QuorumObserver::InitializeQuorumConnections(gsl::not_null pindexNew) const -{ - for (const auto& params : Params().GetConsensus().llmqs) { - CheckQuorumConnections(params, pindexNew); - } -} - -void QuorumObserver::UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const -{ - if (!pindexNew) return; - if (!m_mn_sync.IsBlockchainSynced()) return; - - for (const auto& params : Params().GetConsensus().llmqs) { - CheckQuorumConnections(params, pindexNew); - } - - // Cleanup expired data requests - m_qman.CleanupExpiredDataRequests(); - - TriggerQuorumDataRecoveryThreads(pindexNew); - StartCleanupOldQuorumDataThread(pindexNew); -} - -Uint256HashSet QuorumObserver::GetQuorumsToDelete(const Consensus::LLMQParams& llmqParams, - gsl::not_null pindexNew) const -{ - auto connmanQuorumsToDelete = m_connman.GetMasternodeQuorums(llmqParams.type); - - // don't remove connections for the currently in-progress DKG round - if (IsQuorumRotationEnabled(llmqParams, pindexNew)) { - int cycleIndexTipHeight = pindexNew->nHeight % llmqParams.dkgInterval; - int cycleQuorumBaseHeight = pindexNew->nHeight - cycleIndexTipHeight; - std::stringstream ss; - for (const auto quorumIndex : util::irange(llmqParams.signingActiveQuorumCount)) { - if (quorumIndex <= cycleIndexTipHeight) { - int curDkgHeight = cycleQuorumBaseHeight + quorumIndex; - auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash(); - ss << curDkgHeight << ":" << curDkgBlock.ToString() << " | "; - connmanQuorumsToDelete.erase(curDkgBlock); - } - } - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- llmqType[%d] h[%d] keeping mn quorum connections for rotated quorums: [%s]\n", __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, ss.str()); - } else { - int curDkgHeight = pindexNew->nHeight - (pindexNew->nHeight % llmqParams.dkgInterval); - auto curDkgBlock = pindexNew->GetAncestor(curDkgHeight)->GetBlockHash(); - connmanQuorumsToDelete.erase(curDkgBlock); - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n", __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, curDkgHeight, curDkgBlock.ToString()); - } - - return connmanQuorumsToDelete; -} - -void QuorumObserver::CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, - gsl::not_null pindexNew) const -{ - auto lastQuorums = m_qman.ScanQuorums(llmqParams.type, pindexNew, (size_t)llmqParams.keepOldConnections); - auto deletableQuorums = GetQuorumsToDelete(llmqParams, pindexNew); - - for (const auto& quorum : lastQuorums) { - if (utils::EnsureQuorumConnections(llmqParams, m_connman, m_sporkman, {m_dmnman, m_qsnapman, m_chainman, quorum->m_quorum_base_block_index}, - m_dmnman.GetListAtChainTip(), /*myProTxHash=*/uint256{}, /*is_masternode=*/false, /*quorums_watch=*/true)) { - if (deletableQuorums.erase(quorum->qc->quorumHash) > 0) { - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- llmqType[%d] h[%d] keeping mn quorum connections for quorum: [%d:%s]\n", __func__, std23::to_underlying(llmqParams.type), pindexNew->nHeight, quorum->m_quorum_base_block_index->nHeight, quorum->m_quorum_base_block_index->GetBlockHash().ToString()); - } - } - } - - for (const auto& quorumHash : deletableQuorums) { - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- removing masternodes quorum connections for quorum %s:\n", __func__, quorumHash.ToString()); - m_connman.RemoveMasternodeQuorumNodes(llmqParams.type, quorumHash); - } -} - -bool QuorumObserver::SetQuorumSecretKeyShare(CQuorum& quorum, Span skContributions) const -{ - // Watch-only nodes cannot work with secret keys - return false; -} - -MessageProcessingResult QuorumObserver::ProcessContribQGETDATA(bool request_limit_exceeded, CDataStream& vStream, - const CQuorum& quorum, CQuorumDataRequest& request, - gsl::not_null block_index) -{ - // Watch-only nodes cannot provide encrypted contributions - if (request.GetDataMask() & CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS) { - request.SetError(CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING); - return request_limit_exceeded ? MisbehavingError{25, "request limit exceeded"} : MessageProcessingResult{}; - } - return {}; -} - -MessageProcessingResult QuorumObserver::ProcessContribQDATA(CNode& pfrom, CDataStream& vStream, - CQuorum& quorum, CQuorumDataRequest& request) -{ - // Watch-only nodes ignore encrypted contributions - return {}; -} - -bool QuorumObserver::IsMasternode() const -{ - // Watch-only nodes are not masternodes - return false; -} - -bool QuorumObserver::IsWatching() const -{ - // We are only initialized if watch-only mode is enabled - return true; -} - -void QuorumObserver::DataRecoveryThread(gsl::not_null block_index, CQuorumCPtr pQuorum, - uint16_t data_mask, const uint256& protx_hash, size_t start_offset) const -{ - size_t nTries{0}; - uint16_t nDataMask{data_mask}; - int64_t nTimeLastSuccess{0}; - uint256* pCurrentMemberHash{nullptr}; - std::vector vecMemberHashes; - const int64_t nRequestTimeout{10}; - - auto printLog = [&](const std::string& strMessage) { - const std::string strMember{pCurrentMemberHash == nullptr ? "nullptr" : pCurrentMemberHash->ToString()}; - LogPrint(BCLog::LLMQ, "QuorumObserver::DataRecoveryThread -- %s - for llmqType %d, quorumHash %s, nDataMask (%d/%d), pCurrentMemberHash %s, nTries %d\n", - strMessage, std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), nDataMask, data_mask, strMember, nTries); - }; - printLog("Start"); - - while (!m_mn_sync.IsBlockchainSynced() && !quorumThreadInterrupt) { - quorumThreadInterrupt.sleep_for(std::chrono::seconds(nRequestTimeout)); - } - - if (quorumThreadInterrupt) { - printLog("Aborted"); - return; - } - - vecMemberHashes.reserve(pQuorum->qc->validMembers.size()); - for (auto& member : pQuorum->members) { - if (pQuorum->IsValidMember(member->proTxHash) && member->proTxHash != protx_hash) { - vecMemberHashes.push_back(member->proTxHash); - } - } - std::sort(vecMemberHashes.begin(), vecMemberHashes.end()); - - printLog("Try to request"); - - while (nDataMask > 0 && !quorumThreadInterrupt) { - if (nDataMask & llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR && - pQuorum->HasVerificationVector()) { - nDataMask &= ~llmq::CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR; - printLog("Received quorumVvec"); - } - - if (nDataMask & llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS && pQuorum->GetSkShare().IsValid()) { - nDataMask &= ~llmq::CQuorumDataRequest::ENCRYPTED_CONTRIBUTIONS; - printLog("Received skShare"); - } - - if (nDataMask == 0) { - printLog("Success"); - break; - } - - if ((GetTime().count() - nTimeLastSuccess) > nRequestTimeout) { - if (nTries >= vecMemberHashes.size()) { - printLog("All tried but failed"); - break; - } - // Access the member list of the quorum with the calculated offset applied to balance the load equally - pCurrentMemberHash = &vecMemberHashes[(start_offset + nTries++) % vecMemberHashes.size()]; - if (m_qman.IsDataRequestPending(*pCurrentMemberHash, /*we_requested=*/true, pQuorum->qc->quorumHash, - pQuorum->qc->llmqType)) { - printLog("Already asked"); - continue; - } - // Sleep a bit depending on the start offset to balance out multiple requests to same masternode - quorumThreadInterrupt.sleep_for(std::chrono::milliseconds(start_offset * 100)); - nTimeLastSuccess = GetTime().count(); - m_connman.AddPendingMasternode(*pCurrentMemberHash); - printLog("Connect"); - } - - m_connman.ForEachNode([&](CNode* pNode) { - auto verifiedProRegTxHash = pNode->GetVerifiedProRegTxHash(); - if (pCurrentMemberHash == nullptr || verifiedProRegTxHash != *pCurrentMemberHash) { - return; - } - - if (m_qman.RequestQuorumData(pNode, m_connman, *pQuorum, nDataMask, protx_hash)) { - nTimeLastSuccess = GetTime().count(); - printLog("Requested"); - } else { - const auto status = m_qman.GetDataRequestStatus(*pCurrentMemberHash, /*we_requested=*/true, - pQuorum->qc->quorumHash, pQuorum->qc->llmqType); - switch (status) { - case DataRequestStatus::NotFound: - printLog("Failed"); - pNode->fDisconnect = true; - pCurrentMemberHash = nullptr; - return; - case DataRequestStatus::Processed: - printLog("Processed"); - pNode->fDisconnect = true; - pCurrentMemberHash = nullptr; - return; - case DataRequestStatus::Pending: - printLog("Waiting"); - return; - } - } - }); - quorumThreadInterrupt.sleep_for(std::chrono::seconds(1)); - } - pQuorum->fQuorumDataRecoveryThreadRunning = false; - printLog("Done"); -} - -void QuorumObserver::StartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum) const -{ - bool expected = false; - if (!pQuorum->fQuorumDataRecoveryThreadRunning.compare_exchange_strong(expected, true)) { - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- Already running\n", __func__); - return; - } - - workerPool.push([pQuorum = std::move(pQuorum), block_index, this](int threadId) mutable { - DataRecoveryThread(block_index, std::move(pQuorum), CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR, - /*protx_hash=*/uint256(), /*start_offset=*/0); - }); -} - -void QuorumObserver::TriggerQuorumDataRecoveryThreads(gsl::not_null block_index) const -{ - if (!m_quorums_recovery) { - return; - } - - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- Process block %s\n", __func__, block_index->GetBlockHash().ToString()); - for (const auto& params : Params().GetConsensus().llmqs) { - auto vecQuorums = m_qman.ScanQuorums(params.type, block_index, params.keepOldConnections); - for (auto& pQuorum : vecQuorums) { - TryStartVvecSyncThread(block_index, std::move(pQuorum), /*fWeAreQuorumTypeMember=*/false); - } - } -} - -void QuorumObserver::TryStartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum, - bool fWeAreQuorumTypeMember) const -{ - if (pQuorum->fQuorumDataRecoveryThreadRunning) return; - - const bool fSyncForTypeEnabled = m_sync_map.count(pQuorum->qc->llmqType) > 0; - const QvvecSyncMode syncMode = fSyncForTypeEnabled ? m_sync_map.at(pQuorum->qc->llmqType) : QvvecSyncMode::Invalid; - const bool fSyncCurrent = syncMode == QvvecSyncMode::Always || - (syncMode == QvvecSyncMode::OnlyIfTypeMember && fWeAreQuorumTypeMember); - - if ((fSyncForTypeEnabled && fSyncCurrent) && !pQuorum->HasVerificationVector()) { - StartVvecSyncThread(block_index, std::move(pQuorum)); - } else { - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- No data needed from (%d, %s) at height %d\n", __func__, - std23::to_underlying(pQuorum->qc->llmqType), pQuorum->qc->quorumHash.ToString(), block_index->nHeight); - } -} - -void QuorumObserver::StartCleanupOldQuorumDataThread(gsl::not_null pIndex) const -{ - // Note: this function is CPU heavy and we don't want it to be running during DKGs. - // The largest dkgMiningWindowStart for a related quorum type is 42 (LLMQ_60_75). - // At the same time most quorums use dkgInterval = 24 so the next DKG for them - // (after block 576 + 42) will start at block 576 + 24 * 2. That's only a 6 blocks - // window and it's better to have more room so we pick next cycle. - // dkgMiningWindowStart for small quorums is 10 i.e. a safe block to start - // these calculations is at height 576 + 24 * 2 + 10 = 576 + 58. - if (pIndex->nHeight % 576 != 58) { - return; - } - - cxxtimer::Timer t(/*start=*/ true); - LogPrint(BCLog::LLMQ, "QuorumObserver::%s -- start\n", __func__); - - // do not block the caller thread - workerPool.push([pIndex, t, this](int threadId) { - Uint256HashSet dbKeysToSkip; - - if (LOCK(cs_cleanup); cleanupQuorumsCache.empty()) { - utils::InitQuorumsCache(cleanupQuorumsCache, m_chainman.GetConsensus(), /*limit_by_connections=*/false); - } - for (const auto& params : Params().GetConsensus().llmqs) { - if (quorumThreadInterrupt) { - break; - } - LOCK(cs_cleanup); - auto& cache = cleanupQuorumsCache[params.type]; - const CBlockIndex* pindex_loop{pIndex}; - Uint256HashSet quorum_keys; - while (pindex_loop != nullptr && pIndex->nHeight - pindex_loop->nHeight < params.max_store_depth()) { - uint256 quorum_key; - if (cache.get(pindex_loop->GetBlockHash(), quorum_key)) { - quorum_keys.insert(quorum_key); - if (quorum_keys.size() >= static_cast(params.keepOldKeys)) break; // extra safety belt - } - pindex_loop = pindex_loop->pprev; - } - for (const auto& pQuorum : m_qman.ScanQuorums(params.type, pIndex, params.keepOldKeys - quorum_keys.size())) { - const uint256 quorum_key = MakeQuorumKey(*pQuorum); - quorum_keys.insert(quorum_key); - cache.insert(pQuorum->m_quorum_base_block_index->GetBlockHash(), quorum_key); - } - dbKeysToSkip.merge(quorum_keys); - } - - if (!quorumThreadInterrupt) { - m_qman.CleanupOldQuorumData(dbKeysToSkip); - } - - LogPrint(BCLog::LLMQ, "QuorumObserver::StartCleanupOldQuorumDataThread -- done. time=%d\n", t.count()); - }); -} -} // namespace llmq diff --git a/src/llmq/observer/quorums.h b/src/llmq/observer/quorums.h deleted file mode 100644 index 6f60a734515d..000000000000 --- a/src/llmq/observer/quorums.h +++ /dev/null @@ -1,130 +0,0 @@ -// Copyright (c) 2018-2026 The Dash Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#ifndef BITCOIN_LLMQ_OBSERVER_QUORUMS_H -#define BITCOIN_LLMQ_OBSERVER_QUORUMS_H - -#include -#include -#include -#include -#include - -#include -#include -#include -#include -#include -#include - -#include - -#include -#include - -class CConnman; -class CDataStream; -class CDeterministicMNManager; -class CMasternodeSync; -class CNode; -class CSporkManager; -struct MessageProcessingResult; -namespace llmq { -class CQuorumDataRequest; -class CQuorumSnapshotManager; -} // namespace llmq - -namespace llmq { -enum class DataRequestStatus : uint8_t { - NotFound, - Pending, - Processed, -}; - -class QuorumObserverParent -{ -public: - virtual ~QuorumObserverParent() = default; - virtual bool GetEncryptedContributions(Consensus::LLMQType llmq_type, const CBlockIndex* block_index, - const std::vector& valid_members, const uint256& protx_hash, - std::vector>& vec_enc) const = 0; - virtual bool IsDataRequestPending(const uint256& proRegTx, bool we_requested, const uint256& quorumHash, - Consensus::LLMQType llmqType) const = 0; - virtual bool RequestQuorumData(CNode* pfrom, CConnman& connman, const CQuorum& quorum, uint16_t nDataMask, - const uint256& proTxHash) const = 0; - virtual DataRequestStatus GetDataRequestStatus(const uint256& proRegTx, bool we_requested, - const uint256& quorumHash, Consensus::LLMQType llmqType) const = 0; - virtual std::vector ScanQuorums(Consensus::LLMQType llmqType, - gsl::not_null pindexStart, - size_t nCountRequested) const = 0; - virtual void CleanupExpiredDataRequests() const = 0; - virtual void CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const = 0; -}; - -class QuorumObserver -{ -protected: - CConnman& m_connman; - CDeterministicMNManager& m_dmnman; - QuorumObserverParent& m_qman; - CQuorumSnapshotManager& m_qsnapman; - const ChainstateManager& m_chainman; - const CMasternodeSync& m_mn_sync; - const CSporkManager& m_sporkman; - const bool m_quorums_recovery{false}; - const llmq::QvvecSyncModeMap m_sync_map; - -protected: - mutable Mutex cs_cleanup; - mutable std::map> cleanupQuorumsCache GUARDED_BY(cs_cleanup); - - mutable ctpl::thread_pool workerPool; - mutable CThreadInterrupt quorumThreadInterrupt; - -public: - QuorumObserver() = delete; - QuorumObserver(const QuorumObserver&) = delete; - QuorumObserver& operator=(const QuorumObserver&) = delete; - explicit QuorumObserver(CConnman& connman, CDeterministicMNManager& dmnman, QuorumObserverParent& qman, - CQuorumSnapshotManager& qsnapman, const ChainstateManager& chainman, - const CMasternodeSync& mn_sync, const CSporkManager& sporkman, - const llmq::QvvecSyncModeMap& sync_map, bool quorums_recovery); - virtual ~QuorumObserver(); - - void Start(int16_t worker_count); - void Stop(); - - void UpdatedBlockTip(const CBlockIndex* pindexNew, bool fInitialDownload) const; - void InitializeQuorumConnections(gsl::not_null pindexNew) const; - -public: - virtual bool SetQuorumSecretKeyShare(CQuorum& quorum, Span skContributions) const; - [[nodiscard]] virtual MessageProcessingResult ProcessContribQGETDATA( - bool request_limit_exceeded, CDataStream& vStream, const CQuorum& quorum, CQuorumDataRequest& request, - gsl::not_null block_index); - [[nodiscard]] virtual MessageProcessingResult ProcessContribQDATA( - CNode& pfrom, CDataStream& vStream, CQuorum& quorum, CQuorumDataRequest& request); - virtual bool IsMasternode() const; - virtual bool IsWatching() const; - -protected: - virtual void CheckQuorumConnections(const Consensus::LLMQParams& llmqParams, - gsl::not_null pindexNew) const; - virtual void TriggerQuorumDataRecoveryThreads(gsl::not_null pIndex) const; - -protected: - Uint256HashSet GetQuorumsToDelete(const Consensus::LLMQParams& llmqParams, - gsl::not_null pindexNew) const; - - void DataRecoveryThread(gsl::not_null block_index, CQuorumCPtr quorum, uint16_t data_mask, - const uint256& protx_hash, size_t start_offset) const; - void StartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum) const; - void TryStartVvecSyncThread(gsl::not_null block_index, CQuorumCPtr pQuorum, - bool fWeAreQuorumTypeMember) const; - - void StartCleanupOldQuorumDataThread(gsl::not_null pIndex) const; -}; -} // namespace llmq - -#endif // BITCOIN_LLMQ_OBSERVER_QUORUMS_H diff --git a/src/llmq/quorums.cpp b/src/llmq/quorums.cpp index 2eeb8d0d82ef..69b2100bd778 100644 --- a/src/llmq/quorums.cpp +++ b/src/llmq/quorums.cpp @@ -100,16 +100,18 @@ std::string CQuorumDataRequest::GetErrorString() const return "UNDEFINED"; } -CQuorum::CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker) : params(_params), blsCache(_blsWorker) +CQuorum::CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, + CFinalCommitmentPtr _qc, const CBlockIndex* _pQuorumBaseBlockIndex, + const uint256& _minedBlockHash, Span _members) : + params{_params}, + qc{std::move(_qc)}, + m_quorum_base_block_index{_pQuorumBaseBlockIndex}, + minedBlockHash{_minedBlockHash}, + members{_members.begin(), _members.end()}, + blsCache{_blsWorker} { -} - -void CQuorum::Init(CFinalCommitmentPtr _qc, const CBlockIndex* _pQuorumBaseBlockIndex, const uint256& _minedBlockHash, Span _members) -{ - qc = std::move(_qc); - m_quorum_base_block_index = _pQuorumBaseBlockIndex; - members = std::vector(_members.begin(), _members.end()); - minedBlockHash = _minedBlockHash; + assert(qc != nullptr); + assert(m_quorum_base_block_index != nullptr); } bool CQuorum::SetVerificationVector(const std::vector& quorumVecIn) diff --git a/src/llmq/quorums.h b/src/llmq/quorums.h index 5bd10eac2b9b..591048cc0ed1 100644 --- a/src/llmq/quorums.h +++ b/src/llmq/quorums.h @@ -12,6 +12,7 @@ #include #include +#include #include #include #include @@ -25,12 +26,20 @@ class CBlockIndex; class CDBWrapper; namespace llmq { class CQuorum; +class CQuorumDataRequest; class CQuorumManager; -class QuorumObserver; -class QuorumParticipant; -}; // namespace llmq +class QuorumRole; +} // namespace llmq namespace llmq { + +enum class DataRequestStatus : uint8_t { + NotFound, + Requested, + Pending, + Processed, +}; + extern const std::string DB_QUORUM_SK_SHARE; extern const std::string DB_QUORUM_QUORUM_VVEC; @@ -159,15 +168,13 @@ class CQuorumDataRequest class CQuorum { friend class CQuorumManager; - friend class llmq::QuorumObserver; - friend class llmq::QuorumParticipant; public: const Consensus::LLMQParams params; - CFinalCommitmentPtr qc; - const CBlockIndex* m_quorum_base_block_index{nullptr}; - uint256 minedBlockHash; - std::vector members; + const CFinalCommitmentPtr qc; + const CBlockIndex* m_quorum_base_block_index; + const uint256 minedBlockHash; + const std::vector members; private: // Recovery of public key shares is very slow, so we start a background thread that pre-populates a cache so that @@ -181,9 +188,10 @@ class CQuorum CBLSSecretKey skShare GUARDED_BY(cs_vvec_shShare); public: - CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker); + CQuorum(const Consensus::LLMQParams& _params, CBLSWorker& _blsWorker, + CFinalCommitmentPtr _qc, const CBlockIndex* _pQuorumBaseBlockIndex, const uint256& _minedBlockHash, Span _members); + ~CQuorum() = default; - void Init(CFinalCommitmentPtr _qc, const CBlockIndex* _pQuorumBaseBlockIndex, const uint256& _minedBlockHash, Span _members); bool SetVerificationVector(const std::vector& quorumVecIn) EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare); void SetVerificationVector(BLSVerificationVectorPtr vvec_in) EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare) @@ -195,6 +203,11 @@ class CQuorum EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare); bool HasVerificationVector() const EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare); + std::shared_ptr> GetVerificationVector() const EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare) + { + LOCK(cs_vvec_shShare); + return quorumVvec; + } bool IsMember(const uint256& proTxHash) const; bool IsValidMember(const uint256& proTxHash) const; int GetMemberIndex(const uint256& proTxHash) const; @@ -202,6 +215,11 @@ class CQuorum CBLSPublicKey GetPubKeyShare(size_t memberIdx) const EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare); CBLSSecretKey GetSkShare() const EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare); + //! Try to claim exclusive data recovery for this quorum. Returns true if claimed. + bool TryClaimRecovery() const { bool expected = false; return fQuorumDataRecoveryThreadRunning.compare_exchange_strong(expected, true); } + bool IsRecoveryRunning() const { return fQuorumDataRecoveryThreadRunning; } + void ReleaseRecovery() const { fQuorumDataRecoveryThreadRunning = false; } + private: bool HasVerificationVectorInternal() const EXCLUSIVE_LOCKS_REQUIRED(cs_vvec_shShare); void WriteContributions(CDBWrapper& db) const EXCLUSIVE_LOCKS_REQUIRED(!cs_vvec_shShare); diff --git a/src/llmq/quorumsman.cpp b/src/llmq/quorumsman.cpp index a93b5384e5d7..0e2c17969a51 100644 --- a/src/llmq/quorumsman.cpp +++ b/src/llmq/quorumsman.cpp @@ -20,8 +20,7 @@ #include #include -#include -#include +#include #include #include #include @@ -76,10 +75,10 @@ CQuorumPtr CQuorumManager::BuildQuorumFromCommitment(const Consensus::LLMQType l const auto& llmq_params_opt = Params().GetLLMQ(llmqType); assert(llmq_params_opt.has_value()); - auto quorum = std::make_shared(llmq_params_opt.value(), blsWorker); auto members = utils::GetAllQuorumMembers(qc.llmqType, {m_dmnman, m_qsnapman, m_chainman, pQuorumBaseBlockIndex}); - quorum->Init(std::make_unique(std::move(qc)), pQuorumBaseBlockIndex, minedBlockHash, members); + auto quorum = std::make_shared(llmq_params_opt.value(), blsWorker, + std::make_unique(std::move(qc)), pQuorumBaseBlockIndex, minedBlockHash, members); if (populate_cache && llmq_params_opt->size == 1) { WITH_LOCK(m_cs_maps, mapQuorumsCache[llmqType].insert(quorumHash, quorum)); @@ -147,49 +146,6 @@ bool CQuorumManager::HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockP return quorum_block_processor.HasMinedCommitment(llmqType, quorumHash); } -bool CQuorumManager::RequestQuorumData(CNode* pfrom, CConnman& connman, const CQuorum& quorum, uint16_t nDataMask, - const uint256& proTxHash) const -{ - if (pfrom == nullptr) { - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid pfrom: nullptr\n", __func__); - return false; - } - if (pfrom->GetVerifiedProRegTxHash().IsNull()) { - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- pfrom is not a verified masternode\n", __func__); - return false; - } - const Consensus::LLMQType llmqType = quorum.qc->llmqType; - if (!Params().GetLLMQ(llmqType).has_value()) { - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid llmqType: %d\n", __func__, std23::to_underlying(llmqType)); - return false; - } - const CBlockIndex* pindex{quorum.m_quorum_base_block_index}; - if (pindex == nullptr) { - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Invalid m_quorum_base_block_index : nullptr\n", __func__); - return false; - } - - LOCK(cs_data_requests); - const CQuorumDataRequestKey key(pfrom->GetVerifiedProRegTxHash(), true, pindex->GetBlockHash(), llmqType); - const CQuorumDataRequest request(llmqType, pindex->GetBlockHash(), nDataMask, proTxHash); - auto [old_pair, inserted] = mapQuorumDataRequests.emplace(key, request); - if (!inserted) { - if (old_pair->second.IsExpired(/*add_bias=*/true)) { - old_pair->second = request; - } else { - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- Already requested\n", __func__); - return false; - } - } - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- sending QGETDATA quorumHash[%s] llmqType[%d] proRegTx[%s]\n", __func__, key.quorumHash.ToString(), - std23::to_underlying(key.llmqType), key.proRegTx.ToString()); - - CNetMsgMaker msgMaker(pfrom->GetCommonVersion()); - connman.PushMessage(pfrom, msgMaker.Make(NetMsgType::QGETDATA, request)); - - return true; -} - std::vector CQuorumManager::ScanQuorums(Consensus::LLMQType llmqType, size_t nCountRequested) const { const CBlockIndex* pindex = WITH_LOCK(::cs_main, return m_chainman.ActiveTip()); @@ -406,162 +362,51 @@ CQuorumCPtr CQuorumManager::GetQuorum(Consensus::LLMQType llmqType, gsl::not_nul return BuildQuorumFromCommitment(llmqType, pQuorumBaseBlockIndex, populate_cache); } -MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& connman, std::string_view msg_type, CDataStream& vRecv) +bool CQuorumManager::RegisterDataRequest(const CQuorumDataRequestKey& key, const CQuorumDataRequest& request, + bool add_expiry_bias) const { - if (msg_type == NetMsgType::QGETDATA) { - if (!IsMasternode() || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) { - return MisbehavingError{10, "not a verified masternode or a qwatch connection"}; - } - - CQuorumDataRequest request; - vRecv >> request; - - auto sendQDATA = [&](CQuorumDataRequest::Errors nError, - bool request_limit_exceeded, - const CDataStream& body = CDataStream(SER_NETWORK, PROTOCOL_VERSION)) -> MessageProcessingResult { - MessageProcessingResult ret{}; - switch (nError) { - case (CQuorumDataRequest::Errors::NONE): - case (CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID): - case (CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND): - case (CQuorumDataRequest::Errors::QUORUM_NOT_FOUND): - case (CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER): - case (CQuorumDataRequest::Errors::UNDEFINED): - if (request_limit_exceeded) ret = MisbehavingError{25, "request limit exceeded"}; - break; - case (CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING): - case (CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING): - // Do not punish limit exceed if we don't have the requested data - break; - } - request.SetError(nError); - CDataStream ssResponse{SER_NETWORK, pfrom.GetCommonVersion()}; - ssResponse << request << body; - connman.PushMessage(&pfrom, CNetMsgMaker(pfrom.GetCommonVersion()).Make(NetMsgType::QDATA, ssResponse)); - return ret; - }; - - bool request_limit_exceeded(false); - { - LOCK2(::cs_main, cs_data_requests); - const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), false, request.GetQuorumHash(), request.GetLLMQType()); - auto it = mapQuorumDataRequests.find(key); - if (it == mapQuorumDataRequests.end()) { - it = mapQuorumDataRequests.emplace(key, request).first; - } else if (it->second.IsExpired(/*add_bias=*/false)) { - it->second = request; - } else { - request_limit_exceeded = true; - } - } - - if (!Params().GetLLMQ(request.GetLLMQType()).has_value()) { - return sendQDATA(CQuorumDataRequest::Errors::QUORUM_TYPE_INVALID, request_limit_exceeded); - } - - const CBlockIndex* pQuorumBaseBlockIndex = WITH_LOCK(::cs_main, return m_chainman.m_blockman.LookupBlockIndex(request.GetQuorumHash())); - if (pQuorumBaseBlockIndex == nullptr) { - return sendQDATA(CQuorumDataRequest::Errors::QUORUM_BLOCK_NOT_FOUND, request_limit_exceeded); - } - - const auto pQuorum = GetQuorum(request.GetLLMQType(), pQuorumBaseBlockIndex); - if (pQuorum == nullptr) { - return sendQDATA(CQuorumDataRequest::Errors::QUORUM_NOT_FOUND, request_limit_exceeded); - } - - CDataStream ssResponseData(SER_NETWORK, pfrom.GetCommonVersion()); - - // Check if request wants QUORUM_VERIFICATION_VECTOR data - if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) { - if (!pQuorum->HasVerificationVector()) { - return sendQDATA(CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING, request_limit_exceeded); - } - - WITH_LOCK(pQuorum->cs_vvec_shShare, ssResponseData << *pQuorum->quorumVvec); - } - - // Check if request wants ENCRYPTED_CONTRIBUTIONS data - CQuorumDataRequest::Errors ret_err{CQuorumDataRequest::Errors::NONE}; - MessageProcessingResult qdata_ret{}, ret{}; - if (m_handler) { - ret = m_handler->ProcessContribQGETDATA(request_limit_exceeded, ssResponseData, *pQuorum, request, pQuorumBaseBlockIndex); - if (auto request_err = request.GetError(); request_err != CQuorumDataRequest::Errors::NONE && - request_err != CQuorumDataRequest::Errors::UNDEFINED) { - ret_err = request_err; - } - } - // sendQDATA also pushes a message independent of the returned value - if (ret_err != CQuorumDataRequest::Errors::NONE) { - qdata_ret = sendQDATA(ret_err, request_limit_exceeded); - } else { - qdata_ret = sendQDATA(CQuorumDataRequest::Errors::NONE, request_limit_exceeded, ssResponseData); + LOCK(cs_data_requests); + auto [old_pair, inserted] = mapQuorumDataRequests.emplace(key, request); + if (!inserted) { + if (old_pair->second.IsExpired(add_expiry_bias)) { + old_pair->second = request; + return true; } - return ret.empty() ? qdata_ret : ret; + return false; } + return true; +} - if (msg_type == NetMsgType::QDATA) { - if ((!IsMasternode() && !IsWatching()) || pfrom.GetVerifiedProRegTxHash().IsNull()) { - return MisbehavingError{10, "not a verified masternode and -watchquorums is not enabled"}; - } - - CQuorumDataRequest request; - vRecv >> request; - - { - LOCK2(::cs_main, cs_data_requests); - const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), true, request.GetQuorumHash(), request.GetLLMQType()); - auto it = mapQuorumDataRequests.find(key); - if (it == mapQuorumDataRequests.end()) { - return MisbehavingError{10, "not requested"}; - } - if (it->second.IsProcessed()) { - return MisbehavingError(10, "already received"); - } - if (request != it->second) { - return MisbehavingError(10, "not like requested"); - } - it->second.SetProcessed(); - } - - if (request.GetError() != CQuorumDataRequest::Errors::NONE) { - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: Error %d (%s), from peer=%d\n", __func__, msg_type, request.GetError(), request.GetErrorString(), pfrom.GetId()); - return {}; - } - - CQuorumPtr pQuorum; - { - if (LOCK(m_cs_maps); !mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) { - // Don't bump score because we asked for it - LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId()); - return {}; - } - } - - // Check if request has QUORUM_VERIFICATION_VECTOR data - if (request.GetDataMask() & CQuorumDataRequest::QUORUM_VERIFICATION_VECTOR) { - - std::vector verificationVector; - vRecv >> verificationVector; - - if (pQuorum->SetVerificationVector(verificationVector)) { - QueueQuorumForWarming(pQuorum); - } else { - return MisbehavingError{10, "invalid quorum verification vector"}; - } - } - - // Check if request has ENCRYPTED_CONTRIBUTIONS data - if (m_handler) { - if (auto ret = m_handler->ProcessContribQDATA(pfrom, vRecv, *pQuorum, request); !ret.empty()) { - return ret; - } - } - - WITH_LOCK(cs_db, pQuorum->WriteContributions(*db)); - return {}; +CQuorumManager::DataResponseValidation CQuorumManager::ValidateDataResponse( + const CQuorumDataRequestKey& key, const CQuorumDataRequest& response) const +{ + LOCK(cs_data_requests); + auto it = mapQuorumDataRequests.find(key); + if (it == mapQuorumDataRequests.end()) { + return DataResponseValidation::NotRequested; } + if (it->second.IsProcessed()) { + return DataResponseValidation::AlreadyReceived; + } + if (response != it->second) { + return DataResponseValidation::Mismatch; + } + it->second.SetProcessed(); + return DataResponseValidation::OK; +} - return {}; +CQuorumPtr CQuorumManager::GetCachedMutableQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const +{ + CQuorumPtr pQuorum; + LOCK(m_cs_maps); + mapQuorumsCache[llmqType].get(quorumHash, pQuorum); + return pQuorum; +} + +void CQuorumManager::WriteContributions(const CQuorumPtr& quorum) const +{ + LOCK(cs_db); + quorum->WriteContributions(*db); } void CQuorumManager::CacheWarmingThreadMain() const @@ -691,6 +536,7 @@ CQuorumCPtr SelectQuorumForSigning(const Consensus::LLMQParams& llmq_params, con pindexStart = active_chain[startBlockHeight]; } + // don't remove connections for the currently in-progress DKG round if (IsQuorumRotationEnabled(llmq_params, pindexStart)) { auto quorums = qman.ScanQuorums(llmq_params.type, pindexStart, poolSize); if (quorums.empty()) { @@ -750,4 +596,5 @@ VerifyRecSigStatus VerifyRecoveredSig(Consensus::LLMQType llmqType, const CChain const bool ret = sig.VerifyInsecure(quorum->qc->quorumPublicKey, signHash.Get()); return ret ? VerifyRecSigStatus::Valid : VerifyRecSigStatus::Invalid; } + } // namespace llmq diff --git a/src/llmq/quorumsman.h b/src/llmq/quorumsman.h index 352ec128dc7f..c814af452454 100644 --- a/src/llmq/quorumsman.h +++ b/src/llmq/quorumsman.h @@ -5,40 +5,32 @@ #ifndef BITCOIN_LLMQ_QUORUMSMAN_H #define BITCOIN_LLMQ_QUORUMSMAN_H +#include #include -#include -#include #include #include #include -#include #include #include #include #include -#include #include -#include #include #include #include #include -#include class CBLSSignature; class CBLSWorker; class CBlockIndex; class CChain; -class CConnman; -class CDataStream; class CDeterministicMNManager; class CDBWrapper; class CEvoDB; class ChainstateManager; -class CNode; namespace util { struct DbWrapperParams; } // namespace util @@ -50,23 +42,17 @@ enum class VerifyRecSigStatus : uint8_t { Valid, }; +class QuorumRole; class CDKGSessionManager; class CQuorumBlockProcessor; class CQuorumSnapshotManager; -class QuorumObserver; -class QuorumParticipant; /** * The quorum manager maintains quorums which were mined on chain. When a quorum is requested from the manager, * it will lookup the commitment (through CQuorumBlockProcessor) and build a CQuorum object from it. - * - * It is also responsible for initialization of the intra-quorum connections for new quorums. */ -class CQuorumManager final : public QuorumObserverParent +class CQuorumManager final { - friend class llmq::QuorumObserver; - friend class llmq::QuorumParticipant; - private: CBLSWorker& blsWorker; CDeterministicMNManager& m_dmnman; @@ -74,7 +60,7 @@ class CQuorumManager final : public QuorumObserverParent CQuorumSnapshotManager& m_qsnapman; const ChainstateManager& m_chainman; llmq::CDKGSessionManager* m_qdkgsman{nullptr}; - llmq::QuorumObserver* m_handler{nullptr}; + llmq::QuorumRole* m_handler{nullptr}; private: mutable Mutex cs_db; @@ -110,7 +96,7 @@ class CQuorumManager final : public QuorumObserverParent const ChainstateManager& chainman, const util::DbWrapperParams& db_params); ~CQuorumManager(); - void ConnectManagers(gsl::not_null handler, gsl::not_null qdkgsman) + void ConnectManagers(gsl::not_null handler, gsl::not_null qdkgsman) { // Prohibit double initialization assert(m_handler == nullptr); @@ -126,18 +112,10 @@ class CQuorumManager final : public QuorumObserverParent bool GetEncryptedContributions(Consensus::LLMQType llmq_type, const CBlockIndex* block_index, const std::vector& valid_members, const uint256& protx_hash, - std::vector>& vec_enc) const override; - - [[nodiscard]] MessageProcessingResult ProcessMessage(CNode& pfrom, CConnman& connman, std::string_view msg_type, - CDataStream& vRecv) - EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !m_cs_maps, !m_cache_cs); + std::vector>& vec_enc) const; static bool HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockProcessor& quorum_block_processor, const uint256& quorumHash); - bool RequestQuorumData(CNode* pfrom, CConnman& connman, const CQuorum& quorum, uint16_t nDataMask, - const uint256& proTxHash = uint256()) const override - EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); - // all these methods will lock cs_main for a short period of time CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); @@ -146,19 +124,34 @@ class CQuorumManager final : public QuorumObserverParent // this one is cs_main-free std::vector ScanQuorums(Consensus::LLMQType llmqType, gsl::not_null pindexStart, - size_t nCountRequested) const override + size_t nCountRequested) const EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); bool IsMasternode() const; bool IsWatching() const; + //! Request tracking for QGETDATA/QDATA — used by NetQuorum and RPC + bool RegisterDataRequest(const CQuorumDataRequestKey& key, const CQuorumDataRequest& request, + bool add_expiry_bias = true) const + EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); + enum class DataResponseValidation : uint8_t { OK, NotRequested, AlreadyReceived, Mismatch }; + DataResponseValidation ValidateDataResponse(const CQuorumDataRequestKey& key, + const CQuorumDataRequest& response) const + EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); bool IsDataRequestPending(const uint256& proRegTx, bool we_requested, const uint256& quorumHash, - Consensus::LLMQType llmqType) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); + Consensus::LLMQType llmqType) const EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); DataRequestStatus GetDataRequestStatus(const uint256& proRegTx, bool we_requested, const uint256& quorumHash, - Consensus::LLMQType llmqType) const override + Consensus::LLMQType llmqType) const EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); - void CleanupExpiredDataRequests() const override EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); - void CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const override EXCLUSIVE_LOCKS_REQUIRED(!cs_db); + void CleanupExpiredDataRequests() const EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests); + + void CleanupOldQuorumData(const Uint256HashSet& dbKeysToSkip) const EXCLUSIVE_LOCKS_REQUIRED(!cs_db); + + //! Used by NetQuorum for QDATA processing + CQuorumPtr GetCachedMutableQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const + EXCLUSIVE_LOCKS_REQUIRED(!m_cs_maps); + void WriteContributions(const CQuorumPtr& quorum) const EXCLUSIVE_LOCKS_REQUIRED(!cs_db); + void QueueQuorumForWarming(CQuorumCPtr pQuorum) const EXCLUSIVE_LOCKS_REQUIRED(!m_cache_cs); private: // all private methods here are cs_main-free @@ -173,7 +166,6 @@ class CQuorumManager final : public QuorumObserverParent bool populate_cache = true) const EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !m_cs_maps, !m_cache_cs); - void QueueQuorumForWarming(CQuorumCPtr pQuorum) const EXCLUSIVE_LOCKS_REQUIRED(!m_cache_cs); void CacheWarmingThreadMain() const EXCLUSIVE_LOCKS_REQUIRED(!m_cache_cs); void MigrateOldQuorumDB(CEvoDB& evoDb) const EXCLUSIVE_LOCKS_REQUIRED(!cs_db); }; @@ -190,6 +182,29 @@ CQuorumCPtr SelectQuorumForSigning(const Consensus::LLMQParams& llmq_params, con VerifyRecSigStatus VerifyRecoveredSig(Consensus::LLMQType llmqType, const CChain& active_chain, const CQuorumManager& qman, int signedAtHeight, const uint256& id, const uint256& msgHash, const CBLSSignature& sig, int signOffset = SIGN_HEIGHT_OFFSET); + +/** + * Base class providing the interface used by CQuorumManager and NetQuorum. + * Both ObserverContext and ActiveContext inherit from this class. + */ +class QuorumRole +{ +protected: + CQuorumManager& m_qman; + +public: + QuorumRole() = delete; + QuorumRole(const QuorumRole&) = delete; + QuorumRole& operator=(const QuorumRole&) = delete; + explicit QuorumRole(CQuorumManager& qman) : m_qman{qman} {} + virtual ~QuorumRole() = default; + + virtual bool IsMasternode() const = 0; + virtual bool IsWatching() const = 0; + virtual uint256 GetProTxHash() const { return {}; } + virtual bool SetQuorumSecretKeyShare(CQuorum& quorum, Span skContributions) const = 0; +}; + } // namespace llmq #endif // BITCOIN_LLMQ_QUORUMSMAN_H diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 057fd3bf49c1..5a2127463d5f 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -56,12 +56,12 @@ #include #include #include +#include #include #include #include #include #include -#include #include #include #include @@ -5484,7 +5484,6 @@ void PeerManagerImpl::ProcessMessage( PostProcessMessage(m_sporkman.ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(CMNAuth::ProcessMessage(pfrom, peer->m_their_services, m_connman, m_mn_metaman, (m_active_ctx ? m_active_ctx->nodeman.get() : nullptr), m_mn_sync, m_dmnman->GetListAtChainTip(), msg_type, vRecv), pfrom.GetId()); PostProcessMessage(m_llmq_ctx->quorum_block_processor->ProcessMessage(pfrom, msg_type, vRecv), pfrom.GetId()); - PostProcessMessage(m_llmq_ctx->qman->ProcessMessage(pfrom, m_connman, msg_type, vRecv), pfrom.GetId()); PostProcessMessage(ProcessPlatformBanMessage(pfrom.GetId(), msg_type, vRecv), pfrom.GetId()); if (msg_type == NetMsgType::CLSIG) { diff --git a/src/node/context.cpp b/src/node/context.cpp index 025bb23e8687..776b74a9da19 100644 --- a/src/node/context.cpp +++ b/src/node/context.cpp @@ -29,7 +29,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/rpc/quorums.cpp b/src/rpc/quorums.cpp index df5f559590aa..c2033cd48dcc 100644 --- a/src/rpc/quorums.cpp +++ b/src/rpc/quorums.cpp @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include #include @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -923,7 +924,13 @@ static RPCHelpMan quorum_getdata() throw JSONRPCError(RPC_INVALID_PARAMETER, "quorum not found"); } return connman.ForNode(nodeId, [&](CNode* pNode) { - return llmq_ctx.qman->RequestQuorumData(pNode, connman, *quorum, nDataMask, proTxHash); + if (pNode->GetVerifiedProRegTxHash().IsNull()) return false; + if (!quorum->m_quorum_base_block_index) return false; + const llmq::CQuorumDataRequest request(llmqType, quorum->qc->quorumHash, nDataMask, proTxHash); + const llmq::CQuorumDataRequestKey key(pNode->GetVerifiedProRegTxHash(), true, quorum->qc->quorumHash, llmqType); + if (!llmq_ctx.qman->RegisterDataRequest(key, request)) return false; + connman.PushMessage(pNode, CNetMsgMaker(pNode->GetCommonVersion()).Make(NetMsgType::QGETDATA, request)); + return true; }); }, }; diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index 2fd14b426434..496be65c8d29 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -204,6 +204,10 @@ void CMainSignals::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockInd fInitialDownload); } +void CMainSignals::InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd) { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.InitializeCurrentBlockTip(tip, ibd); }); +} + void CMainSignals::SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.SynchronousUpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); }); } diff --git a/src/validationinterface.h b/src/validationinterface.h index 80b9a5f22a70..ad41b0d6c729 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -107,6 +107,18 @@ class CValidationInterface { * Called on a background thread. */ virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {} + /** + * Force UpdatedBlockTip to initialize block-tip-dependent state at + * startup (e.g. nCachedBlockHeight for DS/MN payments/budgets, + * quorum connections). Called synchronously from the loadblk + * thread — once after ThreadImport for general subscribers and + * again after nodeman->Init() so that masternode-specific state + * (proTxHash) is available for quorum connection setup. + * + * Unlike GetMainSignals().UpdatedBlockTip(), this does not trigger + * other listeners like ZMQ. + */ + virtual void InitializeCurrentBlockTip(const CBlockIndex* /*tip*/, bool /*ibd*/) {} /** * Same as UpdatedBlockTip, but called from the caller's thread */ @@ -226,6 +238,7 @@ class CMainSignals { void AcceptedBlockHeader(const CBlockIndex *pindexNew); void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); + void InitializeCurrentBlockTip(const CBlockIndex* tip, bool ibd); void SynchronousUpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void TransactionAddedToMempool(const CTransactionRef&, int64_t, uint64_t mempool_sequence); void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence); diff --git a/test/lint/lint-circular-dependencies.py b/test/lint/lint-circular-dependencies.py index f0da72c0790a..96ce0c51fa02 100755 --- a/test/lint/lint-circular-dependencies.py +++ b/test/lint/lint-circular-dependencies.py @@ -22,11 +22,11 @@ "kernel/coinstats -> validation -> kernel/coinstats", # Dash "active/context -> active/dkgsessionhandler -> llmq/dkgsessionhandler -> net_processing -> active/context", - "banman -> common/bloom -> evo/assetlocktx -> llmq/quorumsman -> net -> banman", + "banman -> common/bloom -> evo/assetlocktx -> llmq/quorumsman -> llmq/blockprocessor -> net -> banman", "chainlock/chainlock -> spork -> net -> evo/deterministicmns -> evo/providertx -> validation -> chainlock/chainlock", "coinjoin/client -> coinjoin/util -> wallet/wallet -> psbt -> node/transaction -> net_processing -> coinjoin/walletman -> coinjoin/client", "common/bloom -> evo/assetlocktx -> llmq/commitment -> evo/deterministicmns -> evo/simplifiedmns -> merkleblock -> common/bloom", - "common/bloom -> evo/assetlocktx -> llmq/quorumsman -> net -> common/bloom", + "common/bloom -> evo/assetlocktx -> llmq/quorumsman -> llmq/blockprocessor -> net -> common/bloom", "consensus/tx_verify -> evo/assetlocktx -> llmq/commitment -> validation -> consensus/tx_verify", "consensus/tx_verify -> evo/assetlocktx -> llmq/commitment -> validation -> txmempool -> consensus/tx_verify", "evo/assetlocktx -> llmq/commitment -> validation -> txmempool -> evo/assetlocktx", diff --git a/test/util/data/non-backported.txt b/test/util/data/non-backported.txt index a570da3bc6fa..dd29b023dc3e 100644 --- a/test/util/data/non-backported.txt +++ b/test/util/data/non-backported.txt @@ -22,8 +22,6 @@ src/instantsend/*.cpp src/instantsend/*.h src/llmq/*.cpp src/llmq/*.h -src/llmq/observer/*.cpp -src/llmq/observer/*.h src/masternode/*.cpp src/masternode/*.h src/messagesigner.*