diff --git a/src/avalanche/peermanager.cpp b/src/avalanche/peermanager.cpp index d4e40c688..645b69411 100644 --- a/src/avalanche/peermanager.cpp +++ b/src/avalanche/peermanager.cpp @@ -1,459 +1,465 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include // For ChainstateActive() #include namespace avalanche { bool PeerManager::addNode(NodeId nodeid, const std::shared_ptr &proof, const Delegation &delegation) { auto it = fetchOrCreatePeer(proof); if (it == peers.end()) { return false; } const PeerId peerid = it->peerid; DelegationState state; CPubKey pubkey; if (!delegation.verify(state, pubkey)) { return false; } auto nit = nodes.find(nodeid); if (nit == nodes.end()) { if (!nodes.emplace(nodeid, peerid, std::move(pubkey)).second) { return false; } } else { const PeerId oldpeerid = nit->peerid; if (!nodes.modify(nit, [&](Node &n) { n.peerid = peerid; n.pubkey = std::move(pubkey); })) { return false; } // We actually have this node already, we need to update it. bool success = removeNodeFromPeer(peers.find(oldpeerid)); assert(success); // Make sure it is not invalidated. it = peers.find(peerid); } bool success = addNodeToPeer(it); assert(success); return true; } bool PeerManager::addNodeToPeer(const PeerSet::iterator &it) { assert(it != peers.end()); return peers.modify(it, [&](Peer &p) { if (p.node_count++ > 0) { // We are done. return; } // We ned to allocate this peer. p.index = uint32_t(slots.size()); const uint32_t score = p.getScore(); const uint64_t start = slotCount; slots.emplace_back(start, score, it->peerid); slotCount = start + score; }); } bool PeerManager::removeNode(NodeId nodeid) { auto it = nodes.find(nodeid); if (it == nodes.end()) { return false; } const PeerId peerid = it->peerid; nodes.erase(it); // Keep the track of the reference count. bool success = removeNodeFromPeer(peers.find(peerid)); assert(success); return true; } bool PeerManager::removeNodeFromPeer(const PeerSet::iterator &it, uint32_t count) { assert(it != peers.end()); assert(count <= it->node_count); if (count == 0) { // This is a NOOP. return false; } const uint32_t new_count = it->node_count - count; if (!peers.modify(it, [&](Peer &p) { p.node_count = new_count; })) { return false; } if (new_count > 0) { // We are done. return true; } // There are no more node left, we need to cleanup. const size_t i = it->index; assert(i < slots.size()); if (i + 1 == slots.size()) { slots.pop_back(); slotCount = slots.empty() ? 0 : slots.back().getStop(); } else { fragmentation += slots[i].getScore(); slots[i] = slots[i].withPeerId(NO_PEER); } return true; } bool PeerManager::forNode(NodeId nodeid, std::function func) const { auto it = nodes.find(nodeid); return it != nodes.end() && func(*it); } bool PeerManager::updateNextRequestTime(NodeId nodeid, TimePoint timeout) { auto it = nodes.find(nodeid); if (it == nodes.end()) { return false; } return nodes.modify(it, [&](Node &n) { n.nextRequestTime = timeout; }); } NodeId PeerManager::selectNode() { for (int retry = 0; retry < SELECT_NODE_MAX_RETRY; retry++) { const PeerId p = selectPeer(); // If we cannot find a peer, it may be due to the fact that it is // unlikely due to high fragmentation, so compact and retry. if (p == NO_PEER) { compact(); continue; } // See if that peer has an available node. auto &nview = nodes.get(); auto it = nview.lower_bound(boost::make_tuple(p, TimePoint())); if (it != nview.end() && it->peerid == p && it->nextRequestTime <= std::chrono::steady_clock::now()) { return it->nodeid; } } return NO_NODE; } void PeerManager::updatedBlockTip() { std::vector invalidPeers; { LOCK(cs_main); const CCoinsViewCache &coins = ::ChainstateActive().CoinsTip(); for (const auto &p : peers) { ProofValidationState state; if (!p.proof->verify(state, coins)) { invalidPeers.push_back(p.peerid); } } } for (const auto &pid : invalidPeers) { removePeer(pid); } } PeerId PeerManager::getPeerId(const std::shared_ptr &proof) { auto it = fetchOrCreatePeer(proof); return it == peers.end() ? NO_PEER : it->peerid; } +std::shared_ptr PeerManager::getProof(const ProofId &proofid) const { + auto &pview = peers.get(); + auto it = pview.find(proofid); + return it == pview.end() ? nullptr : it->proof; +} + PeerManager::PeerSet::iterator PeerManager::fetchOrCreatePeer(const std::shared_ptr &proof) { { // Check if we already know of that peer. auto &pview = peers.get(); auto it = pview.find(proof->getId()); if (it != pview.end()) { return peers.project<0>(it); } } { // Reject invalid proof. LOCK(cs_main); const CCoinsViewCache &coins = ::ChainstateActive().CoinsTip(); ProofValidationState state; if (!proof->verify(state, coins)) { return peers.end(); } } // New peer means new peerid! const PeerId peerid = nextPeerId++; // Attach UTXOs to this proof. std::unordered_set conflicting_peerids; for (const auto &s : proof->getStakes()) { auto p = utxos.emplace(s.getStake().getUTXO(), peerid); if (!p.second) { // We have a collision with an existing proof. conflicting_peerids.insert(p.first->second); } } // For now, if there is a conflict, just ceanup the mess. if (conflicting_peerids.size() > 0) { for (const auto &s : proof->getStakes()) { auto it = utxos.find(s.getStake().getUTXO()); assert(it != utxos.end()); // We need to delete that one. if (it->second == peerid) { utxos.erase(it); } } return peers.end(); } // We have no peer for this proof, time to create it. auto inserted = peers.emplace(peerid, proof); assert(inserted.second); return inserted.first; } bool PeerManager::removePeer(const PeerId peerid) { auto it = peers.find(peerid); if (it == peers.end()) { return false; } // Remove all nodes from this peer. removeNodeFromPeer(it, it->node_count); // Remove nodes associated with this peer, unless their timeout is still // active. This ensure that we don't overquery them in case they are // subsequently added to another peer. auto &nview = nodes.get(); nview.erase(nview.lower_bound(boost::make_tuple(peerid, TimePoint())), nview.upper_bound(boost::make_tuple( peerid, std::chrono::steady_clock::now()))); // Release UTXOs attached to this proof. for (const auto &s : it->proof->getStakes()) { bool deleted = utxos.erase(s.getStake().getUTXO()) > 0; assert(deleted); } peers.erase(it); return true; } PeerId PeerManager::selectPeer() const { if (slots.empty() || slotCount == 0) { return NO_PEER; } const uint64_t max = slotCount; for (int retry = 0; retry < SELECT_PEER_MAX_RETRY; retry++) { size_t i = selectPeerImpl(slots, GetRand(max), max); if (i != NO_PEER) { return i; } } return NO_PEER; } uint64_t PeerManager::compact() { // There is nothing to compact. if (fragmentation == 0) { return 0; } std::vector newslots; newslots.reserve(peers.size()); uint64_t prevStop = 0; uint32_t i = 0; for (auto it = peers.begin(); it != peers.end(); it++) { if (it->node_count == 0) { continue; } newslots.emplace_back(prevStop, it->getScore(), it->peerid); prevStop = slots[i].getStop(); if (!peers.modify(it, [&](Peer &p) { p.index = i++; })) { return 0; } } slots = std::move(newslots); const uint64_t saved = slotCount - prevStop; slotCount = prevStop; fragmentation = 0; return saved; } bool PeerManager::verify() const { uint64_t prevStop = 0; for (size_t i = 0; i < slots.size(); i++) { const Slot &s = slots[i]; // Slots must be in correct order. if (s.getStart() < prevStop) { return false; } prevStop = s.getStop(); // If this is a dead slot, then nothing more needs to be checked. if (s.getPeerId() == NO_PEER) { continue; } // We have a live slot, verify index. auto it = peers.find(s.getPeerId()); if (it == peers.end() || it->index != i) { return false; } } for (const auto &p : peers) { // Count node attached to this peer. const auto count_nodes = [&]() { size_t count = 0; auto &nview = nodes.get(); auto begin = nview.lower_bound(boost::make_tuple(p.peerid, TimePoint())); auto end = nview.upper_bound(boost::make_tuple(p.peerid + 1, TimePoint())); for (auto it = begin; it != end; ++it) { count++; } return count; }; if (p.node_count != count_nodes()) { return false; } // If there are no nodes attached to this peer, then we are done. if (p.node_count == 0) { continue; } // The index must point to a slot refering to this peer. if (p.index >= slots.size() || slots[p.index].getPeerId() != p.peerid) { return false; } // If the score do not match, same thing. if (slots[p.index].getScore() != p.getScore()) { return false; } } return true; } PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max) { assert(slot <= max); size_t begin = 0, end = slots.size(); uint64_t bottom = 0, top = max; // Try to find the slot using dichotomic search. while ((end - begin) > 8) { // The slot we picked in not allocated. if (slot < bottom || slot >= top) { return NO_PEER; } // Guesstimate the position of the slot. size_t i = begin + ((slot - bottom) * (end - begin) / (top - bottom)); assert(begin <= i && i < end); // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } // We undershooted. if (slots[i].precedes(slot)) { begin = i + 1; if (begin >= end) { return NO_PEER; } bottom = slots[begin].getStart(); continue; } // We overshooted. if (slots[i].follows(slot)) { end = i; top = slots[end].getStart(); continue; } // We have an unalocated slot. return NO_PEER; } // Enough of that nonsense, let fallback to linear search. for (size_t i = begin; i < end; i++) { // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } } // We failed to find a slot, retry. return NO_PEER; } std::vector PeerManager::getPeers() const { std::vector vpeers; for (auto &it : peers.get<0>()) { vpeers.emplace_back(it); } return vpeers; } std::vector PeerManager::getNodeIdsForPeer(PeerId peerId) const { std::vector nodeids; auto &nview = nodes.get(); auto nodeRange = nview.equal_range(peerId); for (auto it = nodeRange.first; it != nodeRange.second; ++it) { nodeids.emplace_back(it->nodeid); } return nodeids; } } // namespace avalanche diff --git a/src/avalanche/peermanager.h b/src/avalanche/peermanager.h index 4ff4b2340..1215bcde5 100644 --- a/src/avalanche/peermanager.h +++ b/src/avalanche/peermanager.h @@ -1,199 +1,201 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PEERMANAGER_H #define BITCOIN_AVALANCHE_PEERMANAGER_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace avalanche { class Delegation; struct Slot { private: uint64_t start; uint32_t score; PeerId peerid; public: Slot(uint64_t startIn, uint32_t scoreIn, PeerId peeridIn) : start(startIn), score(scoreIn), peerid(peeridIn) {} Slot withStart(uint64_t startIn) const { return Slot(startIn, score, peerid); } Slot withScore(uint64_t scoreIn) const { return Slot(start, scoreIn, peerid); } Slot withPeerId(PeerId peeridIn) const { return Slot(start, score, peeridIn); } uint64_t getStart() const { return start; } uint64_t getStop() const { return start + score; } uint32_t getScore() const { return score; } PeerId getPeerId() const { return peerid; } bool contains(uint64_t slot) const { return getStart() <= slot && slot < getStop(); } bool precedes(uint64_t slot) const { return slot >= getStop(); } bool follows(uint64_t slot) const { return getStart() > slot; } }; struct Peer { PeerId peerid; uint32_t index = -1; uint32_t node_count = 0; std::shared_ptr proof; Peer(PeerId peerid_, std::shared_ptr proof_) : peerid(peerid_), proof(std::move(proof_)) {} const ProofId &getProofId() const { return proof->getId(); } uint32_t getScore() const { return proof->getScore(); } }; struct proof_index { using result_type = ProofId; result_type operator()(const Peer &p) const { return p.proof->getId(); } }; struct next_request_time {}; class PeerManager { std::vector slots; uint64_t slotCount = 0; uint64_t fragmentation = 0; /** * Several nodes can make an avalanche peer. In this case, all nodes are * considered interchangeable parts of the same peer. */ using PeerSet = boost::multi_index_container< Peer, boost::multi_index::indexed_by< // index by peerid boost::multi_index::hashed_unique< boost::multi_index::member>, // index by proof boost::multi_index::hashed_unique< boost::multi_index::tag, proof_index, SaltedProofIdHasher>>>; PeerId nextPeerId = 0; PeerSet peers; std::unordered_map utxos; using NodeSet = boost::multi_index_container< Node, boost::multi_index::indexed_by< // index by nodeid boost::multi_index::hashed_unique< boost::multi_index::member>, // sorted by peerid/nextRequestTime boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::composite_key< Node, boost::multi_index::member, boost::multi_index::member>>>>; NodeSet nodes; static constexpr int SELECT_PEER_MAX_RETRY = 3; static constexpr int SELECT_NODE_MAX_RETRY = 3; public: /** * Node API. */ bool addNode(NodeId nodeid, const std::shared_ptr &proof, const Delegation &delegation); bool removeNode(NodeId nodeid); bool forNode(NodeId nodeid, std::function func) const; bool updateNextRequestTime(NodeId nodeid, TimePoint timeout); /** * Randomly select a node to poll. */ NodeId selectNode(); /** * Update the peer set when a new block is connected. */ void updatedBlockTip(); /**************************************************** * Functions which are public for testing purposes. * ****************************************************/ /** * Provide the PeerId associated with the given proof. If the peer does not * exist, then it is created. */ PeerId getPeerId(const std::shared_ptr &proof); /** * Remove an existing peer. */ bool removePeer(const PeerId peerid); /** * Randomly select a peer to poll. */ PeerId selectPeer() const; /** * Trigger maintenance of internal data structures. * Returns how much slot space was saved after compaction. */ uint64_t compact(); /** * Perform consistency check on internal data structures. */ bool verify() const; // Accessors. uint64_t getSlotCount() const { return slotCount; } uint64_t getFragmentation() const { return fragmentation; } std::vector getPeers() const; std::vector getNodeIdsForPeer(PeerId peerId) const; + std::shared_ptr getProof(const ProofId &proofid) const; + private: PeerSet::iterator fetchOrCreatePeer(const std::shared_ptr &proof); bool addNodeToPeer(const PeerSet::iterator &it); bool removeNodeFromPeer(const PeerSet::iterator &it, uint32_t count = 1); }; /** * This is an internal method that is exposed for testing purposes. */ PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max); } // namespace avalanche #endif // BITCOIN_AVALANCHE_PEERMANAGER_H diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 773323a52..5742de421 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,680 +1,691 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include // For DecodeSecret #include // For ::PeerManager #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; namespace avalanche { bool VoteRecord::registerVote(NodeId nodeid, uint32_t error) { // We just got a new vote, so there is one less inflight request. clearInflightRequest(); // We want to avoid having the same node voting twice in a quorum. if (!addNodeToQuorum(nodeid)) { return false; } /** * The result of the vote is determined from the error code. If the error * code is 0, there is no error and therefore the vote is yes. If there is * an error, we check the most significant bit to decide if the vote is a no * (for instance, the block is invalid) or is the vote inconclusive (for * instance, the queried node does not have the block yet). */ votes = (votes << 1) | (error == 0); consider = (consider << 1) | (int32_t(error) >= 0); /** * We compute the number of yes and/or no votes as follow: * * votes: 1010 * consider: 1100 * * yes votes: 1000 using votes & consider * no votes: 0100 using ~votes & consider */ bool yes = countBits(votes & consider & 0xff) > 6; if (!yes) { bool no = countBits(~votes & consider & 0xff) > 6; if (!no) { // The round is inconclusive. return false; } } // If the round is in agreement with previous rounds, increase confidence. if (isAccepted() == yes) { confidence += 2; return getConfidence() == AVALANCHE_FINALIZATION_SCORE; } // The round changed our state. We reset the confidence. confidence = yes; return true; } bool VoteRecord::addNodeToQuorum(NodeId nodeid) { if (nodeid == NO_NODE) { // Helpful for testing. return true; } // MMIX Linear Congruent Generator. const uint64_t r1 = 6364136223846793005 * uint64_t(nodeid) + 1442695040888963407; // Fibonacci hashing. const uint64_t r2 = 11400714819323198485ull * (nodeid ^ seed); // Combine and extract hash. const uint16_t h = (r1 + r2) >> 48; /** * Check if the node is in the filter. */ for (size_t i = 1; i < nodeFilter.size(); i++) { if (nodeFilter[(successfulVotes + i) % nodeFilter.size()] == h) { return false; } } /** * Add the node which just voted to the filter. */ nodeFilter[successfulVotes % nodeFilter.size()] = h; successfulVotes++; return true; } bool VoteRecord::registerPoll() const { uint8_t count = inflight.load(); while (count < AVALANCHE_MAX_INFLIGHT_POLL) { if (inflight.compare_exchange_weak(count, count + 1)) { return true; } } return false; } static bool IsWorthPolling(const CBlockIndex *pindex) { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (::ChainstateActive().IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } struct Processor::PeerData { std::shared_ptr proof; Delegation delegation; }; class Processor::NotificationsHandler : public interfaces::Chain::Notifications { Processor *m_processor; public: NotificationsHandler(Processor *p) : m_processor(p) {} void updatedBlockTip() override { LOCK(m_processor->cs_peerManager); if (m_processor->mustRegisterProof && !::ChainstateActive().IsInitialBlockDownload()) { m_processor->peerManager->getPeerId(m_processor->peerData->proof); m_processor->mustRegisterProof = false; } m_processor->peerManager->updatedBlockTip(); } }; Processor::Processor(interfaces::Chain &chain, CConnman *connmanIn, NodePeerManager *nodePeerManagerIn, std::unique_ptr peerDataIn, CKey sessionKeyIn) : connman(connmanIn), nodePeerManager(nodePeerManagerIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), round(0), peerManager(std::make_unique()), peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)), // Schedule proof registration at the first new block after IBD. // FIXME: get rid of this flag mustRegisterProof(!!peerData) { // Make sure we get notified of chain state changes. chainNotificationsHandler = chain.handleNotifications(std::make_shared(this)); } Processor::~Processor() { chainNotificationsHandler.reset(); stopEventLoop(); } std::unique_ptr Processor::MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, NodePeerManager *nodePeerManager, bilingual_str &error) { std::unique_ptr peerData; CKey masterKey; CKey sessionKey; if (argsman.IsArgSet("-avasessionkey")) { sessionKey = DecodeSecret(argsman.GetArg("-avasessionkey", "")); if (!sessionKey.IsValid()) { error = _("the avalanche session key is invalid"); return nullptr; } } else { // Pick a random key for the session. sessionKey.MakeNewKey(true); } if (argsman.IsArgSet("-avaproof")) { if (!argsman.IsArgSet("-avamasterkey")) { error = _( "the avalanche master key is missing for the avalanche proof"); return nullptr; } masterKey = DecodeSecret(argsman.GetArg("-avamasterkey", "")); if (!masterKey.IsValid()) { error = _("the avalanche master key is invalid"); return nullptr; } peerData = std::make_unique(); peerData->proof = std::make_shared(); if (!Proof::FromHex(*peerData->proof, argsman.GetArg("-avaproof", ""), error)) { // error is set by FromHex return nullptr; } ProofValidationState proof_state; if (!peerData->proof->verify(proof_state)) { switch (proof_state.GetResult()) { case ProofValidationResult::NO_STAKE: error = _("the avalanche proof has no stake"); return nullptr; case ProofValidationResult::DUST_THRESOLD: error = _("the avalanche proof stake is too low"); return nullptr; case ProofValidationResult::DUPLICATE_STAKE: error = _("the avalanche proof has duplicated stake"); return nullptr; case ProofValidationResult::INVALID_SIGNATURE: error = _("the avalanche proof has invalid stake signatures"); return nullptr; case ProofValidationResult::TOO_MANY_UTXOS: error = strprintf( _("the avalanche proof has too many utxos (max: %u)"), AVALANCHE_MAX_PROOF_STAKES); return nullptr; default: error = _("the avalanche proof is invalid"); return nullptr; } } // Generate the delegation to the session key. DelegationBuilder dgb(*peerData->proof); if (sessionKey.GetPubKey() != peerData->proof->getMaster()) { dgb.addLevel(masterKey, sessionKey.GetPubKey()); } peerData->delegation = dgb.build(); } // We can't use std::make_unique with a private constructor return std::unique_ptr( new Processor(chain, connman, nodePeerManager, std::move(peerData), std::move(sessionKey))); } bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = ::ChainActive().Contains(pindex); } return vote_records.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } bool Processor::isAccepted(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } int Processor::getConfidence(const CBlockIndex *pindex) const { auto r = vote_records.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is * not secure. */ class TCPResponse { Response response; SchnorrSig sig; public: TCPResponse(Response responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! if (!key.SignSchnorr(hash, sig)) { sig.fill(0); } } // serialization support SERIALIZE_METHODS(TCPResponse, obj) { READWRITE(obj.response, obj.sig); } }; } // namespace void Processor::sendResponse(CNode *pfrom, Response response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVARESPONSE, TCPResponse(std::move(response), sessionKey))); } bool Processor::registerVotes(NodeId nodeid, const Response &response, std::vector &updates) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { nodePeerManager->Misbehaving(nodeid, 2, "unexpected-ava-response"); return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { nodePeerManager->Misbehaving(nodeid, 100, "invalid-ava-response-size"); return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { nodePeerManager->Misbehaving(nodeid, 100, "invalid-ava-response-content"); return false; } } std::map responseIndex; { LOCK(cs_main); for (const auto &v : votes) { auto pindex = LookupBlockIndex(BlockHash(v.GetHash())); if (!pindex) { // This should not happen, but just in case... continue; } if (!IsWorthPolling(pindex)) { // There is no point polling this block. continue; } responseIndex.insert(std::make_pair(pindex, v)); } } { // Register votes. auto w = vote_records.getWriteView(); for (const auto &p : responseIndex) { CBlockIndex *pindex = p.first; const Vote &v = p.second; auto it = w->find(pindex); if (it == w.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back( pindex, vr.isAccepted() ? BlockUpdate::Status::Accepted : BlockUpdate::Status::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(pindex, vr.isAccepted() ? BlockUpdate::Status::Finalized : BlockUpdate::Status::Invalid); w->erase(it); } } return true; } bool Processor::addNode(NodeId nodeid, const std::shared_ptr &proof, const Delegation &delegation) { LOCK(cs_peerManager); return peerManager->addNode(nodeid, proof, delegation); } bool Processor::forNode(NodeId nodeid, std::function func) const { LOCK(cs_peerManager); return peerManager->forNode(nodeid, std::move(func)); } CPubKey Processor::getSessionPubKey() const { return sessionKey.GetPubKey(); } uint256 Processor::buildLocalSighash(CNode *pfrom) const { CHashWriter hasher(SER_GETHASH, 0); hasher << peerData->delegation.getId(); hasher << pfrom->GetLocalNonce(); hasher << pfrom->nRemoteHostNonce; hasher << pfrom->GetLocalExtraEntropy(); hasher << pfrom->nRemoteExtraEntropy; return hasher.GetHash(); } uint256 Processor::buildRemoteSighash(CNode *pfrom) const { CHashWriter hasher(SER_GETHASH, 0); hasher << pfrom->m_avalanche_state->delegation.getId(); hasher << pfrom->nRemoteHostNonce; hasher << pfrom->GetLocalNonce(); hasher << pfrom->nRemoteExtraEntropy; hasher << pfrom->GetLocalExtraEntropy(); return hasher.GetHash(); } bool Processor::sendHello(CNode *pfrom) const { if (!peerData) { // We do not have a delegation to advertise. return false; } // Now let's sign! SchnorrSig sig; { const uint256 hash = buildLocalSighash(pfrom); if (!sessionKey.SignSchnorr(hash, sig)) { return false; } } connman->PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVAHELLO, Hello(peerData->delegation, sig))); return true; } +bool Processor::addProof(const std::shared_ptr &proof) { + LOCK(cs_peerManager); + return !peerManager->getProof(proof->getId()) && + peerManager->getPeerId(proof) != NO_PEER; +} + +std::shared_ptr Processor::getProof(const ProofId &proofid) const { + LOCK(cs_peerManager); + return peerManager->getProof(proofid); +} + bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; // First remove all blocks that are not worth polling. { LOCK(cs_main); auto w = vote_records.getWriteView(); for (auto it = w->begin(); it != w->end();) { const CBlockIndex *pindex = it->first; if (!IsWorthPolling(pindex)) { w->erase(it++); } else { ++it; } } } auto r = vote_records.getReadView(); for (const std::pair &p : reverse_iterate(r)) { // Check if we can run poll. const bool shouldPoll = forPoll ? p.second.registerPoll() : p.second.shouldPoll(); if (!shouldPoll) { continue; } // We don't have a decision, we need more votes. invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash()); if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return invs; } } return invs; } NodeId Processor::getSuitableNodeToQuery() { LOCK(cs_peerManager); return peerManager->selectNode(); } void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; assert(inv.type == MSG_BLOCK); CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(BlockHash(inv.hash)); if (!pindex) { continue; } } auto w = vote_records.getWriteView(); auto it = w->find(pindex); if (it == w.end()) { continue; } it->second.clearInflightRequest(p.second); } } void Processor::runEventLoop() { // Don't do Avalanche while node is IBD'ing if (::ChainstateActive().IsInitialBlockDownload()) { return; } // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. LOCK(cs_peerManager); peerManager->updateNextRequestTime(pnode->GetId(), timeout); } // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetCommonVersion()) .Make(NetMsgType::AVAPOLL, Poll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } { // This node is obsolete, delete it. LOCK(cs_peerManager); peerManager->removeNode(nodeid); } // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); } while (nodeid != NO_NODE); } std::vector Processor::getPeers() const { LOCK(cs_peerManager); return peerManager->getPeers(); } std::vector Processor::getNodeIdsForPeer(PeerId peerId) const { LOCK(cs_peerManager); return peerManager->getNodeIdsForPeer(peerId); } } // namespace avalanche diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h index b29987da1..422909e4a 100644 --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -1,338 +1,341 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PROCESSOR_H #define BITCOIN_AVALANCHE_PROCESSOR_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class ArgsManager; class Config; class CBlockIndex; class CScheduler; class PeerManager; struct bilingual_str; using NodePeerManager = PeerManager; /** * Is avalanche enabled by default. */ static constexpr bool AVALANCHE_DEFAULT_ENABLED = false; /** * Finalization score. */ static constexpr int AVALANCHE_FINALIZATION_SCORE = 128; /** * Maximum item that can be polled at once. */ static constexpr size_t AVALANCHE_MAX_ELEMENT_POLL = 16; /** * Avalanche default cooldown in milliseconds. */ static constexpr size_t AVALANCHE_DEFAULT_COOLDOWN = 100; /** * How long before we consider that a query timed out. */ static constexpr std::chrono::milliseconds AVALANCHE_DEFAULT_QUERY_TIMEOUT{ 10000}; /** * How many inflight requests can exist for one item. */ static constexpr int AVALANCHE_MAX_INFLIGHT_POLL = 10; namespace avalanche { class Delegation; class PeerManager; class Proof; /** * Vote history. */ struct VoteRecord { private: // confidence's LSB bit is the result. Higher bits are actual confidence // score. uint16_t confidence = 0; // Historical record of votes. uint8_t votes = 0; // Each bit indicate if the vote is to be considered. uint8_t consider = 0; // How many in flight requests exists for this element. mutable std::atomic inflight{0}; // Seed for pseudorandom operations. const uint32_t seed = 0; // Track how many successful votes occured. uint32_t successfulVotes = 0; // Track the nodes which are part of the quorum. std::array nodeFilter{{0, 0, 0, 0, 0, 0, 0, 0}}; public: explicit VoteRecord(bool accepted) : confidence(accepted) {} /** * Copy semantic */ VoteRecord(const VoteRecord &other) : confidence(other.confidence), votes(other.votes), consider(other.consider), inflight(other.inflight.load()), successfulVotes(other.successfulVotes), nodeFilter(other.nodeFilter) { } /** * Vote accounting facilities. */ bool isAccepted() const { return confidence & 0x01; } uint16_t getConfidence() const { return confidence >> 1; } bool hasFinalized() const { return getConfidence() >= AVALANCHE_FINALIZATION_SCORE; } /** * Register a new vote for an item and update confidence accordingly. * Returns true if the acceptance or finalization state changed. */ bool registerVote(NodeId nodeid, uint32_t error); /** * Register that a request is being made regarding that item. * The method is made const so that it can be accessed via a read only view * of vote_records. It's not a problem as it is made thread safe. */ bool registerPoll() const; /** * Return if this item is in condition to be polled at the moment. */ bool shouldPoll() const { return inflight < AVALANCHE_MAX_INFLIGHT_POLL; } /** * Clear `count` inflight requests. */ void clearInflightRequest(uint8_t count = 1) { inflight -= count; } private: /** * Add the node to the quorum. * Returns true if the node was added, false if the node already was in the * quorum. */ bool addNodeToQuorum(NodeId nodeid); }; class BlockUpdate { union { CBlockIndex *pindex; uintptr_t raw; }; static const size_t STATUS_BITS = 2; static const uintptr_t MASK = (1 << STATUS_BITS) - 1; static_assert( alignof(CBlockIndex) >= (1 << STATUS_BITS), "CBlockIndex alignement doesn't allow for Status to be stored."); public: enum Status : uint8_t { Invalid, Rejected, Accepted, Finalized, }; BlockUpdate(CBlockIndex *pindexIn, Status statusIn) : pindex(pindexIn) { raw |= statusIn; } Status getStatus() const { return Status(raw & MASK); } CBlockIndex *getBlockIndex() { return reinterpret_cast(raw & ~MASK); } const CBlockIndex *getBlockIndex() const { return const_cast(this)->getBlockIndex(); } }; using BlockVoteMap = std::map; struct query_timeout {}; namespace { struct AvalancheTest; } class Processor { CConnman *connman; NodePeerManager *nodePeerManager; std::chrono::milliseconds queryTimeoutDuration; /** * Blocks to run avalanche on. */ RWCollection vote_records; /** * Keep track of peers and queries sent. */ std::atomic round; /** * Keep track of the peers and associated infos. */ mutable Mutex cs_peerManager; std::unique_ptr peerManager GUARDED_BY(cs_peerManager); struct Query { NodeId nodeid; uint64_t round; TimePoint timeout; /** * We declare this as mutable so it can be modified in the multi_index. * This is ok because we do not use this field to index in anyway. * * /!\ Do not use any mutable field as index. */ mutable std::vector invs; }; using QuerySet = boost::multi_index_container< Query, boost::multi_index::indexed_by< // index by nodeid/round boost::multi_index::hashed_unique, boost::multi_index::member>>, // sorted by timeout boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>>; RWCollection queries; /** Data required to participate. */ struct PeerData; std::unique_ptr peerData; CKey sessionKey; /** Event loop machinery. */ EventLoop eventLoop; /** Registered interfaces::Chain::Notifications handler. */ class NotificationsHandler; std::unique_ptr chainNotificationsHandler; /** * Flag indicating that the proof must be registered at first new block * after IBD */ bool mustRegisterProof = false; Processor(interfaces::Chain &chain, CConnman *connmanIn, NodePeerManager *nodePeerManagerIn, std::unique_ptr peerDataIn, CKey sessionKeyIn); public: ~Processor(); static std::unique_ptr MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, NodePeerManager *nodePeerManager, bilingual_str &error); void setQueryTimeoutDuration(std::chrono::milliseconds d) { queryTimeoutDuration = d; } bool addBlockToReconcile(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; int getConfidence(const CBlockIndex *pindex) const; // TDOD: Refactor the API to remove the dependency on avalanche/protocol.h void sendResponse(CNode *pfrom, Response response) const; bool registerVotes(NodeId nodeid, const Response &response, std::vector &updates); bool addNode(NodeId nodeid, const std::shared_ptr &proof, const Delegation &delegation); bool forNode(NodeId nodeid, std::function func) const; CPubKey getSessionPubKey() const; bool sendHello(CNode *pfrom) const; /** * Build and return the challenge whose signature we expect a peer to * include in his AVAHELLO message. */ uint256 buildRemoteSighash(CNode *pfrom) const; + bool addProof(const std::shared_ptr &proof); + std::shared_ptr getProof(const ProofId &proofid) const; + /* * Return whether the avalanche service flag should be set. */ bool isAvalancheServiceAvailable() { return !!peerData; } std::vector getPeers() const; std::vector getNodeIdsForPeer(PeerId peerId) const; bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); private: void runEventLoop(); void clearTimedoutRequests(); std::vector getInvsForNextPoll(bool forPoll = true); NodeId getSuitableNodeToQuery(); /** * Build and return the challenge whose signature is included in the * AVAHELLO message that we send to a peer. */ uint256 buildLocalSighash(CNode *pfrom) const; friend struct ::avalanche::AvalancheTest; }; } // namespace avalanche /** * Global avalanche instance. */ extern std::unique_ptr g_avalanche; #endif // BITCOIN_AVALANCHE_PROCESSOR_H diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp index f0b8cb95a..10cb341d7 100644 --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -1,974 +1,1010 @@ // Copyright (c) 2018-2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include // For ::PeerManager #include #include // For bilingual_str // D6970 moved LookupBlockIndex from chain.h to validation.h TODO: remove this // when LookupBlockIndex is refactored out of validation #include #include #include using namespace avalanche; namespace avalanche { namespace { struct AvalancheTest { static void runEventLoop(avalanche::Processor &p) { p.runEventLoop(); } static std::vector getInvsForNextPoll(Processor &p) { return p.getInvsForNextPoll(false); } static NodeId getSuitableNodeToQuery(Processor &p) { return p.getSuitableNodeToQuery(); } static avalanche::PeerManager &getPeerManager(Processor &p) { LOCK(p.cs_peerManager); return *p.peerManager; } static uint64_t getRound(const Processor &p) { return p.round; } }; } // namespace } // namespace avalanche namespace { struct CConnmanTest : public CConnman { using CConnman::CConnman; void AddNode(CNode &node) { LOCK(cs_vNodes); vNodes.push_back(&node); } void ClearNodes() { LOCK(cs_vNodes); for (CNode *node : vNodes) { delete node; } vNodes.clear(); } }; CService ip(uint32_t i) { struct in_addr s; s.s_addr = i; return CService(CNetAddr(s), Params().GetDefaultPort()); } struct AvalancheTestingSetup : public TestChain100Setup { const Config &config; CConnmanTest *m_connman; std::unique_ptr m_processor; CKey masterpriv; AvalancheTestingSetup() : TestChain100Setup(), config(GetConfig()), masterpriv() { // Deterministic randomness for tests. auto connman = std::make_unique(config, 0x1337, 0x1337); m_connman = connman.get(); m_node.connman = std::move(connman); m_node.peerman = std::make_unique<::PeerManager>( config.GetChainParams(), *m_connman, m_node.banman.get(), *m_node.scheduler, *m_node.chainman, *m_node.mempool); m_node.chain = interfaces::MakeChain(m_node, config.GetChainParams()); // Get the processor ready. bilingual_str error; m_processor = Processor::MakeProcessor(*m_node.args, *m_node.chain, m_node.connman.get(), m_node.peerman.get(), error); BOOST_CHECK(m_processor); // The master private key we delegate to. masterpriv.MakeNewKey(true); } ~AvalancheTestingSetup() { m_connman->ClearNodes(); } CNode *ConnectNode(ServiceFlags nServices) { static NodeId id = 0; CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); auto node = new CNode(id++, ServiceFlags(NODE_NETWORK), 0, INVALID_SOCKET, addr, 0, 0, 0, CAddress(), "", ConnectionType::OUTBOUND); node->SetCommonVersion(PROTOCOL_VERSION); node->nServices = nServices; m_node.peerman->InitializeNode(config, node); node->nVersion = 1; node->fSuccessfullyConnected = true; m_connman->AddNode(*node); return node; } size_t next_coinbase = 0; std::shared_ptr GetProof() { size_t current_coinbase = next_coinbase++; const CTransaction &coinbase = *m_coinbase_txns[current_coinbase]; ProofBuilder pb(0, 0, masterpriv.GetPubKey()); BOOST_CHECK(pb.addUTXO(COutPoint(coinbase.GetId(), 0), coinbase.vout[0].nValue, current_coinbase + 1, true, coinbaseKey)); return std::make_shared(pb.build()); } bool addNode(NodeId nodeid) { auto proof = GetProof(); return m_processor->addNode(nodeid, proof, DelegationBuilder(*proof).build()); } std::array ConnectNodes() { avalanche::PeerManager &pm = getPeerManager(); auto proof = GetProof(); Delegation dg = DelegationBuilder(*proof).build(); std::array nodes; for (CNode *&n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(pm.addNode(n->GetId(), proof, dg)); } return nodes; } void runEventLoop() { AvalancheTest::runEventLoop(*m_processor); } NodeId getSuitableNodeToQuery() { return AvalancheTest::getSuitableNodeToQuery(*m_processor); } std::vector getInvsForNextPoll() { return AvalancheTest::getInvsForNextPoll(*m_processor); } avalanche::PeerManager &getPeerManager() { return AvalancheTest::getPeerManager(*m_processor); } uint64_t getRound() const { return AvalancheTest::getRound(*m_processor); } }; } // namespace BOOST_FIXTURE_TEST_SUITE(processor_tests, AvalancheTestingSetup) #define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \ vr.registerVote(NO_NODE, vote); \ BOOST_CHECK_EQUAL(vr.isAccepted(), state); \ BOOST_CHECK_EQUAL(vr.hasFinalized(), finalized); \ BOOST_CHECK_EQUAL(vr.getConfidence(), confidence); BOOST_AUTO_TEST_CASE(vote_record) { VoteRecord vraccepted(true); // Check initial state. BOOST_CHECK_EQUAL(vraccepted.isAccepted(), true); BOOST_CHECK_EQUAL(vraccepted.hasFinalized(), false); BOOST_CHECK_EQUAL(vraccepted.getConfidence(), 0); VoteRecord vr(false); // Check initial state. BOOST_CHECK_EQUAL(vr.isAccepted(), false); BOOST_CHECK_EQUAL(vr.hasFinalized(), false); BOOST_CHECK_EQUAL(vr.getConfidence(), 0); // We need to register 6 positive votes before we start counting. for (int i = 0; i < 6; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, false, false, 0); } // Next vote will flip state, and confidence will increase as long as we // vote yes. REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 7); } // Now confidence will increase as long as we vote yes. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); // Now that we have two no votes, confidence stop increasing. for (int i = 0; i < 5; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); } // Next vote will flip state, and confidence will increase as long as we // vote no. REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 7); } // Now confidence will increase as long as we vote no. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 0, false, true, AVALANCHE_FINALIZATION_SCORE); // Check that inflight accounting work as expected. VoteRecord vrinflight(false); for (int i = 0; i < 2 * AVALANCHE_MAX_INFLIGHT_POLL; i++) { bool shouldPoll = vrinflight.shouldPoll(); BOOST_CHECK_EQUAL(shouldPoll, i < AVALANCHE_MAX_INFLIGHT_POLL); BOOST_CHECK_EQUAL(vrinflight.registerPoll(), shouldPoll); } // Clear various number of inflight requests and check everything behaves as // expected. for (int i = 1; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { vrinflight.clearInflightRequest(i); BOOST_CHECK(vrinflight.shouldPoll()); for (int j = 1; j < i; j++) { BOOST_CHECK(vrinflight.registerPoll()); BOOST_CHECK(vrinflight.shouldPoll()); } BOOST_CHECK(vrinflight.registerPoll()); BOOST_CHECK(!vrinflight.shouldPoll()); } } BOOST_AUTO_TEST_CASE(block_update) { CBlockIndex index; CBlockIndex *pindex = &index; std::set status{ BlockUpdate::Status::Invalid, BlockUpdate::Status::Rejected, BlockUpdate::Status::Accepted, BlockUpdate::Status::Finalized, }; for (auto s : status) { BlockUpdate abu(pindex, s); BOOST_CHECK(abu.getBlockIndex() == pindex); BOOST_CHECK_EQUAL(abu.getStatus(), s); } } namespace { Response next(Response &r) { auto copy = r; r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()}; return copy; } } // namespace BOOST_AUTO_TEST_CASE(block_register) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Newly added blocks' state reflect the blockchain. BOOST_CHECK(m_processor->isAccepted(pindex)); int nextNodeIndex = 0; auto registerNewVote = [&](const Response &resp) { runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(m_processor->registerVotes(nodeid, resp, updates)); }; // Let's vote for this block a few times. Response resp{0, 0, {Vote(0, blockHash)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // A single neutral vote do not change anything. resp = {getRound(), 0, {Vote(-1, blockHash)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, blockHash)}}; for (int i = 1; i < 7; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // Two neutral votes will stall progress. resp = {getRound(), 0, {Vote(-1, blockHash)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, blockHash)}}; for (int i = 2; i < 8; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); } // We vote for it numerous times to finalize it. for (int i = 7; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); // Now let's undo this and finalize rejection. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); resp = {getRound(), 0, {Vote(1, blockHash)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Rejected); updates = {}; // Now it is rejected, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Invalid); updates = {}; // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); // Adding the block twice does nothing. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); BOOST_CHECK(!m_processor->addBlockToReconcile(pindex)); BOOST_CHECK(m_processor->isAccepted(pindex)); } BOOST_AUTO_TEST_CASE(multi_block_register) { CBlockIndex indexA, indexB; std::vector updates; // Create several nodes that support avalanche. auto avanodes = ConnectNodes(); // Make sure the block has a hash. CBlock blockA = CreateAndProcessBlock({}, CScript()); const BlockHash blockHashA = blockA.GetHash(); CBlock blockB = CreateAndProcessBlock({}, CScript()); const BlockHash blockHashB = blockB.GetHash(); const CBlockIndex *pindexA; const CBlockIndex *pindexB; { LOCK(cs_main); pindexA = LookupBlockIndex(blockHashA); pindexB = LookupBlockIndex(blockHashB); } // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindexA)); BOOST_CHECK(!m_processor->isAccepted(pindexB)); // Start voting on block A. BOOST_CHECK(m_processor->addBlockToReconcile(pindexA)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashA); uint64_t round = getRound(); runEventLoop(); BOOST_CHECK(m_processor->registerVotes( avanodes[0]->GetId(), {round, 0, {Vote(0, blockHashA)}}, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on block B after one vote. Response resp{round + 1, 0, {Vote(0, blockHashB), Vote(0, blockHashA)}}; BOOST_CHECK(m_processor->addBlockToReconcile(pindexB)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 2); // Ensure B comes before A because it has accumulated more PoW. BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); BOOST_CHECK_EQUAL(invs[1].type, MSG_BLOCK); BOOST_CHECK(invs[1].hash == blockHashA); // Let's vote for these blocks a few times. for (int i = 0; i < 4; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(m_processor->registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(m_processor->registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggered on A // and B. NodeId firstNodeid = getSuitableNodeToQuery(); runEventLoop(); NodeId secondNodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(firstNodeid != secondNodeid); // Next vote will finalize block A. BOOST_CHECK(m_processor->registerVotes(firstNodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexA); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // We do not vote on A anymore. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); // Next vote will finalize block B. BOOST_CHECK(m_processor->registerVotes(secondNodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexB); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // There is nothing left to vote on. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } BOOST_AUTO_TEST_CASE(poll_and_response) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // There is no node to query. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Create a node that supports avalanche and one that doesn't. ConnectNode(NODE_NONE); auto avanode = ConnectNode(NODE_AVALANCHE); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(addNode(avanodeid)); // It returns the avalanche peer. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // Register a block and check it is added to the list of elements to poll. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Trigger a poll on avanode. uint64_t round = getRound(); runEventLoop(); // There is no more suitable peer available, so return nothing. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Respond to the request. Response resp = {round, 0, {Vote(0, blockHash)}}; BOOST_CHECK(m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Now that avanode fullfilled his request, it is added back to the list of // queriable nodes. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // Sending a response when not polled fails. BOOST_CHECK(!m_processor->registerVotes(avanodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Trigger a poll on avanode. round = getRound(); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Sending responses that do not match the request also fails. // 1. Too many results. resp = {round, 0, {Vote(0, blockHash), Vote(0, blockHash)}}; runEventLoop(); BOOST_CHECK(!m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 2. Not enough results. resp = {getRound(), 0, {}}; runEventLoop(); BOOST_CHECK(!m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 3. Do not match the poll. resp = {getRound(), 0, {Vote()}}; runEventLoop(); BOOST_CHECK(!m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 4. Invalid round count. Request is not discarded. uint64_t queryRound = getRound(); runEventLoop(); resp = {queryRound + 1, 0, {Vote()}}; BOOST_CHECK(!m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {queryRound - 1, 0, {Vote()}}; BOOST_CHECK(!m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // 5. Making request for invalid nodes do not work. Request is not // discarded. resp = {queryRound, 0, {Vote(0, blockHash)}}; BOOST_CHECK(!m_processor->registerVotes(avanodeid + 1234, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Proper response gets processed and avanode is available again. resp = {queryRound, 0, {Vote(0, blockHash)}}; BOOST_CHECK(m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // Out of order response are rejected. CBlock block2 = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash2 = block2.GetHash(); CBlockIndex *pindex2; { LOCK(cs_main); pindex2 = LookupBlockIndex(blockHash2); } BOOST_CHECK(m_processor->addBlockToReconcile(pindex2)); resp = {getRound(), 0, {Vote(0, blockHash), Vote(0, blockHash2)}}; runEventLoop(); BOOST_CHECK(!m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // But they are accepted in order. resp = {getRound(), 0, {Vote(0, blockHash2), Vote(0, blockHash)}}; runEventLoop(); BOOST_CHECK(m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // When a block is marked invalid, stop polling. pindex2->nStatus = pindex2->nStatus.withFailed(); resp = {getRound(), 0, {Vote(0, blockHash)}}; runEventLoop(); BOOST_CHECK(m_processor->registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); } BOOST_AUTO_TEST_CASE(poll_inflight_timeout, *boost::unit_test::timeout(60)) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Add the block BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); // Create a node that supports avalanche. auto avanode = ConnectNode(NODE_AVALANCHE); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(addNode(avanodeid)); // Expire requests after some time. auto queryTimeDuration = std::chrono::milliseconds(10); m_processor->setQueryTimeoutDuration(queryTimeDuration); for (int i = 0; i < 10; i++) { Response resp = {getRound(), 0, {Vote(0, blockHash)}}; auto start = std::chrono::steady_clock::now(); runEventLoop(); // We cannot guarantee that we'll wait for just 1ms, so we have to bail // if we aren't within the proper time range. std::this_thread::sleep_for(std::chrono::milliseconds(1)); runEventLoop(); bool ret = m_processor->registerVotes(avanodeid, next(resp), updates); if (std::chrono::steady_clock::now() > start + queryTimeDuration) { // We waited for too long, bail. Because we can't know for sure when // previous steps ran, ret is not deterministic and we do not check // it. i--; continue; } // We are within time bounds, so the vote should have worked. BOOST_CHECK(ret); // Now try again but wait for expiration. runEventLoop(); std::this_thread::sleep_for(queryTimeDuration); runEventLoop(); BOOST_CHECK( !m_processor->registerVotes(avanodeid, next(resp), updates)); } } BOOST_AUTO_TEST_CASE(poll_inflight_count) { // Create enough nodes so that we run into the inflight request limit. avalanche::PeerManager &pm = getPeerManager(); auto proof = GetProof(); Delegation dg = DelegationBuilder(*proof).build(); std::array nodes; for (auto &n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(pm.addNode(n->GetId(), proof, dg)); } // Add a block to poll CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); // Ensure there are enough requests in flight. std::map node_round_map; for (int i = 0; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map[nodeid] = getRound(); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); runEventLoop(); } // Now that we have enough in flight requests, we shouldn't poll. auto suitablenodeid = getSuitableNodeToQuery(); BOOST_CHECK(suitablenodeid != NO_NODE); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), suitablenodeid); std::vector updates; // Send one response, now we can poll again. auto it = node_round_map.begin(); Response resp = {it->second, 0, {Vote(0, blockHash)}}; BOOST_CHECK(m_processor->registerVotes(it->first, resp, updates)); node_round_map.erase(it); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); } BOOST_AUTO_TEST_CASE(quorum_diversity) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); // Do one valid round of voting. uint64_t round = getRound(); Response resp{round, 0, {Vote(0, blockHash)}}; // Check that all nodes can vote. for (size_t i = 0; i < avanodes.size(); i++) { runEventLoop(); BOOST_CHECK(m_processor->registerVotes(avanodes[i]->GetId(), next(resp), updates)); } // Generate a query for every single node. const NodeId firstNodeId = getSuitableNodeToQuery(); std::map node_round_map; round = getRound(); for (size_t i = 0; i < avanodes.size(); i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map[nodeid] = getRound(); runEventLoop(); } // Now only tge first node can vote. All others would be duplicate in the // quorum. auto confidence = m_processor->getConfidence(pindex); BOOST_REQUIRE(confidence > 0); for (auto &pair : node_round_map) { NodeId nodeid = pair.first; uint64_t r = pair.second; if (nodeid == firstNodeId) { // Node 0 is the only one which can vote at this stage. round = r; continue; } BOOST_CHECK(m_processor->registerVotes( nodeid, {r, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence); } BOOST_CHECK(m_processor->registerVotes( firstNodeId, {round, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence + 1); } BOOST_AUTO_TEST_CASE(event_loop) { CScheduler s; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Starting the event loop. BOOST_CHECK(m_processor->startEventLoop(s)); // There is one task planned in the next hour (our event loop). std::chrono::system_clock::time_point start, stop; BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. BOOST_CHECK(!m_processor->startEventLoop(s)); // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); // Create a node that supports avalanche. auto avanode = ConnectNode(NODE_AVALANCHE); NodeId nodeid = avanode->GetId(); BOOST_CHECK(addNode(nodeid)); // There is no query in flight at the moment. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), nodeid); // Add a new block. Check it is added to the polls. uint64_t queryRound = getRound(); BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); for (int i = 0; i < 60 * 1000; i++) { // Technically, this is a race condition, but this should do just fine // as we wait up to 1 minute for an event that should take 10ms. UninterruptibleSleep(std::chrono::milliseconds(1)); if (getRound() != queryRound) { break; } } // Check that we effectively got a request and not timed out. BOOST_CHECK(getRound() > queryRound); // Respond and check the cooldown time is respected. uint64_t responseRound = getRound(); auto queryTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); std::vector updates; m_processor->registerVotes(nodeid, {queryRound, 100, {Vote(0, blockHash)}}, updates); for (int i = 0; i < 10000; i++) { // We make sure that we do not get a request before queryTime. UninterruptibleSleep(std::chrono::milliseconds(1)); if (getRound() != responseRound) { BOOST_CHECK(std::chrono::steady_clock::now() > queryTime); break; } } // But we eventually get one. BOOST_CHECK(getRound() > responseRound); // Stop event loop. BOOST_CHECK(m_processor->stopEventLoop()); // We don't have any task scheduled anymore. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Can't stop the event loop twice. BOOST_CHECK(!m_processor->stopEventLoop()); // Wait for the scheduler to stop. s.stop(true); schedulerThread.join(); } BOOST_AUTO_TEST_CASE(destructor) { CScheduler s; std::chrono::system_clock::time_point start, stop; std::thread schedulerThread; BOOST_CHECK(m_processor->startEventLoop(s)); BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Start the service thread after the queue size check to prevent a race // condition where the thread may be processing the event loop task during // the check. schedulerThread = std::thread(std::bind(&CScheduler::serviceQueue, &s)); // Destroy the processor. m_processor.reset(); // Now that avalanche is destroyed, there is no more scheduled tasks. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Wait for the scheduler to stop. s.stop(true); schedulerThread.join(); } +BOOST_AUTO_TEST_CASE(proof_accessors) { + constexpr int numProofs = 10; + + std::vector> proofs; + proofs.reserve(numProofs); + for (int i = 0; i < numProofs; i++) { + proofs.push_back(GetProof()); + } + + for (int i = 0; i < numProofs; i++) { + BOOST_CHECK(m_processor->addProof(proofs[i])); + // Fail to add an existing proof + BOOST_CHECK(!m_processor->addProof(proofs[i])); + + for (int added = 0; added <= i; added++) { + auto proof = m_processor->getProof(proofs[added]->getId()); + BOOST_CHECK(proof != nullptr); + BOOST_CHECK_EQUAL(proof->getId(), proofs[added]->getId()); + } + + for (int missing = i + 1; missing < numProofs; missing++) { + BOOST_CHECK(!m_processor->getProof(proofs[missing]->getId())); + } + } + + // No stake, copied from proof_tests.cpp + const std::string badProofHex( + "96527eae083f1f24625f049d9e54bb9a2102a93d98bf42ab90cfc0bf9e7c634ed76a7" + "3e95b02cacfd357b64e4fb6c92e92dd00"); + bilingual_str error; + Proof badProof; + BOOST_CHECK(Proof::FromHex(badProof, badProofHex, error)); + BOOST_CHECK( + !m_processor->addProof(std::make_shared(std::move(badProof)))); +} + BOOST_AUTO_TEST_SUITE_END()