diff --git a/src/avalanche.h b/src/avalanche.h --- a/src/avalanche.h +++ b/src/avalanche.h @@ -33,10 +33,17 @@ * Finalization score. */ static const int AVALANCHE_FINALIZATION_SCORE = 128; + /** * How long before we consider that a query timed out. */ static const int AVALANCHE_DEFAULT_QUERY_TIMEOUT_DURATION_MILLISECONDS = 10000; + +/** + * How many inflight requests can exist for one item. + */ +static const int AVALANCHE_MAX_INFLIGHT_POLL = 10; + /** * Special NodeId that represent no node. */ @@ -52,6 +59,8 @@ uint8_t votes = 0; // Each bit indicate if the vote is to be considered. uint8_t consider = 0; + // How many in flight requests exists for this element. + mutable std::atomic inflight{0}; // confidence's LSB bit is the result. Higher bits are actual confidence // score. @@ -60,6 +69,16 @@ public: VoteRecord(bool accepted) : confidence(accepted) {} + /** + * Copy semantic + */ + VoteRecord(const VoteRecord &other) + : votes(other.votes), consider(other.consider), + inflight(other.inflight.load()), confidence(other.confidence) {} + + /** + * Vote accounting facilities. + */ bool isAccepted() const { return confidence & 0x01; } uint16_t getConfidence() const { return confidence >> 1; } @@ -72,6 +91,23 @@ * Returns true if the acceptance or finalization state changed. */ bool registerVote(uint32_t error); + + /** + * Register that a request is being made regarding that item. + * The method is made const so that it can be accessed via a read only view + * of vote_records. It's not a problem as it is made thread safe. + */ + bool registerPoll() const; + + /** + * Return if this item is in condition to be polled at the moment. + */ + bool shouldPoll() const { return inflight < AVALANCHE_MAX_INFLIGHT_POLL; } + + /** + * Clear `count` inflight requests. + */ + void clearInflightRequest(uint8_t count = 1) { inflight -= count; } }; class AvalancheVote { @@ -290,7 +326,7 @@ private: void runEventLoop(); void clearTimedoutRequests(); - std::vector getInvsForNextPoll() const; + std::vector getInvsForNextPoll(bool forPoll = true) const; NodeId getSuitableNodeToQuery(); friend struct AvalancheTest; diff --git a/src/avalanche.cpp b/src/avalanche.cpp --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -41,6 +41,9 @@ } bool VoteRecord::registerVote(uint32_t error) { + // We just got a new vote, so there is one less inflight request. + clearInflightRequest(); + /** * The result of the vote is determined from the error code. If the error * code is 0, there is no error and therefore the vote is yes. If there is @@ -80,6 +83,17 @@ return true; } +bool VoteRecord::registerPoll() const { + uint8_t count = inflight.load(); + while (count < AVALANCHE_MAX_INFLIGHT_POLL) { + if (inflight.compare_exchange_weak(count, count + 1)) { + return true; + } + } + + return false; +} + static bool IsWorthPolling(const CBlockIndex *pindex) { AssertLockHeld(cs_main); @@ -302,7 +316,7 @@ return true; } -std::vector AvalancheProcessor::getInvsForNextPoll() const { +std::vector AvalancheProcessor::getInvsForNextPoll(bool forPoll) const { std::vector invs; auto r = vote_records.getReadView(); @@ -318,6 +332,13 @@ } } + // Check if we can run poll. + const bool shouldPoll = + forPoll ? p.second.registerPoll() : p.second.shouldPoll(); + if (!shouldPoll) { + continue; + } + // We don't have a decision, we need more votes. invs.emplace_back(MSG_BLOCK, pindex->GetBlockHash()); if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { @@ -346,12 +367,49 @@ void AvalancheProcessor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); + std::map timedout_items{}; + + { + // Clear expired requests. + auto w = queries.getWriteView(); + auto it = w->get().begin(); + while (it != w->get().end() && it->timeout < now) { + for (auto &i : it->invs) { + timedout_items[i]++; + } + + w->get().erase(it++); + } + } + + if (timedout_items.empty()) { + return; + } - // Clear expired requests. - auto w = queries.getWriteView(); - auto it = w->get().begin(); - while (it != w->get().end() && it->timeout < now) { - w->get().erase(it++); + // In flight request accounting. + for (const auto &p : timedout_items) { + const CInv &inv = p.first; + assert(inv.type == MSG_BLOCK); + + CBlockIndex *pindex; + + { + LOCK(cs_main); + BlockMap::iterator mi = mapBlockIndex.find(inv.hash); + if (mi == mapBlockIndex.end()) { + continue; + } + + pindex = mi->second; + } + + auto w = vote_records.getWriteView(); + auto it = w->find(pindex); + if (it == w.end()) { + continue; + } + + it->second.clearInflightRequest(p.second); } } @@ -360,12 +418,6 @@ // them. clearTimedoutRequests(); - std::vector invs = getInvsForNextPoll(); - if (invs.empty()) { - // If there are no invs to poll, we are done. - return; - } - while (true) { NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { @@ -377,7 +429,13 @@ * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ + std::vector invs; bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { + invs = getInvsForNextPoll(); + if (invs.empty()) { + return false; + } + uint64_t current_round = round++; { @@ -407,7 +465,7 @@ }); // Success! - if (hasSent) { + if (hasSent || invs.empty()) { return; } diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -14,7 +14,7 @@ static void runEventLoop(AvalancheProcessor &p) { p.runEventLoop(); } static std::vector getInvsForNextPoll(const AvalancheProcessor &p) { - return p.getInvsForNextPoll(); + return p.getInvsForNextPoll(false); } static NodeId getSuitableNodeToQuery(AvalancheProcessor &p) { @@ -107,6 +107,29 @@ // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 0, false, true, AVALANCHE_FINALIZATION_SCORE); + + // Check that inflight accounting work as expected. + VoteRecord vrinflight(false); + for (int i = 0; i < 2 * AVALANCHE_MAX_INFLIGHT_POLL; i++) { + BOOST_CHECK_EQUAL(vrinflight.shouldPoll(), + i < AVALANCHE_MAX_INFLIGHT_POLL); + BOOST_CHECK_EQUAL(vrinflight.registerPoll(), vrinflight.shouldPoll()); + } + + // Clear various number of inflight requests and check everything behaves as + // expected. + for (int i = 1; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { + vrinflight.clearInflightRequest(i); + BOOST_CHECK(vrinflight.shouldPoll()); + + for (int j = 1; j < i; j++) { + BOOST_CHECK(vrinflight.registerPoll()); + BOOST_CHECK(vrinflight.shouldPoll()); + } + + BOOST_CHECK(vrinflight.registerPoll()); + BOOST_CHECK(!vrinflight.shouldPoll()); + } } BOOST_AUTO_TEST_CASE(block_update) { @@ -612,6 +635,60 @@ CConnmanTest::ClearNodes(); } +BOOST_AUTO_TEST_CASE(poll_inflight_count) { + AvalancheProcessor p(g_connman.get()); + const Config &config = GetConfig(); + + // Create enough nodes so that we run into the inflight request limit. + std::array, AVALANCHE_MAX_INFLIGHT_POLL + 1> nodes; + for (auto &n : nodes) { + n = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + BOOST_CHECK(p.addPeer(n->GetId(), 0)); + } + + // Add a block to poll + CBlock block = CreateAndProcessBlock({}, CScript()); + const uint256 blockHash = block.GetHash(); + const CBlockIndex *pindex = mapBlockIndex[blockHash]; + BOOST_CHECK(p.addBlockToReconcile(pindex)); + + // Ensure there are enough requests in flight. + std::map node_round_map; + for (int i = 0; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { + NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); + BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); + node_round_map[nodeid] = AvalancheTest::getRound(p); + auto invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 1); + BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); + BOOST_CHECK(invs[0].hash == blockHash); + AvalancheTest::runEventLoop(p); + } + + // Now that we have enough in flight requests, we shouldn't poll. + auto suitablenodeid = AvalancheTest::getSuitableNodeToQuery(p); + BOOST_CHECK(suitablenodeid != NO_NODE); + auto invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 0); + AvalancheTest::runEventLoop(p); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), suitablenodeid); + + std::vector updates; + + // Send one response, now we can poll again. + auto it = node_round_map.begin(); + AvalancheResponse resp = {it->second, 0, {AvalancheVote(0, blockHash)}}; + BOOST_CHECK(p.registerVotes(it->first, resp, updates)); + node_round_map.erase(it); + + invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 1); + BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); + BOOST_CHECK(invs[0].hash == blockHash); + + CConnmanTest::ClearNodes(); +} + BOOST_AUTO_TEST_CASE(event_loop) { AvalancheProcessor p(g_connman.get()); CScheduler s;