diff --git a/src/avalanche/peermanager.h b/src/avalanche/peermanager.h --- a/src/avalanche/peermanager.h +++ b/src/avalanche/peermanager.h @@ -109,6 +109,9 @@ bool addNodeToPeer(PeerId peerid, NodeId nodeid, CPubKey pubkey); bool removeNode(NodeId nodeid); + bool forNode(NodeId nodeid, + std::function func) const; + bool updateNextRequestTime(NodeId nodeid, TimePoint timeout); NodeId getSuitableNodeToQuery(); diff --git a/src/avalanche/peermanager.cpp b/src/avalanche/peermanager.cpp --- a/src/avalanche/peermanager.cpp +++ b/src/avalanche/peermanager.cpp @@ -112,6 +112,12 @@ return nodes.erase(nodeid) > 0; } +bool PeerManager::forNode( + NodeId nodeid, std::function func) const { + auto it = nodes.find(nodeid); + return it != nodes.end() && func(*it); +} + bool PeerManager::updateNextRequestTime(NodeId nodeid, TimePoint timeout) { auto it = nodes.find(nodeid); if (it == nodes.end()) { diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -25,6 +25,7 @@ class Config; class CBlockIndex; +class PeerManager; class CScheduler; /** @@ -175,7 +176,6 @@ using BlockVoteMap = std::map; -struct next_request_time {}; struct query_timeout {}; class AvalancheProcessor { @@ -192,19 +192,11 @@ */ std::atomic round; - using PeerSet = boost::multi_index_container< - AvalancheNode, - boost::multi_index::indexed_by< - // index by nodeid - boost::multi_index::hashed_unique>, - // sorted by nextRequestTime - boost::multi_index::ordered_non_unique< - boost::multi_index::tag, - boost::multi_index::member>>>; - - RWCollection peerSet; + /** + * Keep track of the peers and associated infos. + */ + mutable Mutex cs_peerManager; + std::unique_ptr peerManager GUARDED_BY(cs_peerManager); struct Query { NodeId nodeid; diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -4,6 +4,7 @@ #include +#include #include #include #include @@ -131,7 +132,7 @@ AvalancheProcessor::AvalancheProcessor(CConnman *connmanIn) : connman(connmanIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), - round(0) { + round(0), peerManager(std::make_unique()) { // Pick a random key for the session. sessionKey.MakeNewKey(true); } @@ -230,18 +231,14 @@ std::vector &updates) { { // Save the time at which we can query again. - auto w = peerSet.getWriteView(); - auto it = w->find(nodeid); - if (it != w->end()) { - w->modify(it, [&response](AvalancheNode &n) { - // FIXME: This will override the time even when we received an - // old stale message. This should check that the message is - // indeed the most up to date one before updating the time. - n.nextRequestTime = - std::chrono::steady_clock::now() + - std::chrono::milliseconds(response.getCooldown()); - }); - } + LOCK(cs_peerManager); + + // FIXME: This will override the time even when we received an old stale + // message. This should check that the message is indeed the most up to + // date one before updating the time. + peerManager->updateNextRequestTime( + nodeid, std::chrono::steady_clock::now() + + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; @@ -340,16 +337,21 @@ } bool AvalancheProcessor::addPeer(NodeId nodeid, int64_t score, CPubKey pubkey) { - return peerSet.getWriteView() - ->insert({nodeid, /* peerid is unused here */ 0, std::move(pubkey)}) - .second; + LOCK(cs_peerManager); + + PeerId p = peerManager->addPeer(score); + bool inserted = peerManager->addNodeToPeer(p, nodeid, std::move(pubkey)); + if (!inserted) { + peerManager->removePeer(p); + } + + return inserted; } bool AvalancheProcessor::forNode( NodeId nodeid, std::function func) const { - auto r = peerSet.getReadView(); - auto it = r->find(nodeid); - return it != r->end() && func(*it); + LOCK(cs_peerManager); + return peerManager->forNode(nodeid, std::move(func)); } bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { @@ -401,17 +403,8 @@ } NodeId AvalancheProcessor::getSuitableNodeToQuery() { - auto r = peerSet.getReadView(); - auto it = r->get().begin(); - if (it == r->get().end()) { - return NO_NODE; - } - - if (it->nextRequestTime <= std::chrono::steady_clock::now()) { - return it->nodeid; - } - - return NO_NODE; + LOCK(cs_peerManager); + return peerManager->getSuitableNodeToQuery(); } void AvalancheProcessor::clearTimedoutRequests() { @@ -495,13 +488,8 @@ queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. - auto w = peerSet.getWriteView(); - auto it = w->find(pnode->GetId()); - if (it != w->end()) { - w->modify(it, [&timeout](AvalancheNode &n) { - n.nextRequestTime = timeout; - }); - } + LOCK(cs_peerManager); + peerManager->updateNextRequestTime(pnode->GetId(), timeout); } // Send the query to the node. @@ -518,8 +506,11 @@ return; } - // This node is obsolete, delete it. - peerSet.getWriteView()->erase(nodeid); + { + // This node is obsolete, delete it. + LOCK(cs_peerManager); + peerManager->removeNode(nodeid); + } // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -4,6 +4,7 @@ #include +#include #include #include // For PeerLogicValidation #include @@ -23,6 +24,11 @@ return p.getSuitableNodeToQuery(); } + static PeerManager &getPeerManager(AvalancheProcessor &p) { + LOCK(p.cs_peerManager); + return *p.peerManager; + } + static uint64_t getRound(const AvalancheProcessor &p) { return p.round; } }; @@ -195,10 +201,13 @@ ServiceFlags nServices, PeerLogicValidation &peerLogic, CConnmanTest *connman) { + PeerManager &pm = AvalancheTest::getPeerManager(p); + PeerId pid = pm.addPeer(100); + std::array nodes; for (CNode *&n : nodes) { n = ConnectNode(config, nServices, peerLogic, connman); - BOOST_CHECK(p.addPeer(n->GetId(), 0, CPubKey())); + BOOST_CHECK(pm.addNodeToPeer(pid, n->GetId(), CPubKey())); } return nodes; @@ -522,7 +531,7 @@ auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); NodeId avanodeid = avanode->GetId(); - BOOST_CHECK(p.addPeer(avanodeid, 0, CPubKey())); + BOOST_CHECK(p.addPeer(avanodeid, 100, CPubKey())); // It returns the avalanche peer. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); @@ -670,7 +679,7 @@ auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); NodeId avanodeid = avanode->GetId(); - BOOST_CHECK(p.addPeer(avanodeid, 0, CPubKey())); + BOOST_CHECK(p.addPeer(avanodeid, 100, CPubKey())); // Expire requests after some time. auto queryTimeDuration = std::chrono::milliseconds(10); @@ -718,10 +727,13 @@ AvalancheProcessor p(connman.get()); // Create enough nodes so that we run into the inflight request limit. + PeerManager &pm = AvalancheTest::getPeerManager(p); + PeerId pid = pm.addPeer(100); + std::array nodes; for (auto &n : nodes) { n = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); - BOOST_CHECK(p.addPeer(n->GetId(), 0, CPubKey())); + BOOST_CHECK(pm.addNodeToPeer(pid, n->GetId(), CPubKey())); } // Add a block to poll @@ -882,7 +894,7 @@ auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); NodeId nodeid = avanode->GetId(); - BOOST_CHECK(p.addPeer(nodeid, 0, CPubKey())); + BOOST_CHECK(p.addPeer(nodeid, 100, CPubKey())); // There is no query in flight at the moment. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid); diff --git a/src/rpc/avalanche.cpp b/src/rpc/avalanche.cpp --- a/src/rpc/avalanche.cpp +++ b/src/rpc/avalanche.cpp @@ -69,7 +69,7 @@ CPubKey pubkey{HexToPubKey(keyHex)}; - g_avalanche->addPeer(nodeid, 0, pubkey); + g_avalanche->addPeer(nodeid, 100, pubkey); return {}; }