diff --git a/src/avalanche/peermanager.cpp b/src/avalanche/peermanager.cpp index 612c452f3..eed4503fa 100644 --- a/src/avalanche/peermanager.cpp +++ b/src/avalanche/peermanager.cpp @@ -1,337 +1,344 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include // For ChainstateActive() #include namespace avalanche { -PeerId PeerManager::getPeer(const Proof &proof) { +bool PeerManager::addNode(NodeId nodeid, const Proof &proof, + const Delegation &delegation) { + const PeerId peerid = getPeerId(proof); + if (peerid == NO_PEER) { + return false; + } + + DelegationState state; + CPubKey pubkey; + if (!delegation.verify(state, proof, pubkey)) { + return false; + } + + auto nit = nodes.find(nodeid); + if (nit == nodes.end()) { + return nodes.emplace(nodeid, peerid, std::move(pubkey)).second; + } + + // We actually have this node already, we need to update it. + return nodes.modify(nit, [&](Node &n) { + n.peerid = peerid; + n.pubkey = std::move(pubkey); + }); +} + +bool PeerManager::removeNode(NodeId nodeid) { + return nodes.erase(nodeid) > 0; +} + +bool PeerManager::forNode(NodeId nodeid, + std::function func) const { + auto it = nodes.find(nodeid); + return it != nodes.end() && func(*it); +} + +bool PeerManager::updateNextRequestTime(NodeId nodeid, TimePoint timeout) { + auto it = nodes.find(nodeid); + if (it == nodes.end()) { + return false; + } + + return nodes.modify(it, [&](Node &n) { n.nextRequestTime = timeout; }); +} + +NodeId PeerManager::selectNode() { + for (int retry = 0; retry < SELECT_NODE_MAX_RETRY; retry++) { + const PeerId p = selectPeer(); + + // If we cannot find a peer, it may be due to the fact that it is + // unlikely due to high fragmentation, so compact and retry. + if (p == NO_PEER) { + compact(); + continue; + } + + // See if that peer has an available node. + auto &nview = nodes.get(); + auto it = nview.lower_bound(boost::make_tuple(p, TimePoint())); + if (it != nview.end() && it->peerid == p && + it->nextRequestTime <= std::chrono::steady_clock::now()) { + return it->nodeid; + } + } + + return NO_NODE; +} + +void PeerManager::updatedBlockTip() { + std::vector invalidPeers; + + { + LOCK(cs_main); + + const CCoinsViewCache &coins = ::ChainstateActive().CoinsTip(); + for (const auto &p : peers) { + ProofValidationState state; + if (!p.proof.verify(state, coins)) { + invalidPeers.push_back(p.peerid); + } + } + } + + for (const auto &pid : invalidPeers) { + removePeer(pid); + } +} + +PeerId PeerManager::getPeerId(const Proof &proof) { + auto it = fetchOrCreatePeer(proof); + return it == peers.end() ? NO_PEER : it->peerid; +} + +PeerManager::PeerSet::iterator +PeerManager::fetchOrCreatePeer(const Proof &proof) { { // Check if we already know of that peer. auto &pview = peers.get(); auto it = pview.find(proof.getId()); if (it != pview.end()) { - return it->peerid; + return peers.project<0>(it); } } { // Reject invalid proof. LOCK(cs_main); const CCoinsViewCache &coins = ::ChainstateActive().CoinsTip(); ProofValidationState state; if (!proof.verify(state, coins)) { - return NO_PEER; + return peers.end(); } } // New peer means new peerid! const PeerId peerid = nextPeerId++; // Attach UTXOs to this proof. std::unordered_set conflicting_peerids; for (const auto &s : proof.getStakes()) { auto p = utxos.emplace(s.getStake().getUTXO(), peerid); if (!p.second) { // We have a collision with an existing proof. conflicting_peerids.insert(p.first->second); } } // For now, if there is a conflict, just ceanup the mess. if (conflicting_peerids.size() > 0) { for (const auto &s : proof.getStakes()) { auto it = utxos.find(s.getStake().getUTXO()); assert(it != utxos.end()); // We need to delete that one. if (it->second == peerid) { utxos.erase(it); } } - return NO_PEER; + return peers.end(); } // We have no peer for this proof, time to create it. auto inserted = peers.emplace(peerid, uint32_t(slots.size()), proof); assert(inserted.second); const uint32_t score = proof.getScore(); const uint64_t start = slotCount; slots.emplace_back(start, score, peerid); slotCount = start + score; - return peerid; + + return inserted.first; } bool PeerManager::removePeer(const PeerId peerid) { auto it = peers.find(peerid); if (it == peers.end()) { return false; } size_t i = it->index; assert(i < slots.size()); if (i + 1 == slots.size()) { slots.pop_back(); slotCount = slots.empty() ? 0 : slots.back().getStop(); } else { fragmentation += slots[i].getScore(); slots[i] = slots[i].withPeerId(NO_PEER); } // Remove nodes associated with this peer, unless their timeout is still // active. This ensure that we don't overquery them in case their are // subsequently added to another peer. auto &nview = nodes.get(); nview.erase(nview.lower_bound(boost::make_tuple(peerid, TimePoint())), nview.upper_bound(boost::make_tuple( peerid, std::chrono::steady_clock::now()))); // Release UTXOs attached to this proof. for (const auto &s : it->proof.getStakes()) { bool deleted = utxos.erase(s.getStake().getUTXO()) > 0; assert(deleted); } peers.erase(it); return true; } -bool PeerManager::addNode(NodeId nodeid, const Proof &proof, - const Delegation &delegation) { - const PeerId peerid = getPeer(proof); - if (peerid == NO_PEER) { - return false; - } - - DelegationState state; - CPubKey pubkey; - if (!delegation.verify(state, proof, pubkey)) { - return false; - } - - auto nit = nodes.find(nodeid); - if (nit == nodes.end()) { - return nodes.emplace(nodeid, peerid, std::move(pubkey)).second; - } - - // We actually have this node already, we need to update it. - return nodes.modify(nit, [&](Node &n) { - n.peerid = peerid; - n.pubkey = std::move(pubkey); - }); -} - -bool PeerManager::removeNode(NodeId nodeid) { - return nodes.erase(nodeid) > 0; -} - -bool PeerManager::forNode(NodeId nodeid, - std::function func) const { - auto it = nodes.find(nodeid); - return it != nodes.end() && func(*it); -} - -bool PeerManager::updateNextRequestTime(NodeId nodeid, TimePoint timeout) { - auto it = nodes.find(nodeid); - if (it == nodes.end()) { - return false; - } - - return nodes.modify(it, [&](Node &n) { n.nextRequestTime = timeout; }); -} - -NodeId PeerManager::selectNode() { - for (int retry = 0; retry < SELECT_NODE_MAX_RETRY; retry++) { - const PeerId p = selectPeer(); - - // If we cannot find a peer, it may be due to the fact that it is - // unlikely due to high fragmentation, so compact and retry. - if (p == NO_PEER) { - compact(); - continue; - } - - // See if that peer has an available node. - auto &nview = nodes.get(); - auto it = nview.lower_bound(boost::make_tuple(p, TimePoint())); - if (it != nview.end() && it->peerid == p && - it->nextRequestTime <= std::chrono::steady_clock::now()) { - return it->nodeid; - } - } - - return NO_NODE; -} - PeerId PeerManager::selectPeer() const { if (slots.empty() || slotCount == 0) { return NO_PEER; } const uint64_t max = slotCount; for (int retry = 0; retry < SELECT_PEER_MAX_RETRY; retry++) { size_t i = selectPeerImpl(slots, GetRand(max), max); if (i != NO_PEER) { return i; } } return NO_PEER; } uint64_t PeerManager::compact() { // There is nothing to compact. if (fragmentation == 0) { return 0; } // Shrink the vector to the expected size. while (slots.size() > peers.size()) { slots.pop_back(); } uint64_t prevStop = 0; uint32_t i = 0; for (auto it = peers.begin(); it != peers.end(); it++) { slots[i] = Slot(prevStop, it->getScore(), it->peerid); prevStop = slots[i].getStop(); peers.modify(it, [&](Peer &p) { p.index = i++; }); } const uint64_t saved = slotCount - prevStop; slotCount = prevStop; fragmentation = 0; return saved; } bool PeerManager::verify() const { uint64_t prevStop = 0; for (size_t i = 0; i < slots.size(); i++) { const Slot &s = slots[i]; // Slots must be in correct order. if (s.getStart() < prevStop) { return false; } prevStop = s.getStop(); // If this is a dead slot, then nothing more needs to be checked. if (s.getPeerId() == NO_PEER) { continue; } // We have a live slot, verify index. auto it = peers.find(s.getPeerId()); if (it == peers.end() || it->index != i) { return false; } } for (const auto &p : peers) { // The index must point to a slot refering to this peer. if (p.index >= slots.size() || slots[p.index].getPeerId() != p.peerid) { return false; } // If the score do not match, same thing. if (slots[p.index].getScore() != p.getScore()) { return false; } } return true; } PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max) { assert(slot <= max); size_t begin = 0, end = slots.size(); uint64_t bottom = 0, top = max; // Try to find the slot using dichotomic search. while ((end - begin) > 8) { // The slot we picked in not allocated. if (slot < bottom || slot >= top) { return NO_PEER; } // Guesstimate the position of the slot. size_t i = begin + ((slot - bottom) * (end - begin) / (top - bottom)); assert(begin <= i && i < end); // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } // We undershooted. if (slots[i].precedes(slot)) { begin = i + 1; if (begin >= end) { return NO_PEER; } bottom = slots[begin].getStart(); continue; } // We overshooted. if (slots[i].follows(slot)) { end = i; top = slots[end].getStart(); continue; } // We have an unalocated slot. return NO_PEER; } // Enough of that nonsense, let fallback to linear search. for (size_t i = begin; i < end; i++) { // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } } // We failed to find a slot, retry. return NO_PEER; } -void PeerManager::updatedBlockTip() { - std::vector invalidPeers; - - { - LOCK(cs_main); - - const CCoinsViewCache &coins = ::ChainstateActive().CoinsTip(); - for (const auto &p : peers) { - ProofValidationState state; - if (!p.proof.verify(state, coins)) { - invalidPeers.push_back(p.peerid); - } - } - } - - for (const auto &pid : invalidPeers) { - removePeer(pid); - } -} - } // namespace avalanche diff --git a/src/avalanche/peermanager.h b/src/avalanche/peermanager.h index f0439da29..d3b0ca9db 100644 --- a/src/avalanche/peermanager.h +++ b/src/avalanche/peermanager.h @@ -1,192 +1,199 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PEERMANAGER_H #define BITCOIN_AVALANCHE_PEERMANAGER_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace avalanche { class Delegation; struct Slot { private: uint64_t start; uint32_t score; PeerId peerid; public: Slot(uint64_t startIn, uint32_t scoreIn, PeerId peeridIn) : start(startIn), score(scoreIn), peerid(peeridIn) {} Slot withStart(uint64_t startIn) const { return Slot(startIn, score, peerid); } Slot withScore(uint64_t scoreIn) const { return Slot(start, scoreIn, peerid); } Slot withPeerId(PeerId peeridIn) const { return Slot(start, score, peeridIn); } uint64_t getStart() const { return start; } uint64_t getStop() const { return start + score; } uint32_t getScore() const { return score; } PeerId getPeerId() const { return peerid; } bool contains(uint64_t slot) const { return getStart() <= slot && slot < getStop(); } bool precedes(uint64_t slot) const { return slot >= getStop(); } bool follows(uint64_t slot) const { return getStart() > slot; } }; struct Peer { PeerId peerid; uint32_t index; Proof proof; Peer(PeerId peerid_, uint32_t index_, Proof proof_) : peerid(peerid_), index(index_), proof(std::move(proof_)) {} const ProofId &getProofId() const { return proof.getId(); } uint32_t getScore() const { return proof.getScore(); } }; struct proof_index { using result_type = ProofId; result_type operator()(const Peer &p) const { return p.proof.getId(); } }; class SaltedProofIdHasher : private SaltedUint256Hasher { public: SaltedProofIdHasher() : SaltedUint256Hasher() {} size_t operator()(const ProofId &proofid) const { return hash(proofid); } }; struct next_request_time {}; class PeerManager { std::vector slots; uint64_t slotCount = 0; uint64_t fragmentation = 0; /** * Several nodes can make an avalanche peer. In this case, all nodes are * considered interchangeable parts of the same peer. */ using PeerSet = boost::multi_index_container< Peer, boost::multi_index::indexed_by< // index by peerid boost::multi_index::hashed_unique< boost::multi_index::member>, // index by proof boost::multi_index::hashed_unique< boost::multi_index::tag, proof_index, SaltedProofIdHasher>>>; PeerId nextPeerId = 0; PeerSet peers; std::unordered_map utxos; using NodeSet = boost::multi_index_container< Node, boost::multi_index::indexed_by< // index by nodeid boost::multi_index::hashed_unique< boost::multi_index::member>, // sorted by peerid/nextRequestTime boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::composite_key< Node, boost::multi_index::member, boost::multi_index::member>>>>; NodeSet nodes; static constexpr int SELECT_PEER_MAX_RETRY = 3; static constexpr int SELECT_NODE_MAX_RETRY = 3; public: /** - * Provide the peer associated with the given proof. If the peer does not - * exists, then it is created. + * Node API. */ - PeerId getPeer(const Proof &proof); + bool addNode(NodeId nodeid, const Proof &proof, + const Delegation &delegation); + bool removeNode(NodeId nodeid); + + bool forNode(NodeId nodeid, std::function func) const; + bool updateNextRequestTime(NodeId nodeid, TimePoint timeout); /** - * Remove an existing peer. - * This is not meant for public consumption. + * Randomly select a node to poll. */ - bool removePeer(const PeerId peerid); + NodeId selectNode(); /** - * Node API. + * Update the peer set when a nw block is connected. */ - bool addNode(NodeId nodeid, const Proof &proof, - const Delegation &delegation); - bool removeNode(NodeId nodeid); + void updatedBlockTip(); - NodeId selectNode(); + /**************************************************** + * Functions which are public for testing purposes. * + ****************************************************/ + /** + * Provide the PeerId associated with the given proof. If the peer does not + * exists, then it is created. + */ + PeerId getPeerId(const Proof &proof); - bool forNode(NodeId nodeid, std::function func) const; - bool updateNextRequestTime(NodeId nodeid, TimePoint timeout); + /** + * Remove an existing peer. + */ + bool removePeer(const PeerId peerid); /** - * Exposed for tests. + * Randomly select a peer to poll. */ PeerId selectPeer() const; /** * Trigger maintenance of internal datastructures. * Returns how much slot space was saved after compaction. */ uint64_t compact(); /** * Perform consistency check on internal data structures. - * Mostly useful for tests. */ bool verify() const; // Accssors. uint64_t getSlotCount() const { return slotCount; } uint64_t getFragmentation() const { return fragmentation; } - /** - * Update the peer set when a nw block is connected. - */ - void updatedBlockTip(); +private: + PeerSet::iterator fetchOrCreatePeer(const Proof &proof); }; /** * This is an internal method that is exposed for testing purposes. */ PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max); } // namespace avalanche #endif // BITCOIN_AVALANCHE_PEERMANAGER_H diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 61598260b..c788efceb 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,558 +1,560 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include // For DecodeSecret #include // For Misbehaving #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; namespace avalanche { bool VoteRecord::registerVote(NodeId nodeid, uint32_t error) { // We just got a new vote, so there is one less inflight request. clearInflightRequest(); // We want to avoid having the same node voting twice in a quorum. if (!addNodeToQuorum(nodeid)) { return false; } /** * The result of the vote is determined from the error code. If the error * code is 0, there is no error and therefore the vote is yes. If there is * an error, we check the most significant bit to decide if the vote is a no * (for instance, the block is invalid) or is the vote inconclusive (for * instance, the queried node does not have the block yet). */ votes = (votes << 1) | (error == 0); consider = (consider << 1) | (int32_t(error) >= 0); /** * We compute the number of yes and/or no votes as follow: * * votes: 1010 * consider: 1100 * * yes votes: 1000 using votes & consider * no votes: 0100 using ~votes & consider */ bool yes = countBits(votes & consider & 0xff) > 6; if (!yes) { bool no = countBits(~votes & consider & 0xff) > 6; if (!no) { // The round is inconclusive. return false; } } // If the round is in agreement with previous rounds, increase confidence. if (isAccepted() == yes) { confidence += 2; return getConfidence() == AVALANCHE_FINALIZATION_SCORE; } // The round changed our state. We reset the confidence. confidence = yes; return true; } bool VoteRecord::addNodeToQuorum(NodeId nodeid) { if (nodeid == NO_NODE) { // Helpful for testing. return true; } // MMIX Linear Congruent Generator. const uint64_t r1 = 6364136223846793005 * uint64_t(nodeid) + 1442695040888963407; // Fibonacci hashing. const uint64_t r2 = 11400714819323198485ull * (nodeid ^ seed); // Combine and extract hash. const uint16_t h = (r1 + r2) >> 48; /** * Check if the node is in the filter. */ for (size_t i = 1; i < nodeFilter.size(); i++) { if (nodeFilter[(successfulVotes + i) % nodeFilter.size()] == h) { return false; } } /** * Add the node which just voted to the filter. */ nodeFilter[successfulVotes % nodeFilter.size()] = h; successfulVotes++; return true; } bool VoteRecord::registerPoll() const { uint8_t count = inflight.load(); while (count < AVALANCHE_MAX_INFLIGHT_POLL) { if (inflight.compare_exchange_weak(count, count + 1)) { return true; } } return false; } static bool IsWorthPolling(const CBlockIndex *pindex) { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (::ChainstateActive().IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } struct Processor::PeerData { Proof proof; Delegation delegation; }; class Processor::NotificationsHandler : public interfaces::Chain::Notifications { Processor *m_processor; public: NotificationsHandler(Processor *p) : m_processor(p) {} void updatedBlockTip() override { LOCK(m_processor->cs_peerManager); m_processor->peerManager->updatedBlockTip(); } }; Processor::Processor(interfaces::Chain &chain, CConnman *connmanIn) : connman(connmanIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), round(0), peerManager(std::make_unique()) { if (gArgs.IsArgSet("-avasessionkey")) { sessionKey = DecodeSecret(gArgs.GetArg("-avasessionkey", "")); } else { // Pick a random key for the session. sessionKey.MakeNewKey(true); } if (gArgs.IsArgSet("-avaproof")) { peerData = std::make_unique(); { // The proof. CDataStream stream(ParseHex(gArgs.GetArg("-avaproof", "")), SER_NETWORK, 0); stream >> peerData->proof; + + // Ensure the peer manager knows about it. LOCK(cs_peerManager); - peerManager->getPeer(peerData->proof); + peerManager->getPeerId(peerData->proof); } // Generate the delegation to the session key. DelegationBuilder dgb(peerData->proof); if (sessionKey.GetPubKey() != peerData->proof.getMaster()) { dgb.addLevel(DecodeSecret(gArgs.GetArg("-avamasterkey", "")), sessionKey.GetPubKey()); } peerData->delegation = dgb.build(); } // Make sure we get notified of chain state changes. chainNotificationsHandler = chain.handleNotifications(std::make_shared(this)); } Processor::~Processor() { chainNotificationsHandler.reset(); stopEventLoop(); } bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = ::ChainActive().Contains(pindex); } return vote_records.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } bool Processor::isAccepted(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } int Processor::getConfidence(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is * not secure. */ class TCPResponse { Response response; std::array sig; public: TCPResponse(Response responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! if (!key.SignSchnorr(hash, sig)) { sig.fill(0); } } // serialization support ADD_SERIALIZE_METHODS; template inline void SerializationOp(Stream &s, Operation ser_action) { READWRITE(response); READWRITE(sig); } }; } // namespace void Processor::sendResponse(CNode *pfrom, Response response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetSendVersion()) .Make(NetMsgType::AVARESPONSE, TCPResponse(std::move(response), sessionKey))); } bool Processor::registerVotes(NodeId nodeid, const Response &response, std::vector &updates) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { LOCK(cs_main); Misbehaving(nodeid, 2, "unexpcted-ava-response"); return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { LOCK(cs_main); Misbehaving(nodeid, 100, "invalid-ava-response-size"); return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { LOCK(cs_main); Misbehaving(nodeid, 100, "invalid-ava-response-content"); return false; } } std::map responseIndex; { LOCK(cs_main); for (const auto &v : votes) { auto pindex = LookupBlockIndex(BlockHash(v.GetHash())); if (!pindex) { // This should not happen, but just in case... continue; } if (!IsWorthPolling(pindex)) { // There is no point polling this block. continue; } responseIndex.insert(std::make_pair(pindex, v)); } } { // Register votes. auto w = vote_records.getWriteView(); for (const auto &p : responseIndex) { CBlockIndex *pindex = p.first; const Vote &v = p.second; auto it = w->find(pindex); if (it == w.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back( pindex, vr.isAccepted() ? BlockUpdate::Status::Accepted : BlockUpdate::Status::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(pindex, vr.isAccepted() ? BlockUpdate::Status::Finalized : BlockUpdate::Status::Invalid); w->erase(it); } } return true; } bool Processor::addNode(NodeId nodeid, const Proof &proof, const Delegation &delegation) { LOCK(cs_peerManager); return peerManager->addNode(nodeid, proof, delegation); } bool Processor::forNode(NodeId nodeid, std::function func) const { LOCK(cs_peerManager); return peerManager->forNode(nodeid, std::move(func)); } CPubKey Processor::getSessionPubKey() const { return sessionKey.GetPubKey(); } bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; // First remove all blocks that are not worth polling. { LOCK(cs_main); auto w = vote_records.getWriteView(); for (auto it = w->begin(); it != w->end();) { const CBlockIndex *pindex = it->first; if (!IsWorthPolling(pindex)) { w->erase(it++); } else { ++it; } } } auto r = vote_records.getReadView(); for (const std::pair &p : reverse_iterate(r)) { // Check if we can run poll. const bool shouldPoll = forPoll ? p.second.registerPoll() : p.second.shouldPoll(); if (!shouldPoll) { continue; } // We don't have a decision, we need more votes. invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash()); if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return invs; } } return invs; } NodeId Processor::getSuitableNodeToQuery() { LOCK(cs_peerManager); return peerManager->selectNode(); } void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; assert(inv.type == MSG_BLOCK); CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(BlockHash(inv.hash)); if (!pindex) { continue; } } auto w = vote_records.getWriteView(); auto it = w->find(pindex); if (it == w.end()) { continue; } it->second.clearInflightRequest(p.second); } } void Processor::runEventLoop() { // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. LOCK(cs_peerManager); peerManager->updateNextRequestTime(pnode->GetId(), timeout); } // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetSendVersion()) .Make(NetMsgType::AVAPOLL, Poll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } { // This node is obsolete, delete it. LOCK(cs_peerManager); peerManager->removeNode(nodeid); } // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); } while (nodeid != NO_NODE); } } // namespace avalanche diff --git a/src/avalanche/test/peermanager_tests.cpp b/src/avalanche/test/peermanager_tests.cpp index 41d45ca8a..8ac7adf56 100644 --- a/src/avalanche/test/peermanager_tests.cpp +++ b/src/avalanche/test/peermanager_tests.cpp @@ -1,432 +1,432 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include