diff --git a/src/avalanche/node.h b/src/avalanche/node.h index 43c37c0da..e5d85b881 100644 --- a/src/avalanche/node.h +++ b/src/avalanche/node.h @@ -1,31 +1,35 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_NODE_H #define BITCOIN_AVALANCHE_NODE_H #include // For NodeId #include #include #include using PeerId = uint32_t; static constexpr PeerId NO_PEER = -1; using TimePoint = std::chrono::time_point; -struct AvalancheNode { +namespace avalanche { + +struct Node { NodeId nodeid; PeerId peerid; TimePoint nextRequestTime; CPubKey pubkey; - AvalancheNode(NodeId nodeid_, PeerId peerid_, CPubKey pubkey_) + Node(NodeId nodeid_, PeerId peerid_, CPubKey pubkey_) : nodeid(nodeid_), peerid(peerid_), nextRequestTime(std::chrono::steady_clock::now()), pubkey(std::move(pubkey_)) {} }; +} // namespace avalanche + #endif // BITCOIN_AVALANCHE_NODE_H diff --git a/src/avalanche/peermanager.cpp b/src/avalanche/peermanager.cpp index 27b8973e2..aa01fb76e 100644 --- a/src/avalanche/peermanager.cpp +++ b/src/avalanche/peermanager.cpp @@ -1,298 +1,301 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include +namespace avalanche { + PeerId PeerManager::addPeer(PeerId p, uint32_t score) { auto inserted = peers.emplace(p, Peer(score, uint32_t(slots.size()))); assert(inserted.second); const uint64_t start = slotCount; slots.emplace_back(start, score, p); slotCount = start + score; return p; } bool PeerManager::removePeer(PeerId p) { auto it = peers.find(p); if (it == peers.end()) { return false; } size_t i = it->second.index; assert(i < slots.size()); if (i + 1 == slots.size()) { slots.pop_back(); slotCount = slots.empty() ? 0 : slots.back().getStop(); } else { fragmentation += slots[i].getScore(); slots[i] = slots[i].withPeerId(NO_PEER); } // Remove nodes associated with this peer, unless their timeout is still // active. This ensure that we don't overquery them in case their are // subsequently added to another peer. auto &nview = nodes.get(); nview.erase(nview.lower_bound(boost::make_tuple(p, TimePoint())), nview.upper_bound( boost::make_tuple(p, std::chrono::steady_clock::now()))); peers.erase(it); return true; } bool PeerManager::rescorePeer(PeerId p, uint32_t score) { auto it = peers.find(p); if (it == peers.end()) { return false; } size_t i = it->second.index; assert(i < slots.size()); // Update the peer's score. it->second.score = score; // Update the slot allocation to reflect the new score. const uint64_t start = slots[i].getStart(); // If this is the last element, we can extend/shrink easily. if (i + 1 == slots.size()) { slots[i] = slots[i].withScore(score); slotCount = slots[i].getStop(); return true; } const uint64_t stop = start + score; const uint64_t nextStart = slots[i + 1].getStart(); // We can extend in place. if (stop <= nextStart) { fragmentation += (slots[i].getStop() - stop); slots[i] = slots[i].withScore(score); return true; } // So we need to insert a new entry. fragmentation += slots[i].getScore(); slots[i] = slots[i].withPeerId(NO_PEER); it->second.index = uint32_t(slots.size()); const uint64_t newStart = slotCount; slots.emplace_back(newStart, score, p); slotCount = newStart + score; return true; } bool PeerManager::addNodeToPeer(PeerId peerid, NodeId nodeid, CPubKey pubkey) { auto pit = peers.find(peerid); if (pit == peers.end()) { return false; } auto nit = nodes.find(nodeid); if (nit == nodes.end()) { return nodes.emplace(nodeid, peerid, std::move(pubkey)).second; } // We actually have this node already, we need to update it. - return nodes.modify(nit, [&](AvalancheNode &n) { + return nodes.modify(nit, [&](Node &n) { n.peerid = peerid; n.pubkey = std::move(pubkey); }); } bool PeerManager::removeNode(NodeId nodeid) { return nodes.erase(nodeid) > 0; } -bool PeerManager::forNode( - NodeId nodeid, std::function func) const { +bool PeerManager::forNode(NodeId nodeid, + std::function func) const { auto it = nodes.find(nodeid); return it != nodes.end() && func(*it); } bool PeerManager::updateNextRequestTime(NodeId nodeid, TimePoint timeout) { auto it = nodes.find(nodeid); if (it == nodes.end()) { return false; } - return nodes.modify(it, - [&](AvalancheNode &n) { n.nextRequestTime = timeout; }); + return nodes.modify(it, [&](Node &n) { n.nextRequestTime = timeout; }); } NodeId PeerManager::getSuitableNodeToQuery() { for (int retry = 0; retry < SELECT_NODE_MAX_RETRY; retry++) { const PeerId p = selectPeer(); // If we cannot find a peer, it may be due to the fact that it is // unlikely due to high fragmentation, so compact and retry. if (p == NO_PEER) { compact(); continue; } // See if that peer has an available node. auto &nview = nodes.get(); auto it = nview.lower_bound(boost::make_tuple(p, TimePoint())); if (it != nview.end() && it->peerid == p && it->nextRequestTime <= std::chrono::steady_clock::now()) { return it->nodeid; } } return NO_NODE; } PeerId PeerManager::selectPeer() const { if (slots.empty() || slotCount == 0) { return NO_PEER; } const uint64_t max = slotCount; for (int retry = 0; retry < SELECT_PEER_MAX_RETRY; retry++) { size_t i = selectPeerImpl(slots, GetRand(max), max); if (i != NO_PEER) { return i; } } return NO_PEER; } uint64_t PeerManager::compact() { // There is nothing to compact. if (fragmentation == 0) { return 0; } // Shrink the vector to the expected size. while (slots.size() > peers.size()) { slots.pop_back(); } uint64_t prevStop = 0; uint32_t i = 0; for (auto &pair : peers) { PeerId pid = pair.first; Peer &p = pair.second; slots[i] = Slot(prevStop, p.score, pid); prevStop = slots[i].getStop(); p.index = i++; } const uint64_t saved = slotCount - prevStop; slotCount = prevStop; fragmentation = 0; return saved; } bool PeerManager::verify() const { uint64_t prevStop = 0; for (size_t i = 0; i < slots.size(); i++) { const Slot &s = slots[i]; // Slots must be in correct order. if (s.getStart() < prevStop) { return false; } prevStop = s.getStop(); // If this is a dead slot, then nothing more needs to be checked. if (s.getPeerId() == NO_PEER) { continue; } // We have a live slot, verify index. auto it = peers.find(s.getPeerId()); if (it == peers.end() || it->second.index != i) { return false; } } for (const auto &pair : peers) { auto p = pair.first; auto i = pair.second.index; auto s = pair.second.score; // The index must point to a slot refering to this peer. if (i >= slots.size() || slots[i].getPeerId() != p) { return false; } // If the score do not match, same thing. if (slots[i].getScore() != s) { return false; } } return true; } PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max) { assert(slot <= max); size_t begin = 0, end = slots.size(); uint64_t bottom = 0, top = max; // Try to find the slot using dichotomic search. while ((end - begin) > 8) { // The slot we picked in not allocated. if (slot < bottom || slot >= top) { return NO_PEER; } // Guesstimate the position of the slot. size_t i = begin + ((slot - bottom) * (end - begin) / (top - bottom)); assert(begin <= i && i < end); // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } // We undershooted. if (slots[i].precedes(slot)) { begin = i + 1; if (begin >= end) { return NO_PEER; } bottom = slots[begin].getStart(); continue; } // We overshooted. if (slots[i].follows(slot)) { end = i; top = slots[end].getStart(); continue; } // We have an unalocated slot. return NO_PEER; } // Enough of that nonsense, let fallback to linear search. for (size_t i = begin; i < end; i++) { // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } } // We failed to find a slot, retry. return NO_PEER; } + +} // namespace avalanche diff --git a/src/avalanche/peermanager.h b/src/avalanche/peermanager.h index 164bedd2c..bd65ef0d8 100644 --- a/src/avalanche/peermanager.h +++ b/src/avalanche/peermanager.h @@ -1,150 +1,151 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PEERMANAGER_H #define BITCOIN_AVALANCHE_PEERMANAGER_H #include #include #include #include #include #include #include #include #include #include #include #include +namespace avalanche { + struct Slot { private: uint64_t start; uint32_t score; PeerId peerid; public: Slot(uint64_t startIn, uint32_t scoreIn, PeerId peeridIn) : start(startIn), score(scoreIn), peerid(peeridIn) {} Slot withStart(uint64_t startIn) const { return Slot(startIn, score, peerid); } Slot withScore(uint64_t scoreIn) const { return Slot(start, scoreIn, peerid); } Slot withPeerId(PeerId peeridIn) const { return Slot(start, score, peeridIn); } uint64_t getStart() const { return start; } uint64_t getStop() const { return start + score; } uint32_t getScore() const { return score; } PeerId getPeerId() const { return peerid; } bool contains(uint64_t slot) const { return getStart() <= slot && slot < getStop(); } bool precedes(uint64_t slot) const { return slot >= getStop(); } bool follows(uint64_t slot) const { return getStart() > slot; } }; struct next_request_time {}; class PeerManager { std::vector slots; uint64_t slotCount = 0; uint64_t fragmentation = 0; /** * Several nodes can make an avalanche peer. In this case, all nodes are * considered interchangeable parts of the same peer. */ struct Peer { uint32_t score; uint32_t index; Peer(uint32_t score_, uint32_t index_) : score(score_), index(index_) {} }; PeerId nextPeerId = 0; std::unordered_map peers; using NodeSet = boost::multi_index_container< - AvalancheNode, + Node, boost::multi_index::indexed_by< // index by nodeid - boost::multi_index::hashed_unique>, + boost::multi_index::hashed_unique< + boost::multi_index::member>, // sorted by peerid/nextRequestTime boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::composite_key< - AvalancheNode, - boost::multi_index::member, - boost::multi_index::member< - AvalancheNode, TimePoint, - &AvalancheNode::nextRequestTime>>>>>; + Node, + boost::multi_index::member, + boost::multi_index::member>>>>; NodeSet nodes; static constexpr int SELECT_PEER_MAX_RETRY = 3; static constexpr int SELECT_NODE_MAX_RETRY = 3; public: /** * Peer API. */ PeerId addPeer(uint32_t score) { return addPeer(nextPeerId++, score); } bool removePeer(PeerId p); bool rescorePeer(PeerId p, uint32_t score); /** * Node API. */ bool addNodeToPeer(PeerId peerid, NodeId nodeid, CPubKey pubkey); bool removeNode(NodeId nodeid); - bool forNode(NodeId nodeid, - std::function func) const; + bool forNode(NodeId nodeid, std::function func) const; bool updateNextRequestTime(NodeId nodeid, TimePoint timeout); NodeId getSuitableNodeToQuery(); /** * Exposed for tests. */ PeerId selectPeer() const; /** * Trigger maintenance of internal datastructures. * Returns how much slot space was saved after compaction. */ uint64_t compact(); /** * Perform consistency check on internal data structures. * Mostly useful for tests. */ bool verify() const; // Accssors. uint64_t getSlotCount() const { return slotCount; } uint64_t getFragmentation() const { return fragmentation; } private: PeerId addPeer(PeerId peerid, uint32_t score); }; /** * This is an internal method that is exposed for testing purposes. */ PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max); +} // namespace avalanche + #endif // BITCOIN_AVALANCHE_PEERMANAGER_H diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 764375dc6..c871f7eaf 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,518 +1,518 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. -std::unique_ptr g_avalanche; +std::unique_ptr g_avalanche; + +namespace avalanche { bool VoteRecord::registerVote(NodeId nodeid, uint32_t error) { // We just got a new vote, so there is one less inflight request. clearInflightRequest(); // We want to avoid having the same node voting twice in a quorum. if (!addNodeToQuorum(nodeid)) { return false; } /** * The result of the vote is determined from the error code. If the error * code is 0, there is no error and therefore the vote is yes. If there is * an error, we check the most significant bit to decide if the vote is a no * (for instance, the block is invalid) or is the vote inconclusive (for * instance, the queried node does not have the block yet). */ votes = (votes << 1) | (error == 0); consider = (consider << 1) | (int32_t(error) >= 0); /** * We compute the number of yes and/or no votes as follow: * * votes: 1010 * consider: 1100 * * yes votes: 1000 using votes & consider * no votes: 0100 using ~votes & consider */ bool yes = countBits(votes & consider & 0xff) > 6; if (!yes) { bool no = countBits(~votes & consider & 0xff) > 6; if (!no) { // The round is inconclusive. return false; } } // If the round is in agreement with previous rounds, increase confidence. if (isAccepted() == yes) { confidence += 2; return getConfidence() == AVALANCHE_FINALIZATION_SCORE; } // The round changed our state. We reset the confidence. confidence = yes; return true; } bool VoteRecord::addNodeToQuorum(NodeId nodeid) { if (nodeid == NO_NODE) { // Helpful for testing. return true; } // MMIX Linear Congruent Generator. const uint64_t r1 = 6364136223846793005 * uint64_t(nodeid) + 1442695040888963407; // Fibonacci hashing. const uint64_t r2 = 11400714819323198485ull * (nodeid ^ seed); // Combine and extract hash. const uint16_t h = (r1 + r2) >> 48; /** * Check if the node is in the filter. */ for (size_t i = 1; i < nodeFilter.size(); i++) { if (nodeFilter[(successfulVotes + i) % nodeFilter.size()] == h) { return false; } } /** * Add the node which just voted to the filter. */ nodeFilter[successfulVotes % nodeFilter.size()] = h; successfulVotes++; return true; } bool VoteRecord::registerPoll() const { uint8_t count = inflight.load(); while (count < AVALANCHE_MAX_INFLIGHT_POLL) { if (inflight.compare_exchange_weak(count, count + 1)) { return true; } } return false; } static bool IsWorthPolling(const CBlockIndex *pindex) { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } -AvalancheProcessor::AvalancheProcessor(CConnman *connmanIn) +Processor::Processor(CConnman *connmanIn) : connman(connmanIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), round(0), peerManager(std::make_unique()) { // Pick a random key for the session. sessionKey.MakeNewKey(true); } -AvalancheProcessor::~AvalancheProcessor() { +Processor::~Processor() { stopEventLoop(); } -bool AvalancheProcessor::addBlockToReconcile(const CBlockIndex *pindex) { +bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = ::ChainActive().Contains(pindex); } return vote_records.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } -bool AvalancheProcessor::isAccepted(const CBlockIndex *pindex) const { +bool Processor::isAccepted(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } -int AvalancheProcessor::getConfidence(const CBlockIndex *pindex) const { +int Processor::getConfidence(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { -/** - * When using TCP, we need to sign all messages as the transport layer is not - * secure. - */ -class TCPAvalancheResponse { - AvalancheResponse response; - std::array sig; - -public: - TCPAvalancheResponse(AvalancheResponse responseIn, const CKey &key) - : response(std::move(responseIn)) { - CHashWriter hasher(SER_GETHASH, 0); - hasher << response; - const uint256 hash = hasher.GetHash(); - - // Now let's sign! - std::vector vchSig; - if (key.SignSchnorr(hash, vchSig)) { - // Schnorr sigs are 64 bytes in size. - assert(vchSig.size() == 64); - std::copy(vchSig.begin(), vchSig.end(), sig.begin()); - } else { - sig.fill(0); + /** + * When using TCP, we need to sign all messages as the transport layer is + * not secure. + */ + class TCPAvalancheResponse { + AvalancheResponse response; + std::array sig; + + public: + TCPAvalancheResponse(AvalancheResponse responseIn, const CKey &key) + : response(std::move(responseIn)) { + CHashWriter hasher(SER_GETHASH, 0); + hasher << response; + const uint256 hash = hasher.GetHash(); + + // Now let's sign! + std::vector vchSig; + if (key.SignSchnorr(hash, vchSig)) { + // Schnorr sigs are 64 bytes in size. + assert(vchSig.size() == 64); + std::copy(vchSig.begin(), vchSig.end(), sig.begin()); + } else { + sig.fill(0); + } } - } - // serialization support - ADD_SERIALIZE_METHODS; + // serialization support + ADD_SERIALIZE_METHODS; - template - inline void SerializationOp(Stream &s, Operation ser_action) { - READWRITE(response); - READWRITE(sig); - } -}; + template + inline void SerializationOp(Stream &s, Operation ser_action) { + READWRITE(response); + READWRITE(sig); + } + }; } // namespace -void AvalancheProcessor::sendResponse(CNode *pfrom, - AvalancheResponse response) const { +void Processor::sendResponse(CNode *pfrom, AvalancheResponse response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetSendVersion()) .Make(NetMsgType::AVARESPONSE, TCPAvalancheResponse(std::move(response), sessionKey))); } -bool AvalancheProcessor::registerVotes( - NodeId nodeid, const AvalancheResponse &response, - std::vector &updates) { +bool Processor::registerVotes(NodeId nodeid, const AvalancheResponse &response, + std::vector &updates) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { // NB: The request may be old, so we don't increase banscore. return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { // TODO: increase banscore for inconsistent response. // NB: This isn't timeout but actually node misbehaving. return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { // TODO: increase banscore for inconsistent response. // NB: This isn't timeout but actually node misbehaving. return false; } } std::map responseIndex; { LOCK(cs_main); for (const auto &v : votes) { BlockMap::iterator mi = mapBlockIndex.find(BlockHash(v.GetHash())); if (mi == mapBlockIndex.end()) { // This should not happen, but just in case... continue; } CBlockIndex *pindex = mi->second; if (!IsWorthPolling(pindex)) { // There is no point polling this block. continue; } responseIndex.insert(std::make_pair(pindex, v)); } } { // Register votes. auto w = vote_records.getWriteView(); for (const auto &p : responseIndex) { CBlockIndex *pindex = p.first; const AvalancheVote &v = p.second; auto it = w->find(pindex); if (it == w.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back( - pindex, vr.isAccepted() - ? AvalancheBlockUpdate::Status::Accepted - : AvalancheBlockUpdate::Status::Rejected); + pindex, vr.isAccepted() ? BlockUpdate::Status::Accepted + : BlockUpdate::Status::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. - updates.emplace_back(pindex, - vr.isAccepted() - ? AvalancheBlockUpdate::Status::Finalized - : AvalancheBlockUpdate::Status::Invalid); + updates.emplace_back(pindex, vr.isAccepted() + ? BlockUpdate::Status::Finalized + : BlockUpdate::Status::Invalid); w->erase(it); } } return true; } -bool AvalancheProcessor::addPeer(NodeId nodeid, int64_t score, CPubKey pubkey) { +bool Processor::addPeer(NodeId nodeid, int64_t score, CPubKey pubkey) { LOCK(cs_peerManager); PeerId p = peerManager->addPeer(score); bool inserted = peerManager->addNodeToPeer(p, nodeid, std::move(pubkey)); if (!inserted) { peerManager->removePeer(p); } return inserted; } -bool AvalancheProcessor::forNode( - NodeId nodeid, std::function func) const { +bool Processor::forNode(NodeId nodeid, + std::function func) const { LOCK(cs_peerManager); return peerManager->forNode(nodeid, std::move(func)); } -bool AvalancheProcessor::startEventLoop(CScheduler &scheduler) { +bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } -bool AvalancheProcessor::stopEventLoop() { +bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } -std::vector AvalancheProcessor::getInvsForNextPoll(bool forPoll) { +std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; // First remove all blocks that are not worth polling. { LOCK(cs_main); auto w = vote_records.getWriteView(); for (auto it = w->begin(); it != w->end();) { const CBlockIndex *pindex = it->first; if (!IsWorthPolling(pindex)) { w->erase(it++); } else { ++it; } } } auto r = vote_records.getReadView(); for (const std::pair &p : reverse_iterate(r)) { // Check if we can run poll. const bool shouldPoll = forPoll ? p.second.registerPoll() : p.second.shouldPoll(); if (!shouldPoll) { continue; } // We don't have a decision, we need more votes. invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash()); if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return invs; } } return invs; } -NodeId AvalancheProcessor::getSuitableNodeToQuery() { +NodeId Processor::getSuitableNodeToQuery() { LOCK(cs_peerManager); return peerManager->getSuitableNodeToQuery(); } -void AvalancheProcessor::clearTimedoutRequests() { +void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; assert(inv.type == MSG_BLOCK); CBlockIndex *pindex; { LOCK(cs_main); BlockMap::iterator mi = mapBlockIndex.find(BlockHash(inv.hash)); if (mi == mapBlockIndex.end()) { continue; } pindex = mi->second; } auto w = vote_records.getWriteView(); auto it = w->find(pindex); if (it == w.end()) { continue; } it->second.clearInflightRequest(p.second); } } -void AvalancheProcessor::runEventLoop() { +void Processor::runEventLoop() { // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. LOCK(cs_peerManager); peerManager->updateNextRequestTime(pnode->GetId(), timeout); } // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetSendVersion()) .Make(NetMsgType::AVAPOLL, AvalanchePoll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } { // This node is obsolete, delete it. LOCK(cs_peerManager); peerManager->removeNode(nodeid); } // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); } while (nodeid != NO_NODE); } + +} // namespace avalanche diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h index 851d43805..36ed7b231 100644 --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -1,277 +1,284 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PROCESSOR_H #define BITCOIN_AVALANCHE_PROCESSOR_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class Config; class CBlockIndex; -class PeerManager; class CScheduler; /** * Is avalanche enabled by default. */ static constexpr bool AVALANCHE_DEFAULT_ENABLED = false; /** * Finalization score. */ static constexpr int AVALANCHE_FINALIZATION_SCORE = 128; /** * Maximum item that can be polled at once. */ static constexpr size_t AVALANCHE_MAX_ELEMENT_POLL = 16; /** * Avalanche default cooldown in milliseconds. */ static constexpr size_t AVALANCHE_DEFAULT_COOLDOWN = 100; /** * How long before we consider that a query timed out. */ static constexpr std::chrono::milliseconds AVALANCHE_DEFAULT_QUERY_TIMEOUT{ 10000}; /** * How many inflight requests can exist for one item. */ static constexpr int AVALANCHE_MAX_INFLIGHT_POLL = 10; +namespace avalanche { + +class PeerManager; + /** * Vote history. */ struct VoteRecord { private: // confidence's LSB bit is the result. Higher bits are actual confidence // score. uint16_t confidence = 0; // Historical record of votes. uint8_t votes = 0; // Each bit indicate if the vote is to be considered. uint8_t consider = 0; // How many in flight requests exists for this element. mutable std::atomic inflight{0}; // Seed for pseudorandom operations. const uint32_t seed = 0; // Track how many successful votes occured. uint32_t successfulVotes = 0; // Track the nodes which are part of the quorum. std::array nodeFilter{{0, 0, 0, 0, 0, 0, 0, 0}}; public: explicit VoteRecord(bool accepted) : confidence(accepted) {} /** * Copy semantic */ VoteRecord(const VoteRecord &other) : confidence(other.confidence), votes(other.votes), consider(other.consider), inflight(other.inflight.load()), successfulVotes(other.successfulVotes), nodeFilter(other.nodeFilter) { } /** * Vote accounting facilities. */ bool isAccepted() const { return confidence & 0x01; } uint16_t getConfidence() const { return confidence >> 1; } bool hasFinalized() const { return getConfidence() >= AVALANCHE_FINALIZATION_SCORE; } /** * Register a new vote for an item and update confidence accordingly. * Returns true if the acceptance or finalization state changed. */ bool registerVote(NodeId nodeid, uint32_t error); /** * Register that a request is being made regarding that item. * The method is made const so that it can be accessed via a read only view * of vote_records. It's not a problem as it is made thread safe. */ bool registerPoll() const; /** * Return if this item is in condition to be polled at the moment. */ bool shouldPoll() const { return inflight < AVALANCHE_MAX_INFLIGHT_POLL; } /** * Clear `count` inflight requests. */ void clearInflightRequest(uint8_t count = 1) { inflight -= count; } private: /** * Add the node to the quorum. * Returns true if the node was added, false if the node already was in the * quorum. */ bool addNodeToQuorum(NodeId nodeid); }; -class AvalancheBlockUpdate { +class BlockUpdate { union { CBlockIndex *pindex; uintptr_t raw; }; static const size_t STATUS_BITS = 2; static const uintptr_t MASK = (1 << STATUS_BITS) - 1; static_assert( alignof(CBlockIndex) >= (1 << STATUS_BITS), "CBlockIndex alignement doesn't allow for Status to be stored."); public: enum Status : uint8_t { Invalid, Rejected, Accepted, Finalized, }; - AvalancheBlockUpdate(CBlockIndex *pindexIn, Status statusIn) - : pindex(pindexIn) { + BlockUpdate(CBlockIndex *pindexIn, Status statusIn) : pindex(pindexIn) { raw |= statusIn; } Status getStatus() const { return Status(raw & MASK); } CBlockIndex *getBlockIndex() { return reinterpret_cast(raw & ~MASK); } const CBlockIndex *getBlockIndex() const { - return const_cast(this)->getBlockIndex(); + return const_cast(this)->getBlockIndex(); } }; using BlockVoteMap = std::map; struct query_timeout {}; -class AvalancheProcessor { +namespace { + struct AvalancheTest; +} + +class Processor { CConnman *connman; std::chrono::milliseconds queryTimeoutDuration; /** * Blocks to run avalanche on. */ RWCollection vote_records; /** * Keep track of peers and queries sent. */ std::atomic round; /** * Keep track of the peers and associated infos. */ mutable Mutex cs_peerManager; std::unique_ptr peerManager GUARDED_BY(cs_peerManager); struct Query { NodeId nodeid; uint64_t round; TimePoint timeout; /** * We declare this as mutable so it can be modified in the multi_index. * This is ok because we do not use this field to index in anyway. * * /!\ Do not use any mutable field as index. */ mutable std::vector invs; }; using QuerySet = boost::multi_index_container< Query, boost::multi_index::indexed_by< // index by nodeid/round boost::multi_index::hashed_unique, boost::multi_index::member>>, // sorted by timeout boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>>; RWCollection queries; /** The key used to sign responses. */ CKey sessionKey; /** Event loop machinery. */ EventLoop eventLoop; public: - explicit AvalancheProcessor(CConnman *connmanIn); - ~AvalancheProcessor(); + explicit Processor(CConnman *connmanIn); + ~Processor(); void setQueryTimeoutDuration(std::chrono::milliseconds d) { queryTimeoutDuration = d; } bool addBlockToReconcile(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; int getConfidence(const CBlockIndex *pindex) const; // TDOD: Refactor the API to remove the dependency on avalanche/protocol.h void sendResponse(CNode *pfrom, AvalancheResponse response) const; bool registerVotes(NodeId nodeid, const AvalancheResponse &response, - std::vector &updates); + std::vector &updates); bool addPeer(NodeId nodeid, int64_t score, CPubKey pubkey); - bool forNode(NodeId nodeid, - std::function func) const; + bool forNode(NodeId nodeid, std::function func) const; CPubKey getSessionPubKey() const { return sessionKey.GetPubKey(); } bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); private: void runEventLoop(); void clearTimedoutRequests(); std::vector getInvsForNextPoll(bool forPoll = true); NodeId getSuitableNodeToQuery(); - friend struct AvalancheTest; + friend struct ::avalanche::AvalancheTest; }; +} // namespace avalanche + /** * Global avalanche instance. */ -extern std::unique_ptr g_avalanche; +extern std::unique_ptr g_avalanche; #endif // BITCOIN_AVALANCHE_PROCESSOR_H diff --git a/src/avalanche/test/peermanager_tests.cpp b/src/avalanche/test/peermanager_tests.cpp index 06aab00b8..5b8dc25c1 100644 --- a/src/avalanche/test/peermanager_tests.cpp +++ b/src/avalanche/test/peermanager_tests.cpp @@ -1,417 +1,419 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include +using namespace avalanche; + BOOST_FIXTURE_TEST_SUITE(peermanager_tests, BasicTestingSetup) BOOST_AUTO_TEST_CASE(select_peer_linear) { // No peers. BOOST_CHECK_EQUAL(selectPeerImpl({}, 0, 0), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl({}, 1, 3), NO_PEER); // One peer const std::vector oneslot = {{100, 100, 23}}; // Undershoot BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 0, 300), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 42, 300), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 99, 300), NO_PEER); // Nailed it BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 100, 300), 23); BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 142, 300), 23); BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 199, 300), 23); // Overshoot BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 200, 300), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 242, 300), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(oneslot, 299, 300), NO_PEER); // Two peers const std::vector twoslots = {{100, 100, 69}, {300, 100, 42}}; // Undershoot BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 0, 500), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 42, 500), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 99, 500), NO_PEER); // First entry BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 100, 500), 69); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 142, 500), 69); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 199, 500), 69); // In betwenn BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 200, 500), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 242, 500), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 299, 500), NO_PEER); // Second entry BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 300, 500), 42); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 342, 500), 42); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 399, 500), 42); // Overshoot BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 400, 500), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 442, 500), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(twoslots, 499, 500), NO_PEER); } BOOST_AUTO_TEST_CASE(select_peer_dichotomic) { std::vector slots; // 100 peers of size 1 with 1 empty element apart. uint64_t max = 1; for (int i = 0; i < 100; i++) { slots.emplace_back(max, 1, i); max += 2; } BOOST_CHECK_EQUAL(selectPeerImpl(slots, 4, max), NO_PEER); // Check that we get what we expect. for (int i = 0; i < 100; i++) { BOOST_CHECK_EQUAL(selectPeerImpl(slots, 2 * i, max), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 2 * i + 1, max), i); } BOOST_CHECK_EQUAL(selectPeerImpl(slots, max, max), NO_PEER); // Update the slots to be heavily skewed toward the last element. slots[99] = slots[99].withScore(101); max = slots[99].getStop(); BOOST_CHECK_EQUAL(max, 300); for (int i = 0; i < 100; i++) { BOOST_CHECK_EQUAL(selectPeerImpl(slots, 2 * i, max), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 2 * i + 1, max), i); } BOOST_CHECK_EQUAL(selectPeerImpl(slots, 200, max), 99); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 256, max), 99); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 299, max), 99); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 300, max), NO_PEER); // Update the slots to be heavily skewed toward the first element. for (int i = 0; i < 100; i++) { slots[i] = slots[i].withStart(slots[i].getStart() + 100); } slots[0] = Slot(1, slots[0].getStop() - 1, slots[0].getPeerId()); slots[99] = slots[99].withScore(1); max = slots[99].getStop(); BOOST_CHECK_EQUAL(max, 300); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 0, max), NO_PEER); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 1, max), 0); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 42, max), 0); for (int i = 0; i < 100; i++) { BOOST_CHECK_EQUAL(selectPeerImpl(slots, 100 + 2 * i + 1, max), i); BOOST_CHECK_EQUAL(selectPeerImpl(slots, 100 + 2 * i + 2, max), NO_PEER); } } BOOST_AUTO_TEST_CASE(select_peer_random) { for (int c = 0; c < 1000; c++) { size_t size = InsecureRandBits(10) + 1; std::vector slots; slots.reserve(size); uint64_t max = InsecureRandBits(3); auto next = [&]() { uint64_t r = max; max += InsecureRandBits(3); return r; }; for (size_t i = 0; i < size; i++) { const uint64_t start = next(); const uint32_t score = InsecureRandBits(3); max += score; slots.emplace_back(start, score, i); } for (int k = 0; k < 100; k++) { uint64_t s = InsecureRandRange(max); auto i = selectPeerImpl(slots, s, max); // /!\ Because of the way we construct the vector, the peer id is // always the index. This might not be the case in practice. BOOST_CHECK(i == NO_PEER || slots[i].contains(s)); } } } BOOST_AUTO_TEST_CASE(add_peer) { // No peers. PeerManager pm; BOOST_CHECK_EQUAL(pm.selectPeer(), NO_PEER); // One peer, we always return it. PeerId peer0 = pm.addPeer(100); BOOST_CHECK_EQUAL(pm.selectPeer(), peer0); // Two peers, verify ratio. PeerId peer1 = pm.addPeer(200); std::unordered_map results = {}; for (int i = 0; i < 10000; i++) { size_t p = pm.selectPeer(); BOOST_CHECK(p == peer0 || p == peer1); results[p]++; } BOOST_CHECK(abs(2 * results[0] - results[1]) < 500); // Three peers, verify ratio. PeerId peer2 = pm.addPeer(100); results.clear(); for (int i = 0; i < 10000; i++) { size_t p = pm.selectPeer(); BOOST_CHECK(p == peer0 || p == peer1 || p == peer2); results[p]++; } BOOST_CHECK(abs(results[0] - results[1] + results[2]) < 500); } BOOST_AUTO_TEST_CASE(remove_peer) { // No peers. PeerManager pm; BOOST_CHECK_EQUAL(pm.selectPeer(), NO_PEER); // Add 4 peers. std::array peerids; for (int i = 0; i < 4; i++) { peerids[i] = pm.addPeer(100); } BOOST_CHECK_EQUAL(pm.getSlotCount(), 400); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); for (int i = 0; i < 100; i++) { PeerId p = pm.selectPeer(); BOOST_CHECK(p == peerids[0] || p == peerids[1] || p == peerids[2] || p == peerids[3]); } // Remove one peer, it nevers show up now. BOOST_CHECK(pm.removePeer(peerids[2])); BOOST_CHECK_EQUAL(pm.getSlotCount(), 400); BOOST_CHECK_EQUAL(pm.getFragmentation(), 100); // Make sure we compact to never get NO_PEER. BOOST_CHECK_EQUAL(pm.compact(), 100); BOOST_CHECK(pm.verify()); BOOST_CHECK_EQUAL(pm.getSlotCount(), 300); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); for (int i = 0; i < 100; i++) { PeerId p = pm.selectPeer(); BOOST_CHECK(p == peerids[0] || p == peerids[1] || p == peerids[3]); } // Add 4 more peers. for (int i = 0; i < 4; i++) { peerids[i + 4] = pm.addPeer(100); } BOOST_CHECK_EQUAL(pm.getSlotCount(), 700); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); BOOST_CHECK(pm.removePeer(peerids[0])); BOOST_CHECK_EQUAL(pm.getSlotCount(), 700); BOOST_CHECK_EQUAL(pm.getFragmentation(), 100); // Removing the last entry do not increase fragmentation. BOOST_CHECK(pm.removePeer(peerids[7])); BOOST_CHECK_EQUAL(pm.getSlotCount(), 600); BOOST_CHECK_EQUAL(pm.getFragmentation(), 100); // Make sure we compact to never get NO_PEER. BOOST_CHECK_EQUAL(pm.compact(), 100); BOOST_CHECK(pm.verify()); BOOST_CHECK_EQUAL(pm.getSlotCount(), 500); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); for (int i = 0; i < 100; i++) { PeerId p = pm.selectPeer(); BOOST_CHECK(p == peerids[1] || p == peerids[3] || p == peerids[4] || p == peerids[5] || p == peerids[6]); } // Removing non existent peers fails. BOOST_CHECK(!pm.removePeer(peerids[0])); BOOST_CHECK(!pm.removePeer(peerids[2])); BOOST_CHECK(!pm.removePeer(peerids[7])); BOOST_CHECK(!pm.removePeer(NO_PEER)); } BOOST_AUTO_TEST_CASE(rescore_peer, *boost::unit_test::timeout(5)) { // No peers. PeerManager pm; BOOST_CHECK_EQUAL(pm.selectPeer(), NO_PEER); // Add 4 peers. std::array peerids; for (int i = 0; i < 4; i++) { peerids[i] = pm.addPeer(100); } BOOST_CHECK_EQUAL(pm.getSlotCount(), 400); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); for (int i = 0; i < 100; i++) { PeerId p = pm.selectPeer(); BOOST_CHECK(p == peerids[0] || p == peerids[1] || p == peerids[2] || p == peerids[3]); } // Set one peer's score to 0, it nevers show up now. BOOST_CHECK(pm.rescorePeer(peerids[1], 0)); BOOST_CHECK_EQUAL(pm.getSlotCount(), 400); BOOST_CHECK_EQUAL(pm.getFragmentation(), 100); for (int i = 0; i < 100; i++) { PeerId p = pm.selectPeer(); BOOST_CHECK(p == peerids[0] || p == peerids[2] || p == peerids[3] || p == NO_PEER); } // "resurrect" the peer. BOOST_CHECK(pm.rescorePeer(peerids[1], 100)); BOOST_CHECK_EQUAL(pm.getSlotCount(), 400); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); while (true) { PeerId p = pm.selectPeer(); BOOST_CHECK(p == peerids[0] || p == peerids[1] || p == peerids[2] || p == peerids[3]); // Make sure peer 1 reappeared. if (p == peerids[1]) { break; } } // Grow the peer to a point where it needs to be reallocated. BOOST_CHECK(pm.rescorePeer(peerids[1], 200)); BOOST_CHECK_EQUAL(pm.getSlotCount(), 600); BOOST_CHECK_EQUAL(pm.getFragmentation(), 100); for (int i = 0; i < 25; i++) { while (true) { PeerId p = pm.selectPeer(); BOOST_CHECK(p == peerids[0] || p == peerids[1] || p == peerids[2] || p == peerids[3] || p == NO_PEER); // Make sure peer 1 reappeared. if (p == peerids[1]) { break; } } } // Compact the peer manager. BOOST_CHECK_EQUAL(pm.compact(), 100); BOOST_CHECK(pm.verify()); BOOST_CHECK_EQUAL(pm.getSlotCount(), 500); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); } BOOST_AUTO_TEST_CASE(compact_slots) { PeerManager pm; // Add 4 peers. std::array peerids; for (int i = 0; i < 4; i++) { peerids[i] = pm.addPeer(100); } // Remove all peers. for (auto p : peerids) { pm.removePeer(p); } BOOST_CHECK_EQUAL(pm.getSlotCount(), 300); BOOST_CHECK_EQUAL(pm.getFragmentation(), 300); for (int i = 0; i < 100; i++) { BOOST_CHECK_EQUAL(pm.selectPeer(), NO_PEER); } BOOST_CHECK_EQUAL(pm.compact(), 300); BOOST_CHECK(pm.verify()); BOOST_CHECK_EQUAL(pm.getSlotCount(), 0); BOOST_CHECK_EQUAL(pm.getFragmentation(), 0); } BOOST_AUTO_TEST_CASE(node_crud) { PeerManager pm; // Create one peer. PeerId peerid = pm.addPeer(100); BOOST_CHECK_EQUAL(pm.getSuitableNodeToQuery(), NO_NODE); // Add 4 nodes. for (int i = 0; i < 4; i++) { BOOST_CHECK(pm.addNodeToPeer(peerid, i, CPubKey())); } for (int i = 0; i < 100; i++) { NodeId n = pm.getSuitableNodeToQuery(); BOOST_CHECK(n >= 0 && n < 4); BOOST_CHECK( pm.updateNextRequestTime(n, std::chrono::steady_clock::now())); } // Remove a node, check that it doesn't show up. BOOST_CHECK(pm.removeNode(2)); for (int i = 0; i < 100; i++) { NodeId n = pm.getSuitableNodeToQuery(); BOOST_CHECK(n == 0 || n == 1 || n == 3); BOOST_CHECK( pm.updateNextRequestTime(n, std::chrono::steady_clock::now())); } // Push a node's timeout in the future, so that it doesn't show up. BOOST_CHECK(pm.updateNextRequestTime(1, std::chrono::steady_clock::now() + std::chrono::hours(24))); for (int i = 0; i < 100; i++) { NodeId n = pm.getSuitableNodeToQuery(); BOOST_CHECK(n == 0 || n == 3); BOOST_CHECK( pm.updateNextRequestTime(n, std::chrono::steady_clock::now())); } // Move a node from a peer to another. PeerId altpeer = pm.addPeer(0); BOOST_CHECK(pm.addNodeToPeer(altpeer, 3, CPubKey())); for (int i = 0; i < 100; i++) { NodeId n = pm.getSuitableNodeToQuery(); BOOST_CHECK(n == 0); BOOST_CHECK( pm.updateNextRequestTime(n, std::chrono::steady_clock::now())); } // Rescore peers and cheks node selection is affected as expected. BOOST_CHECK(pm.rescorePeer(peerid, 0)); BOOST_CHECK(pm.rescorePeer(altpeer, 100)); BOOST_CHECK_EQUAL(pm.compact(), 100); for (int i = 0; i < 100; i++) { NodeId n = pm.getSuitableNodeToQuery(); BOOST_CHECK(n == 3); BOOST_CHECK( pm.updateNextRequestTime(n, std::chrono::steady_clock::now())); } } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp index 15d2300a5..e261c1490 100644 --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -1,975 +1,976 @@ // Copyright (c) 2018-2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include // For PeerLogicValidation #include #include #include -struct AvalancheTest { - static void runEventLoop(AvalancheProcessor &p) { p.runEventLoop(); } +using namespace avalanche; - static std::vector getInvsForNextPoll(AvalancheProcessor &p) { - return p.getInvsForNextPoll(false); - } +namespace avalanche { +namespace { + struct AvalancheTest { + static void runEventLoop(avalanche::Processor &p) { p.runEventLoop(); } - static NodeId getSuitableNodeToQuery(AvalancheProcessor &p) { - return p.getSuitableNodeToQuery(); - } + static std::vector getInvsForNextPoll(Processor &p) { + return p.getInvsForNextPoll(false); + } - static PeerManager &getPeerManager(AvalancheProcessor &p) { - LOCK(p.cs_peerManager); - return *p.peerManager; - } + static NodeId getSuitableNodeToQuery(Processor &p) { + return p.getSuitableNodeToQuery(); + } - static uint64_t getRound(const AvalancheProcessor &p) { return p.round; } -}; + static PeerManager &getPeerManager(Processor &p) { + LOCK(p.cs_peerManager); + return *p.peerManager; + } + + static uint64_t getRound(const Processor &p) { return p.round; } + }; +} // namespace +} // namespace avalanche struct CConnmanTest : public CConnman { using CConnman::CConnman; void AddNode(CNode &node) { LOCK(cs_vNodes); vNodes.push_back(&node); } void ClearNodes() { LOCK(cs_vNodes); for (CNode *node : vNodes) { delete node; } vNodes.clear(); } }; BOOST_FIXTURE_TEST_SUITE(processor_tests, TestChain100Setup) #define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \ vr.registerVote(NO_NODE, vote); \ BOOST_CHECK_EQUAL(vr.isAccepted(), state); \ BOOST_CHECK_EQUAL(vr.hasFinalized(), finalized); \ BOOST_CHECK_EQUAL(vr.getConfidence(), confidence); BOOST_AUTO_TEST_CASE(vote_record) { VoteRecord vraccepted(true); // Check initial state. BOOST_CHECK_EQUAL(vraccepted.isAccepted(), true); BOOST_CHECK_EQUAL(vraccepted.hasFinalized(), false); BOOST_CHECK_EQUAL(vraccepted.getConfidence(), 0); VoteRecord vr(false); // Check initial state. BOOST_CHECK_EQUAL(vr.isAccepted(), false); BOOST_CHECK_EQUAL(vr.hasFinalized(), false); BOOST_CHECK_EQUAL(vr.getConfidence(), 0); // We need to register 6 positive votes before we start counting. for (int i = 0; i < 6; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, false, false, 0); } // Next vote will flip state, and confidence will increase as long as we // vote yes. REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 7); } // Now confidence will increase as long as we vote yes. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); // Now that we have two no votes, confidence stop increasing. for (int i = 0; i < 5; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); } // Next vote will flip state, and confidence will increase as long as we // vote no. REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 7); } // Now confidence will increase as long as we vote no. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 0, false, true, AVALANCHE_FINALIZATION_SCORE); // Check that inflight accounting work as expected. VoteRecord vrinflight(false); for (int i = 0; i < 2 * AVALANCHE_MAX_INFLIGHT_POLL; i++) { bool shouldPoll = vrinflight.shouldPoll(); BOOST_CHECK_EQUAL(shouldPoll, i < AVALANCHE_MAX_INFLIGHT_POLL); BOOST_CHECK_EQUAL(vrinflight.registerPoll(), shouldPoll); } // Clear various number of inflight requests and check everything behaves as // expected. for (int i = 1; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { vrinflight.clearInflightRequest(i); BOOST_CHECK(vrinflight.shouldPoll()); for (int j = 1; j < i; j++) { BOOST_CHECK(vrinflight.registerPoll()); BOOST_CHECK(vrinflight.shouldPoll()); } BOOST_CHECK(vrinflight.registerPoll()); BOOST_CHECK(!vrinflight.shouldPoll()); } } BOOST_AUTO_TEST_CASE(block_update) { CBlockIndex index; CBlockIndex *pindex = &index; - std::set status{ - AvalancheBlockUpdate::Status::Invalid, - AvalancheBlockUpdate::Status::Rejected, - AvalancheBlockUpdate::Status::Accepted, - AvalancheBlockUpdate::Status::Finalized, + std::set status{ + BlockUpdate::Status::Invalid, + BlockUpdate::Status::Rejected, + BlockUpdate::Status::Accepted, + BlockUpdate::Status::Finalized, }; for (auto s : status) { - AvalancheBlockUpdate abu(pindex, s); + BlockUpdate abu(pindex, s); BOOST_CHECK(abu.getBlockIndex() == pindex); BOOST_CHECK_EQUAL(abu.getStatus(), s); } } CService ip(uint32_t i) { struct in_addr s; s.s_addr = i; return CService(CNetAddr(s), Params().GetDefaultPort()); } CNode *ConnectNode(const Config &config, ServiceFlags nServices, PeerLogicValidation &peerLogic, CConnmanTest *connman) { static NodeId id = 0; CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); auto node = new CNode(id++, ServiceFlags(NODE_NETWORK), 0, INVALID_SOCKET, addr, 0, 0, CAddress(), "", /*fInboundIn=*/false); node->SetSendVersion(PROTOCOL_VERSION); node->nServices = nServices; peerLogic.InitializeNode(config, node); node->nVersion = 1; node->fSuccessfullyConnected = true; connman->AddNode(*node); return node; } -std::array ConnectNodes(const Config &config, AvalancheProcessor &p, +std::array ConnectNodes(const Config &config, Processor &p, ServiceFlags nServices, PeerLogicValidation &peerLogic, CConnmanTest *connman) { PeerManager &pm = AvalancheTest::getPeerManager(p); PeerId pid = pm.addPeer(100); std::array nodes; for (CNode *&n : nodes) { n = ConnectNode(config, nServices, peerLogic, connman); BOOST_CHECK(pm.addNodeToPeer(pid, n->GetId(), CPubKey())); } return nodes; } static AvalancheResponse next(AvalancheResponse &r) { auto copy = r; r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()}; return copy; } BOOST_AUTO_TEST_CASE(block_register) { const Config &config = GetConfig(); auto connman = std::make_unique(config, 0x1337, 0x1337); auto peerLogic = std::make_unique( connman.get(), nullptr, *m_node.scheduler, false); - AvalancheProcessor p(connman.get()); - std::vector updates; + Processor p(connman.get()); + std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(config, p, NODE_AVALANCHE, *peerLogic, connman.get()); // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(p.addBlockToReconcile(pindex)); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Newly added blocks' state reflect the blockchain. BOOST_CHECK(p.isAccepted(pindex)); int nextNodeIndex = 0; auto registerNewVote = [&](const AvalancheResponse &resp) { AvalancheTest::runEventLoop(p); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); }; // Let's vote for this block a few times. AvalancheResponse resp{0, 0, {AvalancheVote(0, blockHash)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // A single neutral vote do not change anything. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(-1, blockHash)}}; registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; for (int i = 1; i < 7; i++) { registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // Two neutral votes will stall progress. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(-1, blockHash)}}; registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; for (int i = 2; i < 8; i++) { registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); } // We vote for it numerous times to finalize it. for (int i = 7; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); - BOOST_CHECK_EQUAL(updates[0].getStatus(), - AvalancheBlockUpdate::Status::Finalized); + BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // Once the decision is finalized, there is no poll for it. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); // Now let's undo this and finalize rejection. BOOST_CHECK(p.addBlockToReconcile(pindex)); invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(1, blockHash)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. registerNewVote(next(resp)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); - BOOST_CHECK_EQUAL(updates[0].getStatus(), - AvalancheBlockUpdate::Status::Rejected); + BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Rejected); updates = {}; // Now it is rejected, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); - BOOST_CHECK_EQUAL(updates[0].getStatus(), - AvalancheBlockUpdate::Status::Invalid); + BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Invalid); updates = {}; // Once the decision is finalized, there is no poll for it. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); // Adding the block twice does nothing. BOOST_CHECK(p.addBlockToReconcile(pindex)); BOOST_CHECK(!p.addBlockToReconcile(pindex)); BOOST_CHECK(p.isAccepted(pindex)); connman->ClearNodes(); } BOOST_AUTO_TEST_CASE(multi_block_register) { const Config &config = GetConfig(); auto connman = std::make_unique(config, 0x1337, 0x1337); auto peerLogic = std::make_unique( connman.get(), nullptr, *m_node.scheduler, false); - AvalancheProcessor p(connman.get()); + Processor p(connman.get()); CBlockIndex indexA, indexB; - std::vector updates; + std::vector updates; // Create several nodes that support avalanche. auto avanodes = ConnectNodes(config, p, NODE_AVALANCHE, *peerLogic, connman.get()); // Make sure the block has a hash. CBlock blockA = CreateAndProcessBlock({}, CScript()); const BlockHash blockHashA = blockA.GetHash(); CBlock blockB = CreateAndProcessBlock({}, CScript()); const BlockHash blockHashB = blockB.GetHash(); const CBlockIndex *pindexA; const CBlockIndex *pindexB; { LOCK(cs_main); pindexA = LookupBlockIndex(blockHashA); pindexB = LookupBlockIndex(blockHashB); } // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindexA)); BOOST_CHECK(!p.isAccepted(pindexB)); // Start voting on block A. BOOST_CHECK(p.addBlockToReconcile(pindexA)); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashA); uint64_t round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodes[0]->GetId(), {round, 0, {AvalancheVote(0, blockHashA)}}, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on block B after one vote. AvalancheResponse resp{ round + 1, 0, {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}}; BOOST_CHECK(p.addBlockToReconcile(pindexB)); invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 2); // Ensure B comes before A because it has accumulated more PoW. BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); BOOST_CHECK_EQUAL(invs[1].type, MSG_BLOCK); BOOST_CHECK(invs[1].hash == blockHashA); // Let's vote for these blocks a few times. for (int i = 0; i < 4; i++) { NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggered on A // and B. NodeId firstNodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); NodeId secondNodeid = AvalancheTest::getSuitableNodeToQuery(p); AvalancheTest::runEventLoop(p); BOOST_CHECK(firstNodeid != secondNodeid); // Next vote will finalize block A. BOOST_CHECK(p.registerVotes(firstNodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexA); - BOOST_CHECK_EQUAL(updates[0].getStatus(), - AvalancheBlockUpdate::Status::Finalized); + BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // We do not vote on A anymore. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); // Next vote will finalize block B. BOOST_CHECK(p.registerVotes(secondNodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexB); - BOOST_CHECK_EQUAL(updates[0].getStatus(), - AvalancheBlockUpdate::Status::Finalized); + BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // There is nothing left to vote on. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); connman->ClearNodes(); } BOOST_AUTO_TEST_CASE(poll_and_response) { const Config &config = GetConfig(); auto connman = std::make_unique(config, 0x1337, 0x1337); auto peerLogic = std::make_unique( connman.get(), nullptr, *m_node.scheduler, false); - AvalancheProcessor p(connman.get()); + Processor p(connman.get()); - std::vector updates; + std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // There is no node to query. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), NO_NODE); // Create a node that supports avalanche and one that doesn't. ConnectNode(config, NODE_NONE, *peerLogic, connman.get()); auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(p.addPeer(avanodeid, 100, CPubKey())); // It returns the avalanche peer. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Register a block and check it is added to the list of elements to poll. BOOST_CHECK(p.addBlockToReconcile(pindex)); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Trigger a poll on avanode. uint64_t round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); // There is no more suitable peer available, so return nothing. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), NO_NODE); // Respond to the request. AvalancheResponse resp = {round, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Now that avanode fullfilled his request, it is added back to the list of // queriable nodes. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Sending a response when not polled fails. BOOST_CHECK(!p.registerVotes(avanodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Trigger a poll on avanode. round = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), NO_NODE); // Sending responses that do not match the request also fails. // 1. Too many results. resp = { round, 0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 2. Not enough results. resp = {AvalancheTest::getRound(p), 0, {}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 3. Do not match the poll. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote()}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // 4. Invalid round count. Request is not discarded. uint64_t queryRound = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); resp = {queryRound + 1, 0, {AvalancheVote()}}; BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {queryRound - 1, 0, {AvalancheVote()}}; BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // 5. Making request for invalid nodes do not work. Request is not // discarded. resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(!p.registerVotes(avanodeid + 1234, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Proper response gets processed and avanode is available again. resp = {queryRound, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // Out of order response are rejected. CBlock block2 = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash2 = block2.GetHash(); CBlockIndex *pindex2; { LOCK(cs_main); pindex2 = LookupBlockIndex(blockHash2); } BOOST_CHECK(p.addBlockToReconcile(pindex2)); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // But they are accepted in order. resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash2), AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); // When a block is marked invalid, stop polling. pindex2->nStatus = pindex2->nStatus.withFailed(); resp = {AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); connman->ClearNodes(); } BOOST_AUTO_TEST_CASE(poll_inflight_timeout, *boost::unit_test::timeout(60)) { const Config &config = GetConfig(); auto connman = std::make_unique(config, 0x1337, 0x1337); auto peerLogic = std::make_unique( connman.get(), nullptr, *m_node.scheduler, false); - AvalancheProcessor p(connman.get()); + Processor p(connman.get()); - std::vector updates; + std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Add the block BOOST_CHECK(p.addBlockToReconcile(pindex)); // Create a node that supports avalanche. auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(p.addPeer(avanodeid, 100, CPubKey())); // Expire requests after some time. auto queryTimeDuration = std::chrono::milliseconds(10); p.setQueryTimeoutDuration(queryTimeDuration); for (int i = 0; i < 10; i++) { AvalancheResponse resp = { AvalancheTest::getRound(p), 0, {AvalancheVote(0, blockHash)}}; auto start = std::chrono::steady_clock::now(); AvalancheTest::runEventLoop(p); // We cannot guarantee that we'll wait for just 1ms, so we have to bail // if we aren't within the proper time range. std::this_thread::sleep_for(std::chrono::milliseconds(1)); AvalancheTest::runEventLoop(p); bool ret = p.registerVotes(avanodeid, next(resp), updates); if (std::chrono::steady_clock::now() > start + queryTimeDuration) { // We waited for too long, bail. Because we can't know for sure when // previous steps ran, ret is not deterministic and we do not check // it. i--; continue; } // We are within time bounds, so the vote should have worked. BOOST_CHECK(ret); // Now try again but wait for expiration. AvalancheTest::runEventLoop(p); std::this_thread::sleep_for(queryTimeDuration); AvalancheTest::runEventLoop(p); BOOST_CHECK(!p.registerVotes(avanodeid, next(resp), updates)); } connman->ClearNodes(); } BOOST_AUTO_TEST_CASE(poll_inflight_count) { const Config &config = GetConfig(); auto connman = std::make_unique(config, 0x1337, 0x1337); auto peerLogic = std::make_unique( connman.get(), nullptr, *m_node.scheduler, false); - AvalancheProcessor p(connman.get()); + Processor p(connman.get()); // Create enough nodes so that we run into the inflight request limit. PeerManager &pm = AvalancheTest::getPeerManager(p); PeerId pid = pm.addPeer(100); std::array nodes; for (auto &n : nodes) { n = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); BOOST_CHECK(pm.addNodeToPeer(pid, n->GetId(), CPubKey())); } // Add a block to poll CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } BOOST_CHECK(p.addBlockToReconcile(pindex)); // Ensure there are enough requests in flight. std::map node_round_map; for (int i = 0; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map[nodeid] = AvalancheTest::getRound(p); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); AvalancheTest::runEventLoop(p); } // Now that we have enough in flight requests, we shouldn't poll. auto suitablenodeid = AvalancheTest::getSuitableNodeToQuery(p); BOOST_CHECK(suitablenodeid != NO_NODE); auto invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); AvalancheTest::runEventLoop(p); BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), suitablenodeid); - std::vector updates; + std::vector updates; // Send one response, now we can poll again. auto it = node_round_map.begin(); AvalancheResponse resp = {it->second, 0, {AvalancheVote(0, blockHash)}}; BOOST_CHECK(p.registerVotes(it->first, resp, updates)); node_round_map.erase(it); invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); connman->ClearNodes(); } BOOST_AUTO_TEST_CASE(quorum_diversity) { const Config &config = GetConfig(); auto connman = std::make_unique(config, 0x1337, 0x1337); auto peerLogic = std::make_unique( connman.get(), nullptr, *m_node.scheduler, false); - AvalancheProcessor p(connman.get()); - std::vector updates; + Processor p(connman.get()); + std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(config, p, NODE_AVALANCHE, *peerLogic, connman.get()); // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(p.addBlockToReconcile(pindex)); // Do one valid round of voting. uint64_t round = AvalancheTest::getRound(p); AvalancheResponse resp{round, 0, {AvalancheVote(0, blockHash)}}; // Check that all nodes can vote. for (size_t i = 0; i < avanodes.size(); i++) { AvalancheTest::runEventLoop(p); BOOST_CHECK(p.registerVotes(avanodes[i]->GetId(), next(resp), updates)); } // Generate a query for every single node. const NodeId firstNodeId = AvalancheTest::getSuitableNodeToQuery(p); std::map node_round_map; round = AvalancheTest::getRound(p); for (size_t i = 0; i < avanodes.size(); i++) { NodeId nodeid = AvalancheTest::getSuitableNodeToQuery(p); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map[nodeid] = AvalancheTest::getRound(p); AvalancheTest::runEventLoop(p); } // Now only tge first node can vote. All others would be duplicate in the // quorum. auto confidence = p.getConfidence(pindex); BOOST_REQUIRE(confidence > 0); for (auto &pair : node_round_map) { NodeId nodeid = pair.first; uint64_t r = pair.second; if (nodeid == firstNodeId) { // Node 0 is the only one which can vote at this stage. round = r; continue; } BOOST_CHECK(p.registerVotes( nodeid, {r, 0, {AvalancheVote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), confidence); } BOOST_CHECK(p.registerVotes( firstNodeId, {round, 0, {AvalancheVote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(p.getConfidence(pindex), confidence + 1); connman->ClearNodes(); } BOOST_AUTO_TEST_CASE(event_loop) { const Config &config = GetConfig(); auto connman = std::make_unique(config, 0x1337, 0x1337); auto peerLogic = std::make_unique( connman.get(), nullptr, *m_node.scheduler, false); - AvalancheProcessor p(connman.get()); + Processor p(connman.get()); CScheduler s; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Starting the event loop. BOOST_CHECK(p.startEventLoop(s)); // There is one task planned in the next hour (our event loop). std::chrono::system_clock::time_point start, stop; BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. BOOST_CHECK(!p.startEventLoop(s)); // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); // Create a node that supports avalanche. auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic, connman.get()); NodeId nodeid = avanode->GetId(); BOOST_CHECK(p.addPeer(nodeid, 100, CPubKey())); // There is no query in flight at the moment. BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid); // Add a new block. Check it is added to the polls. uint64_t queryRound = AvalancheTest::getRound(p); BOOST_CHECK(p.addBlockToReconcile(pindex)); for (int i = 0; i < 60 * 1000; i++) { // Technically, this is a race condition, but this should do just fine // as we wait up to 1 minute for an event that should take 10ms. UninterruptibleSleep(std::chrono::milliseconds(1)); if (AvalancheTest::getRound(p) != queryRound) { break; } } // Check that we effectively got a request and not timed out. BOOST_CHECK(AvalancheTest::getRound(p) > queryRound); // Respond and check the cooldown time is respected. uint64_t responseRound = AvalancheTest::getRound(p); auto queryTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); - std::vector updates; + std::vector updates; p.registerVotes(nodeid, {queryRound, 100, {AvalancheVote(0, blockHash)}}, updates); for (int i = 0; i < 10000; i++) { // We make sure that we do not get a request before queryTime. UninterruptibleSleep(std::chrono::milliseconds(1)); if (AvalancheTest::getRound(p) != responseRound) { BOOST_CHECK(std::chrono::steady_clock::now() > queryTime); break; } } // But we eventually get one. BOOST_CHECK(AvalancheTest::getRound(p) > responseRound); // Stop event loop. BOOST_CHECK(p.stopEventLoop()); // We don't have any task scheduled anymore. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Can't stop the event loop twice. BOOST_CHECK(!p.stopEventLoop()); // Wait for the scheduler to stop. s.stop(true); schedulerThread.join(); connman->ClearNodes(); } BOOST_AUTO_TEST_CASE(destructor) { CScheduler s; std::chrono::system_clock::time_point start, stop; // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); { - AvalancheProcessor p(m_node.connman.get()); + Processor p(m_node.connman.get()); BOOST_CHECK(p.startEventLoop(s)); BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); } // Now that avalanche is destroyed, there is no more scheduled tasks. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Wait for the scheduler to stop. s.stop(true); schedulerThread.join(); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/init.cpp b/src/init.cpp index 14f90859f..9dbe8c7ed 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1,2797 +1,2797 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #if defined(HAVE_CONFIG_H) #include #endif #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include