diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 8b21fda42..4ae8406f5 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,955 +1,974 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include // For DecodeSecret #include #include #include #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; namespace avalanche { static bool VerifyProof(const Proof &proof, bilingual_str &error) { ProofValidationState proof_state; if (!proof.verify(proof_state)) { switch (proof_state.GetResult()) { case ProofValidationResult::NO_STAKE: error = _("The avalanche proof has no stake."); return false; case ProofValidationResult::DUST_THRESHOLD: error = _("The avalanche proof stake is too low."); return false; case ProofValidationResult::DUPLICATE_STAKE: error = _("The avalanche proof has duplicated stake."); return false; case ProofValidationResult::INVALID_STAKE_SIGNATURE: error = _("The avalanche proof has invalid stake signatures."); return false; case ProofValidationResult::TOO_MANY_UTXOS: error = strprintf( _("The avalanche proof has too many utxos (max: %u)."), AVALANCHE_MAX_PROOF_STAKES); return false; default: error = _("The avalanche proof is invalid."); return false; } } return true; } static bool VerifyDelegation(const Delegation &dg, const CPubKey &expectedPubKey, bilingual_str &error) { DelegationState dg_state; CPubKey auth; if (!dg.verify(dg_state, auth)) { switch (dg_state.GetResult()) { case avalanche::DelegationResult::INVALID_SIGNATURE: error = _("The avalanche delegation has invalid signatures."); return false; case avalanche::DelegationResult::TOO_MANY_LEVELS: error = _( "The avalanche delegation has too many delegation levels."); return false; default: error = _("The avalanche delegation is invalid."); return false; } } if (auth != expectedPubKey) { error = _( "The avalanche delegation does not match the expected public key."); return false; } return true; } struct Processor::PeerData { ProofRef proof; Delegation delegation; }; class Processor::NotificationsHandler : public interfaces::Chain::Notifications { Processor *m_processor; public: NotificationsHandler(Processor *p) : m_processor(p) {} void updatedBlockTip() override { auto registerProofs = [&]() { LOCK(m_processor->cs_peerManager); if (m_processor->peerData && m_processor->peerData->proof) { m_processor->peerManager->registerProof( m_processor->peerData->proof); } return m_processor->peerManager->updatedBlockTip(); }; auto registeredProofs = registerProofs(); for (const auto &proof : registeredProofs) { m_processor->addProofToReconcile(proof); } } }; Processor::Processor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connmanIn, ChainstateManager &chainmanIn, CScheduler &scheduler, std::unique_ptr peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, int64_t minAvaproofsNodeCountIn, uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn) : connman(connmanIn), chainman(chainmanIn), queryTimeoutDuration(argsman.GetArg( "-avatimeout", AVALANCHE_DEFAULT_QUERY_TIMEOUT.count())), round(0), peerManager(std::make_unique(chainman)), peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)), minQuorumScore(minQuorumTotalScoreIn), minQuorumConnectedScoreRatio(minQuorumConnectedScoreRatioIn), minAvaproofsNodeCount(minAvaproofsNodeCountIn), staleVoteThreshold(staleVoteThresholdIn), staleVoteFactor(staleVoteFactorIn) { // Make sure we get notified of chain state changes. chainNotificationsHandler = chain.handleNotifications(std::make_shared(this)); scheduler.scheduleEvery( [this]() -> bool { WITH_LOCK(cs_peerManager, peerManager->cleanupDanglingProofs( peerData ? peerData->proof : ProofRef())); return true; }, 5min); } Processor::~Processor() { chainNotificationsHandler.reset(); stopEventLoop(); } std::unique_ptr Processor::MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, ChainstateManager &chainman, CScheduler &scheduler, bilingual_str &error) { std::unique_ptr peerData; CKey masterKey; CKey sessionKey; if (argsman.IsArgSet("-avasessionkey")) { sessionKey = DecodeSecret(argsman.GetArg("-avasessionkey", "")); if (!sessionKey.IsValid()) { error = _("The avalanche session key is invalid."); return nullptr; } } else { // Pick a random key for the session. sessionKey.MakeNewKey(true); } if (argsman.IsArgSet("-avaproof")) { if (!argsman.IsArgSet("-avamasterkey")) { error = _( "The avalanche master key is missing for the avalanche proof."); return nullptr; } masterKey = DecodeSecret(argsman.GetArg("-avamasterkey", "")); if (!masterKey.IsValid()) { error = _("The avalanche master key is invalid."); return nullptr; } auto proof = RCUPtr::make(); if (!Proof::FromHex(*proof, argsman.GetArg("-avaproof", ""), error)) { // error is set by FromHex return nullptr; } peerData = std::make_unique(); peerData->proof = std::move(proof); if (!VerifyProof(*peerData->proof, error)) { // error is set by VerifyProof return nullptr; } std::unique_ptr dgb; const CPubKey &masterPubKey = masterKey.GetPubKey(); if (argsman.IsArgSet("-avadelegation")) { Delegation dg; if (!Delegation::FromHex(dg, argsman.GetArg("-avadelegation", ""), error)) { // error is set by FromHex() return nullptr; } if (dg.getProofId() != peerData->proof->getId()) { error = _("The delegation does not match the proof."); return nullptr; } if (masterPubKey != dg.getDelegatedPubkey()) { error = _( "The master key does not match the delegation public key."); return nullptr; } dgb = std::make_unique(dg); } else { if (masterPubKey != peerData->proof->getMaster()) { error = _("The master key does not match the proof public key."); return nullptr; } dgb = std::make_unique(*peerData->proof); } // Generate the delegation to the session key. const CPubKey sessionPubKey = sessionKey.GetPubKey(); if (sessionPubKey != masterPubKey) { if (!dgb->addLevel(masterKey, sessionPubKey)) { error = _("Failed to generate a delegation for this session."); return nullptr; } } peerData->delegation = dgb->build(); if (!VerifyDelegation(peerData->delegation, sessionPubKey, error)) { // error is set by VerifyDelegation return nullptr; } } // Determine quorum parameters Amount minQuorumStake = AVALANCHE_DEFAULT_MIN_QUORUM_STAKE; if (argsman.IsArgSet("-avaminquorumstake") && !ParseMoney(argsman.GetArg("-avaminquorumstake", ""), minQuorumStake)) { error = _("The avalanche min quorum stake amount is invalid."); return nullptr; } if (!MoneyRange(minQuorumStake)) { error = _("The avalanche min quorum stake amount is out of range."); return nullptr; } double minQuorumConnectedStakeRatio = AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO; if (argsman.IsArgSet("-avaminquorumconnectedstakeratio") && !ParseDouble(argsman.GetArg("-avaminquorumconnectedstakeratio", ""), &minQuorumConnectedStakeRatio)) { error = _("The avalanche min quorum connected stake ratio is invalid."); return nullptr; } if (minQuorumConnectedStakeRatio < 0 || minQuorumConnectedStakeRatio > 1) { error = _( "The avalanche min quorum connected stake ratio is out of range."); return nullptr; } int64_t minAvaproofsNodeCount = argsman.GetArg("-avaminavaproofsnodecount", AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT); if (minAvaproofsNodeCount < 0) { error = _("The minimum number of node that sent avaproofs message " "should be non-negative"); return nullptr; } // Determine voting parameters int64_t staleVoteThreshold = argsman.GetArg("-avastalevotethreshold", AVALANCHE_VOTE_STALE_THRESHOLD); if (staleVoteThreshold < AVALANCHE_VOTE_STALE_MIN_THRESHOLD) { error = strprintf(_("The avalanche stale vote threshold must be " "greater than or equal to %d"), AVALANCHE_VOTE_STALE_MIN_THRESHOLD); return nullptr; } if (staleVoteThreshold > std::numeric_limits::max()) { error = strprintf(_("The avalanche stale vote threshold must be less " "than or equal to %d"), std::numeric_limits::max()); return nullptr; } int64_t staleVoteFactor = argsman.GetArg("-avastalevotefactor", AVALANCHE_VOTE_STALE_FACTOR); if (staleVoteFactor <= 0) { error = _("The avalanche stale vote factor must be greater than 0"); return nullptr; } if (staleVoteFactor > std::numeric_limits::max()) { error = strprintf(_("The avalanche stale vote factor must be less than " "or equal to %d"), std::numeric_limits::max()); return nullptr; } // We can't use std::make_unique with a private constructor return std::unique_ptr(new Processor( argsman, chain, connman, chainman, scheduler, std::move(peerData), std::move(sessionKey), Proof::amountToScore(minQuorumStake), minQuorumConnectedStakeRatio, minAvaproofsNodeCount, staleVoteThreshold, staleVoteFactor)); } bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { if (!pindex) { // isWorthPolling expects this to be non-null, so bail early. return false; } bool isAccepted; { LOCK(cs_main); if (!isWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = chainman.ActiveChain().Contains(pindex); } return blockVoteRecords.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } bool Processor::addProofToReconcile(const ProofRef &proof) { if (!proof) { // isWorthPolling expects this to be non-null, so bail early. return false; } bool isAccepted; { LOCK(cs_peerManager); if (!isWorthPolling(proof)) { return false; } isAccepted = peerManager->isBoundToPeer(proof->getId()); } return proofVoteRecords.getWriteView() ->insert(std::make_pair(proof, VoteRecord(isAccepted))) .second; } bool Processor::isAccepted(const CBlockIndex *pindex) const { if (!pindex) { // CBlockIndexWorkComparator expects this to be non-null, so bail early. return false; } auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } bool Processor::isAccepted(const ProofRef &proof) const { auto r = proofVoteRecords.getReadView(); auto it = r->find(proof); if (it == r.end()) { return false; } return it->second.isAccepted(); } int Processor::getConfidence(const CBlockIndex *pindex) const { if (!pindex) { // CBlockIndexWorkComparator expects this to be non-null, so bail early. return -1; } auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } int Processor::getConfidence(const ProofRef &proof) const { auto r = proofVoteRecords.getReadView(); auto it = r->find(proof); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is * not secure. */ class TCPResponse { Response response; SchnorrSig sig; public: TCPResponse(Response responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! if (!key.SignSchnorr(hash, sig)) { sig.fill(0); } } // serialization support SERIALIZE_METHODS(TCPResponse, obj) { READWRITE(obj.response, obj.sig); } }; } // namespace void Processor::sendResponse(CNode *pfrom, Response response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVARESPONSE, TCPResponse(std::move(response), sessionKey))); } bool Processor::registerVotes(NodeId nodeid, const Response &response, std::vector &blockUpdates, std::vector &proofUpdates, int &banscore, std::string &error) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { banscore = 2; error = "unexpected-ava-response"; return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { banscore = 100; error = "invalid-ava-response-size"; return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { banscore = 100; error = "invalid-ava-response-content"; return false; } } std::map responseIndex; std::map responseProof; // At this stage we are certain that invs[i] matches votes[i], so we can use // the inv type to retrieve what is being voted on. for (size_t i = 0; i < size; i++) { if (invs[i].IsMsgBlk()) { CBlockIndex *pindex; { LOCK(cs_main); pindex = chainman.m_blockman.LookupBlockIndex( BlockHash(votes[i].GetHash())); if (!pindex) { // This should not happen, but just in case... continue; } if (!isWorthPolling(pindex)) { // There is no point polling this block. continue; } } responseIndex.insert(std::make_pair(pindex, votes[i])); } if (invs[i].IsMsgProof()) { const ProofId proofid(votes[i].GetHash()); ProofRef proof; { LOCK(cs_peerManager); proof = peerManager->getProof(proofid); if (!proof) { continue; } if (!isWorthPolling(proof)) { continue; } } responseProof.insert(std::make_pair(proof, votes[i])); } } // Thanks to C++14 generic lambdas, we can apply the same logic to various // parameter types sharing the same interface. auto registerVoteItems = [&](auto voteRecordsWriteView, auto &updates, auto responseItems) { // Register votes. for (const auto &p : responseItems) { auto item = p.first; const Vote &v = p.second; auto it = voteRecordsWriteView->find(item); if (it == voteRecordsWriteView.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { if (vr.isStale(staleVoteThreshold, staleVoteFactor)) { updates.emplace_back(item, VoteStatus::Stale); // Just drop stale votes. If we see this item again, we'll // do a new vote. voteRecordsWriteView->erase(it); } // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back(item, vr.isAccepted() ? VoteStatus::Accepted : VoteStatus::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(item, vr.isAccepted() ? VoteStatus::Finalized : VoteStatus::Invalid); voteRecordsWriteView->erase(it); } }; registerVoteItems(blockVoteRecords.getWriteView(), blockUpdates, responseIndex); registerVoteItems(proofVoteRecords.getWriteView(), proofUpdates, responseProof); return true; } CPubKey Processor::getSessionPubKey() const { return sessionKey.GetPubKey(); } bool Processor::sendHello(CNode *pfrom) const { Delegation delegation; if (peerData) { delegation = peerData->delegation; pfrom->AddKnownProof(delegation.getProofId()); } CHashWriter hasher(SER_GETHASH, 0); hasher << delegation.getId(); hasher << pfrom->GetLocalNonce(); hasher << pfrom->nRemoteHostNonce; hasher << pfrom->GetLocalExtraEntropy(); hasher << pfrom->nRemoteExtraEntropy; // Now let's sign! SchnorrSig sig; if (!sessionKey.SignSchnorr(hasher.GetHash(), sig)) { return false; } connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVAHELLO, Hello(delegation, sig))); return true; } ProofRef Processor::getLocalProof() const { return peerData ? peerData->proof : ProofRef(); } bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } void Processor::avaproofsSent(NodeId nodeid) { if (chainman.ActiveChainstate().IsInitialBlockDownload()) { // Before IBD is complete there is no way to make sure a proof is valid // or not, e.g. it can be spent in a block we don't know yet. In order // to increase confidence that our proof set is similar to other nodes // on the network, the messages received during IBD are not accounted. return; } LOCK(cs_peerManager); if (peerManager->latchAvaproofsSent(nodeid)) { avaproofsNodeCounter++; } } /* * Returns a bool indicating whether we have a usable Avalanche quorum enabling * us to take decisions based on polls. */ bool Processor::isQuorumEstablished() { + { + LOCK(cs_peerManager); + if (peerManager->getNodeCount() < 8) { + // There is no point polling if we know the vote cannot converge + return false; + } + } + + /* + * The following parameters can naturally go temporarly below the threshold + * under normal circumstances, like during a proof replacement with a lower + * stake amount, or the discovery of a new proofs for which we don't have a + * node yet. + * In order to prevent our node from starting and stopping the polls + * spuriously on such event, the quorum establishement is latched. The only + * parameters that should not latched is the minimum node count, as this + * would cause the poll to be inconclusive anyway and should not happen + * under normal circumstances. + */ if (quorumIsEstablished) { return true; } // Don't do Avalanche while node is IBD'ing if (chainman.ActiveChainstate().IsInitialBlockDownload()) { return false; } if (avaproofsNodeCounter < minAvaproofsNodeCount) { return false; } auto localProof = getLocalProof(); // Get the registered proof score and registered score we have nodes for uint32_t totalPeersScore; uint32_t connectedPeersScore; { LOCK(cs_peerManager); totalPeersScore = peerManager->getTotalPeersScore(); connectedPeersScore = peerManager->getConnectedPeersScore(); // Consider that we are always connected to our proof, even if we are // the single node using that proof. if (localProof && peerManager->forPeer(localProof->getId(), [](const Peer &peer) { return peer.node_count == 0; })) { connectedPeersScore += localProof->getScore(); } } // Ensure enough is being staked overall if (totalPeersScore < minQuorumScore) { return false; } // Ensure we have connected score for enough of the overall score uint32_t minConnectedScore = std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); if (connectedPeersScore < minConnectedScore) { return false; } quorumIsEstablished = true; return true; } void Processor::FinalizeNode(const Config &config, const CNode &node, bool &update_connection_time) { WITH_LOCK(cs_peerManager, peerManager->removeNode(node.GetId())); } void Processor::runEventLoop() { // Don't poll if quorum hasn't been established yet if (!isQuorumEstablished()) { return; } // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = WITH_LOCK(cs_peerManager, return peerManager->selectNode()); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } LOCK(cs_peerManager); do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode( nodeid, [this, &invs](CNode *pnode) EXCLUSIVE_LOCKS_REQUIRED( cs_peerManager) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. peerManager->updateNextRequestTime(pnode->GetId(), timeout); } pnode->m_avalanche_state->invsPolled(invs.size()); // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetCommonVersion()) .Make(NetMsgType::AVAPOLL, Poll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } // This node is obsolete, delete it. peerManager->removeNode(nodeid); // Get next suitable node to try again nodeid = peerManager->selectNode(); } while (nodeid != NO_NODE); } void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } auto clearInflightRequest = [&](auto &voteRecords, const auto &voteItem, uint8_t count) { if (!voteItem) { return false; } auto voteRecordsWriteView = voteRecords.getWriteView(); auto it = voteRecordsWriteView->find(voteItem); if (it == voteRecordsWriteView.end()) { return false; } it->second.clearInflightRequest(count); return true; }; // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; if (inv.IsMsgBlk()) { const CBlockIndex *pindex = WITH_LOCK(cs_main, return chainman.m_blockman.LookupBlockIndex( BlockHash(inv.hash))); if (!clearInflightRequest(blockVoteRecords, pindex, p.second)) { continue; } } if (inv.IsMsgProof()) { const ProofRef proof = WITH_LOCK(cs_peerManager, return peerManager->getProof(ProofId(inv.hash))); if (!clearInflightRequest(proofVoteRecords, proof, p.second)) { continue; } } } } std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; // Use NO_THREAD_SAFETY_ANALYSIS to avoid false positive due to // isWorthPolling requiring a different lock depending of the prototype. auto removeItemsNotWorthPolling = [&](auto &itemVoteRecords) NO_THREAD_SAFETY_ANALYSIS { auto w = itemVoteRecords.getWriteView(); for (auto it = w->begin(); it != w->end();) { if (!isWorthPolling(it->first)) { it = w->erase(it); } else { ++it; } } }; auto extractVoteRecordsToInvs = [&](const auto &itemVoteRecordRange, auto buildInvFromVoteItem) { for (const auto &[item, voteRecord] : itemVoteRecordRange) { if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return true; } const bool shouldPoll = forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); if (!shouldPoll) { continue; } invs.emplace_back(buildInvFromVoteItem(item)); } return invs.size() >= AVALANCHE_MAX_ELEMENT_POLL; }; // First remove all proofs that are not worth polling. WITH_LOCK(cs_peerManager, removeItemsNotWorthPolling(proofVoteRecords)); if (extractVoteRecordsToInvs(proofVoteRecords.getReadView(), [](const ProofRef &proof) { return CInv(MSG_AVA_PROOF, proof->getId()); })) { // The inventory vector is full, we're done return invs; } // First remove all blocks that are not worth polling. WITH_LOCK(cs_main, removeItemsNotWorthPolling(blockVoteRecords)); auto r = blockVoteRecords.getReadView(); extractVoteRecordsToInvs(reverse_iterate(r), [](const CBlockIndex *pindex) { return CInv(MSG_BLOCK, pindex->GetBlockHash()); }); return invs; } bool Processor::isWorthPolling(const CBlockIndex *pindex) const { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (chainman.ActiveChainstate().IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } bool Processor::isWorthPolling(const ProofRef &proof) const { AssertLockHeld(cs_peerManager); if (!gArgs.GetBoolArg("-enableavalancheproofreplacement", AVALANCHE_DEFAULT_PROOF_REPLACEMENT_ENABLED)) { // If proof replacement is not enabled there is no point dealing // with proof polling, so we're done. return false; } const ProofId &proofid = proof->getId(); // No point polling orphans or discarded proofs return peerManager->isBoundToPeer(proofid) || peerManager->isInConflictingPool(proofid); } } // namespace avalanche diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp index 8b72eca9c..9234d6dfd 100644 --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -1,1559 +1,1629 @@ // Copyright (c) 2018-2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include // For ::PeerManager #include #include #include #include // For bilingual_str // D6970 moved LookupBlockIndex from chain.h to validation.h TODO: remove this // when LookupBlockIndex is refactored out of validation #include #include #include #include #include #include #include #include using namespace avalanche; namespace avalanche { namespace { struct AvalancheTest { static void runEventLoop(avalanche::Processor &p) { p.runEventLoop(); } static std::vector getInvsForNextPoll(Processor &p) { return p.getInvsForNextPoll(false); } static NodeId getSuitableNodeToQuery(Processor &p) { return WITH_LOCK(p.cs_peerManager, return p.peerManager->selectNode()); } static uint64_t getRound(const Processor &p) { return p.round; } static uint32_t getMinQuorumScore(const Processor &p) { return p.minQuorumScore; } static double getMinQuorumConnectedScoreRatio(const Processor &p) { return p.minQuorumConnectedScoreRatio; } static void clearavaproofsNodeCounter(Processor &p) { p.avaproofsNodeCounter = 0; } }; } // namespace } // namespace avalanche namespace { struct CConnmanTest : public CConnman { using CConnman::CConnman; void AddNode(CNode &node) { LOCK(cs_vNodes); vNodes.push_back(&node); } void ClearNodes() { LOCK(cs_vNodes); for (CNode *node : vNodes) { delete node; } vNodes.clear(); } }; CService ip(uint32_t i) { struct in_addr s; s.s_addr = i; return CService(CNetAddr(s), Params().GetDefaultPort()); } struct AvalancheTestingSetup : public TestChain100Setup { const Config &config; CConnmanTest *m_connman; std::unique_ptr m_processor; // The master private key we delegate to. CKey masterpriv; AvalancheTestingSetup() : TestChain100Setup(), config(GetConfig()), masterpriv(CKey::MakeCompressedKey()) { // Deterministic randomness for tests. auto connman = std::make_unique(config, 0x1337, 0x1337); m_connman = connman.get(); m_node.connman = std::move(connman); m_node.peerman = ::PeerManager::make( config.GetChainParams(), *m_connman, m_node.banman.get(), *m_node.scheduler, *m_node.chainman, *m_node.mempool, false); m_node.chain = interfaces::MakeChain(m_node, config.GetChainParams()); // Get the processor ready. bilingual_str error; m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), *Assert(m_node.chainman), *m_node.scheduler, error); BOOST_CHECK(m_processor); gArgs.ForceSetArg("-avaproofstakeutxoconfirmations", "1"); gArgs.ForceSetArg("-enableavalancheproofreplacement", "1"); } ~AvalancheTestingSetup() { m_connman->ClearNodes(); SyncWithValidationInterfaceQueue(); gArgs.ClearForcedArg("-avaproofstakeutxoconfirmations"); gArgs.ClearForcedArg("-enableavalancheproofreplacement"); } CNode *ConnectNode(ServiceFlags nServices) { static NodeId id = 0; CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); auto node = new CNode(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr, /* nKeyedNetGroupIn */ 0, /* nLocalHostNonceIn */ 0, /* nLocalExtraEntropyIn */ 0, CAddress(), /* pszDest */ "", ConnectionType::OUTBOUND_FULL_RELAY, /* inbound_onion */ false); node->SetCommonVersion(PROTOCOL_VERSION); node->nServices = nServices; m_node.peerman->InitializeNode(config, node); node->nVersion = 1; node->fSuccessfullyConnected = true; node->m_avalanche_state = std::make_unique(); m_connman->AddNode(*node); return node; } size_t next_coinbase = 0; ProofRef GetProof() { size_t current_coinbase = next_coinbase++; const CTransaction &coinbase = *m_coinbase_txns[current_coinbase]; ProofBuilder pb(0, 0, masterpriv); BOOST_CHECK(pb.addUTXO(COutPoint(coinbase.GetId(), 0), coinbase.vout[0].nValue, current_coinbase + 1, true, coinbaseKey)); return pb.build(); } bool addNode(NodeId nodeid, const ProofId &proofid) { return m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.addNode(nodeid, proofid); }); } bool addNode(NodeId nodeid) { auto proof = GetProof(); return m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.registerProof(proof) && pm.addNode(nodeid, proof->getId()); }); } std::array ConnectNodes() { auto proof = GetProof(); BOOST_CHECK( m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.registerProof(proof); })); const ProofId &proofid = proof->getId(); std::array nodes; for (CNode *&n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(addNode(n->GetId(), proofid)); } return nodes; } void runEventLoop() { AvalancheTest::runEventLoop(*m_processor); } NodeId getSuitableNodeToQuery() { return AvalancheTest::getSuitableNodeToQuery(*m_processor); } std::vector getInvsForNextPoll() { return AvalancheTest::getInvsForNextPoll(*m_processor); } uint64_t getRound() const { return AvalancheTest::getRound(*m_processor); } bool registerVotes(NodeId nodeid, const avalanche::Response &response, std::vector &blockUpdates) { int banscore; std::string error; std::vector proofUpdates; return m_processor->registerVotes(nodeid, response, blockUpdates, proofUpdates, banscore, error); } }; struct BlockProvider { AvalancheTestingSetup *fixture; std::vector updates; uint32_t invType; BlockProvider(AvalancheTestingSetup *_fixture) : fixture(_fixture), invType(MSG_BLOCK) {} CBlockIndex *buildVoteItem() const { CBlock block = fixture->CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); LOCK(cs_main); return Assert(fixture->m_node.chainman) ->m_blockman.LookupBlockIndex(blockHash); } uint256 getVoteItemId(const CBlockIndex *pindex) const { return pindex->GetBlockHash(); } bool registerVotes(NodeId nodeid, const avalanche::Response &response, std::string &error) { int banscore; std::vector proofUpdates; return fixture->m_processor->registerVotes( nodeid, response, updates, proofUpdates, banscore, error); } bool registerVotes(NodeId nodeid, const avalanche::Response &response) { std::string error; return registerVotes(nodeid, response, error); } bool addToReconcile(const CBlockIndex *pindex) { return fixture->m_processor->addBlockToReconcile(pindex); } std::vector buildVotesForItems(uint32_t error, std::vector &&items) { size_t numItems = items.size(); std::vector votes; votes.reserve(numItems); // Votes are sorted by most work first std::sort(items.begin(), items.end(), CBlockIndexWorkComparator()); for (auto &item : reverse_iterate(items)) { votes.emplace_back(error, item->GetBlockHash()); } return votes; } void invalidateItem(CBlockIndex *pindex) { pindex->nStatus = pindex->nStatus.withFailed(); } }; struct ProofProvider { AvalancheTestingSetup *fixture; std::vector updates; uint32_t invType; ProofProvider(AvalancheTestingSetup *_fixture) : fixture(_fixture), invType(MSG_AVA_PROOF) {} ProofRef buildVoteItem() const { const ProofRef proof = fixture->GetProof(); fixture->m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof)); }); return proof; } uint256 getVoteItemId(const ProofRef &proof) const { return proof->getId(); } bool registerVotes(NodeId nodeid, const avalanche::Response &response, std::string &error) { int banscore; std::vector blockUpdates; return fixture->m_processor->registerVotes( nodeid, response, blockUpdates, updates, banscore, error); } bool registerVotes(NodeId nodeid, const avalanche::Response &response) { std::string error; return registerVotes(nodeid, response, error); } bool addToReconcile(const ProofRef &proof) { return fixture->m_processor->addProofToReconcile(proof); } std::vector buildVotesForItems(uint32_t error, std::vector &&items) { size_t numItems = items.size(); std::vector votes; votes.reserve(numItems); // Votes are sorted by high score first std::sort(items.begin(), items.end(), ProofComparatorByScore()); for (auto &item : items) { votes.emplace_back(error, item->getId()); } return votes; } void invalidateItem(const ProofRef &proof) { fixture->m_processor->withPeerManager([&](avalanche::PeerManager &pm) { pm.rejectProof(proof->getId(), avalanche::PeerManager::RejectionMode::INVALIDATE); }); } }; } // namespace BOOST_FIXTURE_TEST_SUITE(processor_tests, AvalancheTestingSetup) // FIXME A std::tuple can be used instead of boost::mpl::list after boost 1.67 using VoteItemProviders = boost::mpl::list; BOOST_AUTO_TEST_CASE(block_update) { CBlockIndex index; CBlockIndex *pindex = &index; std::set status{ VoteStatus::Invalid, VoteStatus::Rejected, VoteStatus::Accepted, VoteStatus::Finalized, VoteStatus::Stale, }; for (auto s : status) { BlockUpdate abu(pindex, s); // The use of BOOST_CHECK instead of BOOST_CHECK_EQUAL prevents from // having to define operator<<() for each argument type. BOOST_CHECK(abu.getVoteItem() == pindex); BOOST_CHECK(abu.getStatus() == s); } } BOOST_AUTO_TEST_CASE_TEMPLATE(item_reconcile_twice, P, VoteItemProviders) { P provider(this); auto item = provider.buildVoteItem(); // Adding the item twice does nothing. BOOST_CHECK(provider.addToReconcile(item)); BOOST_CHECK(!provider.addToReconcile(item)); BOOST_CHECK(m_processor->isAccepted(item)); } BOOST_AUTO_TEST_CASE_TEMPLATE(item_null, P, VoteItemProviders) { P provider(this); // Check that null case is handled on the public interface BOOST_CHECK(!m_processor->isAccepted(nullptr)); BOOST_CHECK_EQUAL(m_processor->getConfidence(nullptr), -1); auto item = decltype(provider.buildVoteItem())(); BOOST_CHECK(item == nullptr); BOOST_CHECK(!provider.addToReconcile(item)); // Check that adding item to vote on doesn't change the outcome. A // comparator is used under the hood, and this is skipped if there are no // vote records. item = provider.buildVoteItem(); BOOST_CHECK(provider.addToReconcile(item)); BOOST_CHECK(!m_processor->isAccepted(nullptr)); BOOST_CHECK_EQUAL(m_processor->getConfidence(nullptr), -1); } namespace { Response next(Response &r) { auto copy = r; r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()}; return copy; } } // namespace BOOST_AUTO_TEST_CASE_TEMPLATE(vote_item_register, P, VoteItemProviders) { P provider(this); auto &updates = provider.updates; const uint32_t invType = provider.invType; const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random item returns false. BOOST_CHECK(!m_processor->isAccepted(item)); // Add a new item. Check it is added to the polls. BOOST_CHECK(provider.addToReconcile(item)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); BOOST_CHECK(m_processor->isAccepted(item)); int nextNodeIndex = 0; auto registerNewVote = [&](const Response &resp) { runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(provider.registerVotes(nodeid, resp)); }; // Let's vote for this item a few times. Response resp{0, 0, {Vote(0, itemid)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // A single neutral vote do not change anything. resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 0); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, itemid)}}; for (int i = 1; i < 7; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // Two neutral votes will stall progress. resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 6); BOOST_CHECK_EQUAL(updates.size(), 0); registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 6); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, itemid)}}; for (int i = 2; i < 8; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 6); BOOST_CHECK_EQUAL(updates.size(), 0); } // We vote for it numerous times to finalize it. for (int i = 7; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getVoteItem() == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Finalized); updates.clear(); // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); // Now let's undo this and finalize rejection. BOOST_CHECK(provider.addToReconcile(item)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); resp = {getRound(), 0, {Vote(1, itemid)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getVoteItem() == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Rejected); updates.clear(); // Now it is rejected, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getVoteItem() == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Invalid); updates.clear(); // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } BOOST_AUTO_TEST_CASE_TEMPLATE(multi_item_register, P, VoteItemProviders) { P provider(this); auto &updates = provider.updates; const uint32_t invType = provider.invType; auto itemA = provider.buildVoteItem(); auto itemidA = provider.getVoteItemId(itemA); auto itemB = provider.buildVoteItem(); auto itemidB = provider.getVoteItemId(itemB); // Create several nodes that support avalanche. auto avanodes = ConnectNodes(); // Querying for random item returns false. BOOST_CHECK(!m_processor->isAccepted(itemA)); BOOST_CHECK(!m_processor->isAccepted(itemB)); // Start voting on item A. BOOST_CHECK(provider.addToReconcile(itemA)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemidA); uint64_t round = getRound(); runEventLoop(); BOOST_CHECK(provider.registerVotes(avanodes[0]->GetId(), {round, 0, {Vote(0, itemidA)}})); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on item B after one vote. std::vector votes = provider.buildVotesForItems(0, {itemA, itemB}); Response resp{round + 1, 0, votes}; BOOST_CHECK(provider.addToReconcile(itemB)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 2); // Ensure the inv ordering is as expected for (size_t i = 0; i < invs.size(); i++) { BOOST_CHECK_EQUAL(invs[i].type, invType); BOOST_CHECK(invs[i].hash == votes[i].GetHash()); } // Let's vote for these items a few times. for (int i = 0; i < 4; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(provider.registerVotes(nodeid, next(resp))); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(provider.registerVotes(nodeid, next(resp))); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggered on A // and B. NodeId firstNodeid = getSuitableNodeToQuery(); runEventLoop(); NodeId secondNodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(firstNodeid != secondNodeid); // Next vote will finalize item A. BOOST_CHECK(provider.registerVotes(firstNodeid, next(resp))); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getVoteItem() == itemA); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Finalized); updates = {}; // We do not vote on A anymore. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemidB); // Next vote will finalize item B. BOOST_CHECK(provider.registerVotes(secondNodeid, resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getVoteItem() == itemB); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Finalized); updates = {}; // There is nothing left to vote on. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } BOOST_AUTO_TEST_CASE_TEMPLATE(poll_and_response, P, VoteItemProviders) { P provider(this); auto &updates = provider.updates; const uint32_t invType = provider.invType; - const auto item = provider.buildVoteItem(); - const auto itemid = provider.getVoteItemId(item); + auto item = provider.buildVoteItem(); + auto itemid = provider.getVoteItemId(item); // There is no node to query. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); - // Create a node that supports avalanche and one that doesn't. - ConnectNode(NODE_NONE); - auto avanode = ConnectNode(NODE_AVALANCHE); - NodeId avanodeid = avanode->GetId(); - BOOST_CHECK(addNode(avanodeid)); + // Add enough nodes to have a valid quorum, and the same amount with no + // avalanche support + std::set avanodeIds; + auto avanodes = ConnectNodes(); + for (auto avanode : avanodes) { + ConnectNode(NODE_NONE); + avanodeIds.insert(avanode->GetId()); + } + + auto getSelectedAvanodeId = [&]() { + NodeId avanodeid = getSuitableNodeToQuery(); + BOOST_CHECK(avanodeIds.find(avanodeid) != avanodeIds.end()); + return avanodeid; + }; - // It returns the avalanche peer. - BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); + // It returns one of the avalanche peer. + NodeId avanodeid = getSelectedAvanodeId(); // Register an item and check it is added to the list of elements to poll. BOOST_CHECK(provider.addToReconcile(item)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); - // Trigger a poll on avanode. + std::set unselectedNodeids = avanodeIds; + unselectedNodeids.erase(avanodeid); + const size_t remainingNodeIds = unselectedNodeids.size(); + uint64_t round = getRound(); - runEventLoop(); + for (size_t i = 0; i < remainingNodeIds; i++) { + // Trigger a poll on avanode. + runEventLoop(); + + // Another node is selected + NodeId nodeid = getSuitableNodeToQuery(); + BOOST_CHECK(unselectedNodeids.find(nodeid) != avanodeIds.end()); + unselectedNodeids.erase(nodeid); + } // There is no more suitable peer available, so return nothing. + BOOST_CHECK(unselectedNodeids.empty()); + runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Respond to the request. Response resp = {round, 0, {Vote(0, itemid)}}; BOOST_CHECK(provider.registerVotes(avanodeid, resp)); BOOST_CHECK_EQUAL(updates.size(), 0); // Now that avanode fullfilled his request, it is added back to the list of // queriable nodes. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); auto checkRegisterVotesError = [&](NodeId nodeid, const avalanche::Response &response, const std::string &expectedError) { std::string error; BOOST_CHECK(!provider.registerVotes(nodeid, response, error)); BOOST_CHECK_EQUAL(error, expectedError); BOOST_CHECK_EQUAL(updates.size(), 0); }; // Sending a response when not polled fails. checkRegisterVotesError(avanodeid, next(resp), "unexpected-ava-response"); // Trigger a poll on avanode. round = getRound(); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Sending responses that do not match the request also fails. // 1. Too many results. resp = {round, 0, {Vote(0, itemid), Vote(0, itemid)}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-size"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 2. Not enough results. resp = {getRound(), 0, {}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-size"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 3. Do not match the poll. resp = {getRound(), 0, {Vote()}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-content"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); + // At this stage we have reached the max inflight requests for our inv, so + // it won't be requested anymore until the requests are fullfilled. Let's + // vote on another item with no inflight request so the remaining tests + // makes sense. + invs = getInvsForNextPoll(); + BOOST_CHECK(invs.empty()); + + item = provider.buildVoteItem(); + itemid = provider.getVoteItemId(item); + BOOST_CHECK(provider.addToReconcile(item)); + + invs = getInvsForNextPoll(); + BOOST_CHECK_EQUAL(invs.size(), 1); + // 4. Invalid round count. Request is not discarded. uint64_t queryRound = getRound(); runEventLoop(); resp = {queryRound + 1, 0, {Vote()}}; checkRegisterVotesError(avanodeid, resp, "unexpected-ava-response"); resp = {queryRound - 1, 0, {Vote()}}; checkRegisterVotesError(avanodeid, resp, "unexpected-ava-response"); // 5. Making request for invalid nodes do not work. Request is not // discarded. resp = {queryRound, 0, {Vote(0, itemid)}}; checkRegisterVotesError(avanodeid + 1234, resp, "unexpected-ava-response"); // Proper response gets processed and avanode is available again. resp = {queryRound, 0, {Vote(0, itemid)}}; BOOST_CHECK(provider.registerVotes(avanodeid, resp)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // Out of order response are rejected. const auto item2 = provider.buildVoteItem(); BOOST_CHECK(provider.addToReconcile(item2)); std::vector votes = provider.buildVotesForItems(0, {item, item2}); resp = {getRound(), 0, {votes[1], votes[0]}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-content"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // But they are accepted in order. resp = {getRound(), 0, votes}; runEventLoop(); BOOST_CHECK(provider.registerVotes(avanodeid, resp)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); } BOOST_AUTO_TEST_CASE_TEMPLATE(dont_poll_invalid_item, P, VoteItemProviders) { P provider(this); auto &updates = provider.updates; const uint32_t invType = provider.invType; auto itemA = provider.buildVoteItem(); auto itemB = provider.buildVoteItem(); auto avanodes = ConnectNodes(); // Build votes to get proper ordering std::vector votes = provider.buildVotesForItems(0, {itemA, itemB}); // Register the items and check they are added to the list of elements to // poll. BOOST_CHECK(provider.addToReconcile(itemA)); BOOST_CHECK(provider.addToReconcile(itemB)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 2); for (size_t i = 0; i < invs.size(); i++) { BOOST_CHECK_EQUAL(invs[i].type, invType); BOOST_CHECK(invs[i].hash == votes[i].GetHash()); } // When an item is marked invalid, stop polling. provider.invalidateItem(itemB); Response goodResp{getRound(), 0, {Vote(0, provider.getVoteItemId(itemA))}}; runEventLoop(); BOOST_CHECK(provider.registerVotes(avanodes[0]->GetId(), goodResp)); BOOST_CHECK_EQUAL(updates.size(), 0); // Votes including itemB are rejected Response badResp{getRound(), 0, votes}; runEventLoop(); std::string error; BOOST_CHECK(!provider.registerVotes(avanodes[1]->GetId(), badResp, error)); BOOST_CHECK_EQUAL(error, "invalid-ava-response-size"); } BOOST_TEST_DECORATOR(*boost::unit_test::timeout(60)) BOOST_AUTO_TEST_CASE_TEMPLATE(poll_inflight_timeout, P, VoteItemProviders) { P provider(this); const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); // Add the item BOOST_CHECK(provider.addToReconcile(item)); - // Create a node that supports avalanche. - auto avanode = ConnectNode(NODE_AVALANCHE); - NodeId avanodeid = avanode->GetId(); - BOOST_CHECK(addNode(avanodeid)); + // Create a quorum of nodes that support avalanche. + ConnectNodes(); + NodeId avanodeid = NO_NODE; // Expire requests after some time. auto queryTimeDuration = std::chrono::milliseconds(10); m_processor->setQueryTimeoutDuration(queryTimeDuration); for (int i = 0; i < 10; i++) { Response resp = {getRound(), 0, {Vote(0, itemid)}}; + avanodeid = getSuitableNodeToQuery(); auto start = std::chrono::steady_clock::now(); runEventLoop(); // We cannot guarantee that we'll wait for just 1ms, so we have to bail // if we aren't within the proper time range. std::this_thread::sleep_for(std::chrono::milliseconds(1)); runEventLoop(); bool ret = provider.registerVotes(avanodeid, next(resp)); if (std::chrono::steady_clock::now() > start + queryTimeDuration) { // We waited for too long, bail. Because we can't know for sure when // previous steps ran, ret is not deterministic and we do not check // it. i--; continue; } // We are within time bounds, so the vote should have worked. BOOST_CHECK(ret); + avanodeid = getSuitableNodeToQuery(); + // Now try again but wait for expiration. runEventLoop(); std::this_thread::sleep_for(queryTimeDuration); runEventLoop(); BOOST_CHECK(!provider.registerVotes(avanodeid, next(resp))); } } BOOST_AUTO_TEST_CASE_TEMPLATE(poll_inflight_count, P, VoteItemProviders) { P provider(this); const uint32_t invType = provider.invType; // Create enough nodes so that we run into the inflight request limit. auto proof = GetProof(); BOOST_CHECK(m_processor->withPeerManager( [&](avalanche::PeerManager &pm) { return pm.registerProof(proof); })); std::array nodes; for (auto &n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(addNode(n->GetId(), proof->getId())); } // Add an item to poll const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); BOOST_CHECK(provider.addToReconcile(item)); // Ensure there are enough requests in flight. std::map node_round_map; for (int i = 0; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map.insert(std::pair(nodeid, getRound())); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); runEventLoop(); } // Now that we have enough in flight requests, we shouldn't poll. auto suitablenodeid = getSuitableNodeToQuery(); BOOST_CHECK(suitablenodeid != NO_NODE); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), suitablenodeid); // Send one response, now we can poll again. auto it = node_round_map.begin(); Response resp = {it->second, 0, {Vote(0, itemid)}}; BOOST_CHECK(provider.registerVotes(it->first, resp)); node_round_map.erase(it); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); } BOOST_AUTO_TEST_CASE(quorum_diversity) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = Assert(m_node.chainman)->m_blockman.LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); // Do one valid round of voting. uint64_t round = getRound(); Response resp{round, 0, {Vote(0, blockHash)}}; // Check that all nodes can vote. for (size_t i = 0; i < avanodes.size(); i++) { runEventLoop(); BOOST_CHECK(registerVotes(avanodes[i]->GetId(), next(resp), updates)); } // Generate a query for every single node. const NodeId firstNodeId = getSuitableNodeToQuery(); std::map node_round_map; round = getRound(); for (size_t i = 0; i < avanodes.size(); i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map[nodeid] = getRound(); runEventLoop(); } // Now only the first node can vote. All others would be duplicate in the // quorum. auto confidence = m_processor->getConfidence(pindex); BOOST_REQUIRE(confidence > 0); for (auto &[nodeid, r] : node_round_map) { if (nodeid == firstNodeId) { // Node 0 is the only one which can vote at this stage. round = r; continue; } BOOST_CHECK( registerVotes(nodeid, {r, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence); } BOOST_CHECK( registerVotes(firstNodeId, {round, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence + 1); } BOOST_AUTO_TEST_CASE(event_loop) { CScheduler s; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = Assert(m_node.chainman)->m_blockman.LookupBlockIndex(blockHash); } // Starting the event loop. BOOST_CHECK(m_processor->startEventLoop(s)); // There is one task planned in the next hour (our event loop). std::chrono::system_clock::time_point start, stop; BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. BOOST_CHECK(!m_processor->startEventLoop(s)); // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); - // Create a node that supports avalanche. - auto avanode = ConnectNode(NODE_AVALANCHE); - NodeId nodeid = avanode->GetId(); - BOOST_CHECK(addNode(nodeid)); + // Create a quorum of nodes that support avalanche. + auto avanodes = ConnectNodes(); // There is no query in flight at the moment. - BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), nodeid); + NodeId nodeid = getSuitableNodeToQuery(); + BOOST_CHECK_NE(nodeid, NO_NODE); // Add a new block. Check it is added to the polls. uint64_t queryRound = getRound(); BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); + // Wait until all nodes got a poll for (int i = 0; i < 60 * 1000; i++) { // Technically, this is a race condition, but this should do just fine - // as we wait up to 1 minute for an event that should take 10ms. + // as we wait up to 1 minute for an event that should take 80ms. UninterruptibleSleep(std::chrono::milliseconds(1)); - if (getRound() != queryRound) { + if (getRound() == queryRound + avanodes.size()) { break; } } // Check that we effectively got a request and not timed out. BOOST_CHECK(getRound() > queryRound); // Respond and check the cooldown time is respected. uint64_t responseRound = getRound(); auto queryTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); std::vector updates; + // Only the first node answers, so it's the only one that gets polled again registerVotes(nodeid, {queryRound, 100, {Vote(0, blockHash)}}, updates); + for (int i = 0; i < 10000; i++) { // We make sure that we do not get a request before queryTime. UninterruptibleSleep(std::chrono::milliseconds(1)); if (getRound() != responseRound) { BOOST_CHECK(std::chrono::steady_clock::now() > queryTime); break; } } // But we eventually get one. BOOST_CHECK(getRound() > responseRound); // Stop event loop. BOOST_CHECK(m_processor->stopEventLoop()); // We don't have any task scheduled anymore. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Can't stop the event loop twice. BOOST_CHECK(!m_processor->stopEventLoop()); // Wait for the scheduler to stop. s.StopWhenDrained(); schedulerThread.join(); } BOOST_AUTO_TEST_CASE(destructor) { CScheduler s; std::chrono::system_clock::time_point start, stop; std::thread schedulerThread; BOOST_CHECK(m_processor->startEventLoop(s)); BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Start the service thread after the queue size check to prevent a race // condition where the thread may be processing the event loop task during // the check. schedulerThread = std::thread(std::bind(&CScheduler::serviceQueue, &s)); // Destroy the processor. m_processor.reset(); // Now that avalanche is destroyed, there is no more scheduled tasks. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Wait for the scheduler to stop. s.StopWhenDrained(); schedulerThread.join(); } BOOST_AUTO_TEST_CASE(add_proof_to_reconcile) { uint32_t score = MIN_VALID_PROOF_SCORE; CChainState &active_chainstate = Assert(m_node.chainman)->ActiveChainstate(); auto addProofToReconcile = [&](uint32_t proofScore) { auto proof = buildRandomProof(active_chainstate, proofScore); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof)); }); BOOST_CHECK(m_processor->addProofToReconcile(proof)); return proof; }; for (size_t i = 0; i < AVALANCHE_MAX_ELEMENT_POLL; i++) { auto proof = addProofToReconcile(++score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); BOOST_CHECK_EQUAL(invs.size(), i + 1); BOOST_CHECK(invs.front().IsMsgProof()); BOOST_CHECK_EQUAL(invs.front().hash, proof->getId()); } // From here a new proof is only polled if its score is in the top // AVALANCHE_MAX_ELEMENT_POLL ProofId lastProofId; for (size_t i = 0; i < 10; i++) { auto proof = addProofToReconcile(++score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); BOOST_CHECK_EQUAL(invs.size(), AVALANCHE_MAX_ELEMENT_POLL); BOOST_CHECK(invs.front().IsMsgProof()); BOOST_CHECK_EQUAL(invs.front().hash, proof->getId()); lastProofId = proof->getId(); } for (size_t i = 0; i < 10; i++) { auto proof = addProofToReconcile(--score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); BOOST_CHECK_EQUAL(invs.size(), AVALANCHE_MAX_ELEMENT_POLL); BOOST_CHECK(invs.front().IsMsgProof()); BOOST_CHECK_EQUAL(invs.front().hash, lastProofId); } { // The score is not high enough to get polled auto proof = addProofToReconcile(--score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); for (auto &inv : invs) { BOOST_CHECK_NE(inv.hash, proof->getId()); } } { // If proof replacement is not enabled there is no point polling for the // proof. auto proof = buildRandomProof(active_chainstate, MIN_VALID_PROOF_SCORE); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof)); }); gArgs.ForceSetArg("-enableavalancheproofreplacement", "0"); BOOST_CHECK(!m_processor->addProofToReconcile(proof)); gArgs.ForceSetArg("-enableavalancheproofreplacement", "1"); BOOST_CHECK(m_processor->addProofToReconcile(proof)); gArgs.ClearForcedArg("-enableavalancheproofreplacement"); } } BOOST_AUTO_TEST_CASE(proof_record) { gArgs.ForceSetArg("-avaproofstakeutxoconfirmations", "2"); gArgs.ForceSetArg("-avalancheconflictingproofcooldown", "0"); BOOST_CHECK(!m_processor->isAccepted(nullptr)); BOOST_CHECK_EQUAL(m_processor->getConfidence(nullptr), -1); const CKey key = CKey::MakeCompressedKey(); const COutPoint conflictingOutpoint{TxId(GetRandHash()), 0}; const COutPoint immatureOutpoint{TxId(GetRandHash()), 0}; { CScript script = GetScriptForDestination(PKHash(key.GetPubKey())); LOCK(cs_main); CCoinsViewCache &coins = Assert(m_node.chainman)->ActiveChainstate().CoinsTip(); coins.AddCoin(conflictingOutpoint, Coin(CTxOut(PROOF_DUST_THRESHOLD, script), 10, false), false); coins.AddCoin(immatureOutpoint, Coin(CTxOut(PROOF_DUST_THRESHOLD, script), 100, false), false); } auto buildProof = [&](const COutPoint &outpoint, uint64_t sequence, uint32_t height = 10) { ProofBuilder pb(sequence, 0, key); BOOST_CHECK( pb.addUTXO(outpoint, PROOF_DUST_THRESHOLD, height, false, key)); return pb.build(); }; auto conflictingProof = buildProof(conflictingOutpoint, 1); auto validProof = buildProof(conflictingOutpoint, 2); auto orphanProof = buildProof(immatureOutpoint, 3, 100); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(!m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(orphanProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), -1); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), -1); BOOST_CHECK_EQUAL(m_processor->getConfidence(orphanProof), -1); // Reconciling proofs that don't exist will fail BOOST_CHECK(!m_processor->addProofToReconcile(conflictingProof)); BOOST_CHECK(!m_processor->addProofToReconcile(validProof)); BOOST_CHECK(!m_processor->addProofToReconcile(orphanProof)); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(conflictingProof)); BOOST_CHECK(pm.registerProof(validProof)); BOOST_CHECK(!pm.registerProof(orphanProof)); BOOST_CHECK(pm.isBoundToPeer(validProof->getId())); BOOST_CHECK(pm.isInConflictingPool(conflictingProof->getId())); BOOST_CHECK(pm.isOrphan(orphanProof->getId())); }); BOOST_CHECK(m_processor->addProofToReconcile(conflictingProof)); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(!m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(orphanProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), -1); BOOST_CHECK_EQUAL(m_processor->getConfidence(orphanProof), -1); BOOST_CHECK(m_processor->addProofToReconcile(validProof)); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(orphanProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(orphanProof), -1); BOOST_CHECK(!m_processor->addProofToReconcile(orphanProof)); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(orphanProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(orphanProof), -1); gArgs.ClearForcedArg("-avaproofstakeutxoconfirmations"); gArgs.ClearForcedArg("-avalancheconflictingproofcooldown"); } BOOST_AUTO_TEST_CASE(quorum_detection) { // Set min quorum parameters for our test int minStake = 4'000'000; gArgs.ForceSetArg("-avaminquorumstake", ToString(minStake)); gArgs.ForceSetArg("-avaminquorumconnectedstakeratio", "0.5"); // Create a new processor with our given quorum parameters const auto currency = Currency::get(); uint32_t minScore = Proof::amountToScore(minStake * currency.baseunit); CChainState &active_chainstate = Assert(m_node.chainman)->ActiveChainstate(); const CKey key = CKey::MakeCompressedKey(); auto localProof = buildRandomProof(active_chainstate, minScore / 4, 100, key); gArgs.ForceSetArg("-avamasterkey", EncodeSecret(key)); gArgs.ForceSetArg("-avaproof", localProof->ToHex()); bilingual_str error; ChainstateManager &chainman = *Assert(m_node.chainman); std::unique_ptr processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), chainman, *m_node.scheduler, error); BOOST_CHECK(processor != nullptr); BOOST_CHECK(processor->getLocalProof() != nullptr); BOOST_CHECK_EQUAL(processor->getLocalProof()->getId(), localProof->getId()); BOOST_CHECK_EQUAL(AvalancheTest::getMinQuorumScore(*processor), minScore); BOOST_CHECK_EQUAL( AvalancheTest::getMinQuorumConnectedScoreRatio(*processor), 0.5); // The local proof has not been validated yet processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 0); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); }); BOOST_CHECK(!processor->isQuorumEstablished()); // Register the local proof. This is normally done when the chain tip is // updated. The local proof should be accounted for in the min quorum // computation but the peer manager doesn't know about that. processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(processor->getLocalProof())); BOOST_CHECK(pm.isBoundToPeer(processor->getLocalProof()->getId())); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); }); BOOST_CHECK(!processor->isQuorumEstablished()); + // Add enough nodes to get a conclusive vote + for (NodeId id = 0; id < 8; id++) { + processor->withPeerManager([&](avalanche::PeerManager &pm) { + pm.addNode(id, processor->getLocalProof()->getId()); + BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); + }); + } + // Add part of the required stake and make sure we still report no quorum auto proof1 = buildRandomProof(active_chainstate, minScore / 2); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof1)); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 3 * minScore / 4); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(!processor->isQuorumEstablished()); // Add the rest of the stake, but we are still lacking connected stake auto proof2 = buildRandomProof(active_chainstate, minScore / 4); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof2)); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(!processor->isQuorumEstablished()); // Adding a node should cause the quorum to be detected and locked-in processor->withPeerManager([&](avalanche::PeerManager &pm) { - pm.addNode(0, proof2->getId()); + pm.addNode(8, proof2->getId()); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); // The peer manager knows that proof2 has a node attached ... - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 2); }); // ... but the processor also account for the local proof, so we reached 50% BOOST_CHECK(processor->isQuorumEstablished()); - // Go back to not having enough connected nodes, but we've already latched + // Go back to not having enough connected score, but we've already latched // the quorum as established processor->withPeerManager([&](avalanche::PeerManager &pm) { - pm.removeNode(0); + pm.removeNode(8); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); + }); + BOOST_CHECK(processor->isQuorumEstablished()); + + // Removing one more node drops our count below the minimum and the quorum + // is no longer ready + processor->withPeerManager( + [&](avalanche::PeerManager &pm) { pm.removeNode(7); }); + BOOST_CHECK(!processor->isQuorumEstablished()); + + // It resumes when we have enough nodes again + processor->withPeerManager([&](avalanche::PeerManager &pm) { + pm.addNode(7, processor->getLocalProof()->getId()); }); BOOST_CHECK(processor->isQuorumEstablished()); - // Remove peers one at a time and ensure the quorum stays established + // Remove peers one at a time until the quorum is no longer established auto spendProofUtxo = [&processor, &chainman](ProofRef proof) { { LOCK(cs_main); CCoinsViewCache &coins = chainman.ActiveChainstate().CoinsTip(); coins.SpendCoin(proof->getStakes()[0].getStake().getUTXO()); } processor->withPeerManager([&proof](avalanche::PeerManager &pm) { pm.updatedBlockTip(); BOOST_CHECK(!pm.isBoundToPeer(proof->getId())); }); }; spendProofUtxo(proof2); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 3 * minScore / 4); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(processor->isQuorumEstablished()); spendProofUtxo(proof1); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(processor->isQuorumEstablished()); spendProofUtxo(processor->getLocalProof()); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 0); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); }); - BOOST_CHECK(processor->isQuorumEstablished()); + // There is no node left + BOOST_CHECK(!processor->isQuorumEstablished()); gArgs.ClearForcedArg("-avamasterkey"); gArgs.ClearForcedArg("-avaproof"); gArgs.ClearForcedArg("-avaminquorumstake"); gArgs.ClearForcedArg("-avaminquorumconnectedstakeratio"); } BOOST_AUTO_TEST_CASE(quorum_detection_parameter_validation) { // Create vector of tuples of: // std::vector> tests = { // All parameters are invalid {"", "", "", false}, {"-1", "-1", "-1", false}, // Min stake is out of range {"-1", "0", "0", false}, {"-0.01", "0", "0", false}, {"21000000000000.01", "0", "0", false}, // Min connected ratio is out of range {"0", "-1", "0", false}, {"0", "1.1", "0", false}, // Min avaproofs messages ratio is out of range {"0", "0", "-1", false}, // All parameters are valid {"0", "0", "0", true}, {"0.00", "0", "0", true}, {"0.01", "0", "0", true}, {"1", "0.1", "0", true}, {"10", "0.5", "0", true}, {"10", "1", "0", true}, {"21000000000000.00", "0", "0", true}, {"0", "0", "1", true}, {"0", "0", "100", true}, }; // For each case set the parameters and check that making the processor // succeeds or fails as expected for (auto it = tests.begin(); it != tests.end(); ++it) { gArgs.ForceSetArg("-avaminquorumstake", std::get<0>(*it)); gArgs.ForceSetArg("-avaminquorumconnectedstakeratio", std::get<1>(*it)); gArgs.ForceSetArg("-avaminavaproofsnodecount", std::get<2>(*it)); bilingual_str error; std::unique_ptr processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), *Assert(m_node.chainman), *m_node.scheduler, error); if (std::get<3>(*it)) { BOOST_CHECK(processor != nullptr); BOOST_CHECK(error.empty()); BOOST_CHECK_EQUAL(error.original, ""); } else { BOOST_CHECK(processor == nullptr); BOOST_CHECK(!error.empty()); BOOST_CHECK(error.original != ""); } } gArgs.ClearForcedArg("-avaminquorumstake"); gArgs.ClearForcedArg("-avaminquorumconnectedstakeratio"); gArgs.ClearForcedArg("-avaminavaproofsnodecount"); } BOOST_AUTO_TEST_CASE(min_avaproofs_messages) { ArgsManager argsman; argsman.ForceSetArg("-avaminquorumstake", "0"); argsman.ForceSetArg("-avaminquorumconnectedstakeratio", "0"); ChainstateManager &chainman = *Assert(m_node.chainman); auto checkMinAvaproofsMessages = [&](int64_t minAvaproofsMessages) { argsman.ForceSetArg("-avaminavaproofsnodecount", ToString(minAvaproofsMessages)); bilingual_str error; auto processor = Processor::MakeProcessor( argsman, *m_node.chain, m_node.connman.get(), chainman, *m_node.scheduler, error); - BOOST_CHECK_EQUAL(processor->isQuorumEstablished(), - minAvaproofsMessages <= 0); - auto addNode = [&](NodeId nodeid) { auto proof = buildRandomProof(chainman.ActiveChainstate(), MIN_VALID_PROOF_SCORE); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof)); BOOST_CHECK(pm.addNode(nodeid, proof->getId())); }); }; + // Add enough node to have a conclusive vote, but don't account any + // avaproofs. + // NOTE: we can't use the test facilites like ConnectNodes() because we + // are not testing on m_processor. + for (NodeId id = 100; id < 108; id++) { + addNode(id); + } + + BOOST_CHECK_EQUAL(processor->isQuorumEstablished(), + minAvaproofsMessages <= 0); + for (int64_t i = 0; i < minAvaproofsMessages - 1; i++) { addNode(i); processor->avaproofsSent(i); BOOST_CHECK_EQUAL(processor->getAvaproofsNodeCounter(), i + 1); // Receiving again on the same node does not increase the counter processor->avaproofsSent(i); BOOST_CHECK_EQUAL(processor->getAvaproofsNodeCounter(), i + 1); BOOST_CHECK(!processor->isQuorumEstablished()); } addNode(minAvaproofsMessages); processor->avaproofsSent(minAvaproofsMessages); BOOST_CHECK(processor->isQuorumEstablished()); // Check the latch AvalancheTest::clearavaproofsNodeCounter(*processor); BOOST_CHECK(processor->isQuorumEstablished()); }; checkMinAvaproofsMessages(0); checkMinAvaproofsMessages(1); checkMinAvaproofsMessages(10); checkMinAvaproofsMessages(100); } BOOST_AUTO_TEST_CASE_TEMPLATE(voting_parameters, P, VoteItemProviders) { // Check that setting voting parameters has the expected effect gArgs.ForceSetArg("-avastalevotethreshold", ToString(AVALANCHE_VOTE_STALE_MIN_THRESHOLD)); gArgs.ForceSetArg("-avastalevotefactor", "2"); std::vector> testCases = { // {number of yes votes, number of neutral votes} {0, AVALANCHE_VOTE_STALE_MIN_THRESHOLD}, {AVALANCHE_FINALIZATION_SCORE + 4, AVALANCHE_FINALIZATION_SCORE - 6}, }; bilingual_str error; m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), *Assert(m_node.chainman), *m_node.scheduler, error); BOOST_CHECK(m_processor != nullptr); BOOST_CHECK(error.empty()); P provider(this); auto &updates = provider.updates; const uint32_t invType = provider.invType; const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); int nextNodeIndex = 0; for (auto &testCase : testCases) { // Add a new item. Check it is added to the polls. BOOST_CHECK(provider.addToReconcile(item)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); BOOST_CHECK(m_processor->isAccepted(item)); auto registerNewVote = [&](const Response &resp) { runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(provider.registerVotes(nodeid, resp)); }; // Add some confidence for (int i = 0; i < std::get<0>(testCase); i++) { Response resp = {getRound(), 0, {Vote(0, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), i >= 6 ? i - 5 : 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // Vote until just before item goes stale for (int i = 0; i < std::get<1>(testCase); i++) { Response resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not stale, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); // Now stale Response resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getVoteItem() == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Stale); updates.clear(); // Once stale, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } gArgs.ClearForcedArg("-avastalevotethreshold"); gArgs.ClearForcedArg("-avastalevotefactor"); } BOOST_AUTO_TEST_SUITE_END() diff --git a/test/functional/abc_p2p_avalanche_proof_voting.py b/test/functional/abc_p2p_avalanche_proof_voting.py index 87d499b26..260beb6f7 100755 --- a/test/functional/abc_p2p_avalanche_proof_voting.py +++ b/test/functional/abc_p2p_avalanche_proof_voting.py @@ -1,553 +1,555 @@ #!/usr/bin/env python3 # Copyright (c) 2021 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the resolution of conflicting proofs via avalanche.""" import time from test_framework.avatools import ( avalanche_proof_from_hex, create_coinbase_stakes, gen_proof, get_ava_p2p_interface, get_proof_ids, wait_for_proof, ) from test_framework.key import ECPubKey from test_framework.messages import ( MSG_AVA_PROOF, AvalancheProofVoteResponse, AvalancheVote, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, assert_greater_than, assert_raises_rpc_error, try_rpc, ) from test_framework.wallet_util import bytes_to_wif QUORUM_NODE_COUNT = 16 class AvalancheProofVotingTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.conflicting_proof_cooldown = 100 self.peer_replacement_cooldown = 2000 self.extra_args = [ ['-enableavalanche=1', '-enableavalancheproofreplacement=1', '-avaproofstakeutxoconfirmations=2', f'-avalancheconflictingproofcooldown={self.conflicting_proof_cooldown}', f'-avalanchepeerreplacementcooldown={self.peer_replacement_cooldown}', '-avacooldown=0', '-avastalevotethreshold=140', '-avastalevotefactor=1'], ] self.supports_cli = False # Build a fake quorum of nodes. def get_quorum(self, node): quorum = [get_ava_p2p_interface(node) for _ in range(0, QUORUM_NODE_COUNT)] for n in quorum: success = node.addavalanchenode( n.nodeid, self.privkey.get_pubkey().get_bytes().hex(), self.quorum_proof.serialize().hex(), ) assert success is True return quorum def can_find_proof_in_poll(self, hash, response): found_hash = False for n in self.quorum: poll = n.get_avapoll_if_available() # That node has not received a poll if poll is None: continue # We got a poll, check for the hash and repond votes = [] for inv in poll.invs: # Vote yes to everything r = AvalancheProofVoteResponse.ACTIVE # Look for what we expect if inv.hash == hash: r = response found_hash = True votes.append(AvalancheVote(r, inv.hash)) n.send_avaresponse(poll.round, votes, self.privkey) return found_hash @staticmethod def send_proof(from_peer, proof_hex): proof = avalanche_proof_from_hex(proof_hex) from_peer.send_avaproof(proof) return proof.proofid def send_and_check_for_polling(self, peer, proof_hex, response=AvalancheProofVoteResponse.ACTIVE): proofid = self.send_proof(peer, proof_hex) self.wait_until(lambda: self.can_find_proof_in_poll(proofid, response)) def build_conflicting_proof(self, node, sequence): return node.buildavalancheproof( sequence, 0, self.privkey_wif, self.conflicting_stakes) def run_test(self): node = self.nodes[0] self.privkey, self.quorum_proof = gen_proof(node) self.privkey_wif = bytes_to_wif(self.privkey.get_bytes()) # Make the quorum proof mature before preparing the quorum node.generate(1) self.quorum = self.get_quorum(node) addrkey0 = node.get_deterministic_priv_key() blockhash = node.generatetoaddress(10, addrkey0.address) self.conflicting_stakes = create_coinbase_stakes( node, blockhash[5:9], addrkey0.key) self.immature_stakes = create_coinbase_stakes( node, blockhash[9:], addrkey0.key) self.poll_tests(node) self.update_tests(node) self.vote_tests(node) self.stale_proof_tests(node) self.unorphan_poll_tests(node) self.repeated_conflicting_proofs_tests(node, increase_sequence=True) self.repeated_conflicting_proofs_tests(node, increase_sequence=False) def poll_tests(self, node): proof_seq10 = self.build_conflicting_proof(node, 10) proof_seq20 = self.build_conflicting_proof(node, 20) proof_seq30 = self.build_conflicting_proof(node, 30) proof_seq40 = self.build_conflicting_proof(node, 40) orphan = node.buildavalancheproof( 100, 0, self.privkey_wif, self.immature_stakes) no_stake = node.buildavalancheproof( 200, 2000000000, self.privkey_wif, [] ) # Get the key so we can verify signatures. avakey = ECPubKey() avakey.set(bytes.fromhex(node.getavalanchekey())) self.log.info("Trigger polling from the node...") peer = get_ava_p2p_interface(node) mock_time = int(time.time()) node.setmocktime(mock_time) self.log.info("Check we poll for valid proof") self.send_and_check_for_polling(peer, proof_seq30) self.log.info( "Check we don't poll for subsequent proofs if the cooldown is not elapsed, proof not the favorite") with node.assert_debug_log(["Misbehaving", "cooldown-not-elapsed"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq20)) self.log.info( "Check we don't poll for subsequent proofs if the cooldown is not elapsed, proof is the favorite") with node.assert_debug_log(["Misbehaving", "cooldown-not-elapsed"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq40)) self.log.info( "Check we poll for conflicting proof if the proof is not the favorite") mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.send_and_check_for_polling( peer, proof_seq20, response=AvalancheProofVoteResponse.REJECTED) self.log.info( "Check we poll for conflicting proof if the proof is the favorite") mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.send_and_check_for_polling(peer, proof_seq40) mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.log.info("Check we don't poll for orphans") with node.assert_debug_log(["Not polling the avalanche proof (orphan-proof)"]): peer.send_avaproof(avalanche_proof_from_hex(orphan)) self.log.info("Check we don't poll for proofs that get rejected") with node.assert_debug_log(["Not polling the avalanche proof (rejected-proof)"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq10)) self.log.info("Check we don't poll for invalid proofs and get banned") with node.assert_debug_log(["Misbehaving", "invalid-proof"]): peer.send_avaproof(avalanche_proof_from_hex(no_stake)) peer.wait_for_disconnect() self.log.info("We don't poll for proofs if replacement is disabled") self.restart_node( 0, extra_args=self.extra_args[0] + ['-enableavalancheproofreplacement=0']) peer = get_ava_p2p_interface(node) with node.assert_debug_log(["Not polling the avalanche proof (not-worth-polling)"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq10)) def update_tests(self, node): # Restart the node to get rid of in-flight requests self.restart_node(0) mock_time = int(time.time()) node.setmocktime(mock_time) self.quorum = self.get_quorum(node) peer = get_ava_p2p_interface(node) proof_seq30 = self.build_conflicting_proof(node, 30) proof_seq40 = self.build_conflicting_proof(node, 40) proof_seq50 = self.build_conflicting_proof(node, 50) proofid_seq30 = avalanche_proof_from_hex(proof_seq30).proofid proofid_seq40 = avalanche_proof_from_hex(proof_seq40).proofid proofid_seq50 = avalanche_proof_from_hex(proof_seq50).proofid node.sendavalancheproof(proof_seq40) self.wait_until(lambda: proofid_seq40 in get_proof_ids(node)) assert proofid_seq40 in get_proof_ids(node) assert proofid_seq30 not in get_proof_ids(node) self.log.info("Test proof acceptance") def accept_proof(proofid): self.wait_until(lambda: self.can_find_proof_in_poll( proofid, response=AvalancheProofVoteResponse.ACTIVE), timeout=5) return proofid in get_proof_ids(node) mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.send_and_check_for_polling(peer, proof_seq30) # Let the quorum vote for it self.wait_until(lambda: accept_proof(proofid_seq30)) assert proofid_seq40 not in get_proof_ids(node) self.log.info("Test the peer replacement rate limit") # Wait until proof_seq30 is finalized retry = 5 while retry > 0: try: with node.assert_debug_log([f"Avalanche finalized proof {proofid_seq30:0{64}x}"]): self.wait_until(lambda: not self.can_find_proof_in_poll( proofid_seq30, response=AvalancheProofVoteResponse.ACTIVE)) break except AssertionError: retry -= 1 assert_greater_than(retry, 0) # Not enough assert self.conflicting_proof_cooldown < self.peer_replacement_cooldown mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) peer = get_ava_p2p_interface(node) with node.assert_debug_log(["Misbehaving", "cooldown-not-elapsed"]): self.send_proof(peer, proof_seq50) mock_time += self.peer_replacement_cooldown node.setmocktime(mock_time) self.log.info("Test proof rejection") self.send_proof(peer, proof_seq50) self.wait_until(lambda: proofid_seq50 in get_proof_ids(node)) assert proofid_seq40 not in get_proof_ids(node) def reject_proof(proofid): self.wait_until( lambda: self.can_find_proof_in_poll( proofid, response=AvalancheProofVoteResponse.REJECTED)) return proofid not in get_proof_ids(node) with node.assert_debug_log( [f"Avalanche rejected proof {proofid_seq50:0{64}x}"], ["Failed to reject proof"] ): self.wait_until(lambda: reject_proof(proofid_seq50)) assert proofid_seq50 not in get_proof_ids(node) assert proofid_seq40 in get_proof_ids(node) self.log.info("Test proof invalidation") def invalidate_proof(proofid): self.wait_until( lambda: self.can_find_proof_in_poll( proofid, response=AvalancheProofVoteResponse.REJECTED)) return try_rpc(-8, "Proof not found", node.getrawavalancheproof, f"{proofid:0{64}x}") with node.assert_debug_log( [f"Avalanche invalidated proof {proofid_seq50:0{64}x}"], ["Failed to reject proof"] ): self.wait_until(lambda: invalidate_proof(proofid_seq50)) self.log.info("The node will now ignore the invalid proof") for i in range(5): with node.assert_debug_log(["received: avaproof"]): self.send_proof(peer, proof_seq50) assert_raises_rpc_error(-8, "Proof not found", node.getrawavalancheproof, f"{proofid_seq50:0{64}x}") def vote_tests(self, node): self.restart_node(0, extra_args=['-enableavalanche=1', '-avaproofstakeutxoconfirmations=2', '-avacooldown=0', '-avalancheconflictingproofcooldown=0', '-whitelist=noban@127.0.0.1', ]) + self.get_quorum(node) + ava_node = get_ava_p2p_interface(node) # Generate coinbases to use for stakes stakes_key = node.get_deterministic_priv_key() blocks = node.generatetoaddress(4, stakes_key.address) # Get the ava key so we can verify signatures. ava_key = ECPubKey() ava_key.set(bytes.fromhex(node.getavalanchekey())) def create_proof(stakes): proof = node.buildavalancheproof(11, 12, self.privkey_wif, stakes) proof_id = avalanche_proof_from_hex(proof).proofid return proof, proof_id # proof_0 is valid right now stakes_0 = create_coinbase_stakes(node, [blocks[0]], stakes_key.key) proof_0, proof_0_id = create_proof(stakes_0) # proof_1 is valid right now, and from different stakes stakes_1 = create_coinbase_stakes(node, [blocks[1]], stakes_key.key) proof_1, proof_1_id = create_proof(stakes_1) # proof_2 is an orphan because the stake UTXO is immature stakes_2 = create_coinbase_stakes(node, [blocks[3]], stakes_key.key) proof_2, proof_2_id = create_proof(stakes_2) # proof_3 conflicts with proof_0 and proof_1 stakes_3 = create_coinbase_stakes( node, [blocks[0], blocks[1]], stakes_key.key) proof_3, proof_3_id = create_proof(stakes_3) # proof_4 is invalid and should be rejected stakes_4 = create_coinbase_stakes(node, [blocks[2]], stakes_key.key) stakes_4[0]['amount'] -= 100000 proof_4, proof_4_id = create_proof(stakes_4) # Create a helper to issue a poll and validate the responses def poll_assert_response(expected): # Issue a poll for each proof self.log.info("Trigger polling from the node...") ava_node.send_poll( [proof_0_id, proof_1_id, proof_2_id, proof_3_id, proof_4_id], MSG_AVA_PROOF) response = ava_node.wait_for_avaresponse() r = response.response # Verify signature assert ava_key.verify_schnorr(response.sig, r.get_hash()) # Verify votes votes = r.votes assert_equal(len(votes), len(expected)) for i in range(0, len(votes)): assert_equal(repr(votes[i]), repr(expected[i])) # Check that all proofs start unknown poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # Send the first proof. Nodes should now respond that it's accepted node.sendavalancheproof(proof_0) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # Send and check the 2nd proof. Nodes should now respond that it's # accepted node.sendavalancheproof(proof_1) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # The next proof should be rejected/put in the orphan pool ava_node.send_proof(avalanche_proof_from_hex(proof_2)) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.ORPHAN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # The next proof should be rejected and marked as a conflicting proof assert_raises_rpc_error(-8, "The proof has conflicting utxo with an existing proof", node.sendavalancheproof, proof_3) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.ORPHAN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.CONFLICT, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # The final proof should be permanently rejected for being completely # invalid ava_node.send_proof(avalanche_proof_from_hex(proof_4)) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.ORPHAN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.CONFLICT, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.REJECTED, proof_4_id)]) def stale_proof_tests(self, node): # Restart the node to get rid of in-flight requests self.restart_node(0) mock_time = int(time.time()) node.setmocktime(mock_time) self.quorum = self.get_quorum(node) peer = get_ava_p2p_interface(node) proof_seq1 = self.build_conflicting_proof(node, 1) proof_seq2 = self.build_conflicting_proof(node, 2) proofid_seq1 = avalanche_proof_from_hex(proof_seq1).proofid proofid_seq2 = avalanche_proof_from_hex(proof_seq2).proofid node.sendavalancheproof(proof_seq2) self.wait_until(lambda: proofid_seq2 in get_proof_ids(node)) assert proofid_seq2 in get_proof_ids(node) assert proofid_seq1 not in get_proof_ids(node) mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) peer.send_avaproof(avalanche_proof_from_hex(proof_seq1)) # Wait until proof_seq1 voting goes stale retry = 5 while retry > 0: try: with node.assert_debug_log([f"Avalanche stalled proof {proofid_seq1:0{64}x}"]): self.wait_until(lambda: not self.can_find_proof_in_poll( proofid_seq1, response=AvalancheProofVoteResponse.UNKNOWN), timeout=10) break except AssertionError: retry -= 1 assert_greater_than(retry, 0) # Verify that proof_seq2 was not replaced assert proofid_seq2 in get_proof_ids(node) assert proofid_seq1 not in get_proof_ids(node) # When polled, peer responds with expected votes for both proofs peer.send_poll([proofid_seq1, proofid_seq2], MSG_AVA_PROOF) response = peer.wait_for_avaresponse() assert repr(response.response.votes) == repr([ AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proofid_seq1), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proofid_seq2)]) def unorphan_poll_tests(self, node): # Restart the node with appropriate flags for this test self.restart_node(0, extra_args=[ '-enableavalanche=1', '-enableavalancheproofreplacement=1', '-avaproofstakeutxoconfirmations=2', '-avalancheconflictingproofcooldown=0', '-avacooldown=0', ]) self.quorum = self.get_quorum(node) peer = get_ava_p2p_interface(node) _, immature_proof = gen_proof(node) self.log.info("Orphan proofs are not polled") with node.assert_debug_log(["Not polling the avalanche proof (orphan-proof)"]): peer.send_avaproof(immature_proof) self.log.info("Unorphaned proofs are polled") node.generate(1) self.send_and_check_for_polling(peer, immature_proof.serialize().hex()) def repeated_conflicting_proofs_tests( self, node, increase_sequence, banscore=5): self.log.info( "Send a lot of conflicting proofs ({} sequence) and check the peer " "gets banned".format( "increasing" if increase_sequence else "decreasing" ) ) self.restart_node(0) sequence = 100 # Build and send a first proof proof_base = avalanche_proof_from_hex( self.build_conflicting_proof(node, sequence)) peer = get_ava_p2p_interface(node) peer.send_avaproof(proof_base) wait_for_proof(node, f"{proof_base.proofid:0{64}x}") def send_conflicting_proof(): nonlocal sequence sequence += 1 if increase_sequence else -1 proof = self.build_conflicting_proof(node, sequence) with node.assert_debug_log(["Misbehaving", "cooldown-not-elapsed"]): peer.send_avaproof(avalanche_proof_from_hex(proof)) # Ramp up the banscore by sending conflicting proofs repeatidly for _ in range(100 // banscore - 1): send_conflicting_proof() # This one will trigger the ban send_conflicting_proof() peer.wait_for_disconnect() if __name__ == '__main__': AvalancheProofVotingTest().main() diff --git a/test/functional/abc_p2p_avalanche_quorum.py b/test/functional/abc_p2p_avalanche_quorum.py index a38b635ac..46fb19a2f 100755 --- a/test/functional/abc_p2p_avalanche_quorum.py +++ b/test/functional/abc_p2p_avalanche_quorum.py @@ -1,207 +1,237 @@ #!/usr/bin/env python3 # Copyright (c) 2020-2022 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the quorum detection of avalanche.""" from test_framework.avatools import ( AvaP2PInterface, build_msg_avaproofs, gen_proof, get_ava_p2p_interface, + wait_for_proof, ) from test_framework.key import ECPubKey from test_framework.messages import ( NODE_AVALANCHE, NODE_NETWORK, AvalancheVote, AvalancheVoteError, ) from test_framework.p2p import p2p_lock from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class AvalancheQuorumTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 3 self.min_avaproofs_node_count = 8 self.extra_args = [[ '-enableavalanche=1', '-enableavalanchepeerdiscovery=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', '-avatimeout=0', - '-avaminquorumstake=100000000', + '-avaminquorumstake=150000000', '-avaminquorumconnectedstakeratio=0.8', '-minimumchainwork=0', ]] * self.num_nodes self.extra_args[0] = self.extra_args[0] + \ ['-avaminavaproofsnodecount=0'] self.extra_args[1] = self.extra_args[1] + \ [f'-avaminavaproofsnodecount={self.min_avaproofs_node_count}'] self.extra_args[2] = self.extra_args[2] + \ [f'-avaminavaproofsnodecount={self.min_avaproofs_node_count}'] def run_test(self): + # Initially all nodes start with 8 nodes attached to a single proof + privkey, proof = gen_proof(self.nodes[0]) + for node in self.nodes: + quorum = [get_ava_p2p_interface(node) + for _ in range(0, 8)] + + for n in quorum: + success = node.addavalanchenode( + n.nodeid, + privkey.get_pubkey().get_bytes().hex(), + proof.serialize().hex(), + ) + assert success is True + # Prepare peers proofs peers = [] for i in range(0, self.min_avaproofs_node_count + 1): key, proof = gen_proof(self.nodes[0]) peers.append({'key': key, 'proof': proof}) # Let the nodes known about all the blocks then disconnect them so we're # sure they won't exchange proofs when we start connecting peers. self.sync_all() self.disconnect_nodes(0, 1) # Restart node 2 to apply the minimum chainwork and make sure it's still # in IBD state. chainwork = int(self.nodes[2].getblockchaininfo()['chainwork'], 16) self.restart_node( 2, extra_args=self.extra_args[2] + [f'-minimumchainwork={chainwork + 2:#x}']) assert self.nodes[2].getblockchaininfo()['initialblockdownload'] # Build polling nodes pollers = [get_ava_p2p_interface(node) for node in self.nodes] def poll_and_assert_response(node, expected): pubkey = ECPubKey() pubkey.set(bytes.fromhex(node.getavalanchekey())) poller = pollers[node.index] # Send poll for best block block = int(node.getbestblockhash(), 16) poller.send_poll([block]) # Get response and check that the vote is what we expect response = poller.wait_for_avaresponse() r = response.response assert pubkey.verify_schnorr(response.sig, r.get_hash()) assert_equal(len(r.votes), 1) actual = repr(r.votes[0]) expected = repr(AvalancheVote(expected, block)) assert_equal(actual, expected) p2p_idx = 0 def get_ava_outbound(node, peer, empty_avaproof): nonlocal p2p_idx avapeer = AvaP2PInterface() avapeer.proof = peer['proof'] avapeer.master_privkey = peer['key'] node.add_outbound_p2p_connection( avapeer, p2p_idx=p2p_idx, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) p2p_idx += 1 avapeer.nodeid = node.getpeerinfo()[-1]['id'] peer['node'] = avapeer # There is no compact proof request if the node is in IBD state if not node.getblockchaininfo()['initialblockdownload']: avapeer.wait_until( lambda: avapeer.last_message.get("getavaproofs")) if empty_avaproof: avapeer.send_message(build_msg_avaproofs([])) avapeer.sync_send_with_ping() with p2p_lock: assert_equal( avapeer.message_count.get( "avaproofsreq", 0), 0) else: avapeer.send_and_ping(build_msg_avaproofs([peer['proof']])) avapeer.wait_until( lambda: avapeer.last_message.get("avaproofsreq")) return avapeer def add_avapeer_and_check_status( peer, expected_status, empty_avaproof=False): for i, node in enumerate(self.nodes): get_ava_outbound(node, peer, empty_avaproof) poll_and_assert_response(node, expected_status[i]) - # Start polling. The response should be UNKNOWN because there's no - # score + # Start polling. The response should be UNKNOWN because there's not + # enough stake [poll_and_assert_response(node, AvalancheVoteError.UNKNOWN) for node in self.nodes] - # Create one peer with half the score and add one node + # Create one peer with half the remaining missing stake and add one + # node add_avapeer_and_check_status( peers[0], [ AvalancheVoteError.UNKNOWN, AvalancheVoteError.UNKNOWN, AvalancheVoteError.UNKNOWN, ]) # Create a second peer with the other half and add one node. # This is enough for node0 but not node1 add_avapeer_and_check_status( peers[1], [ AvalancheVoteError.ACCEPTED, AvalancheVoteError.UNKNOWN, AvalancheVoteError.UNKNOWN, ]) # Add more peers for triggering the avaproofs messaging for i in range(2, self.min_avaproofs_node_count - 1): add_avapeer_and_check_status( peers[i], [ AvalancheVoteError.ACCEPTED, AvalancheVoteError.UNKNOWN, AvalancheVoteError.UNKNOWN, ]) add_avapeer_and_check_status( peers[self.min_avaproofs_node_count - 1], [ AvalancheVoteError.ACCEPTED, AvalancheVoteError.ACCEPTED, AvalancheVoteError.UNKNOWN, ]) self.nodes[2].generate(1) assert not self.nodes[2].getblockchaininfo()['initialblockdownload'] # The avaproofs message are not accounted during IBD, so this is not # enough. poll_and_assert_response(self.nodes[2], AvalancheVoteError.UNKNOWN) # Connect more peers to reach the message threshold while node 2 is out # of IBD. for i in range(self.min_avaproofs_node_count - 1): add_avapeer_and_check_status( peers[i], [ AvalancheVoteError.ACCEPTED, AvalancheVoteError.ACCEPTED, AvalancheVoteError.UNKNOWN, ]) # The messages is not accounted when there is no shortid add_avapeer_and_check_status( peers[self.min_avaproofs_node_count - 1], [ AvalancheVoteError.ACCEPTED, AvalancheVoteError.ACCEPTED, AvalancheVoteError.UNKNOWN, ], empty_avaproof=True) # The messages are accounted and the node quorum finally valid add_avapeer_and_check_status( peers[self.min_avaproofs_node_count], [ AvalancheVoteError.ACCEPTED, AvalancheVoteError.ACCEPTED, AvalancheVoteError.ACCEPTED, ]) + # Unless there is not enough nodes to poll + for node in self.nodes: + avapeers = [p2p for p2p in node.p2ps if p2p not in pollers] + for peer in avapeers[7:]: + peer.peer_disconnect() + peer.wait_for_disconnect() + poll_and_assert_response(node, AvalancheVoteError.UNKNOWN) + + # Add a node back and check it resumes the quorum status + avapeer = AvaP2PInterface(node) + node.add_p2p_connection(avapeer) + wait_for_proof(node, f"{avapeer.proof.proofid:0{64}x}") + poll_and_assert_response(node, AvalancheVoteError.ACCEPTED) + if __name__ == '__main__': AvalancheQuorumTest().main() diff --git a/test/functional/abc_p2p_avalanche_voting.py b/test/functional/abc_p2p_avalanche_voting.py index 1753e2c55..188c1753c 100755 --- a/test/functional/abc_p2p_avalanche_voting.py +++ b/test/functional/abc_p2p_avalanche_voting.py @@ -1,235 +1,236 @@ #!/usr/bin/env python3 # Copyright (c) 2020-2021 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the resolution of forks via avalanche.""" import random from test_framework.avatools import ( create_coinbase_stakes, get_ava_p2p_interface, ) from test_framework.key import ECKey, ECPubKey from test_framework.messages import AvalancheVote, AvalancheVoteError from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal from test_framework.wallet_util import bytes_to_wif QUORUM_NODE_COUNT = 16 class AvalancheTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 2 self.extra_args = [ ['-enableavalanche=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0'], ['-enableavalanche=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', '-noparkdeepreorg', '-maxreorgdepth=-1']] self.supports_cli = False self.rpc_timeout = 120 def run_test(self): node = self.nodes[0] # Build a fake quorum of nodes. def get_quorum(): return [get_ava_p2p_interface(node) for _ in range(0, QUORUM_NODE_COUNT)] # Pick on node from the quorum for polling. quorum = get_quorum() poll_node = quorum[0] # Generate many block and poll for them. addrkey0 = node.get_deterministic_priv_key() blockhashes = node.generatetoaddress(100, addrkey0.address) # Use the first coinbase to create a stake stakes = create_coinbase_stakes(node, [blockhashes[0]], addrkey0.key) + # duplicate the deterministic sig test from src/test/key_tests.cpp + privkey = ECKey() + privkey.set(bytes.fromhex( + "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747"), True) + + proof_sequence = 11 + proof_expiration = 12 + proof = node.buildavalancheproof( + proof_sequence, proof_expiration, bytes_to_wif( + privkey.get_bytes()), + stakes) + + # Activate the quorum. + for n in quorum: + success = node.addavalanchenode( + n.nodeid, privkey.get_pubkey().get_bytes().hex(), proof) + assert success is True + fork_node = self.nodes[1] # Make sure the fork node has synced the blocks self.sync_blocks([node, fork_node]) # Get the key so we can verify signatures. avakey = ECPubKey() avakey.set(bytes.fromhex(node.getavalanchekey())) self.log.info("Poll for the chain tip...") best_block_hash = int(node.getbestblockhash(), 16) poll_node.send_poll([best_block_hash]) def assert_response(expected): response = poll_node.wait_for_avaresponse() r = response.response assert_equal(r.cooldown, 0) # Verify signature. assert avakey.verify_schnorr(response.sig, r.get_hash()) votes = r.votes assert_equal(len(votes), len(expected)) for i in range(0, len(votes)): assert_equal(repr(votes[i]), repr(expected[i])) assert_response( [AvalancheVote(AvalancheVoteError.ACCEPTED, best_block_hash)]) self.log.info("Poll for a selection of blocks...") various_block_hashes = [ int(node.getblockhash(0), 16), int(node.getblockhash(1), 16), int(node.getblockhash(10), 16), int(node.getblockhash(25), 16), int(node.getblockhash(42), 16), int(node.getblockhash(96), 16), int(node.getblockhash(99), 16), int(node.getblockhash(100), 16), ] poll_node.send_poll(various_block_hashes) assert_response([AvalancheVote(AvalancheVoteError.ACCEPTED, h) for h in various_block_hashes]) self.log.info( "Poll for a selection of blocks, but some are now invalid...") invalidated_block = node.getblockhash(76) node.invalidateblock(invalidated_block) # We need to send the coin to a new address in order to make sure we do # not regenerate the same block. node.generatetoaddress( 26, 'ecregtest:pqv2r67sgz3qumufap3h2uuj0zfmnzuv8v38gtrh5v') node.reconsiderblock(invalidated_block) poll_node.send_poll(various_block_hashes) assert_response([AvalancheVote(AvalancheVoteError.ACCEPTED, h) for h in various_block_hashes[:5]] + [AvalancheVote(AvalancheVoteError.FORK, h) for h in various_block_hashes[-3:]]) self.log.info("Poll for unknown blocks...") various_block_hashes = [ int(node.getblockhash(0), 16), int(node.getblockhash(25), 16), int(node.getblockhash(42), 16), various_block_hashes[5], various_block_hashes[6], various_block_hashes[7], random.randrange(1 << 255, (1 << 256) - 1), random.randrange(1 << 255, (1 << 256) - 1), random.randrange(1 << 255, (1 << 256) - 1), ] poll_node.send_poll(various_block_hashes) assert_response([AvalancheVote(AvalancheVoteError.ACCEPTED, h) for h in various_block_hashes[:3]] + [AvalancheVote(AvalancheVoteError.FORK, h) for h in various_block_hashes[3:6]] + [AvalancheVote(AvalancheVoteError.UNKNOWN, h) for h in various_block_hashes[-3:]]) self.log.info("Trigger polling from the node...") - # duplicate the deterministic sig test from src/test/key_tests.cpp - privkey = ECKey() - privkey.set(bytes.fromhex( - "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747"), True) - - proof_sequence = 11 - proof_expiration = 12 - proof = node.buildavalancheproof( - proof_sequence, proof_expiration, bytes_to_wif( - privkey.get_bytes()), - stakes) - - # Activate the quorum. - for n in quorum: - success = node.addavalanchenode( - n.nodeid, privkey.get_pubkey().get_bytes().hex(), proof) - assert success is True def can_find_block_in_poll(hash, resp=AvalancheVoteError.ACCEPTED): found_hash = False for n in quorum: poll = n.get_avapoll_if_available() # That node has not received a poll if poll is None: continue # We got a poll, check for the hash and repond votes = [] for inv in poll.invs: # Vote yes to everything r = AvalancheVoteError.ACCEPTED # Look for what we expect if inv.hash == hash: r = resp found_hash = True votes.append(AvalancheVote(r, inv.hash)) n.send_avaresponse(poll.round, votes, privkey) return found_hash # Now that we have a peer, we should start polling for the tip. hash_tip = int(node.getbestblockhash(), 16) self.wait_until(lambda: can_find_block_in_poll(hash_tip), timeout=5) # Make sure the fork node has synced the blocks self.sync_blocks([node, fork_node]) # Create a fork 2 blocks deep. This should trigger polling. fork_node.invalidateblock(fork_node.getblockhash(100)) fork_address = fork_node.get_deterministic_priv_key().address fork_node.generatetoaddress(2, fork_address) # Because the new tip is a deep reorg, the node will not accept it # right away, but poll for it. def parked_block(blockhash): for tip in node.getchaintips(): if tip["hash"] == blockhash: assert tip["status"] != "active" return tip["status"] == "parked" return False fork_tip = fork_node.getbestblockhash() self.wait_until(lambda: parked_block(fork_tip)) self.log.info("Answer all polls to finalize...") hash_to_find = int(fork_tip, 16) def has_accepted_new_tip(): can_find_block_in_poll(hash_to_find) return node.getbestblockhash() == fork_tip # Because everybody answers yes, the node will accept that block. self.wait_until(has_accepted_new_tip, timeout=15) assert_equal(node.getbestblockhash(), fork_tip) self.log.info("Answer all polls to park...") node.generate(1) tip_to_park = node.getbestblockhash() hash_to_find = int(tip_to_park, 16) assert(tip_to_park != fork_tip) def has_parked_new_tip(): can_find_block_in_poll(hash_to_find, AvalancheVoteError.PARKED) return node.getbestblockhash() == fork_tip # Because everybody answers no, the node will park that block. with node.assert_debug_log([f"Avalanche invalidated block {hash_to_find:0{64}x}"]): self.wait_until(has_parked_new_tip, timeout=15) assert_equal(node.getbestblockhash(), fork_tip) self.log.info( "Check the node is discouraging unexpected avaresponses.") with node.assert_debug_log( ['Misbehaving', 'peer=1 (0 -> 2): unexpected-ava-response']): # unknown voting round poll_node.send_avaresponse( round=2**32 - 1, votes=[], privkey=privkey) if __name__ == '__main__': AvalancheTest().main() diff --git a/test/functional/abc_p2p_compactproofs.py b/test/functional/abc_p2p_compactproofs.py index c936e231b..e1c89649e 100644 --- a/test/functional/abc_p2p_compactproofs.py +++ b/test/functional/abc_p2p_compactproofs.py @@ -1,685 +1,682 @@ #!/usr/bin/env python3 # Copyright (c) 2022 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test proof inventory relaying """ import random import time from test_framework.avatools import ( AvaP2PInterface, NoHandshakeAvaP2PInterface, build_msg_avaproofs, gen_proof, get_ava_p2p_interface, get_proof_ids, wait_for_proof, ) from test_framework.messages import ( NODE_AVALANCHE, NODE_NETWORK, AvalanchePrefilledProof, calculate_shortid, msg_avaproofsreq, msg_getavaproofs, ) from test_framework.p2p import P2PInterface, p2p_lock from test_framework.test_framework import BitcoinTestFramework from test_framework.util import MAX_NODES, assert_equal, p2p_port # Timeout after which the proofs can be cleaned up AVALANCHE_AVAPROOFS_TIMEOUT = 2 * 60 # Max interval between 2 periodic networking processing AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL = 5 * 60 class ProofStoreP2PInterface(AvaP2PInterface): def __init__(self): self.proofs = [] super().__init__() def on_avaproof(self, message): self.proofs.append(message.proof) def get_proofs(self): with p2p_lock: return self.proofs class CompactProofsTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 self.extra_args = [[ '-enableavalanche=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', + '-enableavalanchepeerdiscovery=1', ]] * self.num_nodes def setup_network(self): # Don't connect the nodes self.setup_nodes() @staticmethod def received_avaproofs(peer): with p2p_lock: return peer.last_message.get("avaproofs") def test_send_outbound_getavaproofs(self): self.log.info( "Check we send a getavaproofs message to our avalanche outbound peers") node = self.nodes[0] p2p_idx = 0 non_avapeers = [] for _ in range(4): peer = P2PInterface() node.add_outbound_p2p_connection( peer, p2p_idx=p2p_idx, connection_type="outbound-full-relay", services=NODE_NETWORK, ) non_avapeers.append(peer) p2p_idx += 1 inbound_avapeers = [ node.add_p2p_connection( NoHandshakeAvaP2PInterface()) for _ in range(4)] outbound_avapeers = [] # With a proof and the service bit set for _ in range(4): peer = AvaP2PInterface(node) node.add_outbound_p2p_connection( peer, p2p_idx=p2p_idx, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) outbound_avapeers.append(peer) p2p_idx += 1 # Without a proof and no service bit set for _ in range(4): peer = AvaP2PInterface() node.add_outbound_p2p_connection( peer, p2p_idx=p2p_idx, connection_type="outbound-full-relay", services=NODE_NETWORK, ) outbound_avapeers.append(peer) p2p_idx += 1 def all_peers_received_getavaproofs(): with p2p_lock: return all([p.last_message.get("getavaproofs") for p in outbound_avapeers]) self.wait_until(all_peers_received_getavaproofs) with p2p_lock: assert all([p.message_count.get( "getavaproofs", 0) >= 1 for p in outbound_avapeers]) assert all([p.message_count.get( "getavaproofs", 0) == 0 for p in non_avapeers]) assert all([p.message_count.get( "getavaproofs", 0) == 0 for p in inbound_avapeers]) self.log.info( "Check we send periodic getavaproofs message to some of our peers") def count_outbounds_getavaproofs(): with p2p_lock: return sum([p.message_count.get("getavaproofs", 0) for p in outbound_avapeers]) outbounds_getavaproofs = count_outbounds_getavaproofs() for i in range(12): node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) self.wait_until(lambda: count_outbounds_getavaproofs() == outbounds_getavaproofs + 3) outbounds_getavaproofs += 3 with p2p_lock: assert all([p.message_count.get( "getavaproofs", 0) == 0 for p in non_avapeers]) assert all([p.message_count.get( "getavaproofs", 0) == 0 for p in inbound_avapeers]) self.log.info( "After the first avaproofs has been received, all the peers are requested periodically") responding_outbound_avapeer = AvaP2PInterface(node) node.add_outbound_p2p_connection( responding_outbound_avapeer, p2p_idx=p2p_idx, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) p2p_idx += 1 responding_outbound_avapeer_id = node.getpeerinfo()[-1]['id'] outbound_avapeers.append(responding_outbound_avapeer) self.wait_until(all_peers_received_getavaproofs) # Register as an avalanche node for the avaproofs message to be counted key, proof = gen_proof(node) assert node.addavalanchenode( responding_outbound_avapeer_id, key.get_pubkey().get_bytes().hex(), proof.serialize().hex()) # Send the avaproofs message avaproofs = build_msg_avaproofs([proof]) responding_outbound_avapeer.send_and_ping(avaproofs) # Now the node will request from all its peers at each time period outbounds_getavaproofs = count_outbounds_getavaproofs() num_outbound_avapeers = len(outbound_avapeers) for i in range(12): node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) self.wait_until(lambda: count_outbounds_getavaproofs() == outbounds_getavaproofs + num_outbound_avapeers) outbounds_getavaproofs += num_outbound_avapeers for p in outbound_avapeers: with node.assert_debug_log(["received: avaproofs"], ["Ignoring unsollicited avaproofs"]): p.send_message(build_msg_avaproofs([])) with p2p_lock: assert all([p.message_count.get( "getavaproofs", 0) == 0 for p in non_avapeers]) assert all([p.message_count.get( "getavaproofs", 0) == 0 for p in inbound_avapeers]) def test_send_manual_getavaproofs(self): self.log.info( "Check we send a getavaproofs message to our manually connected peers that support avalanche") node = self.nodes[0] # Get rid of previously connected nodes node.disconnect_p2ps() def added_node_connected(ip_port): added_node_info = node.getaddednodeinfo(ip_port) return len( added_node_info) == 1 and added_node_info[0]['connected'] def connect_callback(address, port): self.log.debug("Connecting to {}:{}".format(address, port)) p = AvaP2PInterface(node) p2p_idx = 1 p.peer_accept_connection( connect_cb=connect_callback, connect_id=p2p_idx, net=node.chain, timeout_factor=node.timeout_factor, services=NODE_NETWORK | NODE_AVALANCHE, )() ip_port = f"127.0.01:{p2p_port(MAX_NODES - p2p_idx)}" node.addnode(node=ip_port, command="add") self.wait_until(lambda: added_node_connected(ip_port)) assert_equal(node.getpeerinfo()[-1]['addr'], ip_port) assert_equal(node.getpeerinfo()[-1]['connection_type'], 'manual') p.wait_until(lambda: p.last_message.get("getavaproofs")) def test_respond_getavaproofs(self): self.log.info("Check the node responds to getavaproofs messages") node = self.nodes[0] def send_getavaproof_check_shortid_len(peer, expected_len): peer.send_message(msg_getavaproofs()) self.wait_until(lambda: self.received_avaproofs(peer)) avaproofs = self.received_avaproofs(peer) assert_equal(len(avaproofs.shortids), expected_len) # Initially the node has 0 peer self.restart_node(0) assert_equal(len(get_proof_ids(node)), 0) peer = node.add_p2p_connection(NoHandshakeAvaP2PInterface()) send_getavaproof_check_shortid_len(peer, 0) # Add some proofs sending_peer = node.add_p2p_connection(NoHandshakeAvaP2PInterface()) for _ in range(50): _, proof = gen_proof(node) sending_peer.send_avaproof(proof) wait_for_proof(node, f"{proof.proofid:0{64}x}") proofids = get_proof_ids(node) assert_equal(len(proofids), 50) receiving_peer = node.add_p2p_connection(NoHandshakeAvaP2PInterface()) send_getavaproof_check_shortid_len(receiving_peer, len(proofids)) avaproofs = self.received_avaproofs(receiving_peer) expected_shortids = [ calculate_shortid( avaproofs.key0, avaproofs.key1, proofid) for proofid in sorted(proofids)] assert_equal(expected_shortids, avaproofs.shortids) # Don't expect any prefilled proof for now assert_equal(len(avaproofs.prefilled_proofs), 0) def test_request_missing_proofs(self): self.log.info( "Check the node requests the missing proofs after receiving an avaproofs message") node = self.nodes[0] self.restart_node(0) key0 = random.randint(0, 2**64 - 1) key1 = random.randint(0, 2**64 - 1) proofs = [gen_proof(node)[1] for _ in range(10)] # Build a map from proofid to shortid. Use sorted proofids so we don't # have the same indices than the `proofs` list. proofids = [p.proofid for p in proofs] shortid_map = {} for proofid in sorted(proofids): shortid_map[proofid] = calculate_shortid(key0, key1, proofid) self.log.info("The node ignores unsollicited avaproofs") spam_peer = get_ava_p2p_interface(node) msg = build_msg_avaproofs( proofs, prefilled_proofs=[], key_pair=[ key0, key1]) with node.assert_debug_log(["Ignoring unsollicited avaproofs"]): spam_peer.send_message(msg) def received_avaproofsreq(peer): with p2p_lock: return peer.last_message.get("avaproofsreq") p2p_idx = 0 def add_avalanche_p2p_outbound(): nonlocal p2p_idx peer = AvaP2PInterface(node) node.add_outbound_p2p_connection( peer, p2p_idx=p2p_idx, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) p2p_idx += 1 peer.wait_until(lambda: peer.last_message.get("getavaproofs")) return peer def expect_indices(shortids, expected_indices, prefilled_proofs=None): nonlocal p2p_idx msg = build_msg_avaproofs( [], prefilled_proofs=prefilled_proofs, key_pair=[ key0, key1]) msg.shortids = shortids peer = add_avalanche_p2p_outbound() peer.send_message(msg) self.wait_until(lambda: received_avaproofsreq(peer)) avaproofsreq = received_avaproofsreq(peer) assert_equal(avaproofsreq.indices, expected_indices) self.log.info("Check no proof is requested if there is no shortid") msg = build_msg_avaproofs([]) sender = add_avalanche_p2p_outbound() with node.assert_debug_log(["Got an avaproofs message with no shortid"]): sender.send_message(msg) # Make sure we don't get an avaproofsreq message sender.sync_send_with_ping() with p2p_lock: assert_equal(sender.message_count.get("avaproofsreq", 0), 0) self.log.info( "Check the node requests all the proofs if it known none") expect_indices( list(shortid_map.values()), [i for i in range(len(shortid_map))] ) self.log.info( "Check the node requests only the missing proofs") known_proofids = [] for proof in proofs[:5]: node.sendavalancheproof(proof.serialize().hex()) known_proofids.append(proof.proofid) expected_indices = [i for i, proofid in enumerate( shortid_map) if proofid not in known_proofids] expect_indices(list(shortid_map.values()), expected_indices) self.log.info( "Check the node don't request prefilled proofs") # Get the indices for a couple of proofs indice_proof5 = list(shortid_map.keys()).index(proofids[5]) indice_proof6 = list(shortid_map.keys()).index(proofids[6]) prefilled_proofs = [ AvalanchePrefilledProof(indice_proof5, proofs[5]), AvalanchePrefilledProof(indice_proof6, proofs[6]), ] prefilled_proofs = sorted( prefilled_proofs, key=lambda prefilled_proof: prefilled_proof.index) remaining_shortids = [shortid for proofid, shortid in shortid_map.items( ) if proofid not in proofids[5:7]] known_proofids.extend(proofids[5:7]) expected_indices = [i for i, proofid in enumerate( shortid_map) if proofid not in known_proofids] expect_indices( remaining_shortids, expected_indices, prefilled_proofs=prefilled_proofs) self.log.info( "Check the node requests no proof if it knows all of them") for proof in proofs[5:]: node.sendavalancheproof(proof.serialize().hex()) known_proofids.append(proof.proofid) expect_indices(list(shortid_map.values()), []) self.log.info("Check out of bounds index") bad_peer = add_avalanche_p2p_outbound() msg = build_msg_avaproofs([], prefilled_proofs=[ AvalanchePrefilledProof( len(shortid_map) + 1, gen_proof(node)[1])], key_pair=[key0, key1]) msg.shortids = list(shortid_map.values()) with node.assert_debug_log(["Misbehaving", "avaproofs-bad-indexes"]): bad_peer.send_message(msg) bad_peer.wait_for_disconnect() self.log.info("An invalid prefilled proof will trigger a ban") _, no_stake = gen_proof(node) no_stake.stakes = [] bad_peer = add_avalanche_p2p_outbound() msg = build_msg_avaproofs([], prefilled_proofs=[ AvalanchePrefilledProof(len(shortid_map), no_stake), ], key_pair=[key0, key1]) msg.shortids = list(shortid_map.values()) with node.assert_debug_log(["Misbehaving", "invalid-proof"]): bad_peer.send_message(msg) bad_peer.wait_for_disconnect() def test_send_missing_proofs(self): self.log.info("Check the node respond to missing proofs requests") node = self.nodes[0] self.restart_node(0) numof_proof = 10 proofs = [gen_proof(node)[1] for _ in range(numof_proof)] for proof in proofs: node.sendavalancheproof(proof.serialize().hex()) proofids = get_proof_ids(node) assert all(proof.proofid in proofids for proof in proofs) self.log.info("Unsollicited requests are ignored") peer = node.add_p2p_connection(ProofStoreP2PInterface()) peer.send_and_ping(msg_avaproofsreq()) assert_equal(len(peer.get_proofs()), 0) def request_proofs(peer): peer.send_message(msg_getavaproofs()) self.wait_until(lambda: self.received_avaproofs(peer)) avaproofs = self.received_avaproofs(peer) assert_equal(len(avaproofs.shortids), numof_proof) return avaproofs _ = request_proofs(peer) self.log.info("Sending an empty request has no effect") peer.send_and_ping(msg_avaproofsreq()) assert_equal(len(peer.get_proofs()), 0) self.log.info("Check the requested proofs are sent by the node") def check_received_proofs(indices): requester = node.add_p2p_connection(ProofStoreP2PInterface()) avaproofs = request_proofs(requester) req = msg_avaproofsreq() req.indices = indices requester.send_message(req) # Check we got the expected number of proofs self.wait_until( lambda: len( requester.get_proofs()) == len(indices)) # Check we got the expected proofs received_shortids = [ calculate_shortid( avaproofs.key0, avaproofs.key1, proof.proofid) for proof in requester.get_proofs()] assert_equal(set(received_shortids), set([avaproofs.shortids[i] for i in indices])) # Only the first proof check_received_proofs([0]) # Only the last proof check_received_proofs([numof_proof - 1]) # Half first check_received_proofs(range(0, numof_proof // 2)) # Half last check_received_proofs(range(numof_proof // 2, numof_proof)) # Even check_received_proofs([i for i in range(numof_proof) if i % 2 == 0]) # Odds check_received_proofs([i for i in range(numof_proof) if i % 2 == 1]) # All check_received_proofs(range(numof_proof)) self.log.info( "Check the node will not send the proofs if not requested before the timeout elapsed") # Disconnect the peers for peer in node.p2ps: peer.peer_disconnect() peer.wait_for_disconnect() mocktime = int(time.time()) node.setmocktime(mocktime) slow_peer = ProofStoreP2PInterface() node.add_outbound_p2p_connection( slow_peer, p2p_idx=0, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) slow_peer.wait_until( lambda: slow_peer.last_message.get("getavaproofs")) slow_peer.nodeid = node.getpeerinfo()[-1]['id'] _ = request_proofs(slow_peer) # Elapse the timeout mocktime += AVALANCHE_AVAPROOFS_TIMEOUT + 1 node.setmocktime(mocktime) node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) # Periodic compact proofs requests are sent in the same loop than the # cleanup, so when such a request is made we are sure the cleanup did # happen. slow_peer.wait_until( lambda: slow_peer.message_count.get("getavaproofs") > 1) req = msg_avaproofsreq() req.indices = range(numof_proof) slow_peer.send_and_ping(req) # Check we get no proof assert_equal(len(slow_peer.get_proofs()), 0) def test_compact_proofs_download_on_connect(self): self.log.info( "Check the node get compact proofs upon avalanche outbound discovery") requestee = self.nodes[0] requester = self.nodes[1] self.restart_node(0) numof_proof = 10 proofs = [gen_proof(requestee)[1] for _ in range(numof_proof)] for proof in proofs: requestee.sendavalancheproof(proof.serialize().hex()) proofids = get_proof_ids(requestee) assert all(proof.proofid in proofids for proof in proofs) # Start the requester and check it gets all the proofs self.start_node(1) self.connect_nodes(0, 1) self.wait_until( lambda: all( proof.proofid in proofids for proof in get_proof_ids(requester))) def test_no_compactproofs_during_ibs(self): self.log.info( "Check the node don't request compact proofs during IBD") node = self.nodes[0] chainwork = int(node.getblockchaininfo()['chainwork'], 16) self.restart_node( 0, extra_args=self.extra_args[0] + [f'-minimumchainwork={chainwork + 2:#x}']) assert node.getblockchaininfo()['initialblockdownload'] peer = P2PInterface() node.add_outbound_p2p_connection( peer, p2p_idx=0, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) # Force the node to process the sending loop peer.sync_send_with_ping() with p2p_lock: assert_equal(peer.message_count.get("getavaproofs", 0), 0) # Make sure there is no message sent as part as the periodic network # messaging either node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) peer.sync_send_with_ping() with p2p_lock: assert_equal(peer.message_count.get("getavaproofs", 0), 0) def test_send_inbound_getavaproofs_until_quorum_is_established(self): self.log.info( "Check we also request the inbounds until the quorum is established") node = self.nodes[0] self.restart_node( 0, extra_args=self.extra_args[0] + ['-avaminquorumstake=1000000']) assert_equal(node.getavalancheinfo()['active'], False) outbound = AvaP2PInterface() node.add_outbound_p2p_connection(outbound, p2p_idx=0) inbound = AvaP2PInterface() node.add_p2p_connection(inbound) inbound.nodeid = node.getpeerinfo()[-1]['id'] def count_getavaproofs(peers): with p2p_lock: return sum([peer.message_count.get("getavaproofs", 0) for peer in peers]) # Upon connection only the outbound gets a compact proofs message assert_equal(count_getavaproofs([inbound]), 0) self.wait_until(lambda: count_getavaproofs([outbound]) == 1) # Periodic send will include the inbound as well current_total = count_getavaproofs([inbound, outbound]) while count_getavaproofs([inbound]) == 0: node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) self.wait_until(lambda: count_getavaproofs( [inbound, outbound]) > current_total) current_total = count_getavaproofs([inbound, outbound]) - # Connect the minimum amount of stake - privkey, proof = gen_proof(node) - assert node.addavalanchenode( - inbound.nodeid, - privkey.get_pubkey().get_bytes().hex(), - proof.serialize().hex()) - - assert_equal(node.getavalancheinfo()['active'], True) + # Connect the minimum amount of stake and nodes + for _ in range(8): + node.add_p2p_connection(AvaP2PInterface(node)) + self.wait_until(lambda: node.getavalancheinfo()['active'] is True) # From now only the outbound is requested count_inbound = count_getavaproofs([inbound]) for _ in range(20): node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) self.wait_until(lambda: count_getavaproofs( [inbound, outbound]) > current_total) current_total = count_getavaproofs([inbound, outbound]) assert_equal(count_getavaproofs([inbound]), count_inbound) def run_test(self): # Most if the tests only need a single node, let the other ones start # the node when required self.stop_node(1) self.test_send_outbound_getavaproofs() self.test_send_manual_getavaproofs() self.test_respond_getavaproofs() self.test_request_missing_proofs() self.test_send_missing_proofs() self.test_compact_proofs_download_on_connect() self.test_no_compactproofs_during_ibs() self.test_send_inbound_getavaproofs_until_quorum_is_established() if __name__ == '__main__': CompactProofsTest().main() diff --git a/test/functional/abc_p2p_getavaaddr.py b/test/functional/abc_p2p_getavaaddr.py index a5d81ab4f..831d88a11 100755 --- a/test/functional/abc_p2p_getavaaddr.py +++ b/test/functional/abc_p2p_getavaaddr.py @@ -1,444 +1,440 @@ #!/usr/bin/env python3 # Copyright (c) 2022 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test getavaaddr p2p message""" import time from decimal import Decimal from test_framework.avatools import AvaP2PInterface, gen_proof from test_framework.messages import ( NODE_AVALANCHE, NODE_NETWORK, AvalancheVote, AvalancheVoteError, msg_getavaaddr, ) from test_framework.p2p import P2PInterface, p2p_lock from test_framework.test_framework import BitcoinTestFramework from test_framework.util import MAX_NODES, assert_equal, p2p_port # getavaaddr time interval in seconds, as defined in net_processing.cpp # A node will ignore repeated getavaaddr during this interval GETAVAADDR_INTERVAL = 2 * 60 # Address are sent every 30s on average, with a Poisson filter. Use a large # enough delay so it's very unlikely we don't get the message within this time. MAX_ADDR_SEND_DELAY = 5 * 60 # The interval between avalanche statistics computation AVALANCHE_STATISTICS_INTERVAL = 10 * 60 # The getavaaddr messages are sent every 2 to 5 minutes MAX_GETAVAADDR_DELAY = 5 * 60 class AddrReceiver(P2PInterface): def __init__(self): super().__init__() self.received_addrs = None def get_received_addrs(self): with p2p_lock: return self.received_addrs def on_addr(self, message): self.received_addrs = [] for addr in message.addrs: self.received_addrs.append(f"{addr.ip}:{addr.port}") def addr_received(self): return self.received_addrs is not None class MutedAvaP2PInterface(AvaP2PInterface): def __init__(self, node=None): super().__init__(node) self.is_responding = False self.privkey = None self.addr = None self.poll_received = 0 def set_addr(self, addr): self.addr = addr def on_avapoll(self, message): self.poll_received += 1 class AllYesAvaP2PInterface(MutedAvaP2PInterface): def __init__(self, node=None): super().__init__(node) self.is_responding = True def on_avapoll(self, message): self.send_avaresponse( message.poll.round, [ AvalancheVote( AvalancheVoteError.ACCEPTED, inv.hash) for inv in message.poll.invs], self.master_privkey if self.delegation is None else self.delegated_privkey) super().on_avapoll(message) class AvaAddrTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = False self.num_nodes = 1 self.extra_args = [['-enableavalanche=1', '-enableavalanchepeerdiscovery=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', '-whitelist=noban@127.0.0.1']] def check_all_peers_received_getavaaddr_once(self, avapeers): def received_all_getavaaddr(avapeers): with p2p_lock: return all([p.last_message.get("getavaaddr") for p in avapeers]) self.wait_until(lambda: received_all_getavaaddr(avapeers)) with p2p_lock: assert all([p.message_count.get( "getavaaddr", 0) == 1 for p in avapeers]) def getavaaddr_interval_test(self): node = self.nodes[0] # Init mock time mock_time = int(time.time()) node.setmocktime(mock_time) # Add some avalanche peers to the node for _ in range(10): node.add_p2p_connection(AllYesAvaP2PInterface(node)) # Build some statistics to ensure some addresses will be returned def all_peers_received_poll(): with p2p_lock: return all([avanode.poll_received > 0 for avanode in node.p2ps]) self.wait_until(all_peers_received_poll) node.mockscheduler(AVALANCHE_STATISTICS_INTERVAL) requester = node.add_p2p_connection(AddrReceiver()) requester.send_message(msg_getavaaddr()) # Remember the time we sent the getavaaddr message getavaddr_time = mock_time # Spamming more get getavaaddr has no effect for _ in range(10): with node.assert_debug_log(["Ignoring repeated getavaaddr from peer"]): requester.send_message(msg_getavaaddr()) # Move the time so we get an addr response mock_time += MAX_ADDR_SEND_DELAY node.setmocktime(mock_time) requester.wait_until(requester.addr_received) # Elapse the getavaaddr interval and check our message is now accepted # again mock_time = getavaddr_time + GETAVAADDR_INTERVAL node.setmocktime(mock_time) requester.send_message(msg_getavaaddr()) # We can get an addr message again mock_time += MAX_ADDR_SEND_DELAY node.setmocktime(mock_time) requester.wait_until(requester.addr_received) def address_test(self, maxaddrtosend, num_proof, num_avanode): self.restart_node( 0, extra_args=self.extra_args[0] + [f'-maxaddrtosend={maxaddrtosend}']) node = self.nodes[0] # Init mock time mock_time = int(time.time()) node.setmocktime(mock_time) # Create a bunch of proofs and associate each a bunch of nodes. avanodes = [] for _ in range(num_proof): master_privkey, proof = gen_proof(node) for n in range(num_avanode): avanode = AllYesAvaP2PInterface() if n % 2 else MutedAvaP2PInterface() avanode.master_privkey = master_privkey avanode.proof = proof node.add_p2p_connection(avanode) peerinfo = node.getpeerinfo()[-1] avanode.set_addr(peerinfo["addr"]) avanodes.append(avanode) responding_addresses = [ avanode.addr for avanode in avanodes if avanode.is_responding] assert_equal(len(responding_addresses), num_proof * num_avanode // 2) # Check we have what we expect def all_nodes_connected(): avapeers = node.getavalanchepeerinfo() if len(avapeers) != num_proof: return False for avapeer in avapeers: if avapeer['nodecount'] != num_avanode: return False return True self.wait_until(all_nodes_connected) # Force the availability score to diverge between the responding and the # muted nodes. def poll_all_for_block(): node.generate(1) with p2p_lock: return all([avanode.poll_received > ( 10 if avanode.is_responding else 0) for avanode in avanodes]) self.wait_until(poll_all_for_block) # Move the scheduler time 10 minutes forward so that so that our peers # get an availability score computed. node.mockscheduler(AVALANCHE_STATISTICS_INTERVAL) requester = node.add_p2p_connection(AddrReceiver()) requester.send_and_ping(msg_getavaaddr()) # Sanity check that the availability score is set up as expected peerinfo = node.getpeerinfo() muted_addresses = [ avanode.addr for avanode in avanodes if not avanode.is_responding] assert all([p['availability_score'] < 0 for p in peerinfo if p["addr"] in muted_addresses]) assert all([p['availability_score'] > 0 for p in peerinfo if p["addr"] in responding_addresses]) # Requester has no availability_score because it's not an avalanche # peer assert 'availability_score' not in peerinfo[-1].keys() mock_time += MAX_ADDR_SEND_DELAY node.setmocktime(mock_time) requester.wait_until(requester.addr_received) addresses = requester.get_received_addrs() assert_equal(len(addresses), min(maxaddrtosend, len(responding_addresses))) # Check all the addresses belong to responding peer assert all([address in responding_addresses for address in addresses]) def getavaaddr_outbound_test(self): self.log.info( "Check we send a getavaaddr message to our avalanche outbound peers") node = self.nodes[0] # Get rid of previously connected nodes node.disconnect_p2ps() avapeers = [] for i in range(16): avapeer = AvaP2PInterface() node.add_outbound_p2p_connection( avapeer, p2p_idx=i, ) avapeers.append(avapeer) self.check_all_peers_received_getavaaddr_once(avapeers) # Generate some block to poll for node.generate(1) # Because none of the avalanche peers is responding, our node should # fail out of option shortly and send a getavaaddr message to one of its # outbound avalanche peers. node.mockscheduler(MAX_GETAVAADDR_DELAY) def any_peer_received_getavaaddr(): with p2p_lock: return any([p.message_count.get( "getavaaddr", 0) > 1 for p in avapeers]) self.wait_until(any_peer_received_getavaaddr) def getavaaddr_manual_test(self): self.log.info( "Check we send a getavaaddr message to our manually connected peers that support avalanche") node = self.nodes[0] # Get rid of previously connected nodes node.disconnect_p2ps() def added_node_connected(ip_port): added_node_info = node.getaddednodeinfo(ip_port) return len( added_node_info) == 1 and added_node_info[0]['connected'] def connect_callback(address, port): self.log.debug("Connecting to {}:{}".format(address, port)) p = AvaP2PInterface() p2p_idx = 1 p.peer_accept_connection( connect_cb=connect_callback, connect_id=p2p_idx, net=node.chain, timeout_factor=node.timeout_factor, )() ip_port = f"127.0.01:{p2p_port(MAX_NODES - p2p_idx)}" node.addnode(node=ip_port, command="add") self.wait_until(lambda: added_node_connected(ip_port)) assert_equal(node.getpeerinfo()[-1]['addr'], ip_port) assert_equal(node.getpeerinfo()[-1]['connection_type'], 'manual') p.wait_until(lambda: p.last_message.get("getavaaddr")) # Generate some block to poll for node.generate(1) # Because our avalanche peer is not responding, our node should fail # out of option shortly and send another getavaaddr message. node.mockscheduler(MAX_GETAVAADDR_DELAY) p.wait_until(lambda: p.message_count.get("getavaaddr", 0) > 1) def getavaaddr_noquorum(self): self.log.info( "Check we send a getavaaddr message while our quorum is not established") node = self.nodes[0] self.restart_node(0, extra_args=self.extra_args[0] + [ '-avaminquorumstake=1000000000', '-avaminquorumconnectedstakeratio=0.8', ]) avapeers = [] for i in range(16): avapeer = AllYesAvaP2PInterface(node) node.add_outbound_p2p_connection( avapeer, p2p_idx=i, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) avapeers.append(avapeer) peerinfo = node.getpeerinfo()[-1] avapeer.set_addr(peerinfo["addr"]) self.check_all_peers_received_getavaaddr_once(avapeers) def total_getavaaddr_msg(): with p2p_lock: return sum([p.message_count.get("getavaaddr", 0) for p in avapeers]) # Because we have not enough stake to start polling, we keep requesting # more addresses total_getavaaddr = total_getavaaddr_msg() for i in range(5): node.mockscheduler(MAX_GETAVAADDR_DELAY) self.wait_until(lambda: total_getavaaddr_msg() > total_getavaaddr) total_getavaaddr = total_getavaaddr_msg() # Move the schedulter time forward to make seure we get statistics # computed. But since we did not start polling yet it should remain all # zero. node.mockscheduler(AVALANCHE_STATISTICS_INTERVAL) def wait_for_availability_score(): peerinfo = node.getpeerinfo() return all([p.get('availability_score', None) == Decimal(0) for p in peerinfo]) self.wait_until(wait_for_availability_score) requester = node.add_p2p_connection(AddrReceiver()) requester.send_and_ping(msg_getavaaddr()) node.setmocktime(int(time.time() + MAX_ADDR_SEND_DELAY)) # Check all the peers addresses are returned. requester.wait_until(requester.addr_received) addresses = requester.get_received_addrs() assert_equal(len(addresses), len(avapeers)) expected_addresses = [avapeer.addr for avapeer in avapeers] assert all([address in expected_addresses for address in addresses]) def test_send_inbound_getavaaddr_until_quorum_is_established(self): self.log.info( "Check we also request the inbounds until the quorum is established") node = self.nodes[0] self.restart_node( 0, extra_args=self.extra_args[0] + ['-avaminquorumstake=1000000']) assert_equal(node.getavalancheinfo()['active'], False) outbound = MutedAvaP2PInterface() node.add_outbound_p2p_connection(outbound, p2p_idx=0) inbound = MutedAvaP2PInterface() node.add_p2p_connection(inbound) inbound.nodeid = node.getpeerinfo()[-1]['id'] def count_getavaaddr(peers): with p2p_lock: return sum([peer.message_count.get("getavaaddr", 0) for peer in peers]) # Upon connection only the outbound gets a getavaaddr message assert_equal(count_getavaaddr([inbound]), 0) self.wait_until(lambda: count_getavaaddr([outbound]) == 1) # Periodic send will include the inbound as well current_total = count_getavaaddr([inbound, outbound]) while count_getavaaddr([inbound]) == 0: node.mockscheduler(MAX_GETAVAADDR_DELAY) self.wait_until(lambda: count_getavaaddr( [inbound, outbound]) > current_total) current_total = count_getavaaddr([inbound, outbound]) - # Connect the minimum amount of stake - privkey, proof = gen_proof(node) - assert node.addavalanchenode( - inbound.nodeid, - privkey.get_pubkey().get_bytes().hex(), - proof.serialize().hex()) - - assert_equal(node.getavalancheinfo()['active'], True) + # Connect the minimum amount of stake and nodes + for _ in range(8): + node.add_p2p_connection(AvaP2PInterface(node)) + self.wait_until(lambda: node.getavalancheinfo()['active'] is True) # From now only the outbound is requested count_inbound = count_getavaaddr([inbound]) for _ in range(10): # Trigger a poll node.generate(1) inbound.sync_send_with_ping() node.mockscheduler(MAX_GETAVAADDR_DELAY) self.wait_until(lambda: count_getavaaddr( [inbound, outbound]) > current_total) current_total = count_getavaaddr([inbound, outbound]) assert_equal(count_getavaaddr([inbound]), count_inbound) def run_test(self): self.getavaaddr_interval_test() # Limited by maxaddrtosend self.address_test(maxaddrtosend=3, num_proof=2, num_avanode=8) # Limited by the number of good nodes self.address_test(maxaddrtosend=100, num_proof=2, num_avanode=8) self.getavaaddr_outbound_test() self.getavaaddr_manual_test() self.getavaaddr_noquorum() self.test_send_inbound_getavaaddr_until_quorum_is_established() if __name__ == '__main__': AvaAddrTest().main() diff --git a/test/functional/abc_rpc_getavalancheinfo.py b/test/functional/abc_rpc_getavalancheinfo.py index cff46f921..1cc485719 100755 --- a/test/functional/abc_rpc_getavalancheinfo.py +++ b/test/functional/abc_rpc_getavalancheinfo.py @@ -1,385 +1,383 @@ #!/usr/bin/env python3 # Copyright (c) 2022 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the getavalancheinfo RPC.""" import time from decimal import Decimal from test_framework.address import ADDRESS_ECREG_UNSPENDABLE from test_framework.avatools import ( AvaP2PInterface, avalanche_proof_from_hex, create_coinbase_stakes, gen_proof, get_ava_p2p_interface, wait_for_proof, ) from test_framework.key import ECKey from test_framework.messages import ( AvalancheProofVoteResponse, AvalancheVote, LegacyAvalancheProof, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal from test_framework.wallet_util import bytes_to_wif class GetAvalancheInfoTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 1 self.conflicting_proof_cooldown = 100 self.extra_args = [[ '-enableavalanche=1', '-enableavalancheproofreplacement=1', f'-avalancheconflictingproofcooldown={self.conflicting_proof_cooldown}', '-avaproofstakeutxoconfirmations=2', '-avacooldown=0', '-enableavalanchepeerdiscovery=1', '-avaminquorumstake=250000000', '-avaminquorumconnectedstakeratio=0.9', ]] def run_test(self): node = self.nodes[0] privkey, proof = gen_proof(node) is_legacy = isinstance(proof, LegacyAvalancheProof) # Make the proof mature node.generate(1) def handle_legacy_format(expected): # Add the payout address to the expected output if the legacy format # is diabled if not is_legacy and "local" in expected.keys(): expected["local"]["payout_address"] = ADDRESS_ECREG_UNSPENDABLE return expected def assert_avalancheinfo(expected): assert_equal( node.getavalancheinfo(), handle_legacy_format(expected) ) coinbase_amount = Decimal('25000000.00') self.log.info("The test node has no proof") assert_avalancheinfo({ "active": False, "network": { "proof_count": 0, "connected_proof_count": 0, "dangling_proof_count": 0, "finalized_proof_count": 0, "conflicting_proof_count": 0, "orphan_proof_count": 0, "total_stake_amount": Decimal('0.00'), "connected_stake_amount": Decimal('0.00'), "dangling_stake_amount": Decimal('0.00'), "node_count": 0, "connected_node_count": 0, "pending_node_count": 0, } }) self.log.info("The test node has a proof") self.restart_node(0, self.extra_args[0] + [ '-enableavalanche=1', '-avaproof={}'.format(proof.serialize().hex()), '-avamasterkey={}'.format(bytes_to_wif(privkey.get_bytes())) ]) assert_avalancheinfo({ "active": False, "local": { "live": False, "proofid": f"{proof.proofid:0{64}x}", "limited_proofid": f"{proof.limited_proofid:0{64}x}", "master": privkey.get_pubkey().get_bytes().hex(), "stake_amount": coinbase_amount, }, "network": { "proof_count": 0, "connected_proof_count": 0, "dangling_proof_count": 0, "finalized_proof_count": 0, "conflicting_proof_count": 0, "orphan_proof_count": 0, "total_stake_amount": Decimal('0.00'), "connected_stake_amount": Decimal('0.00'), "dangling_stake_amount": Decimal('0.00'), "node_count": 0, "connected_node_count": 0, "pending_node_count": 0, } }) # Mine a block to trigger proof validation node.generate(1) self.wait_until( lambda: node.getavalancheinfo() == handle_legacy_format({ "active": False, "local": { "live": True, "proofid": f"{proof.proofid:0{64}x}", "limited_proofid": f"{proof.limited_proofid:0{64}x}", "master": privkey.get_pubkey().get_bytes().hex(), "stake_amount": coinbase_amount, }, "network": { "proof_count": 0, "connected_proof_count": 0, "dangling_proof_count": 0, "finalized_proof_count": 0, "conflicting_proof_count": 0, "orphan_proof_count": 0, "total_stake_amount": Decimal('0.00'), "connected_stake_amount": Decimal('0.00'), "dangling_stake_amount": Decimal('0.00'), "node_count": 0, "connected_node_count": 0, "pending_node_count": 0, } }) ) self.log.info("Connect a bunch of peers and nodes") mock_time = int(time.time()) node.setmocktime(mock_time) privkeys = [] proofs = [] conflicting_proofs = [] quorum = [] N = 13 for _ in range(N): _privkey, _proof = gen_proof(node) proofs.append(_proof) privkeys.append(_privkey) # For each proof, also make a conflicting one stakes = create_coinbase_stakes( node, [node.getbestblockhash()], node.get_deterministic_priv_key().key) conflicting_proof_hex = node.buildavalancheproof( 10, 9999, bytes_to_wif(_privkey.get_bytes()), stakes) conflicting_proof = avalanche_proof_from_hex(conflicting_proof_hex) conflicting_proofs.append(conflicting_proof) # Make the proof and its conflicting proof mature node.generate(1) n = AvaP2PInterface() n.proof = _proof n.master_privkey = _privkey node.add_p2p_connection(n) quorum.append(n) n.send_avaproof(_proof) wait_for_proof(node, f"{_proof.proofid:0{64}x}", timeout=10) mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) n.send_avaproof(conflicting_proof) # Generate an orphan (immature) proof _, orphan_proof = gen_proof(node) n.send_avaproof(orphan_proof) self.wait_until( lambda: node.getavalancheinfo() == handle_legacy_format({ "active": True, "local": { "live": True, "proofid": f"{proof.proofid:0{64}x}", "limited_proofid": f"{proof.limited_proofid:0{64}x}", "master": privkey.get_pubkey().get_bytes().hex(), "stake_amount": coinbase_amount, }, "network": { "proof_count": N, "connected_proof_count": N, "dangling_proof_count": 0, "finalized_proof_count": 0, "conflicting_proof_count": N, "orphan_proof_count": 1, "total_stake_amount": coinbase_amount * N, "connected_stake_amount": coinbase_amount * N, "dangling_stake_amount": Decimal('0.00'), "node_count": N, "connected_node_count": N, "pending_node_count": 0, } }) ) self.log.info("Disconnect some nodes") D = 3 for _ in range(D): n = node.p2ps.pop() n.peer_disconnect() n.wait_for_disconnect() self.wait_until( lambda: node.getavalancheinfo() == handle_legacy_format({ "active": True, "local": { "live": True, "proofid": f"{proof.proofid:0{64}x}", "limited_proofid": f"{proof.limited_proofid:0{64}x}", "master": privkey.get_pubkey().get_bytes().hex(), "stake_amount": coinbase_amount, }, "network": { "proof_count": N, "connected_proof_count": N - D, "dangling_proof_count": D, "finalized_proof_count": 0, "conflicting_proof_count": N, "orphan_proof_count": 1, "total_stake_amount": coinbase_amount * N, "connected_stake_amount": coinbase_amount * (N - D), "dangling_stake_amount": coinbase_amount * D, "node_count": N - D, "connected_node_count": N - D, "pending_node_count": 0, } }) ) self.log.info("Add some pending nodes") P = 3 for _ in range(P): dg_priv = ECKey() dg_priv.generate() dg_pub = dg_priv.get_pubkey().get_bytes().hex() _privkey, _proof = gen_proof(node) # Make the proof mature node.generate(1) delegation = node.delegateavalancheproof( f"{_proof.limited_proofid:0{64}x}", bytes_to_wif(_privkey.get_bytes()), dg_pub, None ) n = get_ava_p2p_interface(node) n.send_avahello(delegation, dg_priv) # Make sure we completed at least one time the ProcessMessage or we # might miss the last pending node for the following assert n.sync_with_ping() assert_avalancheinfo({ "active": True, "local": { "live": True, "proofid": f"{proof.proofid:0{64}x}", "limited_proofid": f"{proof.limited_proofid:0{64}x}", "master": privkey.get_pubkey().get_bytes().hex(), "stake_amount": coinbase_amount, }, "network": { # Orphan became mature "proof_count": N + 1, "connected_proof_count": N - D, "dangling_proof_count": D + 1, "finalized_proof_count": 0, "conflicting_proof_count": N, "orphan_proof_count": 0, "total_stake_amount": coinbase_amount * (N + 1), "connected_stake_amount": coinbase_amount * (N - D), "dangling_stake_amount": coinbase_amount * (D + 1), "node_count": N - D + P, "connected_node_count": N - D, "pending_node_count": P, } }) self.log.info("Finalize the proofs for some peers") def vote_for_all_proofs(): done_voting = True for i, n in enumerate(quorum): if not n.is_connected: continue poll = n.get_avapoll_if_available() # That node has not received a poll if poll is None: continue # Respond yes to all polls except the conflicting proofs votes = [] for inv in poll.invs: response = AvalancheProofVoteResponse.ACTIVE if inv.hash in [p.proofid for p in conflicting_proofs]: response = AvalancheProofVoteResponse.REJECTED # We need to finish voting on the conflicting proofs to # ensure the count is stable and that no valid proof # was replaced. done_voting = False votes.append(AvalancheVote(response, inv.hash)) # If we voted on one of our proofs, we're probably not done # voting. if inv.hash in [p.proofid for p in proofs]: done_voting = False n.send_avaresponse(poll.round, votes, privkeys[i]) return done_voting # Vote until proofs have finalized expected_logs = [] for p in proofs: expected_logs.append( f"Avalanche finalized proof {p.proofid:0{64}x}") with node.assert_debug_log(expected_logs): self.wait_until(lambda: vote_for_all_proofs()) self.log.info("Disconnect all the nodes") - for n in node.p2ps: - n.peer_disconnect() - n.wait_for_disconnect() + node.disconnect_p2ps() assert_avalancheinfo({ - "active": True, + "active": False, "local": { "live": True, "proofid": f"{proof.proofid:0{64}x}", "limited_proofid": f"{proof.limited_proofid:0{64}x}", "master": privkey.get_pubkey().get_bytes().hex(), "stake_amount": coinbase_amount, }, "network": { "proof_count": N + 1, "connected_proof_count": 0, "dangling_proof_count": N + 1, "finalized_proof_count": N + 1, "conflicting_proof_count": 0, "orphan_proof_count": 0, "total_stake_amount": coinbase_amount * (N + 1), "connected_stake_amount": 0, "dangling_stake_amount": coinbase_amount * (N + 1), "node_count": 0, "connected_node_count": 0, "pending_node_count": 0, } }) if __name__ == '__main__': GetAvalancheInfoTest().main()