diff --git a/src/avalanche.h b/src/avalanche.h --- a/src/avalanche.h +++ b/src/avalanche.h @@ -205,9 +205,8 @@ typedef std::map BlockVoteMap; -typedef std::map, NodeId> - NodeCooldownMap; +struct next_request_time {}; struct query_timeout {}; class AvalancheProcessor { @@ -222,11 +221,30 @@ /** * Keep track of peers and queries sent. */ + std::atomic round; + typedef std::chrono::time_point TimePoint; - std::atomic round; - RWCollection> nodeids; - RWCollection nodecooldown; + struct Peer { + NodeId nodeid; + int64_t score; + + TimePoint nextRequestTime; + }; + + typedef boost::multi_index_container< + Peer, boost::multi_index::indexed_by< + // index by nodeid + boost::multi_index::hashed_unique< + boost::multi_index::member>, + // sorted by nextRequestTime + boost::multi_index::ordered_non_unique< + boost::multi_index::tag, + boost::multi_index::member>>> + PeerSet; + + RWCollection peerSet; struct Query { NodeId nodeid; @@ -280,6 +298,8 @@ bool registerVotes(NodeId nodeid, const AvalancheResponse &response, std::vector &updates); + bool addPeer(NodeId nodeid, uint32_t score); + bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); diff --git a/src/avalanche.cpp b/src/avalanche.cpp --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -70,9 +70,21 @@ bool AvalancheProcessor::registerVotes( NodeId nodeid, const AvalancheResponse &response, std::vector &updates) { - // Save the time at which we can query again. - auto cooldown_end = std::chrono::steady_clock::now() + - std::chrono::milliseconds(response.getCooldown()); + { + // Save the time at which we can query again. + auto w = peerSet.getWriteView(); + auto it = w->find(nodeid); + if (it != w->end()) { + w->modify(it, [&response](Peer &p) { + // FIXME: This will override the time even when we recieved an + // old stale message. This should check that the message is + // indeed the most up to date one before updating the time. + p.nextRequestTime = + std::chrono::steady_clock::now() + + std::chrono::milliseconds(response.getCooldown()); + }); + } + } std::vector invs; @@ -166,12 +178,15 @@ } } - // Put the node back in the list of queriable nodes. - auto w = nodecooldown.getWriteView(); - w->insert(std::make_pair(cooldown_end, nodeid)); return true; } +bool AvalancheProcessor::addPeer(NodeId nodeid, uint32_t score) { + return peerSet.getWriteView() + ->insert({nodeid, score, std::chrono::steady_clock::now()}) + .second; +} + namespace { /** * Run the avalanche event loop every 10ms. @@ -254,55 +269,17 @@ } NodeId AvalancheProcessor::getSuitableNodeToQuery() { - auto w = nodeids.getWriteView(); - bool isCooldownMapEmpty; - - { - // Recover nodes for which cooldown is over. - auto now = std::chrono::steady_clock::now(); - auto wcooldown = nodecooldown.getWriteView(); - for (auto it = wcooldown.begin(); - it != wcooldown.end() && it->first < now;) { - w->insert(it->second); - wcooldown->erase(it++); - } - - isCooldownMapEmpty = wcooldown->empty(); - } - - // If the cooldown map is empty and we don't have any nodes, it's time to - // fish for new ones. - // FIXME: Clearly, we need a better way to fish for new nodes, but this is - // out of scope for now. - if (isCooldownMapEmpty && w->empty()) { - auto r = queries.getReadView(); - - // We don't have any candidate node, so let's try to find some. - connman->ForEachNode([&w, &r](CNode *pnode) { - // If this node doesn't support avalanche, we remove. - if (!(pnode->nServices & NODE_AVALANCHE)) { - return; - } - - // if we have a request in flight for that node. - if (r->find(pnode->GetId()) != r.end()) { - return; - } - - w->insert(pnode->GetId()); - }); - } - - // We don't have any suitable candidate. - if (w->empty()) { + auto r = peerSet.getReadView(); + auto it = r->get().begin(); + if (it == r->get().end()) { return -1; } - auto it = w.begin(); - NodeId nodeid = *it; - w->erase(it); + if (it->nextRequestTime <= std::chrono::steady_clock::now()) { + return it->nodeid; + } - return nodeid; + return -1; } void AvalancheProcessor::runEventLoop() { @@ -312,29 +289,52 @@ return; } - NodeId nodeid = getSuitableNodeToQuery(); - - /** - * If we lost contact to that node, then we remove it from nodeids, but - * never add the request to queries, which ensures bad nodes get cleaned up - * over time. - */ - connman->ForNode(nodeid, [this, &invs](CNode *pnode) { - { - // Compute the time at which this requests times out. - auto timeout = - std::chrono::steady_clock::now() + std::chrono::seconds(10); - // Register the query. - queries.getWriteView()->insert( - {pnode->GetId(), round, timeout, invs}); + while (true) { + NodeId nodeid = getSuitableNodeToQuery(); + if (nodeid == -1) { + return; } - // Send the query to the node. - connman->PushMessage( - pnode, - CNetMsgMaker(pnode->GetSendVersion()) - .Make(NetMsgType::AVAPOLL, - AvalanchePoll(round++, std::move(invs)))); - return true; - }); + /** + * If we lost contact to that node, then we remove it from nodeids, but + * never add the request to queries, which ensures bad nodes get cleaned + * up over time. + */ + bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { + uint64_t current_round = round++; + + { + // Compute the time at which this requests times out. + auto timeout = + std::chrono::steady_clock::now() + std::chrono::seconds(10); + // Register the query. + queries.getWriteView()->insert( + {pnode->GetId(), current_round, timeout, invs}); + // Set the timeout. + auto w = peerSet.getWriteView(); + auto it = w->find(pnode->GetId()); + if (it != w->end()) { + w->modify(it, [&timeout](Peer &p) { + p.nextRequestTime = timeout; + }); + } + } + + // Send the query to the node. + connman->PushMessage( + pnode, + CNetMsgMaker(pnode->GetSendVersion()) + .Make(NetMsgType::AVAPOLL, + AvalanchePoll(current_round, std::move(invs)))); + return true; + }); + + // Success ! + if (hasSent) { + return; + } + + // This node is obsolete, delete it. + peerSet.getWriteView()->erase(nodeid); + } } diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -152,6 +152,7 @@ // Create a node that supports avalanche. auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); NodeId nodeid = avanode->GetId(); + BOOST_CHECK(p.addPeer(nodeid, 0)); // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindex)); @@ -273,7 +274,9 @@ // Create a node that supports avalanche. auto node0 = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + BOOST_CHECK(p.addPeer(node0->GetId(), 0)); auto node1 = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + BOOST_CHECK(p.addPeer(node1->GetId(), 0)); // Make sure the block has a hash. CBlock blockA = CreateAndProcessBlock({}, CScript()); @@ -318,29 +321,29 @@ // Let's vote for these blocks a few times. for (int i = 0; i < 3; i++) { + NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { + NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggerd on A // and B. - NodeId secondNodeid = AvalancheTest::getSuitableNodeToQuery(p); - // NB: getSuitableNodeToQuery remove the node from the candidate list, so it - // has returned the node that will be queried second. The other one is the - // first. - NodeId firstNodeid = - (node0->GetId() == secondNodeid) ? node1->GetId() : node0->GetId(); + NodeId firstNodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); + NodeId secondNodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); + BOOST_CHECK(firstNodeid != secondNodeid); + // Next vote will finalize block A. BOOST_CHECK(p.registerVotes(firstNodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); @@ -388,6 +391,7 @@ auto oldnode = ConnectNode(config, NODE_NONE, *peerLogic); auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); NodeId avanodeid = avanode->GetId(); + BOOST_CHECK(p.addPeer(avanodeid, 0)); // It returns the avalanche peer. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); @@ -447,26 +451,23 @@ BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); - // 4. Invalid round count. Node is not returned to the pool. + // 4. Invalid round count. Request is not discarded. uint64_t queryRound = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); resp = {queryRound + 1, 0, {AvalancheVote()}}; BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); - BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); resp = {queryRound - 1, 0, {AvalancheVote()}}; BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); - BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); - // 5. Making request for invalid nodes do not work. Node is not returned to - // the pool. + // 5. Making request for invalid nodes do not work. Request is not + // discarded. resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(!p.registerVotes(avanodeid + 1234, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); - BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); // Proper response gets processed and avanode is available again. resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; @@ -493,7 +494,7 @@ 0, {AvalancheVote(0, blockHash2), AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); @@ -535,6 +536,7 @@ // Create a node that supports avalanche. auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); NodeId nodeid = avanode->GetId(); + BOOST_CHECK(p.addPeer(nodeid, 0)); // There is no query in flight at the moment. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid);