diff --git a/src/avalanche.h b/src/avalanche.h --- a/src/avalanche.h +++ b/src/avalanche.h @@ -12,6 +12,12 @@ #include "serialize.h" #include "uint256.h" +#include +#include +#include +#include +#include + #include #include #include @@ -121,13 +127,16 @@ }; class AvalancheResponse { + uint64_t round; uint32_t cooldown; std::vector votes; public: - AvalancheResponse(uint32_t cooldownIn, std::vector votesIn) - : cooldown(cooldownIn), votes(votesIn) {} + AvalancheResponse(uint64_t roundIn, uint32_t cooldownIn, + std::vector votesIn) + : round(roundIn), cooldown(cooldownIn), votes(votesIn) {} + uint64_t getRound() const { return round; } uint32_t getCooldown() const { return cooldown; } const std::vector &GetVotes() const { return votes; } @@ -136,13 +145,14 @@ template inline void SerializationOp(Stream &s, Operation ser_action) { + READWRITE(round); READWRITE(cooldown); READWRITE(votes); } }; class AvalanchePoll { - uint32_t round; + uint64_t round; std::vector invs; public: @@ -196,6 +206,8 @@ typedef std::map, NodeId> NodeCooldownMap; +struct query_timeout {}; + class AvalancheProcessor { private: CConnman *connman; @@ -208,25 +220,44 @@ /** * Keep track of peers and queries sent. */ - struct RequestRecord { - private: - int64_t timestamp; - std::vector invs; - - public: - RequestRecord() : timestamp(0), invs() {} - RequestRecord(int64_t timestampIn, std::vector invIn) - : timestamp(timestampIn), invs(std::move(invIn)) {} - - int64_t GetTimestamp() const { return timestamp; } - const std::vector &GetInvs() const { return invs; } - }; + typedef std::chrono::time_point TimePoint; - std::atomic round; + std::atomic round; RWCollection> nodeids; - RWCollection> queries; RWCollection nodecooldown; + struct Query { + NodeId nodeid; + uint64_t round; + TimePoint timeout; + + /** + * We declare this as mutable so it can be modified in the multi_index. + * This is ok because we do not use this field to index in anyway. + * + * /!\ Do not use any mutable field as index. + */ + mutable std::vector invs; + }; + + typedef boost::multi_index_container< + Query, + boost::multi_index::indexed_by< + // index by nodeid/round + boost::multi_index::ordered_unique< + boost::multi_index::composite_key< + Query, + boost::multi_index::member, + boost::multi_index::member>>, + // sorted by timeout + boost::multi_index::ordered_non_unique< + boost::multi_index::tag, + boost::multi_index::member>>> + QuerySet; + + RWCollection queries; + /** * Start stop machinery. */ diff --git a/src/avalanche.cpp b/src/avalanche.cpp --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -11,6 +11,8 @@ #include +#include + static bool IsWorthPolling(const CBlockIndex *pindex) { AssertLockHeld(cs_main); @@ -72,23 +74,22 @@ auto cooldown_end = std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown()); - RequestRecord r; + std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); - auto it = w->find(nodeid); + auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { // NB: The request may be old, so we don't increase banscore. return false; } - r = std::move(it->second); + invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. - const std::vector &invs = r.GetInvs(); const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { @@ -320,9 +321,12 @@ */ connman->ForNode(nodeid, [this, &invs](CNode *pnode) { { + // Compute the time at which this requests times out. + auto timeout = + std::chrono::steady_clock::now() + std::chrono::seconds(10); // Register the query. - queries.getWriteView()->emplace( - pnode->GetId(), RequestRecord(GetAdjustedTime(), invs)); + queries.getWriteView()->insert( + {pnode->GetId(), round, timeout, invs}); } // Send the query to the node. diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -133,6 +133,12 @@ return nodeptr; } +static AvalancheResponse next(AvalancheResponse &r) { + auto copy = r; + r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()}; + return copy; +} + BOOST_AUTO_TEST_CASE(block_register) { AvalancheProcessor p(g_connman.get()); std::vector updates; @@ -161,10 +167,10 @@ BOOST_CHECK(p.isAccepted(pindex)); // Let's vote for this block a few times. - AvalancheResponse resp{0, {AvalancheVote(0, blockHash)}}; + AvalancheResponse resp{0, 0, {AvalancheVote(0, blockHash)}}; for (int i = 0; i < 4; i++) { AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } @@ -172,7 +178,7 @@ // We vote for it numerous times to finalize it. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } @@ -185,7 +191,7 @@ // Now finalize the decision. AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -203,17 +209,17 @@ BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); - resp = {0, {AvalancheVote(1, blockHash)}}; + resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(1, blockHash)}}; for (int i = 0; i < 4; i++) { AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); @@ -224,7 +230,7 @@ // Now it is rejected, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } @@ -237,7 +243,7 @@ // Now finalize the decision. AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); + BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); @@ -289,14 +295,17 @@ BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashA); + uint64_t round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(node0->GetId(), - {0, {AvalancheVote(0, blockHashA)}}, updates)); + BOOST_CHECK(p.registerVotes( + node0->GetId(), {round, 0, {AvalancheVote(0, blockHashA)}}, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on block B after one vote. AvalancheResponse resp{ - 0, {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}}; + round + 1, + 0, + {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}}; BOOST_CHECK(p.addBlockToReconcile(pindexB)); invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 2); @@ -310,24 +319,30 @@ // Let's vote for these blocks a few times. for (int i = 0; i < 3; i++) { AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); + BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); + BOOST_CHECK(p.registerVotes(node0->GetId(), next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggerd on A // and B. + NodeId secondNodeid = AvalancheTest::getSuitableNodeToQuery(p); + // NB: getSuitableNodeToQuery remove the node from the candidate list, so it + // has returned the node that will be queried second. The other one is the + // first. + NodeId firstNodeid = + (node0->GetId() == secondNodeid) ? node1->GetId() : node0->GetId(); AvalancheTest::runEventLoop(p); AvalancheTest::runEventLoop(p); // Next vote will finalize block A. - BOOST_CHECK(p.registerVotes(node1->GetId(), resp, updates)); + BOOST_CHECK(p.registerVotes(firstNodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexA); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -341,7 +356,7 @@ BOOST_CHECK(invs[0].hash == blockHashB); // Next vote will finalize block B. - BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); + BOOST_CHECK(p.registerVotes(secondNodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexB); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -385,13 +400,14 @@ BOOST_CHECK(invs[0].hash == blockHash); // Trigger a poll on avanode. + uint64_t round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); // There is no more suitable peer available, so return nothing. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); // Respond to the request. - AvalancheResponse resp = {0, {AvalancheVote(0, blockHash)}}; + AvalancheResponse resp = {round, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); @@ -400,45 +416,63 @@ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Sending a response when not polled fails. - BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); + BOOST_CHECK(!p.registerVotes(avanodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Trigger a poll on avanode. + round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); // Sending responses that do not match the request also fails. // 1. Too many results. - resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}}; + resp = { + round, 0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 2. Not enough results. - resp = {0, {}}; + resp = {AvalancheTest::getRound(p), 0, {}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 3. Do not match the poll. - resp = {0, {AvalancheVote()}}; + resp = {AvalancheTest::getRound(p), 0, {AvalancheVote()}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); - // Proper response gets processed and avanode is available again. - resp = {0, {AvalancheVote(0, blockHash)}}; + // 4. Invalid round count. Node is not returned to the pool. + uint64_t queryRound = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); - BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); + + resp = {queryRound + 1, 0, {AvalancheVote()}}; + BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); - BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); - // Making request for invalid nodes do not work. + resp = {queryRound - 1, 0, {AvalancheVote()}}; + BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // 5. Making request for invalid nodes do not work. Node is not returned to + // the pool. + resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(!p.registerVotes(avanodeid + 1234, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // Proper response gets processed and avanode is available again. + resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; + BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Out of order response are rejected. CBlock block2 = CreateAndProcessBlock({}, CScript()); @@ -446,15 +480,26 @@ CBlockIndex *pindex2 = mapBlockIndex[blockHash2]; BOOST_CHECK(p.addBlockToReconcile(pindex2)); - resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}}; + resp = {AvalancheTest::getRound(p), + 0, + {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + // But they are accepted in order. + resp = {AvalancheTest::getRound(p), + 0, + {AvalancheVote(0, blockHash2), AvalancheVote(0, blockHash)}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + // When a block is marked invalid, stop polling. pindex2->nStatus = pindex2->nStatus.withFailed(); - resp = {0, {AvalancheVote(0, blockHash)}}; + resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); @@ -495,39 +540,40 @@ BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid); // Add a new block. Check it is added to the polls. + uint64_t queryRound = AvalancheTest::getRound(p); BOOST_CHECK(p.addBlockToReconcile(pindex)); - uint32_t round = AvalancheTest::getRound(p); for (int i = 0; i < 1000; i++) { // Technically, this is a race condition, but this should do just fine // as we wait up to 1s for an event that should take 10ms. boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); - if (AvalancheTest::getRound(p) != round) { + if (AvalancheTest::getRound(p) != queryRound) { break; } } // Check that we effectively got a request and not timed out. - BOOST_CHECK(AvalancheTest::getRound(p) > round); + BOOST_CHECK(AvalancheTest::getRound(p) > queryRound); // Respond and check the cooldown time is respected. - round = AvalancheTest::getRound(p); + uint64_t responseRound = AvalancheTest::getRound(p); auto queryTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); std::vector updates; - p.registerVotes(nodeid, {100, {AvalancheVote(0, blockHash)}}, updates); + p.registerVotes(nodeid, {queryRound, 100, {AvalancheVote(0, blockHash)}}, + updates); for (int i = 0; i < 1000; i++) { // We make sure that we do not get a request before queryTime. boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); - if (AvalancheTest::getRound(p) != round) { + if (AvalancheTest::getRound(p) != responseRound) { BOOST_CHECK(std::chrono::steady_clock::now() > queryTime); break; } } // But we eventually get one. - BOOST_CHECK(AvalancheTest::getRound(p) > round); + BOOST_CHECK(AvalancheTest::getRound(p) > responseRound); // Stop event loop. BOOST_CHECK(p.stopEventLoop());