diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -595,16 +595,6 @@ return sessionKey.GetPubKey(); } -uint256 Processor::buildLocalSighash(CNode *pfrom) const { - CHashWriter hasher(SER_GETHASH, 0); - hasher << peerData->delegation.getId(); - hasher << pfrom->GetLocalNonce(); - hasher << pfrom->nRemoteHostNonce; - hasher << pfrom->GetLocalExtraEntropy(); - hasher << pfrom->nRemoteExtraEntropy; - return hasher.GetHash(); -} - bool Processor::sendHello(CNode *pfrom) const { if (!peerData) { // We do not have a delegation to advertise. @@ -644,127 +634,78 @@ return eventLoop.stopEventLoop(); } -std::vector Processor::getInvsForNextPoll(bool forPoll) { - std::vector invs; - - auto extractVoteRecordsToInvs = [&](const auto &itemVoteRecordRange, - auto buildInvFromVoteItem) { - for (const auto &[item, voteRecord] : itemVoteRecordRange) { - if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { - // Make sure we do not produce more invs than specified by the - // protocol. - return true; - } - - const bool shouldPoll = - forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); - - if (!shouldPoll) { - continue; - } - - invs.emplace_back(buildInvFromVoteItem(item)); - } - - return invs.size() >= AVALANCHE_MAX_ELEMENT_POLL; - }; - - if (extractVoteRecordsToInvs(proofVoteRecords.getReadView(), - [](const ProofRef &proof) { - return CInv(MSG_AVA_PROOF, proof->getId()); - })) { - // The inventory vector is full, we're done - return invs; +void Processor::avaproofsSent(NodeId nodeid) { + if (::ChainstateActive().IsInitialBlockDownload()) { + // Before IBD is complete there is no way to make sure a proof is valid + // or not, e.g. it can be spent in a block we don't know yet. In order + // to increase confidence that our proof set is similar to other nodes + // on the network, the messages received during IBD are not accounted. + return; } - // First remove all blocks that are not worth polling. - { - LOCK(cs_main); - auto w = blockVoteRecords.getWriteView(); - for (auto it = w->begin(); it != w->end();) { - const CBlockIndex *pindex = it->first; - if (!IsWorthPolling(pindex)) { - w->erase(it++); - } else { - ++it; - } - } + LOCK(cs_peerManager); + if (peerManager->latchAvaproofsSent(nodeid)) { + avaproofsNodeCounter++; } +} - auto r = blockVoteRecords.getReadView(); - extractVoteRecordsToInvs(reverse_iterate(r), [](const CBlockIndex *pindex) { - return CInv(MSG_BLOCK, pindex->GetBlockHash()); - }); +/* + * Returns a bool indicating whether we have a usable Avalanche quorum enabling + * us to take decisions based on polls. + */ +bool Processor::isQuorumEstablished() { + if (quorumIsEstablished) { + return true; + } - return invs; -} + // Don't do Avalanche while node is IBD'ing + if (::ChainstateActive().IsInitialBlockDownload()) { + return false; + } -NodeId Processor::getSuitableNodeToQuery() { - LOCK(cs_peerManager); - return peerManager->selectNode(); -} + if (avaproofsNodeCounter < minAvaproofsNodeCount) { + return false; + } -void Processor::clearTimedoutRequests() { - auto now = std::chrono::steady_clock::now(); - std::map timedout_items{}; + auto localProof = getLocalProof(); + // Get the registered proof score and registered score we have nodes for + uint32_t totalPeersScore; + uint32_t connectedPeersScore; { - // Clear expired requests. - auto w = queries.getWriteView(); - auto it = w->get().begin(); - while (it != w->get().end() && it->timeout < now) { - for (const auto &i : it->invs) { - timedout_items[i]++; - } + LOCK(cs_peerManager); + totalPeersScore = peerManager->getTotalPeersScore(); + connectedPeersScore = peerManager->getConnectedPeersScore(); - w->get().erase(it++); + // Consider that we are always connected to our proof, even if we are + // the single node using that proof. + if (localProof && + peerManager->forPeer(localProof->getId(), [](const Peer &peer) { + return peer.node_count == 0; + })) { + connectedPeersScore += localProof->getScore(); } } - if (timedout_items.empty()) { - return; + // Ensure enough is being staked overall + if (totalPeersScore < minQuorumScore) { + return false; } - auto clearInflightRequest = [&](auto &voteRecords, const auto &voteItem, - uint8_t count) { - if (!voteItem) { - return false; - } - - auto voteRecordsWriteView = voteRecords.getWriteView(); - auto it = voteRecordsWriteView->find(voteItem); - if (it == voteRecordsWriteView.end()) { - return false; - } - - it->second.clearInflightRequest(count); - - return true; - }; - - // In flight request accounting. - for (const auto &p : timedout_items) { - const CInv &inv = p.first; - if (inv.IsMsgBlk()) { - const CBlockIndex *pindex = WITH_LOCK( - cs_main, return g_chainman.m_blockman.LookupBlockIndex( - BlockHash(inv.hash))); - - if (!clearInflightRequest(blockVoteRecords, pindex, p.second)) { - continue; - } - } + // Ensure we have connected score for enough of the overall score + uint32_t minConnectedScore = + std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); + if (connectedPeersScore < minConnectedScore) { + return false; + } - if (inv.IsMsgProof()) { - const ProofRef proof = - WITH_LOCK(cs_peerManager, - return peerManager->getProof(ProofId(inv.hash))); + quorumIsEstablished = true; + return true; +} - if (!clearInflightRequest(proofVoteRecords, proof, p.second)) { - continue; - } - } - } +void Processor::FinalizeNode(const Config &config, const CNode &node, + bool &update_connection_time) { + WITH_LOCK(cs_peerManager, peerManager->removeNode(node.GetId())); } void Processor::runEventLoop() { @@ -835,78 +776,137 @@ } while (nodeid != NO_NODE); } -void Processor::avaproofsSent(NodeId nodeid) { - if (::ChainstateActive().IsInitialBlockDownload()) { - // Before IBD is complete there is no way to make sure a proof is valid - // or not, e.g. it can be spent in a block we don't know yet. In order - // to increase confidence that our proof set is similar to other nodes - // on the network, the messages received during IBD are not accounted. - return; +void Processor::clearTimedoutRequests() { + auto now = std::chrono::steady_clock::now(); + std::map timedout_items{}; + + { + // Clear expired requests. + auto w = queries.getWriteView(); + auto it = w->get().begin(); + while (it != w->get().end() && it->timeout < now) { + for (const auto &i : it->invs) { + timedout_items[i]++; + } + + w->get().erase(it++); + } } - LOCK(cs_peerManager); - if (peerManager->latchAvaproofsSent(nodeid)) { - avaproofsNodeCounter++; + if (timedout_items.empty()) { + return; } -} -/* - * Returns a bool indicating whether we have a usable Avalanche quorum enabling - * us to take decisions based on polls. - */ -bool Processor::isQuorumEstablished() { - if (quorumIsEstablished) { + auto clearInflightRequest = [&](auto &voteRecords, const auto &voteItem, + uint8_t count) { + if (!voteItem) { + return false; + } + + auto voteRecordsWriteView = voteRecords.getWriteView(); + auto it = voteRecordsWriteView->find(voteItem); + if (it == voteRecordsWriteView.end()) { + return false; + } + + it->second.clearInflightRequest(count); + return true; - } + }; - // Don't do Avalanche while node is IBD'ing - if (::ChainstateActive().IsInitialBlockDownload()) { - return false; - } + // In flight request accounting. + for (const auto &p : timedout_items) { + const CInv &inv = p.first; + if (inv.IsMsgBlk()) { + const CBlockIndex *pindex = WITH_LOCK( + cs_main, return g_chainman.m_blockman.LookupBlockIndex( + BlockHash(inv.hash))); - if (avaproofsNodeCounter < minAvaproofsNodeCount) { - return false; + if (!clearInflightRequest(blockVoteRecords, pindex, p.second)) { + continue; + } + } + + if (inv.IsMsgProof()) { + const ProofRef proof = + WITH_LOCK(cs_peerManager, + return peerManager->getProof(ProofId(inv.hash))); + + if (!clearInflightRequest(proofVoteRecords, proof, p.second)) { + continue; + } + } } +} - auto localProof = getLocalProof(); +std::vector Processor::getInvsForNextPoll(bool forPoll) { + std::vector invs; - // Get the registered proof score and registered score we have nodes for - uint32_t totalPeersScore; - uint32_t connectedPeersScore; - { - LOCK(cs_peerManager); - totalPeersScore = peerManager->getTotalPeersScore(); - connectedPeersScore = peerManager->getConnectedPeersScore(); + auto extractVoteRecordsToInvs = [&](const auto &itemVoteRecordRange, + auto buildInvFromVoteItem) { + for (const auto &[item, voteRecord] : itemVoteRecordRange) { + if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { + // Make sure we do not produce more invs than specified by the + // protocol. + return true; + } - // Consider that we are always connected to our proof, even if we are - // the single node using that proof. - if (localProof && - peerManager->forPeer(localProof->getId(), [](const Peer &peer) { - return peer.node_count == 0; - })) { - connectedPeersScore += localProof->getScore(); + const bool shouldPoll = + forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); + + if (!shouldPoll) { + continue; + } + + invs.emplace_back(buildInvFromVoteItem(item)); } - } - // Ensure enough is being staked overall - if (totalPeersScore < minQuorumScore) { - return false; + return invs.size() >= AVALANCHE_MAX_ELEMENT_POLL; + }; + + if (extractVoteRecordsToInvs(proofVoteRecords.getReadView(), + [](const ProofRef &proof) { + return CInv(MSG_AVA_PROOF, proof->getId()); + })) { + // The inventory vector is full, we're done + return invs; } - // Ensure we have connected score for enough of the overall score - uint32_t minConnectedScore = - std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); - if (connectedPeersScore < minConnectedScore) { - return false; + // First remove all blocks that are not worth polling. + { + LOCK(cs_main); + auto w = blockVoteRecords.getWriteView(); + for (auto it = w->begin(); it != w->end();) { + const CBlockIndex *pindex = it->first; + if (!IsWorthPolling(pindex)) { + w->erase(it++); + } else { + ++it; + } + } } - quorumIsEstablished = true; - return true; + auto r = blockVoteRecords.getReadView(); + extractVoteRecordsToInvs(reverse_iterate(r), [](const CBlockIndex *pindex) { + return CInv(MSG_BLOCK, pindex->GetBlockHash()); + }); + + return invs; } -void Processor::FinalizeNode(const Config &config, const CNode &node, - bool &update_connection_time) { - WITH_LOCK(cs_peerManager, peerManager->removeNode(node.GetId())); +NodeId Processor::getSuitableNodeToQuery() { + LOCK(cs_peerManager); + return peerManager->selectNode(); +} + +uint256 Processor::buildLocalSighash(CNode *pfrom) const { + CHashWriter hasher(SER_GETHASH, 0); + hasher << peerData->delegation.getId(); + hasher << pfrom->GetLocalNonce(); + hasher << pfrom->nRemoteHostNonce; + hasher << pfrom->GetLocalExtraEntropy(); + hasher << pfrom->nRemoteExtraEntropy; + return hasher.GetHash(); } } // namespace avalanche