diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 2a6e586ff..10bc56573 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,639 +1,660 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include // For DecodeSecret #include #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; namespace avalanche { static bool IsWorthPolling(const CBlockIndex *pindex) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (::ChainstateActive().IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } static bool VerifyProof(const Proof &proof, bilingual_str &error) { ProofValidationState proof_state; if (!proof.verify(proof_state)) { switch (proof_state.GetResult()) { case ProofValidationResult::NO_STAKE: error = _("The avalanche proof has no stake."); return false; case ProofValidationResult::DUST_THRESOLD: error = _("The avalanche proof stake is too low."); return false; case ProofValidationResult::DUPLICATE_STAKE: error = _("The avalanche proof has duplicated stake."); return false; case ProofValidationResult::INVALID_STAKE_SIGNATURE: error = _("The avalanche proof has invalid stake signatures."); return false; case ProofValidationResult::TOO_MANY_UTXOS: error = strprintf( _("The avalanche proof has too many utxos (max: %u)."), AVALANCHE_MAX_PROOF_STAKES); return false; default: error = _("The avalanche proof is invalid."); return false; } } return true; } static bool VerifyDelegation(const Delegation &dg, const CPubKey &expectedPubKey, bilingual_str &error) { DelegationState dg_state; CPubKey auth; if (!dg.verify(dg_state, auth)) { switch (dg_state.GetResult()) { case avalanche::DelegationResult::INVALID_SIGNATURE: error = _("The avalanche delegation has invalid signatures."); return false; default: error = _("The avalanche delegation is invalid."); return false; } } if (auth != expectedPubKey) { error = _( "The avalanche delegation does not match the expected public key."); return false; } return true; } struct Processor::PeerData { std::shared_ptr proof; Delegation delegation; }; class Processor::NotificationsHandler : public interfaces::Chain::Notifications { Processor *m_processor; public: NotificationsHandler(Processor *p) : m_processor(p) {} void updatedBlockTip() override { LOCK(m_processor->cs_peerManager); if (m_processor->peerData && m_processor->peerData->proof) { m_processor->peerManager->registerProof( m_processor->peerData->proof); } m_processor->peerManager->updatedBlockTip(); } }; Processor::Processor(interfaces::Chain &chain, CConnman *connmanIn, std::unique_ptr peerDataIn, CKey sessionKeyIn) : connman(connmanIn), queryTimeoutDuration(AVALANCHE_DEFAULT_QUERY_TIMEOUT), round(0), peerManager(std::make_unique()), peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)) { // Make sure we get notified of chain state changes. chainNotificationsHandler = chain.handleNotifications(std::make_shared(this)); } Processor::~Processor() { chainNotificationsHandler.reset(); stopEventLoop(); } std::unique_ptr Processor::MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, bilingual_str &error) { std::unique_ptr peerData; CKey masterKey; CKey sessionKey; if (argsman.IsArgSet("-avasessionkey")) { sessionKey = DecodeSecret(argsman.GetArg("-avasessionkey", "")); if (!sessionKey.IsValid()) { error = _("The avalanche session key is invalid."); return nullptr; } } else { // Pick a random key for the session. sessionKey.MakeNewKey(true); } if (argsman.IsArgSet("-avaproof")) { if (!argsman.IsArgSet("-avamasterkey")) { error = _( "The avalanche master key is missing for the avalanche proof."); return nullptr; } masterKey = DecodeSecret(argsman.GetArg("-avamasterkey", "")); if (!masterKey.IsValid()) { error = _("The avalanche master key is invalid."); return nullptr; } peerData = std::make_unique(); peerData->proof = std::make_shared(); if (!Proof::FromHex(*peerData->proof, argsman.GetArg("-avaproof", ""), error)) { // error is set by FromHex return nullptr; } if (!VerifyProof(*peerData->proof, error)) { // error is set by VerifyProof return nullptr; } std::unique_ptr dgb; const CPubKey &masterPubKey = masterKey.GetPubKey(); if (argsman.IsArgSet("-avadelegation")) { Delegation dg; if (!Delegation::FromHex(dg, argsman.GetArg("-avadelegation", ""), error)) { // error is set by FromHex() return nullptr; } if (dg.getProofId() != peerData->proof->getId()) { error = _("The delegation does not match the proof."); return nullptr; } if (masterPubKey != dg.getDelegatedPubkey()) { error = _( "The master key does not match the delegation public key."); return nullptr; } dgb = std::make_unique(dg); } else { if (masterPubKey != peerData->proof->getMaster()) { error = _("The master key does not match the proof public key."); return nullptr; } dgb = std::make_unique(*peerData->proof); } // Generate the delegation to the session key. const CPubKey sessionPubKey = sessionKey.GetPubKey(); if (sessionPubKey != masterPubKey) { if (!dgb->addLevel(masterKey, sessionPubKey)) { error = _("Failed to generate a delegation for this session."); return nullptr; } } peerData->delegation = dgb->build(); if (!VerifyDelegation(peerData->delegation, sessionPubKey, error)) { // error is set by VerifyDelegation return nullptr; } } // We can't use std::make_unique with a private constructor return std::unique_ptr(new Processor( chain, connman, std::move(peerData), std::move(sessionKey))); } bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = ::ChainActive().Contains(pindex); } return blockVoteRecords.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } +void Processor::addProofToReconcile(const std::shared_ptr &proof, + bool isAccepted) { + // TODO We don't want to accept an infinite number of conflicting proofs. + // They should be some rules to make them expensive and/or limited by + // design. + proofsVoteRecords.getWriteView()->insert( + std::make_pair(proof, VoteRecord(isAccepted))); +} + bool Processor::isAccepted(const CBlockIndex *pindex) const { auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } int Processor::getConfidence(const CBlockIndex *pindex) const { auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is * not secure. */ class TCPResponse { Response response; SchnorrSig sig; public: TCPResponse(Response responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! if (!key.SignSchnorr(hash, sig)) { sig.fill(0); } } // serialization support SERIALIZE_METHODS(TCPResponse, obj) { READWRITE(obj.response, obj.sig); } }; } // namespace void Processor::sendResponse(CNode *pfrom, Response response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVARESPONSE, TCPResponse(std::move(response), sessionKey))); } bool Processor::registerVotes(NodeId nodeid, const Response &response, std::vector &updates, int &banscore, std::string &error) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { banscore = 2; error = "unexpected-ava-response"; return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { banscore = 100; error = "invalid-ava-response-size"; return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { banscore = 100; error = "invalid-ava-response-content"; return false; } } std::map responseIndex; { LOCK(cs_main); for (const auto &v : votes) { auto pindex = LookupBlockIndex(BlockHash(v.GetHash())); if (!pindex) { // This should not happen, but just in case... continue; } if (!IsWorthPolling(pindex)) { // There is no point polling this block. continue; } responseIndex.insert(std::make_pair(pindex, v)); } } { // Register votes. auto w = blockVoteRecords.getWriteView(); for (const auto &p : responseIndex) { CBlockIndex *pindex = p.first; const Vote &v = p.second; auto it = w->find(pindex); if (it == w.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back( pindex, vr.isAccepted() ? BlockUpdate::Status::Accepted : BlockUpdate::Status::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(pindex, vr.isAccepted() ? BlockUpdate::Status::Finalized : BlockUpdate::Status::Invalid); w->erase(it); } } return true; } CPubKey Processor::getSessionPubKey() const { return sessionKey.GetPubKey(); } uint256 Processor::buildLocalSighash(CNode *pfrom) const { CHashWriter hasher(SER_GETHASH, 0); hasher << peerData->delegation.getId(); hasher << pfrom->GetLocalNonce(); hasher << pfrom->nRemoteHostNonce; hasher << pfrom->GetLocalExtraEntropy(); hasher << pfrom->nRemoteExtraEntropy; return hasher.GetHash(); } bool Processor::sendHello(CNode *pfrom) const { if (!peerData) { // We do not have a delegation to advertise. return false; } // Now let's sign! SchnorrSig sig; { const uint256 hash = buildLocalSighash(pfrom); if (!sessionKey.SignSchnorr(hash, sig)) { return false; } } connman->PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVAHELLO, Hello(peerData->delegation, sig))); pfrom->AddKnownProof(peerData->delegation.getProofId()); return true; } std::shared_ptr Processor::getLocalProof() const { return peerData ? peerData->proof : nullptr; } bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; + auto conflictingProofsReadView = proofsVoteRecords.getReadView(); + + auto pit = conflictingProofsReadView.begin(); + // Clamp to AVALANCHE_MAX_ELEMENT_POLL - 1 so we're always able to poll + // for a new block. Since the proofs are sorted by score, the most + // valuable are voted first. + while (pit != conflictingProofsReadView.end() && + invs.size() < AVALANCHE_MAX_ELEMENT_POLL - 1) { + invs.emplace_back(MSG_AVA_PROOF, pit->first->getId()); + ++pit; + } + // First remove all blocks that are not worth polling. { LOCK(cs_main); auto w = blockVoteRecords.getWriteView(); for (auto it = w->begin(); it != w->end();) { const CBlockIndex *pindex = it->first; if (!IsWorthPolling(pindex)) { w->erase(it++); } else { ++it; } } } auto r = blockVoteRecords.getReadView(); for (const std::pair &p : reverse_iterate(r)) { // Check if we can run poll. const bool shouldPoll = forPoll ? p.second.registerPoll() : p.second.shouldPoll(); if (!shouldPoll) { continue; } // We don't have a decision, we need more votes. invs.emplace_back(MSG_BLOCK, p.first->GetBlockHash()); if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return invs; } } return invs; } NodeId Processor::getSuitableNodeToQuery() { LOCK(cs_peerManager); return peerManager->selectNode(); } void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; assert(inv.type == MSG_BLOCK); CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(BlockHash(inv.hash)); if (!pindex) { continue; } } auto w = blockVoteRecords.getWriteView(); auto it = w->find(pindex); if (it == w.end()) { continue; } it->second.clearInflightRequest(p.second); } } void Processor::runEventLoop() { // Don't do Avalanche while node is IBD'ing if (::ChainstateActive().IsInitialBlockDownload()) { return; } // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. LOCK(cs_peerManager); peerManager->updateNextRequestTime(pnode->GetId(), timeout); } pnode->m_avalanche_state->invsPolled(invs.size()); // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetCommonVersion()) .Make(NetMsgType::AVAPOLL, Poll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } { // This node is obsolete, delete it. LOCK(cs_peerManager); peerManager->removeNode(nodeid); } // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); } while (nodeid != NO_NODE); } } // namespace avalanche diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h index 6001aa249..88d12b6f3 100644 --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -1,223 +1,233 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PROCESSOR_H #define BITCOIN_AVALANCHE_PROCESSOR_H #include +#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class ArgsManager; class CBlockIndex; class CConnman; class CNode; class CScheduler; class Config; class PeerManager; struct bilingual_str; /** * Maximum item that can be polled at once. */ static constexpr size_t AVALANCHE_MAX_ELEMENT_POLL = 16; /** * How long before we consider that a query timed out. */ static constexpr std::chrono::milliseconds AVALANCHE_DEFAULT_QUERY_TIMEOUT{ 10000}; namespace avalanche { class Delegation; class PeerManager; class Proof; struct VoteRecord; class BlockUpdate { union { CBlockIndex *pindex; uintptr_t raw; }; static const size_t STATUS_BITS = 2; static const uintptr_t MASK = (1 << STATUS_BITS) - 1; static_assert( alignof(CBlockIndex) >= (1 << STATUS_BITS), "CBlockIndex alignement doesn't allow for Status to be stored."); public: enum Status : uint8_t { Invalid, Rejected, Accepted, Finalized, }; BlockUpdate(CBlockIndex *pindexIn, Status statusIn) : pindex(pindexIn) { raw |= statusIn; } Status getStatus() const { return Status(raw & MASK); } CBlockIndex *getBlockIndex() { return reinterpret_cast(raw & ~MASK); } const CBlockIndex *getBlockIndex() const { return const_cast(this)->getBlockIndex(); } }; using BlockVoteMap = std::map; +using ProofVoteMap = std::map, VoteRecord, + ProofSharedPointerComparator>; struct query_timeout {}; namespace { struct AvalancheTest; } class Processor { CConnman *connman; std::chrono::milliseconds queryTimeoutDuration; /** * Blocks to run avalanche on. */ RWCollection blockVoteRecords; + /** + * Proofs to run avalanche on. + */ + RWCollection proofsVoteRecords; + /** * Keep track of peers and queries sent. */ std::atomic round; /** * Keep track of the peers and associated infos. */ mutable Mutex cs_peerManager; std::unique_ptr peerManager GUARDED_BY(cs_peerManager); struct Query { NodeId nodeid; uint64_t round; TimePoint timeout; /** * We declare this as mutable so it can be modified in the multi_index. * This is ok because we do not use this field to index in anyway. * * /!\ Do not use any mutable field as index. */ mutable std::vector invs; }; using QuerySet = boost::multi_index_container< Query, boost::multi_index::indexed_by< // index by nodeid/round boost::multi_index::hashed_unique, boost::multi_index::member>>, // sorted by timeout boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>>; RWCollection queries; /** Data required to participate. */ struct PeerData; std::unique_ptr peerData; CKey sessionKey; /** Event loop machinery. */ EventLoop eventLoop; /** Registered interfaces::Chain::Notifications handler. */ class NotificationsHandler; std::unique_ptr chainNotificationsHandler; Processor(interfaces::Chain &chain, CConnman *connmanIn, std::unique_ptr peerDataIn, CKey sessionKeyIn); public: ~Processor(); static std::unique_ptr MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, bilingual_str &error); void setQueryTimeoutDuration(std::chrono::milliseconds d) { queryTimeoutDuration = d; } bool addBlockToReconcile(const CBlockIndex *pindex); + void addProofToReconcile(const std::shared_ptr &proof, + bool isAccepted); bool isAccepted(const CBlockIndex *pindex) const; int getConfidence(const CBlockIndex *pindex) const; // TODO: Refactor the API to remove the dependency on avalanche/protocol.h void sendResponse(CNode *pfrom, Response response) const; bool registerVotes(NodeId nodeid, const Response &response, std::vector &updates, int &banscore, std::string &error); template auto withPeerManager(Callable &&func) const { LOCK(cs_peerManager); return func(*peerManager); } CPubKey getSessionPubKey() const; bool sendHello(CNode *pfrom) const; std::shared_ptr getLocalProof() const; /* * Return whether the avalanche service flag should be set. */ bool isAvalancheServiceAvailable() { return !!peerData; } bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); private: void runEventLoop(); void clearTimedoutRequests(); std::vector getInvsForNextPoll(bool forPoll = true); NodeId getSuitableNodeToQuery(); /** * Build and return the challenge whose signature is included in the * AVAHELLO message that we send to a peer. */ uint256 buildLocalSighash(CNode *pfrom) const; friend struct ::avalanche::AvalancheTest; }; } // namespace avalanche #endif // BITCOIN_AVALANCHE_PROCESSOR_H diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp index d5ee33350..b7efc81af 100644 --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -1,985 +1,1035 @@ // Copyright (c) 2018-2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include // For ::PeerManager #include #include #include // For bilingual_str // D6970 moved LookupBlockIndex from chain.h to validation.h TODO: remove this // when LookupBlockIndex is refactored out of validation #include +#include #include #include using namespace avalanche; namespace avalanche { namespace { struct AvalancheTest { static void runEventLoop(avalanche::Processor &p) { p.runEventLoop(); } static std::vector getInvsForNextPoll(Processor &p) { return p.getInvsForNextPoll(false); } static NodeId getSuitableNodeToQuery(Processor &p) { return p.getSuitableNodeToQuery(); } static uint64_t getRound(const Processor &p) { return p.round; } }; } // namespace } // namespace avalanche namespace { struct CConnmanTest : public CConnman { using CConnman::CConnman; void AddNode(CNode &node) { LOCK(cs_vNodes); vNodes.push_back(&node); } void ClearNodes() { LOCK(cs_vNodes); for (CNode *node : vNodes) { delete node; } vNodes.clear(); } }; CService ip(uint32_t i) { struct in_addr s; s.s_addr = i; return CService(CNetAddr(s), Params().GetDefaultPort()); } struct AvalancheTestingSetup : public TestChain100Setup { const Config &config; CConnmanTest *m_connman; std::unique_ptr m_processor; // The master private key we delegate to. CKey masterpriv; AvalancheTestingSetup() : TestChain100Setup(), config(GetConfig()), masterpriv(CKey::MakeCompressedKey()) { // Deterministic randomness for tests. auto connman = std::make_unique(config, 0x1337, 0x1337); m_connman = connman.get(); m_node.connman = std::move(connman); m_node.peerman = std::make_unique<::PeerManager>( config.GetChainParams(), *m_connman, m_node.banman.get(), *m_node.scheduler, *m_node.chainman, *m_node.mempool); m_node.chain = interfaces::MakeChain(m_node, config.GetChainParams()); // Get the processor ready. bilingual_str error; m_processor = Processor::MakeProcessor(*m_node.args, *m_node.chain, m_node.connman.get(), error); BOOST_CHECK(m_processor); } ~AvalancheTestingSetup() { m_connman->ClearNodes(); SyncWithValidationInterfaceQueue(); } CNode *ConnectNode(ServiceFlags nServices) { static NodeId id = 0; CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); auto node = new CNode(id++, ServiceFlags(NODE_NETWORK), 0, INVALID_SOCKET, addr, 0, 0, 0, CAddress(), "", ConnectionType::OUTBOUND_FULL_RELAY); node->SetCommonVersion(PROTOCOL_VERSION); node->nServices = nServices; m_node.peerman->InitializeNode(config, node); node->nVersion = 1; node->fSuccessfullyConnected = true; node->m_avalanche_state = std::make_unique(); m_connman->AddNode(*node); return node; } size_t next_coinbase = 0; std::shared_ptr GetProof() { size_t current_coinbase = next_coinbase++; const CTransaction &coinbase = *m_coinbase_txns[current_coinbase]; ProofBuilder pb(0, 0, masterpriv); BOOST_CHECK(pb.addUTXO(COutPoint(coinbase.GetId(), 0), coinbase.vout[0].nValue, current_coinbase + 1, true, coinbaseKey)); return std::make_shared(pb.build()); } bool addNode(NodeId nodeid, const ProofId &proofid) { return m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.addNode(nodeid, proofid); }); } bool addNode(NodeId nodeid) { auto proof = GetProof(); return m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.registerProof(proof) && pm.addNode(nodeid, proof->getId()); }); } std::array ConnectNodes() { auto proof = GetProof(); BOOST_CHECK( m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.registerProof(proof); })); const ProofId &proofid = proof->getId(); std::array nodes; for (CNode *&n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(addNode(n->GetId(), proofid)); } return nodes; } void runEventLoop() { AvalancheTest::runEventLoop(*m_processor); } NodeId getSuitableNodeToQuery() { return AvalancheTest::getSuitableNodeToQuery(*m_processor); } std::vector getInvsForNextPoll() { return AvalancheTest::getInvsForNextPoll(*m_processor); } uint64_t getRound() const { return AvalancheTest::getRound(*m_processor); } bool registerVotes(NodeId nodeid, const avalanche::Response &response, std::vector &updates) { int banscore; std::string error; return m_processor->registerVotes(nodeid, response, updates, banscore, error); } }; } // namespace BOOST_FIXTURE_TEST_SUITE(processor_tests, AvalancheTestingSetup) #define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \ vr.registerVote(NO_NODE, vote); \ BOOST_CHECK_EQUAL(vr.isAccepted(), state); \ BOOST_CHECK_EQUAL(vr.hasFinalized(), finalized); \ BOOST_CHECK_EQUAL(vr.getConfidence(), confidence); BOOST_AUTO_TEST_CASE(vote_record) { VoteRecord vraccepted(true); // Check initial state. BOOST_CHECK_EQUAL(vraccepted.isAccepted(), true); BOOST_CHECK_EQUAL(vraccepted.hasFinalized(), false); BOOST_CHECK_EQUAL(vraccepted.getConfidence(), 0); VoteRecord vr(false); // Check initial state. BOOST_CHECK_EQUAL(vr.isAccepted(), false); BOOST_CHECK_EQUAL(vr.hasFinalized(), false); BOOST_CHECK_EQUAL(vr.getConfidence(), 0); // We need to register 6 positive votes before we start counting. for (int i = 0; i < 6; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, false, false, 0); } // Next vote will flip state, and confidence will increase as long as we // vote yes. REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, true, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, 7); } // Now confidence will increase as long as we vote yes. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 0, true, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); // Now that we have two no votes, confidence stop increasing. for (int i = 0; i < 5; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, true, true, AVALANCHE_FINALIZATION_SCORE); } // Next vote will flip state, and confidence will increase as long as we // vote no. REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 0); // A single neutral vote do not change anything. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 1); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // Two neutral votes will stall progress. REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); REGISTER_VOTE_AND_CHECK(vr, -1, false, false, 7); for (int i = 2; i < 8; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, 7); } // Now confidence will increase as long as we vote no. for (int i = 8; i < AVALANCHE_FINALIZATION_SCORE; i++) { REGISTER_VOTE_AND_CHECK(vr, 1, false, false, i); } // The next vote will finalize the decision. REGISTER_VOTE_AND_CHECK(vr, 0, false, true, AVALANCHE_FINALIZATION_SCORE); // Check that inflight accounting work as expected. VoteRecord vrinflight(false); for (int i = 0; i < 2 * AVALANCHE_MAX_INFLIGHT_POLL; i++) { bool shouldPoll = vrinflight.shouldPoll(); BOOST_CHECK_EQUAL(shouldPoll, i < AVALANCHE_MAX_INFLIGHT_POLL); BOOST_CHECK_EQUAL(vrinflight.registerPoll(), shouldPoll); } // Clear various number of inflight requests and check everything behaves as // expected. for (int i = 1; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { vrinflight.clearInflightRequest(i); BOOST_CHECK(vrinflight.shouldPoll()); for (int j = 1; j < i; j++) { BOOST_CHECK(vrinflight.registerPoll()); BOOST_CHECK(vrinflight.shouldPoll()); } BOOST_CHECK(vrinflight.registerPoll()); BOOST_CHECK(!vrinflight.shouldPoll()); } } BOOST_AUTO_TEST_CASE(block_update) { CBlockIndex index; CBlockIndex *pindex = &index; std::set status{ BlockUpdate::Status::Invalid, BlockUpdate::Status::Rejected, BlockUpdate::Status::Accepted, BlockUpdate::Status::Finalized, }; for (auto s : status) { BlockUpdate abu(pindex, s); BOOST_CHECK(abu.getBlockIndex() == pindex); BOOST_CHECK_EQUAL(abu.getStatus(), s); } } namespace { Response next(Response &r) { auto copy = r; r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()}; return copy; } } // namespace BOOST_AUTO_TEST_CASE(block_register) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Newly added blocks' state reflect the blockchain. BOOST_CHECK(m_processor->isAccepted(pindex)); int nextNodeIndex = 0; auto registerNewVote = [&](const Response &resp) { runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(registerVotes(nodeid, resp, updates)); }; // Let's vote for this block a few times. Response resp{0, 0, {Vote(0, blockHash)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // A single neutral vote do not change anything. resp = {getRound(), 0, {Vote(-1, blockHash)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 0); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, blockHash)}}; for (int i = 1; i < 7; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // Two neutral votes will stall progress. resp = {getRound(), 0, {Vote(-1, blockHash)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, blockHash)}}; for (int i = 2; i < 8; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), 6); BOOST_CHECK_EQUAL(updates.size(), 0); } // We vote for it numerous times to finalize it. for (int i = 7; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); // Now let's undo this and finalize rejection. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); resp = {getRound(), 0, {Vote(1, blockHash)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Rejected); updates = {}; // Now it is rejected, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Invalid); updates = {}; // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); // Adding the block twice does nothing. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); BOOST_CHECK(!m_processor->addBlockToReconcile(pindex)); BOOST_CHECK(m_processor->isAccepted(pindex)); } BOOST_AUTO_TEST_CASE(multi_block_register) { CBlockIndex indexA, indexB; std::vector updates; // Create several nodes that support avalanche. auto avanodes = ConnectNodes(); // Make sure the block has a hash. CBlock blockA = CreateAndProcessBlock({}, CScript()); const BlockHash blockHashA = blockA.GetHash(); CBlock blockB = CreateAndProcessBlock({}, CScript()); const BlockHash blockHashB = blockB.GetHash(); const CBlockIndex *pindexA; const CBlockIndex *pindexB; { LOCK(cs_main); pindexA = LookupBlockIndex(blockHashA); pindexB = LookupBlockIndex(blockHashB); } // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindexA)); BOOST_CHECK(!m_processor->isAccepted(pindexB)); // Start voting on block A. BOOST_CHECK(m_processor->addBlockToReconcile(pindexA)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashA); uint64_t round = getRound(); runEventLoop(); BOOST_CHECK(registerVotes(avanodes[0]->GetId(), {round, 0, {Vote(0, blockHashA)}}, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on block B after one vote. Response resp{round + 1, 0, {Vote(0, blockHashB), Vote(0, blockHashA)}}; BOOST_CHECK(m_processor->addBlockToReconcile(pindexB)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 2); // Ensure B comes before A because it has accumulated more PoW. BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); BOOST_CHECK_EQUAL(invs[1].type, MSG_BLOCK); BOOST_CHECK(invs[1].hash == blockHashA); // Let's vote for these blocks a few times. for (int i = 0; i < 4; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggered on A // and B. NodeId firstNodeid = getSuitableNodeToQuery(); runEventLoop(); NodeId secondNodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(firstNodeid != secondNodeid); // Next vote will finalize block A. BOOST_CHECK(registerVotes(firstNodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexA); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // We do not vote on A anymore. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashB); // Next vote will finalize block B. BOOST_CHECK(registerVotes(secondNodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexB); BOOST_CHECK_EQUAL(updates[0].getStatus(), BlockUpdate::Status::Finalized); updates = {}; // There is nothing left to vote on. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } BOOST_AUTO_TEST_CASE(poll_and_response) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // There is no node to query. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Create a node that supports avalanche and one that doesn't. ConnectNode(NODE_NONE); auto avanode = ConnectNode(NODE_AVALANCHE); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(addNode(avanodeid)); // It returns the avalanche peer. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // Register a block and check it is added to the list of elements to poll. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); // Trigger a poll on avanode. uint64_t round = getRound(); runEventLoop(); // There is no more suitable peer available, so return nothing. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Respond to the request. Response resp = {round, 0, {Vote(0, blockHash)}}; BOOST_CHECK(registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Now that avanode fullfilled his request, it is added back to the list of // queriable nodes. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); auto checkRegisterVotesError = [&](NodeId nodeid, const avalanche::Response &response, const std::string &expectedError) { int banscore; std::string error; BOOST_CHECK(!m_processor->registerVotes(nodeid, response, updates, banscore, error)); BOOST_CHECK_EQUAL(error, expectedError); BOOST_CHECK_EQUAL(updates.size(), 0); }; // Sending a response when not polled fails. checkRegisterVotesError(avanodeid, next(resp), "unexpected-ava-response"); // Trigger a poll on avanode. round = getRound(); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Sending responses that do not match the request also fails. // 1. Too many results. resp = {round, 0, {Vote(0, blockHash), Vote(0, blockHash)}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-size"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 2. Not enough results. resp = {getRound(), 0, {}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-size"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 3. Do not match the poll. resp = {getRound(), 0, {Vote()}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-content"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 4. Invalid round count. Request is not discarded. uint64_t queryRound = getRound(); runEventLoop(); resp = {queryRound + 1, 0, {Vote()}}; checkRegisterVotesError(avanodeid, resp, "unexpected-ava-response"); resp = {queryRound - 1, 0, {Vote()}}; checkRegisterVotesError(avanodeid, resp, "unexpected-ava-response"); // 5. Making request for invalid nodes do not work. Request is not // discarded. resp = {queryRound, 0, {Vote(0, blockHash)}}; checkRegisterVotesError(avanodeid + 1234, resp, "unexpected-ava-response"); // Proper response gets processed and avanode is available again. resp = {queryRound, 0, {Vote(0, blockHash)}}; BOOST_CHECK(registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // Out of order response are rejected. CBlock block2 = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash2 = block2.GetHash(); CBlockIndex *pindex2; { LOCK(cs_main); pindex2 = LookupBlockIndex(blockHash2); } BOOST_CHECK(m_processor->addBlockToReconcile(pindex2)); resp = {getRound(), 0, {Vote(0, blockHash), Vote(0, blockHash2)}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-content"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // But they are accepted in order. resp = {getRound(), 0, {Vote(0, blockHash2), Vote(0, blockHash)}}; runEventLoop(); BOOST_CHECK(registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // When a block is marked invalid, stop polling. pindex2->nStatus = pindex2->nStatus.withFailed(); resp = {getRound(), 0, {Vote(0, blockHash)}}; runEventLoop(); BOOST_CHECK(registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); } BOOST_AUTO_TEST_CASE(poll_inflight_timeout, *boost::unit_test::timeout(60)) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Add the block BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); // Create a node that supports avalanche. auto avanode = ConnectNode(NODE_AVALANCHE); NodeId avanodeid = avanode->GetId(); BOOST_CHECK(addNode(avanodeid)); // Expire requests after some time. auto queryTimeDuration = std::chrono::milliseconds(10); m_processor->setQueryTimeoutDuration(queryTimeDuration); for (int i = 0; i < 10; i++) { Response resp = {getRound(), 0, {Vote(0, blockHash)}}; auto start = std::chrono::steady_clock::now(); runEventLoop(); // We cannot guarantee that we'll wait for just 1ms, so we have to bail // if we aren't within the proper time range. std::this_thread::sleep_for(std::chrono::milliseconds(1)); runEventLoop(); bool ret = registerVotes(avanodeid, next(resp), updates); if (std::chrono::steady_clock::now() > start + queryTimeDuration) { // We waited for too long, bail. Because we can't know for sure when // previous steps ran, ret is not deterministic and we do not check // it. i--; continue; } // We are within time bounds, so the vote should have worked. BOOST_CHECK(ret); // Now try again but wait for expiration. runEventLoop(); std::this_thread::sleep_for(queryTimeDuration); runEventLoop(); BOOST_CHECK(!registerVotes(avanodeid, next(resp), updates)); } } BOOST_AUTO_TEST_CASE(poll_inflight_count) { // Create enough nodes so that we run into the inflight request limit. auto proof = GetProof(); BOOST_CHECK(m_processor->withPeerManager( [&](avalanche::PeerManager &pm) { return pm.registerProof(proof); })); std::array nodes; for (auto &n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(addNode(n->GetId(), proof->getId())); } // Add a block to poll CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); // Ensure there are enough requests in flight. std::map node_round_map; for (int i = 0; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map.insert(std::pair(nodeid, getRound())); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); runEventLoop(); } // Now that we have enough in flight requests, we shouldn't poll. auto suitablenodeid = getSuitableNodeToQuery(); BOOST_CHECK(suitablenodeid != NO_NODE); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), suitablenodeid); std::vector updates; // Send one response, now we can poll again. auto it = node_round_map.begin(); Response resp = {it->second, 0, {Vote(0, blockHash)}}; BOOST_CHECK(registerVotes(it->first, resp, updates)); node_round_map.erase(it); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHash); } BOOST_AUTO_TEST_CASE(quorum_diversity) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); // Do one valid round of voting. uint64_t round = getRound(); Response resp{round, 0, {Vote(0, blockHash)}}; // Check that all nodes can vote. for (size_t i = 0; i < avanodes.size(); i++) { runEventLoop(); BOOST_CHECK(registerVotes(avanodes[i]->GetId(), next(resp), updates)); } // Generate a query for every single node. const NodeId firstNodeId = getSuitableNodeToQuery(); std::map node_round_map; round = getRound(); for (size_t i = 0; i < avanodes.size(); i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map[nodeid] = getRound(); runEventLoop(); } // Now only the first node can vote. All others would be duplicate in the // quorum. auto confidence = m_processor->getConfidence(pindex); BOOST_REQUIRE(confidence > 0); for (auto &[nodeid, r] : node_round_map) { if (nodeid == firstNodeId) { // Node 0 is the only one which can vote at this stage. round = r; continue; } BOOST_CHECK( registerVotes(nodeid, {r, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence); } BOOST_CHECK( registerVotes(firstNodeId, {round, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence + 1); } BOOST_AUTO_TEST_CASE(event_loop) { CScheduler s; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = LookupBlockIndex(blockHash); } // Starting the event loop. BOOST_CHECK(m_processor->startEventLoop(s)); // There is one task planned in the next hour (our event loop). std::chrono::system_clock::time_point start, stop; BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. BOOST_CHECK(!m_processor->startEventLoop(s)); // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); // Create a node that supports avalanche. auto avanode = ConnectNode(NODE_AVALANCHE); NodeId nodeid = avanode->GetId(); BOOST_CHECK(addNode(nodeid)); // There is no query in flight at the moment. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), nodeid); // Add a new block. Check it is added to the polls. uint64_t queryRound = getRound(); BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); for (int i = 0; i < 60 * 1000; i++) { // Technically, this is a race condition, but this should do just fine // as we wait up to 1 minute for an event that should take 10ms. UninterruptibleSleep(std::chrono::milliseconds(1)); if (getRound() != queryRound) { break; } } // Check that we effectively got a request and not timed out. BOOST_CHECK(getRound() > queryRound); // Respond and check the cooldown time is respected. uint64_t responseRound = getRound(); auto queryTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); std::vector updates; registerVotes(nodeid, {queryRound, 100, {Vote(0, blockHash)}}, updates); for (int i = 0; i < 10000; i++) { // We make sure that we do not get a request before queryTime. UninterruptibleSleep(std::chrono::milliseconds(1)); if (getRound() != responseRound) { BOOST_CHECK(std::chrono::steady_clock::now() > queryTime); break; } } // But we eventually get one. BOOST_CHECK(getRound() > responseRound); // Stop event loop. BOOST_CHECK(m_processor->stopEventLoop()); // We don't have any task scheduled anymore. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Can't stop the event loop twice. BOOST_CHECK(!m_processor->stopEventLoop()); // Wait for the scheduler to stop. s.StopWhenDrained(); schedulerThread.join(); } BOOST_AUTO_TEST_CASE(destructor) { CScheduler s; std::chrono::system_clock::time_point start, stop; std::thread schedulerThread; BOOST_CHECK(m_processor->startEventLoop(s)); BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Start the service thread after the queue size check to prevent a race // condition where the thread may be processing the event loop task during // the check. schedulerThread = std::thread(std::bind(&CScheduler::serviceQueue, &s)); // Destroy the processor. m_processor.reset(); // Now that avalanche is destroyed, there is no more scheduled tasks. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Wait for the scheduler to stop. s.StopWhenDrained(); schedulerThread.join(); } +BOOST_AUTO_TEST_CASE(add_proof_to_reconcile) { + uint32_t score = MIN_VALID_PROOF_SCORE; + + auto addProofToReconcile = [&](uint32_t proofScore) { + auto proof = std::make_shared(buildRandomProof(proofScore)); + m_processor->addProofToReconcile(proof, GetRandInt(1)); + return proof; + }; + + for (size_t i = 0; i < AVALANCHE_MAX_ELEMENT_POLL - 1; i++) { + auto proof = addProofToReconcile(++score); + + auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); + BOOST_CHECK_EQUAL(invs.size(), i + 1); + BOOST_CHECK(invs.front().IsMsgProof()); + BOOST_CHECK_EQUAL(invs.front().hash, proof->getId()); + } + + // From here a new proof is only polled if its score is in the top + // AVALANCHE_MAX_ELEMENT_POLL - 1 + ProofId lastProofId; + for (size_t i = 0; i < 10; i++) { + auto proof = addProofToReconcile(++score); + + auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); + BOOST_CHECK_EQUAL(invs.size(), AVALANCHE_MAX_ELEMENT_POLL - 1); + BOOST_CHECK(invs.front().IsMsgProof()); + BOOST_CHECK_EQUAL(invs.front().hash, proof->getId()); + + lastProofId = proof->getId(); + } + + for (size_t i = 0; i < 10; i++) { + auto proof = addProofToReconcile(--score); + + auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); + BOOST_CHECK_EQUAL(invs.size(), AVALANCHE_MAX_ELEMENT_POLL - 1); + BOOST_CHECK(invs.front().IsMsgProof()); + BOOST_CHECK_EQUAL(invs.front().hash, lastProofId); + } + + // The score is not high enough to get polled + auto proof = addProofToReconcile(--score); + auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); + for (auto &inv : invs) { + BOOST_CHECK_NE(inv.hash, proof->getId()); + } +} + BOOST_AUTO_TEST_SUITE_END()