diff --git a/src/avalanche.cpp b/src/avalanche.cpp index 2f3e0eb65..dda0580c9 100644 --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -1,417 +1,475 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "avalanche.h" #include "chain.h" #include "config/bitcoin-config.h" #include "netmessagemaker.h" #include "reverse_iterator.h" #include "scheduler.h" #include "validation.h" #include /** * Run the avalanche event loop every 10ms. */ static const int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; /** * Maximum item count that can be polled at once. */ static const size_t AVALANCHE_MAX_ELEMENT_POLL = 4096; static uint32_t countBits(uint32_t v) { #if HAVE_DECL___BUILTIN_POPCOUNT return __builtin_popcount(v); #else /** * Computes the number of bits set in each group of 8bits then uses a * multiplication to sum all of them in the 8 most significant bits and * return these. * More detailed explanation can be found at * https://www.playingwithpointers.com/blog/swar.html */ v = v - ((v >> 1) & 0x55555555); v = (v & 0x33333333) + ((v >> 2) & 0x33333333); return (((v + (v >> 4)) & 0xF0F0F0F) * 0x1010101) >> 24; #endif } bool VoteRecord::registerVote(uint32_t error) { + // We just got a new vote, so there is one less inflight request. + clearInflightRequest(); + /** * The result of the vote is determined from the error code. If the error * code is 0, there is no error and therefore the vote is yes. If there is * an error, we check the most significant bit to decide if the vote is a no * (for instance, the block is invalid) or is the vote inconclusive (for * instance, the queried node does not have the block yet). */ votes = (votes << 1) | (error == 0); consider = (consider << 1) | (int32_t(error) >= 0); /** * We compute the number of yes and/or no votes as follow: * * votes: 1010 * consider: 1100 * * yes votes: 1000 using votes & consider * no votes: 0100 using ~votes & consider */ bool yes = countBits(votes & consider & 0xff) > 6; if (!yes) { bool no = countBits(~votes & consider & 0xff) > 6; if (!no) { // The round is inconclusive. return false; } } // If the round is in agreement with previous rounds, increase confidence. if (isAccepted() == yes) { confidence += 2; return getConfidence() == AVALANCHE_FINALIZATION_SCORE; } // The round changed our state. We reset the confidence. confidence = yes; return true; } +bool VoteRecord::registerPoll() const { + uint8_t count = inflight.load(); + while (count < AVALANCHE_MAX_INFLIGHT_POLL) { + if (inflight.compare_exchange_weak(count, count + 1)) { + return true; + } + } + + return false; +} + static bool IsWorthPolling(const CBlockIndex *pindex) { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } bool AvalancheProcessor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = chainActive.Contains(pindex); } return vote_records.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } int AvalancheProcessor::getConfidence(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } bool AvalancheProcessor::registerVotes( NodeId nodeid, const AvalancheResponse &response, std::vector &updates) { { // Save the time at which we can query again. auto w = peerSet.getWriteView(); auto it = w->find(nodeid); if (it != w->end()) { w->modify(it, [&response](Peer &p) { // FIXME: This will override the time even when we received an // old stale message. This should check that the message is // indeed the most up to date one before updating the time. p.nextRequestTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown()); }); } } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { // NB: The request may be old, so we don't increase banscore. return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { // TODO: increase banscore for inconsistent response. // NB: This isn't timeout but actually node misbehaving. return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { // TODO: increase banscore for inconsistent response. // NB: This isn't timeout but actually node misbehaving. return false; } } std::map responseIndex; { LOCK(cs_main); for (auto &v : votes) { BlockMap::iterator mi = mapBlockIndex.find(v.GetHash()); if (mi == mapBlockIndex.end()) { // This should not happen, but just in case... continue; } CBlockIndex *pindex = mi->second; if (!IsWorthPolling(pindex)) { // There is no point polling this block. continue; } responseIndex.insert(std::make_pair(pindex, v)); } } { // Register votes. auto w = vote_records.getWriteView(); for (auto &p : responseIndex) { CBlockIndex *pindex = p.first; const AvalancheVote &v = p.second; auto it = w->find(pindex); if (it == w.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(v.GetError())) { // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back( pindex, vr.isAccepted() ? AvalancheBlockUpdate::Status::Accepted : AvalancheBlockUpdate::Status::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(pindex, vr.isAccepted() ? AvalancheBlockUpdate::Status::Finalized : AvalancheBlockUpdate::Status::Invalid); w->erase(it); } } return true; } bool AvalancheProcessor::addPeer(NodeId nodeid, int64_t score) { return peerSet.getWriteView() ->insert({nodeid, score, std::chrono::steady_clock::now()}) .second; } bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { LOCK(cs_running); if (running) { // Do not start the event loop twice. return false; } running = true; // Start the event loop. scheduler.scheduleEvery( [this]() -> bool { runEventLoop(); if (!stopRequest) { return true; } LOCK(cs_running); running = false; cond_running.notify_all(); // A stop request was made. return false; }, AVALANCHE_TIME_STEP_MILLISECONDS); return true; } bool AvalancheProcessor::stopEventLoop() { WAIT_LOCK(cs_running, lock); if (!running) { return false; } // Request avalanche to stop. stopRequest = true; // Wait for avalanche to stop. cond_running.wait(lock, [this]() EXCLUSIVE_LOCKS_REQUIRED(cs_running) { return !running; }); stopRequest = false; return true; } -std::vector AvalancheProcessor::getInvsForNextPoll() const { +std::vector AvalancheProcessor::getInvsForNextPoll(bool forPoll) const { std::vector invs; auto r = vote_records.getReadView(); for (const std::pair &p : reverse_iterate(r)) { const CBlockIndex *pindex = p.first; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // Obviously do not poll if the block is not worth polling. continue; } } + // Check if we can run poll. + const bool shouldPoll = + forPoll ? p.second.registerPoll() : p.second.shouldPoll(); + if (!shouldPoll) { + continue; + } + // We don't have a decision, we need more votes. invs.emplace_back(MSG_BLOCK, pindex->GetBlockHash()); if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return invs; } } return invs; } NodeId AvalancheProcessor::getSuitableNodeToQuery() { auto r = peerSet.getReadView(); auto it = r->get().begin(); if (it == r->get().end()) { return NO_NODE; } if (it->nextRequestTime <= std::chrono::steady_clock::now()) { return it->nodeid; } return NO_NODE; } void AvalancheProcessor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); + std::map timedout_items{}; + + { + // Clear expired requests. + auto w = queries.getWriteView(); + auto it = w->get().begin(); + while (it != w->get().end() && it->timeout < now) { + for (auto &i : it->invs) { + timedout_items[i]++; + } + + w->get().erase(it++); + } + } + + if (timedout_items.empty()) { + return; + } - // Clear expired requests. - auto w = queries.getWriteView(); - auto it = w->get().begin(); - while (it != w->get().end() && it->timeout < now) { - w->get().erase(it++); + // In flight request accounting. + for (const auto &p : timedout_items) { + const CInv &inv = p.first; + assert(inv.type == MSG_BLOCK); + + CBlockIndex *pindex; + + { + LOCK(cs_main); + BlockMap::iterator mi = mapBlockIndex.find(inv.hash); + if (mi == mapBlockIndex.end()) { + continue; + } + + pindex = mi->second; + } + + auto w = vote_records.getWriteView(); + auto it = w->find(pindex); + if (it == w.end()) { + continue; + } + + it->second.clearInflightRequest(p.second); } } void AvalancheProcessor::runEventLoop() { // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); - std::vector invs = getInvsForNextPoll(); - if (invs.empty()) { - // If there are no invs to poll, we are done. - return; - } - while (true) { NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ + std::vector invs; bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { + invs = getInvsForNextPoll(); + if (invs.empty()) { + return false; + } + uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. auto w = peerSet.getWriteView(); auto it = w->find(pnode->GetId()); if (it != w->end()) { w->modify(it, [&timeout](Peer &p) { p.nextRequestTime = timeout; }); } } // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetSendVersion()) .Make(NetMsgType::AVAPOLL, AvalanchePoll(current_round, std::move(invs)))); return true; }); // Success! - if (hasSent) { + if (hasSent || invs.empty()) { return; } // This node is obsolete, delete it. peerSet.getWriteView()->erase(nodeid); } } diff --git a/src/avalanche.h b/src/avalanche.h index db7ecdef4..517551c79 100644 --- a/src/avalanche.h +++ b/src/avalanche.h @@ -1,299 +1,335 @@ // Copyright (c) 2018 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_H #define BITCOIN_AVALANCHE_H #include "blockindexworkcomparator.h" #include "net.h" #include "protocol.h" // for CInv #include "rwcollection.h" #include "serialize.h" #include "uint256.h" #include #include #include #include #include #include #include #include #include #include class Config; class CBlockIndex; class CScheduler; namespace { /** * Finalization score. */ static const int AVALANCHE_FINALIZATION_SCORE = 128; + /** * How long before we consider that a query timed out. */ static const int AVALANCHE_DEFAULT_QUERY_TIMEOUT_DURATION_MILLISECONDS = 10000; + +/** + * How many inflight requests can exist for one item. + */ +static const int AVALANCHE_MAX_INFLIGHT_POLL = 10; + /** * Special NodeId that represent no node. */ static const NodeId NO_NODE = -1; } /** * Vote history. */ struct VoteRecord { private: // Historical record of votes. uint8_t votes = 0; // Each bit indicate if the vote is to be considered. uint8_t consider = 0; + // How many in flight requests exists for this element. + mutable std::atomic inflight{0}; // confidence's LSB bit is the result. Higher bits are actual confidence // score. uint16_t confidence = 0; public: VoteRecord(bool accepted) : confidence(accepted) {} + /** + * Copy semantic + */ + VoteRecord(const VoteRecord &other) + : votes(other.votes), consider(other.consider), + inflight(other.inflight.load()), confidence(other.confidence) {} + + /** + * Vote accounting facilities. + */ bool isAccepted() const { return confidence & 0x01; } uint16_t getConfidence() const { return confidence >> 1; } bool hasFinalized() const { return getConfidence() >= AVALANCHE_FINALIZATION_SCORE; } /** * Register a new vote for an item and update confidence accordingly. * Returns true if the acceptance or finalization state changed. */ bool registerVote(uint32_t error); + + /** + * Register that a request is being made regarding that item. + * The method is made const so that it can be accessed via a read only view + * of vote_records. It's not a problem as it is made thread safe. + */ + bool registerPoll() const; + + /** + * Return if this item is in condition to be polled at the moment. + */ + bool shouldPoll() const { return inflight < AVALANCHE_MAX_INFLIGHT_POLL; } + + /** + * Clear `count` inflight requests. + */ + void clearInflightRequest(uint8_t count = 1) { inflight -= count; } }; class AvalancheVote { uint32_t error; uint256 hash; public: AvalancheVote() : error(-1), hash() {} AvalancheVote(uint32_t errorIn, uint256 hashIn) : error(errorIn), hash(hashIn) {} const uint256 &GetHash() const { return hash; } uint32_t GetError() const { return error; } // serialization support ADD_SERIALIZE_METHODS; template inline void SerializationOp(Stream &s, Operation ser_action) { READWRITE(error); READWRITE(hash); } }; class AvalancheResponse { uint64_t round; uint32_t cooldown; std::vector votes; public: AvalancheResponse(uint64_t roundIn, uint32_t cooldownIn, std::vector votesIn) : round(roundIn), cooldown(cooldownIn), votes(votesIn) {} uint64_t getRound() const { return round; } uint32_t getCooldown() const { return cooldown; } const std::vector &GetVotes() const { return votes; } // serialization support ADD_SERIALIZE_METHODS; template inline void SerializationOp(Stream &s, Operation ser_action) { READWRITE(round); READWRITE(cooldown); READWRITE(votes); } }; class AvalanchePoll { uint64_t round; std::vector invs; public: AvalanchePoll(uint64_t roundIn, std::vector invsIn) : round(roundIn), invs(invsIn) {} const std::vector &GetInvs() const { return invs; } // serialization support ADD_SERIALIZE_METHODS; template inline void SerializationOp(Stream &s, Operation ser_action) { READWRITE(round); READWRITE(invs); } }; class AvalancheBlockUpdate { union { CBlockIndex *pindex; uintptr_t raw; }; static const size_t STATUS_BITS = 2; static const uintptr_t MASK = (1 << STATUS_BITS) - 1; static_assert( alignof(CBlockIndex) >= (1 << STATUS_BITS), "CBlockIndex alignement doesn't allow for Status to be stored."); public: enum Status : uint8_t { Invalid, Rejected, Accepted, Finalized, }; AvalancheBlockUpdate(CBlockIndex *pindexIn, Status statusIn) : pindex(pindexIn) { raw |= statusIn; } Status getStatus() const { return Status(raw & MASK); } CBlockIndex *getBlockIndex() { return reinterpret_cast(raw & ~MASK); } const CBlockIndex *getBlockIndex() const { return const_cast(this)->getBlockIndex(); } }; typedef std::map BlockVoteMap; struct next_request_time {}; struct query_timeout {}; class AvalancheProcessor { private: CConnman *connman; std::chrono::milliseconds queryTimeoutDuration; /** * Blocks to run avalanche on. */ RWCollection vote_records; /** * Keep track of peers and queries sent. */ std::atomic round; typedef std::chrono::time_point TimePoint; struct Peer { NodeId nodeid; int64_t score; TimePoint nextRequestTime; }; typedef boost::multi_index_container< Peer, boost::multi_index::indexed_by< // index by nodeid boost::multi_index::hashed_unique< boost::multi_index::member>, // sorted by nextRequestTime boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>> PeerSet; RWCollection peerSet; struct Query { NodeId nodeid; uint64_t round; TimePoint timeout; /** * We declare this as mutable so it can be modified in the multi_index. * This is ok because we do not use this field to index in anyway. * * /!\ Do not use any mutable field as index. */ mutable std::vector invs; }; typedef boost::multi_index_container< Query, boost::multi_index::indexed_by< // index by nodeid/round boost::multi_index::ordered_unique< boost::multi_index::composite_key< Query, boost::multi_index::member, boost::multi_index::member>>, // sorted by timeout boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>> QuerySet; RWCollection queries; /** * Start stop machinery. */ std::atomic stopRequest; bool running GUARDED_BY(cs_running); CWaitableCriticalSection cs_running; std::condition_variable cond_running; public: AvalancheProcessor(CConnman *connmanIn) : connman(connmanIn), queryTimeoutDuration( AVALANCHE_DEFAULT_QUERY_TIMEOUT_DURATION_MILLISECONDS), round(0), stopRequest(false), running(false) {} ~AvalancheProcessor() { stopEventLoop(); } void setQueryTimeoutDuration(std::chrono::milliseconds d) { queryTimeoutDuration = d; } bool addBlockToReconcile(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; int getConfidence(const CBlockIndex *pindex) const; bool registerVotes(NodeId nodeid, const AvalancheResponse &response, std::vector &updates); bool addPeer(NodeId nodeid, int64_t score); bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); private: void runEventLoop(); void clearTimedoutRequests(); - std::vector getInvsForNextPoll() const; + std::vector getInvsForNextPoll(bool forPoll = true) const; NodeId getSuitableNodeToQuery(); friend struct AvalancheTest; }; #endif // BITCOIN_AVALANCHE_H diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp index 4b657a4ef..5e846c90c 100644 --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -1,720 +1,797 @@ // Copyright (c) 2010 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "avalanche.h" #include "config.h" #include "net_processing.h" // For PeerLogicValidation #include "test/test_bitcoin.h" #include struct AvalancheTest { static void runEventLoop(AvalancheProcessor &p) { p.runEventLoop(); } static std::vector getInvsForNextPoll(const AvalancheProcessor &p) { - return p.getInvsForNextPoll(); + return p.getInvsForNextPoll(false); } static NodeId getSuitableNodeToQuery(AvalancheProcessor &p) { return p.getSuitableNodeToQuery(); } static uint64_t getRound(const AvalancheProcessor &p) { return p.round; } }; BOOST_FIXTURE_TEST_SUITE(avalanche_tests, TestChain100Setup) #define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \ vr.registerVote(vote); \ BOOST_CHECK_EQUAL(vr.isAccepted(), state); \ BOOST_CHECK_EQUAL(vr.hasFinalized(), finalized); \ BOOST_CHECK_EQUAL(vr.getConfidence(), confidence); BOOST_AUTO_TEST_CASE(vote_record) { VoteRecord vraccepted(true); // Check initial state. BOOST_CHECK_EQUAL(vraccepted.isAccepted(), true); BOOST_CHECK_EQUAL(vraccepted.hasFinalized(), false); BOOST_CHECK_EQUAL(vraccepted.getConfidence(), 0); VoteRecord vr(false); // Check initial state. BOOST_CHECK_EQUAL(vr.isAccepted(), false); BOOST_CHECK_EQUAL(vr.hasFinalized(), false); BOOST_CHECK_EQUAL(vr.getConfidence(), 0); // We need to register 6 positive votes before we start counting. for (int i = 0; i < 6; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, false, false, 0); } // Next vote will flip state, and confidence will increase as long as we // vote yes. REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 7); } // Now confidence will increase as long as we vote yes. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); // Now that we have two no votes, confidence stop increasing. for (int i = 0; i < 5; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); } // Next vote will flip state, and confidence will increase as long as we // vote no. REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 7); } // Now confidence will increase as long as we vote no. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 0, false, true, AVALANCHE_FINALIZATION_SCORE); + + // Check that inflight accounting work as expected. + VoteRecord vrinflight(false); + for (int i = 0; i < 2 * AVALANCHE_MAX_INFLIGHT_POLL; i++) { + BOOST_CHECK_EQUAL(vrinflight.shouldPoll(), + i < AVALANCHE_MAX_INFLIGHT_POLL); + BOOST_CHECK_EQUAL(vrinflight.registerPoll(), vrinflight.shouldPoll()); + } + + // Clear various number of inflight requests and check everything behaves as + // expected. + for (int i = 1; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { + vrinflight.clearInflightRequest(i); + BOOST_CHECK(vrinflight.shouldPoll()); + + for (int j = 1; j < i; j++) { + BOOST_CHECK(vrinflight.registerPoll()); + BOOST_CHECK(vrinflight.shouldPoll()); + } + + BOOST_CHECK(vrinflight.registerPoll()); + BOOST_CHECK(!vrinflight.shouldPoll()); + } } BOOST_AUTO_TEST_CASE(block_update) { CBlockIndex index; CBlockIndex *pindex = &index; std::set status{ AvalancheBlockUpdate::Status::Invalid, AvalancheBlockUpdate::Status::Rejected, AvalancheBlockUpdate::Status::Accepted, AvalancheBlockUpdate::Status::Finalized, }; for (auto s : status) { AvalancheBlockUpdate abu(pindex, s); BOOST_CHECK(abu.getBlockIndex() == pindex); BOOST_CHECK_EQUAL(abu.getStatus(), s); } } CService ip(uint32_t i) { struct in_addr s; s.s_addr = i; return CService(CNetAddr(s), Params().GetDefaultPort()); } std::unique_ptr ConnectNode(const Config &config, ServiceFlags nServices, PeerLogicValidation &peerLogic) { static NodeId id = 0; CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); std::unique_ptr nodeptr(new CNode(id++, ServiceFlags(NODE_NETWORK), 0, INVALID_SOCKET, addr, 0, 0, CAddress(), "", /*fInboundIn=*/false)); CNode &node = *nodeptr; node.SetSendVersion(PROTOCOL_VERSION); node.nServices = nServices; peerLogic.InitializeNode(config, &node); node.nVersion = 1; node.fSuccessfullyConnected = true; CConnmanTest::AddNode(node); return nodeptr; } static AvalancheResponse next(AvalancheResponse &r) { auto copy = r; r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()}; return copy; } BOOST_AUTO_TEST_CASE(block_register) { AvalancheProcessor p(g_connman.get()); std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const uint256 blockHash = block.GetHash(); const CBlockIndex *pindex = mapBlockIndex[blockHash]; const Config &config = GetConfig(); // Create a node that supports avalanche. auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); NodeId nodeid = avanode->GetId(); BOOST_CHECK(p.addPeer(nodeid, 0)); // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(p.addBlockToReconcile(pindex)); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Newly added blocks' state reflect the blockchain. BOOST_CHECK(p.isAccepted(pindex)); // Let's vote for this block a few times. AvalancheResponse resp{0, 0, {AvalancheVote(0, blockHash)}}; for (int i = 0; i < 6; i++) { AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // A single neutral vote do not change anything. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(-1, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; for (int i = 1; i < 7; i++) { AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // Two neutral votes will stall progress. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(-1, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; for (int i = 2; i < 8; i++) { AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); } // We vote for it numerous times to finalize it. for (int i = 7; i < AVALANCHE_FINALIZATION_SCORE; i++) { AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), AvalancheBlockUpdate::Status::Finalized); updates = {}; // Once the decision is finalized, there is no poll for it. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); // Now let's undo this and finalize rejection. BOOST_CHECK(p.addBlockToReconcile(pindex)); invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(1, blockHash)}}; for (int i = 0; i < 6; i++) { AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), AvalancheBlockUpdate::Status::Rejected); updates = {}; // Now it is rejected, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), AvalancheBlockUpdate::Status::Invalid); updates = {}; // Once the decision is finalized, there is no poll for it. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); // Adding the block twice does nothing. BOOST_CHECK(p.addBlockToReconcile(pindex)); BOOST_CHECK(!p.addBlockToReconcile(pindex)); BOOST_CHECK(p.isAccepted(pindex)); CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(multi_block_register) { AvalancheProcessor p(g_connman.get()); CBlockIndex indexA, indexB; std::vector updates; const Config &config = GetConfig(); // Create a node that supports avalanche. auto node0 = ConnectNode(config, NODE_AVALANCHE, *peerLogic); BOOST_CHECK(p.addPeer(node0->GetId(), 0)); auto node1 = ConnectNode(config, NODE_AVALANCHE, *peerLogic); BOOST_CHECK(p.addPeer(node1->GetId(), 0)); // Make sure the block has a hash. CBlock blockA = CreateAndProcessBlock({}, CScript()); const uint256 blockHashA = blockA.GetHash(); const CBlockIndex *pindexA = mapBlockIndex[blockHashA]; CBlock blockB = CreateAndProcessBlock({}, CScript()); const uint256 blockHashB = blockB.GetHash(); const CBlockIndex *pindexB = mapBlockIndex[blockHashB]; // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindexA)); BOOST_CHECK(!p.isAccepted(pindexB)); // Start voting on block A. BOOST_CHECK(p.addBlockToReconcile(pindexA)); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashA); uint64_t round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes( node0->GetId(), {round, 0, {AvalancheVote(0, blockHashA)}}, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on block B after one vote. AvalancheResponse resp{ round + 1, 0, {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}}; BOOST_CHECK(p.addBlockToReconcile(pindexB)); invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 2); // Ensure B comes before A because it has accumulated more PoW. BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); BOOST_CHECK_EQUAL(invs[1].type, MSG_BLOCK); BOOST_CHECK(invs[1].hash == blockHashA); // Let's vote for these blocks a few times. for (int i = 0; i < 4; i++) { NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggerd on A // and B. NodeId firstNodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); NodeId secondNodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(firstNodeid != secondNodeid); // Next vote will finalize block A. BOOST_CHECK(p.registerVotes(firstNodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexA); BOOST_CHECK_EQUAL(updates[0].getStatus(), AvalancheBlockUpdate::Status::Finalized); updates = {}; // We do not vote on A anymore. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); // Next vote will finalize block B. BOOST_CHECK(p.registerVotes(secondNodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexB); BOOST_CHECK_EQUAL(updates[0].getStatus(), AvalancheBlockUpdate::Status::Finalized); updates = {}; // There is nothing left to vote on. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(poll_and_response) { AvalancheProcessor p(g_connman.get()); std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const uint256 blockHash = block.GetHash(); const CBlockIndex *pindex = mapBlockIndex[blockHash]; const Config &config = GetConfig(); // There is no node to query. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), NO_NODE); // Create a node that supports avalanche and one that doesn't. auto oldnode = ConnectNode(config, NODE_NONE, *peerLogic); auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(p.addPeer(avanodeid, 0)); // It returns the avalanche peer. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Register a block and check it is added to the list of elements to poll. BOOST_CHECK(p.addBlockToReconcile(pindex)); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Trigger a poll on avanode. uint64_t round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); // There is no more suitable peer available, so return nothing. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), NO_NODE); // Respond to the request. AvalancheResponse resp = {round, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Now that avanode fullfilled his request, it is added back to the list of // queriable nodes. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Sending a response when not polled fails. BOOST_CHECK(!p.registerVotes(avanodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Trigger a poll on avanode. round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), NO_NODE); // Sending responses that do not match the request also fails. // 1. Too many results. resp = { round, 0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 2. Not enough results. resp = {AvalancheTest::getRound(p), 0, {}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 3. Do not match the poll. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote()}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 4. Invalid round count. Request is not discarded. uint64_t queryRound = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); resp = {queryRound + 1, 0, {AvalancheVote()}}; BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {queryRound - 1, 0, {AvalancheVote()}}; BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // 5. Making request for invalid nodes do not work. Request is not // discarded. resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(!p.registerVotes(avanodeid + 1234, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Proper response gets processed and avanode is available again. resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Out of order response are rejected. CBlock block2 = CreateAndProcessBlock({}, CScript()); const uint256 blockHash2 = block2.GetHash(); CBlockIndex *pindex2 = mapBlockIndex[blockHash2]; BOOST_CHECK(p.addBlockToReconcile(pindex2)); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // But they are accepted in order. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash2), AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // When a block is marked invalid, stop polling. pindex2->nStatus = pindex2->nStatus.withFailed(); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(poll_inflight_timeout) { AvalancheProcessor p(g_connman.get()); std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const uint256 blockHash = block.GetHash(); const CBlockIndex *pindex = mapBlockIndex[blockHash]; // Add the block BOOST_CHECK(p.addBlockToReconcile(pindex)); // Create a node that supports avalanche. const Config &config = GetConfig(); auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(p.addPeer(avanodeid, 0)); // Expire requests after some time. p.setQueryTimeoutDuration(std::chrono::milliseconds(10)); for (int i = 0; i < 10; i++) { AvalancheResponse resp = { AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); // NB: This could wait longer than 1ms in some cases and make the // test flacky. We'll have to come up with a better solution to test // this if that were to be the case. I never was able to trigger this // myself, so it's probably good enough. boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodeid, next(resp), updates)); // Now try again but wait. AvalancheTest::runEventLoop(p); boost::this_thread::sleep_for(boost::chrono::milliseconds(10)); AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, next(resp), updates)); } CConnmanTest::ClearNodes(); } +BOOST_AUTO_TEST_CASE(poll_inflight_count) { + AvalancheProcessor p(g_connman.get()); + const Config &config = GetConfig(); + + // Create enough nodes so that we run into the inflight request limit. + std::array, AVALANCHE_MAX_INFLIGHT_POLL + 1> nodes; + for (auto &n : nodes) { + n = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + BOOST_CHECK(p.addPeer(n->GetId(), 0)); + } + + // Add a block to poll + CBlock block = CreateAndProcessBlock({}, CScript()); + const uint256 blockHash = block.GetHash(); + const CBlockIndex *pindex = mapBlockIndex[blockHash]; + BOOST_CHECK(p.addBlockToReconcile(pindex)); + + // Ensure there are enough requests in flight. + std::map node_round_map; + for (int i = 0; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { + NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); + BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); + node_round_map[nodeid] = AvalancheTest::getRound(p); + auto invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 1); + BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); + BOOST_CHECK(invs[0].hash == blockHash); + AvalancheTest::runEventLoop(p); + } + + // Now that we have enough in flight requests, we shouldn't poll. + auto suitablenodeid = AvalancheTest::getSuitableNodeToQuery(p); + BOOST_CHECK(suitablenodeid != NO_NODE); + auto invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 0); + AvalancheTest::runEventLoop(p); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), suitablenodeid); + + std::vector updates; + + // Send one response, now we can poll again. + auto it = node_round_map.begin(); + AvalancheResponse resp = {it->second, 0, {AvalancheVote(0, blockHash)}}; + BOOST_CHECK(p.registerVotes(it->first, resp, updates)); + node_round_map.erase(it); + + invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 1); + BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); + BOOST_CHECK(invs[0].hash == blockHash); + + CConnmanTest::ClearNodes(); +} + BOOST_AUTO_TEST_CASE(event_loop) { AvalancheProcessor p(g_connman.get()); CScheduler s; CBlock block = CreateAndProcessBlock({}, CScript()); const uint256 blockHash = block.GetHash(); const CBlockIndex *pindex = mapBlockIndex[blockHash]; // Starting the event loop. BOOST_CHECK(p.startEventLoop(s)); // There is one task planned in the next hour (our event loop). boost::chrono::system_clock::time_point start, stop; BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. BOOST_CHECK(!p.startEventLoop(s)); // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); // Create a node and a block to query. const Config &config = GetConfig(); // Create a node that supports avalanche. auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); NodeId nodeid = avanode->GetId(); BOOST_CHECK(p.addPeer(nodeid, 0)); // There is no query in flight at the moment. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid); // Add a new block. Check it is added to the polls. uint64_t queryRound = AvalancheTest::getRound(p); BOOST_CHECK(p.addBlockToReconcile(pindex)); for (int i = 0; i < 1000; i++) { // Technically, this is a race condition, but this should do just fine // as we wait up to 1s for an event that should take 10ms. boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); if (AvalancheTest::getRound(p) != queryRound) { break; } } // Check that we effectively got a request and not timed out. BOOST_CHECK(AvalancheTest::getRound(p) > queryRound); // Respond and check the cooldown time is respected. uint64_t responseRound = AvalancheTest::getRound(p); auto queryTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); std::vector updates; p.registerVotes(nodeid, {queryRound, 100, {AvalancheVote(0, blockHash)}}, updates); for (int i = 0; i < 1000; i++) { // We make sure that we do not get a request before queryTime. boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); if (AvalancheTest::getRound(p) != responseRound) { BOOST_CHECK(std::chrono::steady_clock::now() > queryTime); break; } } // But we eventually get one. BOOST_CHECK(AvalancheTest::getRound(p) > responseRound); // Stop event loop. BOOST_CHECK(p.stopEventLoop()); // We don't have any task scheduled anymore. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Can't stop the event loop twice. BOOST_CHECK(!p.stopEventLoop()); // Wait for the scheduler to stop. s.stop(true); schedulerThread.join(); CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(destructor) { CScheduler s; boost::chrono::system_clock::time_point start, stop; // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); { AvalancheProcessor p(g_connman.get()); BOOST_CHECK(p.startEventLoop(s)); BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); } // Now that avalanche is destroyed, there is no more scheduled tasks. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Wait for the scheduler to stop. s.stop(true); schedulerThread.join(); } BOOST_AUTO_TEST_SUITE_END()