diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 49a7a7414..b6be8f8b4 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,1080 +1,1082 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include // For DecodeSecret #include #include #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; namespace avalanche { static const uint256 GetVoteItemId(const AnyVoteItem &item) { return std::visit(variant::overloaded{ [](const ProofRef &proof) { uint256 id = proof->getId(); return id; }, [](const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); return hash; }, [](const CTransactionRef &tx) { uint256 id = tx->GetId(); return id; }, }, item); } static bool VerifyProof(const Amount &stakeUtxoDustThreshold, const Proof &proof, bilingual_str &error) { ProofValidationState proof_state; if (!proof.verify(stakeUtxoDustThreshold, proof_state)) { switch (proof_state.GetResult()) { case ProofValidationResult::NO_STAKE: error = _("The avalanche proof has no stake."); return false; case ProofValidationResult::DUST_THRESHOLD: error = _("The avalanche proof stake is too low."); return false; case ProofValidationResult::DUPLICATE_STAKE: error = _("The avalanche proof has duplicated stake."); return false; case ProofValidationResult::INVALID_STAKE_SIGNATURE: error = _("The avalanche proof has invalid stake signatures."); return false; case ProofValidationResult::TOO_MANY_UTXOS: error = strprintf( _("The avalanche proof has too many utxos (max: %u)."), AVALANCHE_MAX_PROOF_STAKES); return false; default: error = _("The avalanche proof is invalid."); return false; } } return true; } static bool VerifyDelegation(const Delegation &dg, const CPubKey &expectedPubKey, bilingual_str &error) { DelegationState dg_state; CPubKey auth; if (!dg.verify(dg_state, auth)) { switch (dg_state.GetResult()) { case avalanche::DelegationResult::INVALID_SIGNATURE: error = _("The avalanche delegation has invalid signatures."); return false; case avalanche::DelegationResult::TOO_MANY_LEVELS: error = _( "The avalanche delegation has too many delegation levels."); return false; default: error = _("The avalanche delegation is invalid."); return false; } } if (auth != expectedPubKey) { error = _( "The avalanche delegation does not match the expected public key."); return false; } return true; } struct Processor::PeerData { ProofRef proof; Delegation delegation; mutable Mutex cs_proofState; ProofRegistrationState proofState GUARDED_BY(cs_proofState); }; class Processor::NotificationsHandler : public interfaces::Chain::Notifications { Processor *m_processor; public: NotificationsHandler(Processor *p) : m_processor(p) {} void updatedBlockTip() override { const bool registerLocalProof = m_processor->canShareLocalProof(); auto registerProofs = [&]() { LOCK(m_processor->cs_peerManager); auto registeredProofs = m_processor->peerManager->updatedBlockTip(); ProofRegistrationState localProofState; if (m_processor->peerData && m_processor->peerData->proof && registerLocalProof) { if (m_processor->peerManager->registerProof( m_processor->peerData->proof, localProofState)) { registeredProofs.insert(m_processor->peerData->proof); } WITH_LOCK(m_processor->peerData->cs_proofState, m_processor->peerData->proofState = std::move(localProofState)); } return registeredProofs; }; auto registeredProofs = registerProofs(); for (const auto &proof : registeredProofs) { m_processor->addToReconcile(proof); } } }; Processor::Processor(Config avaconfigIn, interfaces::Chain &chain, CConnman *connmanIn, ChainstateManager &chainmanIn, CTxMemPool *mempoolIn, CScheduler &scheduler, std::unique_ptr peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, int64_t minAvaproofsNodeCountIn, uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn, Amount stakeUtxoDustThreshold) : avaconfig(std::move(avaconfigIn)), connman(connmanIn), - chainman(chainmanIn), mempool(mempoolIn), round(0), - peerManager( - std::make_unique(stakeUtxoDustThreshold, chainman)), + chainman(chainmanIn), mempool(mempoolIn), + voteRecords(RWCollection(VoteMap(VoteMapComparator(mempool)))), + round(0), peerManager(std::make_unique( + stakeUtxoDustThreshold, chainman)), peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)), minQuorumScore(minQuorumTotalScoreIn), minQuorumConnectedScoreRatio(minQuorumConnectedScoreRatioIn), minAvaproofsNodeCount(minAvaproofsNodeCountIn), staleVoteThreshold(staleVoteThresholdIn), staleVoteFactor(staleVoteFactorIn) { // Make sure we get notified of chain state changes. chainNotificationsHandler = chain.handleNotifications(std::make_shared(this)); scheduler.scheduleEvery( [this]() -> bool { WITH_LOCK(cs_peerManager, peerManager->cleanupDanglingProofs(getLocalProof())); return true; }, 5min); } Processor::~Processor() { chainNotificationsHandler.reset(); stopEventLoop(); } std::unique_ptr Processor::MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, ChainstateManager &chainman, CTxMemPool *mempool, CScheduler &scheduler, bilingual_str &error) { std::unique_ptr peerData; CKey masterKey; CKey sessionKey; Amount stakeUtxoDustThreshold = PROOF_DUST_THRESHOLD; if (argsman.IsArgSet("-avaproofstakeutxodustthreshold") && !ParseMoney(argsman.GetArg("-avaproofstakeutxodustthreshold", ""), stakeUtxoDustThreshold)) { error = _("The avalanche stake utxo dust threshold amount is invalid."); return nullptr; } if (argsman.IsArgSet("-avasessionkey")) { sessionKey = DecodeSecret(argsman.GetArg("-avasessionkey", "")); if (!sessionKey.IsValid()) { error = _("The avalanche session key is invalid."); return nullptr; } } else { // Pick a random key for the session. sessionKey.MakeNewKey(true); } if (argsman.IsArgSet("-avaproof")) { if (!argsman.IsArgSet("-avamasterkey")) { error = _( "The avalanche master key is missing for the avalanche proof."); return nullptr; } masterKey = DecodeSecret(argsman.GetArg("-avamasterkey", "")); if (!masterKey.IsValid()) { error = _("The avalanche master key is invalid."); return nullptr; } auto proof = RCUPtr::make(); if (!Proof::FromHex(*proof, argsman.GetArg("-avaproof", ""), error)) { // error is set by FromHex return nullptr; } peerData = std::make_unique(); peerData->proof = std::move(proof); if (!VerifyProof(stakeUtxoDustThreshold, *peerData->proof, error)) { // error is set by VerifyProof return nullptr; } std::unique_ptr dgb; const CPubKey &masterPubKey = masterKey.GetPubKey(); if (argsman.IsArgSet("-avadelegation")) { Delegation dg; if (!Delegation::FromHex(dg, argsman.GetArg("-avadelegation", ""), error)) { // error is set by FromHex() return nullptr; } if (dg.getProofId() != peerData->proof->getId()) { error = _("The delegation does not match the proof."); return nullptr; } if (masterPubKey != dg.getDelegatedPubkey()) { error = _( "The master key does not match the delegation public key."); return nullptr; } dgb = std::make_unique(dg); } else { if (masterPubKey != peerData->proof->getMaster()) { error = _("The master key does not match the proof public key."); return nullptr; } dgb = std::make_unique(*peerData->proof); } // Generate the delegation to the session key. const CPubKey sessionPubKey = sessionKey.GetPubKey(); if (sessionPubKey != masterPubKey) { if (!dgb->addLevel(masterKey, sessionPubKey)) { error = _("Failed to generate a delegation for this session."); return nullptr; } } peerData->delegation = dgb->build(); if (!VerifyDelegation(peerData->delegation, sessionPubKey, error)) { // error is set by VerifyDelegation return nullptr; } } const auto queryTimeoutDuration = std::chrono::milliseconds(argsman.GetIntArg( "-avatimeout", AVALANCHE_DEFAULT_QUERY_TIMEOUT.count())); // Determine quorum parameters Amount minQuorumStake = AVALANCHE_DEFAULT_MIN_QUORUM_STAKE; if (argsman.IsArgSet("-avaminquorumstake") && !ParseMoney(argsman.GetArg("-avaminquorumstake", ""), minQuorumStake)) { error = _("The avalanche min quorum stake amount is invalid."); return nullptr; } if (!MoneyRange(minQuorumStake)) { error = _("The avalanche min quorum stake amount is out of range."); return nullptr; } double minQuorumConnectedStakeRatio = AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO; if (argsman.IsArgSet("-avaminquorumconnectedstakeratio") && !ParseDouble(argsman.GetArg("-avaminquorumconnectedstakeratio", ""), &minQuorumConnectedStakeRatio)) { error = _("The avalanche min quorum connected stake ratio is invalid."); return nullptr; } if (minQuorumConnectedStakeRatio < 0 || minQuorumConnectedStakeRatio > 1) { error = _( "The avalanche min quorum connected stake ratio is out of range."); return nullptr; } int64_t minAvaproofsNodeCount = argsman.GetIntArg("-avaminavaproofsnodecount", AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT); if (minAvaproofsNodeCount < 0) { error = _("The minimum number of node that sent avaproofs message " "should be non-negative"); return nullptr; } // Determine voting parameters int64_t staleVoteThreshold = argsman.GetIntArg( "-avastalevotethreshold", AVALANCHE_VOTE_STALE_THRESHOLD); if (staleVoteThreshold < AVALANCHE_VOTE_STALE_MIN_THRESHOLD) { error = strprintf(_("The avalanche stale vote threshold must be " "greater than or equal to %d"), AVALANCHE_VOTE_STALE_MIN_THRESHOLD); return nullptr; } if (staleVoteThreshold > std::numeric_limits::max()) { error = strprintf(_("The avalanche stale vote threshold must be less " "than or equal to %d"), std::numeric_limits::max()); return nullptr; } int64_t staleVoteFactor = argsman.GetIntArg("-avastalevotefactor", AVALANCHE_VOTE_STALE_FACTOR); if (staleVoteFactor <= 0) { error = _("The avalanche stale vote factor must be greater than 0"); return nullptr; } if (staleVoteFactor > std::numeric_limits::max()) { error = strprintf(_("The avalanche stale vote factor must be less than " "or equal to %d"), std::numeric_limits::max()); return nullptr; } Config avaconfig(queryTimeoutDuration); // We can't use std::make_unique with a private constructor return std::unique_ptr(new Processor( std::move(avaconfig), chain, connman, chainman, mempool, scheduler, std::move(peerData), std::move(sessionKey), Proof::amountToScore(minQuorumStake), minQuorumConnectedStakeRatio, minAvaproofsNodeCount, staleVoteThreshold, staleVoteFactor, stakeUtxoDustThreshold)); } static bool isNull(const AnyVoteItem &item) { return item.valueless_by_exception() || std::visit([](const auto &item) { return item == nullptr; }, item); }; bool Processor::addToReconcile(const AnyVoteItem &item) { if (isNull(item)) { return false; } if (!isWorthPolling(item)) { return false; } // getLocalAcceptance() takes the voteRecords read lock, so we can't inline // the calls or we get a deadlock. const bool accepted = getLocalAcceptance(item); return voteRecords.getWriteView() ->insert(std::make_pair(item, VoteRecord(accepted))) .second; } bool Processor::isAccepted(const AnyVoteItem &item) const { if (isNull(item)) { return false; } auto r = voteRecords.getReadView(); auto it = r->find(item); if (it == r.end()) { return false; } return it->second.isAccepted(); } int Processor::getConfidence(const AnyVoteItem &item) const { if (isNull(item)) { return -1; } auto r = voteRecords.getReadView(); auto it = r->find(item); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is * not secure. */ class TCPResponse { Response response; SchnorrSig sig; public: TCPResponse(Response responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! if (!key.SignSchnorr(hash, sig)) { sig.fill(0); } } // serialization support SERIALIZE_METHODS(TCPResponse, obj) { READWRITE(obj.response, obj.sig); } }; } // namespace void Processor::sendResponse(CNode *pfrom, Response response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVARESPONSE, TCPResponse(std::move(response), sessionKey))); } bool Processor::registerVotes(NodeId nodeid, const Response &response, std::vector &updates, int &banscore, std::string &error) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { banscore = 2; error = "unexpected-ava-response"; return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { banscore = 100; error = "invalid-ava-response-size"; return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { banscore = 100; error = "invalid-ava-response-content"; return false; } } - std::map responseItems; + std::map responseItems( + (VoteMapComparator(mempool))); // At this stage we are certain that invs[i] matches votes[i], so we can use // the inv type to retrieve what is being voted on. for (size_t i = 0; i < size; i++) { auto item = getVoteItemFromInv(invs[i]); if (isNull(item)) { // This should not happen, but just in case... continue; } if (!isWorthPolling(item)) { // There is no point polling this item. continue; } responseItems.insert(std::make_pair(std::move(item), votes[i])); } auto voteRecordsWriteView = voteRecords.getWriteView(); // Register votes. for (const auto &p : responseItems) { auto item = p.first; const Vote &v = p.second; auto it = voteRecordsWriteView->find(item); if (it == voteRecordsWriteView.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { if (vr.isStale(staleVoteThreshold, staleVoteFactor)) { updates.emplace_back(std::move(item), VoteStatus::Stale); // Just drop stale votes. If we see this item again, we'll // do a new vote. voteRecordsWriteView->erase(it); } // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has not been finalized, so we have nothing more to // do. updates.emplace_back(std::move(item), vr.isAccepted() ? VoteStatus::Accepted : VoteStatus::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(std::move(item), vr.isAccepted() ? VoteStatus::Finalized : VoteStatus::Invalid); voteRecordsWriteView->erase(it); } // FIXME This doesn't belong here as it has nothing to do with vote // registration. for (const auto &update : updates) { if (update.getStatus() != VoteStatus::Finalized && update.getStatus() != VoteStatus::Invalid) { continue; } const auto &item = update.getVoteItem(); if (update.getStatus() == VoteStatus::Finalized) { // Always track finalized items regardless of type. Once finalized // they should never become invalid. WITH_LOCK(cs_finalizedItems, return finalizedItems.insert(GetVoteItemId(item))); } if (!std::holds_alternative(item)) { continue; } if (update.getStatus() == VoteStatus::Invalid) { // Track invalidated blocks. Other invalidated types are not // tracked because they may be rejected for transient reasons // (ex: immature proofs or orphaned txs) With blocks this is not // the case. A rejected block will not be mined on. To prevent // reorgs, invalidated blocks should never be polled again. LOCK(cs_invalidatedBlocks); invalidatedBlocks.insert(GetVoteItemId(item)); continue; } // At this point the block index can only be finalized const CBlockIndex *pindex = std::get(item); LOCK(cs_finalizationTip); if (finalizationTip && finalizationTip->GetAncestor(pindex->nHeight) == pindex) { continue; } finalizationTip = pindex; } return true; } CPubKey Processor::getSessionPubKey() const { return sessionKey.GetPubKey(); } bool Processor::sendHelloInternal(CNode *pfrom) { AssertLockHeld(cs_delayedAvahelloNodeIds); Delegation delegation; if (peerData) { if (!canShareLocalProof()) { if (!delayedAvahelloNodeIds.emplace(pfrom->GetId()).second) { // Nothing to do return false; } } else { delegation = peerData->delegation; pfrom->AddKnownProof(delegation.getProofId()); } } CHashWriter hasher(SER_GETHASH, 0); hasher << delegation.getId(); hasher << pfrom->GetLocalNonce(); hasher << pfrom->nRemoteHostNonce; hasher << pfrom->GetLocalExtraEntropy(); hasher << pfrom->nRemoteExtraEntropy; // Now let's sign! SchnorrSig sig; if (!sessionKey.SignSchnorr(hasher.GetHash(), sig)) { return false; } connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVAHELLO, Hello(delegation, sig))); return delegation.getLimitedProofId() != uint256::ZERO; } bool Processor::sendHello(CNode *pfrom) { return WITH_LOCK(cs_delayedAvahelloNodeIds, return sendHelloInternal(pfrom)); } void Processor::sendDelayedAvahello() { LOCK(cs_delayedAvahelloNodeIds); auto it = delayedAvahelloNodeIds.begin(); while (it != delayedAvahelloNodeIds.end()) { if (connman->ForNode(*it, [&](CNode *pnode) EXCLUSIVE_LOCKS_REQUIRED( cs_delayedAvahelloNodeIds) { return sendHelloInternal(pnode); })) { // Our proof has been announced to this node it = delayedAvahelloNodeIds.erase(it); } else { ++it; } } } ProofRef Processor::getLocalProof() const { return peerData ? peerData->proof : ProofRef(); } ProofRegistrationState Processor::getLocalProofRegistrationState() const { return peerData ? WITH_LOCK(peerData->cs_proofState, return peerData->proofState) : ProofRegistrationState(); } bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } void Processor::avaproofsSent(NodeId nodeid) { AssertLockNotHeld(cs_main); if (chainman.ActiveChainstate().IsInitialBlockDownload()) { // Before IBD is complete there is no way to make sure a proof is valid // or not, e.g. it can be spent in a block we don't know yet. In order // to increase confidence that our proof set is similar to other nodes // on the network, the messages received during IBD are not accounted. return; } LOCK(cs_peerManager); if (peerManager->latchAvaproofsSent(nodeid)) { avaproofsNodeCounter++; } } /* * Returns a bool indicating whether we have a usable Avalanche quorum enabling * us to take decisions based on polls. */ bool Processor::isQuorumEstablished() { AssertLockNotHeld(cs_main); { LOCK(cs_peerManager); if (peerManager->getNodeCount() < 8) { // There is no point polling if we know the vote cannot converge return false; } } /* * The following parameters can naturally go temporarly below the threshold * under normal circumstances, like during a proof replacement with a lower * stake amount, or the discovery of a new proofs for which we don't have a * node yet. * In order to prevent our node from starting and stopping the polls * spuriously on such event, the quorum establishement is latched. The only * parameters that should not latched is the minimum node count, as this * would cause the poll to be inconclusive anyway and should not happen * under normal circumstances. */ if (quorumIsEstablished) { return true; } // Don't do Avalanche while node is IBD'ing if (chainman.ActiveChainstate().IsInitialBlockDownload()) { return false; } if (avaproofsNodeCounter < minAvaproofsNodeCount) { return false; } auto localProof = getLocalProof(); // Get the registered proof score and registered score we have nodes for uint32_t totalPeersScore; uint32_t connectedPeersScore; { LOCK(cs_peerManager); totalPeersScore = peerManager->getTotalPeersScore(); connectedPeersScore = peerManager->getConnectedPeersScore(); // Consider that we are always connected to our proof, even if we are // the single node using that proof. if (localProof && peerManager->forPeer(localProof->getId(), [](const Peer &peer) { return peer.node_count == 0; })) { connectedPeersScore += localProof->getScore(); } } // Ensure enough is being staked overall if (totalPeersScore < minQuorumScore) { return false; } // Ensure we have connected score for enough of the overall score uint32_t minConnectedScore = std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); if (connectedPeersScore < minConnectedScore) { return false; } quorumIsEstablished = true; return true; } bool Processor::canShareLocalProof() { // The flag is latched if (m_canShareLocalProof) { return true; } // Don't share our proof if we don't have any inbound connection. // This is a best effort measure to prevent advertising a proof if we have // limited network connectivity. m_canShareLocalProof = connman->GetNodeCount(CConnman::CONNECTIONS_IN) > 0; return m_canShareLocalProof; } void Processor::FinalizeNode(const ::Config &config, const CNode &node) { AssertLockNotHeld(cs_main); const NodeId nodeid = node.GetId(); WITH_LOCK(cs_peerManager, peerManager->removeNode(nodeid)); WITH_LOCK(cs_delayedAvahelloNodeIds, delayedAvahelloNodeIds.erase(nodeid)); } void Processor::runEventLoop() { // Don't poll if quorum hasn't been established yet if (!isQuorumEstablished()) { return; } // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = WITH_LOCK(cs_peerManager, return peerManager->selectNode()); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } LOCK(cs_peerManager); do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode( nodeid, [this, &invs](CNode *pnode) EXCLUSIVE_LOCKS_REQUIRED( cs_peerManager) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + avaconfig.queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. peerManager->updateNextRequestTime(pnode->GetId(), timeout); } pnode->invsPolled(invs.size()); // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetCommonVersion()) .Make(NetMsgType::AVAPOLL, Poll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } // This node is obsolete, delete it. peerManager->removeNode(nodeid); // Get next suitable node to try again nodeid = peerManager->selectNode(); } while (nodeid != NO_NODE); } void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } // In flight request accounting. auto voteRecordsWriteView = voteRecords.getWriteView(); for (const auto &p : timedout_items) { auto item = getVoteItemFromInv(p.first); if (isNull(item)) { continue; } auto it = voteRecordsWriteView->find(item); if (it == voteRecordsWriteView.end()) { continue; } it->second.clearInflightRequest(p.second); } } std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; { // First remove all items that are not worth polling. auto w = voteRecords.getWriteView(); for (auto it = w->begin(); it != w->end();) { if (!isWorthPolling(it->first)) { it = w->erase(it); } else { ++it; } } } auto buildInvFromVoteItem = variant::overloaded{ [](const ProofRef &proof) { return CInv(MSG_AVA_PROOF, proof->getId()); }, [](const CBlockIndex *pindex) { return CInv(MSG_BLOCK, pindex->GetBlockHash()); }, [](const CTransactionRef &tx) { return CInv(MSG_TX, tx->GetHash()); }, }; auto r = voteRecords.getReadView(); for (const auto &[item, voteRecord] : r) { if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return invs; } const bool shouldPoll = forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); if (!shouldPoll) { continue; } invs.emplace_back(std::visit(buildInvFromVoteItem, item)); } return invs; } AnyVoteItem Processor::getVoteItemFromInv(const CInv &inv) const { if (inv.IsMsgBlk()) { return WITH_LOCK(cs_main, return chainman.m_blockman.LookupBlockIndex( BlockHash(inv.hash))); } if (inv.IsMsgProof()) { return WITH_LOCK(cs_peerManager, return peerManager->getProof(ProofId(inv.hash))); } if (mempool && inv.IsMsgTx()) { return WITH_LOCK(mempool->cs, return mempool->get(TxId(inv.hash))); } return {nullptr}; } bool Processor::IsWorthPolling::operator()(const CBlockIndex *pindex) const { AssertLockNotHeld(cs_main); LOCK(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (WITH_LOCK(processor.cs_finalizationTip, return processor.finalizationTip && processor.finalizationTip->GetAncestor( pindex->nHeight) == pindex)) { // There is no point polling blocks that are ancestor of a block that // has been accepted by the network. return false; } if (WITH_LOCK(processor.cs_invalidatedBlocks, return processor.invalidatedBlocks.contains( pindex->GetBlockHash()))) { // Blocks invalidated by Avalanche should not be polled twice. return false; } return true; } bool Processor::IsWorthPolling::operator()(const ProofRef &proof) const { AssertLockNotHeld(processor.cs_peerManager); const ProofId &proofid = proof->getId(); LOCK(processor.cs_peerManager); // No point polling immature or discarded proofs return processor.peerManager->isBoundToPeer(proofid) || processor.peerManager->isInConflictingPool(proofid); } bool Processor::IsWorthPolling::operator()(const CTransactionRef &tx) const { if (!processor.mempool) { return false; } // TODO For now the transactions with conflicts or rejected by policies are // not stored anywhere, so only the mempool transactions are worth polling. AssertLockNotHeld(processor.mempool->cs); return WITH_LOCK(processor.mempool->cs, return processor.mempool->exists(tx->GetId())); } bool Processor::isWorthPolling(const AnyVoteItem &item) const { return std::visit(IsWorthPolling(*this), item) && WITH_LOCK(cs_finalizedItems, return !finalizedItems.contains(GetVoteItemId(item))); } bool Processor::GetLocalAcceptance::operator()( const CBlockIndex *pindex) const { AssertLockNotHeld(cs_main); return WITH_LOCK(cs_main, return processor.chainman.ActiveChain().Contains(pindex)); } bool Processor::GetLocalAcceptance::operator()(const ProofRef &proof) const { AssertLockNotHeld(processor.cs_peerManager); return WITH_LOCK( processor.cs_peerManager, return processor.peerManager->isBoundToPeer(proof->getId())); } bool Processor::GetLocalAcceptance::operator()( const CTransactionRef &tx) const { if (!processor.mempool) { return false; } AssertLockNotHeld(processor.mempool->cs); return WITH_LOCK(processor.mempool->cs, return processor.mempool->exists(tx->GetId())); } } // namespace avalanche diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h index ef4b2b38a..d85a19653 100644 --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -1,374 +1,408 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PROCESSOR_H #define BITCOIN_AVALANCHE_PROCESSOR_H #include #include #include #include #include #include // For AVALANCHE_MAX_INFLIGHT_POLL #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class ArgsManager; class CConnman; class CNode; class CScheduler; class Config; class PeerManager; struct bilingual_str; /** * Maximum item that can be polled at once. */ static constexpr size_t AVALANCHE_MAX_ELEMENT_POLL = 16; /** * How long before we consider that a query timed out. */ static constexpr std::chrono::milliseconds AVALANCHE_DEFAULT_QUERY_TIMEOUT{ 10000}; /** * The size of the finalized items filter. It should be large enough that an * influx of inventories cannot roll any particular item out of the filter on * demand. For example, transactions will roll blocks out of the filter. * Tracking many more items than can possibly be polled at once ensures that * recently polled items will come to a stable state on the network before * rolling out of the filter. */ static constexpr uint32_t AVALANCHE_FINALIZED_ITEMS_FILTER_NUM_ELEMENTS = AVALANCHE_MAX_INFLIGHT_POLL * 20; namespace avalanche { class Delegation; class PeerManager; class ProofRegistrationState; struct VoteRecord; enum struct VoteStatus : uint8_t { Invalid, Rejected, Accepted, Finalized, Stale, }; using AnyVoteItem = std::variant; class VoteItemUpdate { AnyVoteItem item; VoteStatus status; public: VoteItemUpdate(AnyVoteItem itemIn, VoteStatus statusIn) : item(std::move(itemIn)), status(statusIn) {} const VoteStatus &getStatus() const { return status; } const AnyVoteItem &getVoteItem() const { return item; } }; -struct VoteMapComparator { +class VoteMapComparator { + const CTxMemPool *mempool{nullptr}; + +public: + VoteMapComparator() {} + VoteMapComparator(const CTxMemPool *mempoolIn) : mempool(mempoolIn) {} + bool operator()(const AnyVoteItem &lhs, const AnyVoteItem &rhs) const { // If the variants are of different types, sort them by variant index if (lhs.index() != rhs.index()) { return lhs.index() < rhs.index(); } return std::visit( variant::overloaded{ [](const ProofRef &lhs, const ProofRef &rhs) { return ProofComparatorByScore()(lhs, rhs); }, [](const CBlockIndex *lhs, const CBlockIndex *rhs) { // Reverse ordering so we get the highest work first return CBlockIndexWorkComparator()(rhs, lhs); }, - [](const CTransactionRef &lhs, const CTransactionRef &rhs) { - return lhs->GetId() < rhs->GetId(); + [this](const CTransactionRef &lhs, const CTransactionRef &rhs) { + const TxId &lhsTxId = lhs->GetId(); + const TxId &rhsTxId = rhs->GetId(); + + // If there is no mempool, sort by TxId. Note that polling + // for txs is currently not supported if there is no mempool + // so this is only a safety net. + if (!mempool) { + return lhsTxId < rhsTxId; + } + + LOCK(mempool->cs); + + auto lhsOptIter = mempool->GetIter(lhsTxId); + auto rhsOptIter = mempool->GetIter(rhsTxId); + + // If the transactions are not in the mempool, tie by TxId + if (!lhsOptIter && !rhsOptIter) { + return lhsTxId < rhsTxId; + } + + // If only one is in the mempool, pick that one + if (lhsOptIter.has_value() != rhsOptIter.has_value()) { + return !!lhsOptIter; + } + + // Both are in the mempool, select the highest fee rate + // including the fee deltas + return CompareTxMemPoolEntryByModifiedFeeRate{}( + **lhsOptIter, **rhsOptIter); }, [](const auto &lhs, const auto &rhs) { // This serves 2 purposes: // - This makes sure that we don't forget to implement a // comparison case when adding a new variant type. // - This avoids having to write all the cross type cases // which are already handled by the index sort above. // Because the compiler has no way to determine that, we // cannot use static assertions here without having to // define the whole type matrix also. assert(false); // Return any bool, it's only there to make the compiler // happy. return false; }, }, lhs, rhs); } }; using VoteMap = std::map; struct query_timeout {}; namespace { struct AvalancheTest; } // FIXME Implement a proper notification handler for node disconnection instead // of implementing the whole NetEventsInterface for a single interesting event. class Processor final : public NetEventsInterface { Config avaconfig; CConnman *connman; ChainstateManager &chainman; CTxMemPool *mempool; /** * Items to run avalanche on. */ RWCollection voteRecords; /** * Keep track of peers and queries sent. */ std::atomic round; /** * Keep track of the peers and associated infos. */ mutable Mutex cs_peerManager; std::unique_ptr peerManager GUARDED_BY(cs_peerManager); struct Query { NodeId nodeid; uint64_t round; TimePoint timeout; /** * We declare this as mutable so it can be modified in the multi_index. * This is ok because we do not use this field to index in anyway. * * /!\ Do not use any mutable field as index. */ mutable std::vector invs; }; using QuerySet = boost::multi_index_container< Query, boost::multi_index::indexed_by< // index by nodeid/round boost::multi_index::hashed_unique, boost::multi_index::member>>, // sorted by timeout boost::multi_index::ordered_non_unique< boost::multi_index::tag, boost::multi_index::member>>>; RWCollection queries; /** Data required to participate. */ struct PeerData; std::unique_ptr peerData; CKey sessionKey; /** Event loop machinery. */ EventLoop eventLoop; /** * Quorum management. */ uint32_t minQuorumScore; double minQuorumConnectedScoreRatio; std::atomic quorumIsEstablished{false}; std::atomic m_canShareLocalProof{false}; int64_t minAvaproofsNodeCount; std::atomic avaproofsNodeCounter{0}; /** Voting parameters. */ const uint32_t staleVoteThreshold; const uint32_t staleVoteFactor; /** Registered interfaces::Chain::Notifications handler. */ class NotificationsHandler; std::unique_ptr chainNotificationsHandler; mutable Mutex cs_finalizationTip; const CBlockIndex *finalizationTip GUARDED_BY(cs_finalizationTip){nullptr}; mutable Mutex cs_delayedAvahelloNodeIds; /** * A list of the nodes that did not get our proof announced via avahello * yet because we had no inbound connection. */ std::unordered_set delayedAvahelloNodeIds GUARDED_BY(cs_delayedAvahelloNodeIds); Processor(Config avaconfig, interfaces::Chain &chain, CConnman *connmanIn, ChainstateManager &chainman, CTxMemPool *mempoolIn, CScheduler &scheduler, std::unique_ptr peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, int64_t minAvaproofsNodeCountIn, uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn, Amount stakeUtxoDustThresholdIn); public: ~Processor(); static std::unique_ptr MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, ChainstateManager &chainman, CTxMemPool *mempoolIn, CScheduler &scheduler, bilingual_str &error); bool addToReconcile(const AnyVoteItem &item); bool isAccepted(const AnyVoteItem &item) const; int getConfidence(const AnyVoteItem &item) const; // TODO: Refactor the API to remove the dependency on avalanche/protocol.h void sendResponse(CNode *pfrom, Response response) const; bool registerVotes(NodeId nodeid, const Response &response, std::vector &updates, int &banscore, std::string &error); template auto withPeerManager(Callable &&func) const { LOCK(cs_peerManager); return func(*peerManager); } CPubKey getSessionPubKey() const; /** * @brief Send a avahello message * * @param pfrom The node to send the message to * @return True if a non-null delegation has been announced */ bool sendHello(CNode *pfrom); void sendDelayedAvahello(); ProofRef getLocalProof() const; ProofRegistrationState getLocalProofRegistrationState() const; /* * Return whether the avalanche service flag should be set. */ bool isAvalancheServiceAvailable() { return !!peerData; } bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); void avaproofsSent(NodeId nodeid) LOCKS_EXCLUDED(cs_main); int64_t getAvaproofsNodeCounter() const { return avaproofsNodeCounter.load(); } bool isQuorumEstablished() LOCKS_EXCLUDED(cs_main); bool canShareLocalProof(); // Implement NetEventInterface. Only FinalizeNode is of interest. void InitializeNode(const ::Config &config, CNode *pnode) override {} bool ProcessMessages(const ::Config &config, CNode *pnode, std::atomic &interrupt) override { return false; } bool SendMessages(const ::Config &config, CNode *pnode) override { return false; } /** Handle removal of a node */ void FinalizeNode(const ::Config &config, const CNode &node) override LOCKS_EXCLUDED(cs_main); private: void runEventLoop(); void clearTimedoutRequests(); std::vector getInvsForNextPoll(bool forPoll = true); bool sendHelloInternal(CNode *pfrom) EXCLUSIVE_LOCKS_REQUIRED(cs_delayedAvahelloNodeIds); AnyVoteItem getVoteItemFromInv(const CInv &inv) const; /** * We don't need many blocks but a low false positive rate. * In the event of a false positive the node might skip polling this block. * Such a block will not get marked as finalized until it is reconsidered * for polling (if the filter changed its state) or another block is found. */ mutable Mutex cs_invalidatedBlocks; CRollingBloomFilter invalidatedBlocks GUARDED_BY(cs_invalidatedBlocks){ 100, 0.0000001}; /** * Rolling bloom filter to track recently finalized inventory items of any * type. Once placed in this filter, those items will not be polled again * unless they roll out. Note that this one filter tracks all types so * blocks may be rolled out by transaction activity for example. * * We want a low false positive rate to prevent accidentally not polling * for an item when it is first seen. */ mutable Mutex cs_finalizedItems; CRollingBloomFilter finalizedItems GUARDED_BY(cs_finalizedItems){ AVALANCHE_FINALIZED_ITEMS_FILTER_NUM_ELEMENTS, 0.0000001}; struct IsWorthPolling { const Processor &processor; IsWorthPolling(const Processor &_processor) : processor(_processor){}; bool operator()(const CBlockIndex *pindex) const LOCKS_EXCLUDED(cs_main); bool operator()(const ProofRef &proof) const LOCKS_EXCLUDED(cs_peerManager); bool operator()(const CTransactionRef &tx) const; }; bool isWorthPolling(const AnyVoteItem &item) const; struct GetLocalAcceptance { const Processor &processor; GetLocalAcceptance(const Processor &_processor) : processor(_processor){}; bool operator()(const CBlockIndex *pindex) const LOCKS_EXCLUDED(cs_main); bool operator()(const ProofRef &proof) const LOCKS_EXCLUDED(cs_peerManager); bool operator()(const CTransactionRef &tx) const; }; bool getLocalAcceptance(const AnyVoteItem &item) const { return std::visit(GetLocalAcceptance(*this), item); } friend struct ::avalanche::AvalancheTest; }; } // namespace avalanche #endif // BITCOIN_AVALANCHE_PROCESSOR_H diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp index 78c5b075d..78050aa96 100644 --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -1,2082 +1,2228 @@ // Copyright (c) 2018-2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include // For ::PeerManager #include #include #include #include // For bilingual_str // D6970 moved LookupBlockIndex from chain.h to validation.h TODO: remove this // when LookupBlockIndex is refactored out of validation #include #include #include #include #include #include #include #include using namespace avalanche; namespace avalanche { namespace { struct AvalancheTest { static void runEventLoop(avalanche::Processor &p) { p.runEventLoop(); } static std::vector getInvsForNextPoll(Processor &p) { return p.getInvsForNextPoll(false); } static NodeId getSuitableNodeToQuery(Processor &p) { return WITH_LOCK(p.cs_peerManager, return p.peerManager->selectNode()); } static uint64_t getRound(const Processor &p) { return p.round; } static uint32_t getMinQuorumScore(const Processor &p) { return p.minQuorumScore; } static double getMinQuorumConnectedScoreRatio(const Processor &p) { return p.minQuorumConnectedScoreRatio; } static void clearavaproofsNodeCounter(Processor &p) { p.avaproofsNodeCounter = 0; } static void addVoteRecord(Processor &p, AnyVoteItem &item, VoteRecord &voteRecord) { p.voteRecords.getWriteView()->insert( std::make_pair(item, voteRecord)); } static void setFinalizationTip(Processor &p, const CBlockIndex *pindex) { LOCK(p.cs_finalizationTip); p.finalizationTip = pindex; } }; } // namespace struct TestVoteRecord : public VoteRecord { explicit TestVoteRecord(uint16_t conf) : VoteRecord(true) { confidence |= conf << 1; } }; } // namespace avalanche namespace { struct CConnmanTest : public CConnman { using CConnman::CConnman; void AddNode(CNode &node) { LOCK(m_nodes_mutex); m_nodes.push_back(&node); } void ClearNodes() { LOCK(m_nodes_mutex); for (CNode *node : m_nodes) { delete node; } m_nodes.clear(); } }; CService ip(uint32_t i) { struct in_addr s; s.s_addr = i; return CService(CNetAddr(s), Params().GetDefaultPort()); } struct AvalancheTestingSetup : public TestChain100Setup { const ::Config &config; CConnmanTest *m_connman; std::unique_ptr m_processor; // The master private key we delegate to. CKey masterpriv; std::unordered_set m_overridden_args; AvalancheTestingSetup() : TestChain100Setup(), config(GetConfig()), masterpriv(CKey::MakeCompressedKey()) { // Deterministic randomness for tests. auto connman = std::make_unique(config, 0x1337, 0x1337, *m_node.addrman); m_connman = connman.get(); m_node.connman = std::move(connman); m_node.peerman = ::PeerManager::make( config.GetChainParams(), *m_connman, *m_node.addrman, m_node.banman.get(), *m_node.chainman, *m_node.mempool, false); m_node.chain = interfaces::MakeChain(m_node, config.GetChainParams()); // Get the processor ready. setArg("-avaminquorumstake", "0"); setArg("-avaminquorumconnectedstakeratio", "0"); setArg("-avaminavaproofsnodecount", "0"); setArg("-avaproofstakeutxoconfirmations", "1"); bilingual_str error; m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), *Assert(m_node.chainman), m_node.mempool.get(), *m_node.scheduler, error); BOOST_CHECK(m_processor); } ~AvalancheTestingSetup() { m_connman->ClearNodes(); SyncWithValidationInterfaceQueue(); ArgsManager &argsman = *Assert(m_node.args); for (const std::string &key : m_overridden_args) { argsman.ClearForcedArg(key); } m_overridden_args.clear(); } CNode *ConnectNode(ServiceFlags nServices) { static NodeId id = 0; CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); auto node = new CNode(id++, ServiceFlags(NODE_NETWORK), INVALID_SOCKET, addr, /* nKeyedNetGroupIn */ 0, /* nLocalHostNonceIn */ 0, /* nLocalExtraEntropyIn */ 0, CAddress(), /* pszDest */ "", ConnectionType::OUTBOUND_FULL_RELAY, /* inbound_onion */ false); node->SetCommonVersion(PROTOCOL_VERSION); node->nServices = nServices; m_node.peerman->InitializeNode(config, node); node->nVersion = 1; node->fSuccessfullyConnected = true; m_connman->AddNode(*node); return node; } ProofRef GetProof() { const CKey key = CKey::MakeCompressedKey(); const COutPoint outpoint{TxId(GetRandHash()), 0}; CScript script = GetScriptForDestination(PKHash(key.GetPubKey())); const Amount amount = PROOF_DUST_THRESHOLD; const uint32_t height = 100; LOCK(cs_main); CCoinsViewCache &coins = Assert(m_node.chainman)->ActiveChainstate().CoinsTip(); coins.AddCoin(outpoint, Coin(CTxOut(amount, script), height, false), false); ProofBuilder pb(0, 0, masterpriv, UNSPENDABLE_ECREG_PAYOUT_SCRIPT); BOOST_CHECK(pb.addUTXO(outpoint, amount, height, false, key)); return pb.build(); } bool addNode(NodeId nodeid, const ProofId &proofid) { return m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.addNode(nodeid, proofid); }); } bool addNode(NodeId nodeid) { auto proof = GetProof(); return m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.registerProof(proof) && pm.addNode(nodeid, proof->getId()); }); } std::array ConnectNodes() { auto proof = GetProof(); BOOST_CHECK( m_processor->withPeerManager([&](avalanche::PeerManager &pm) { return pm.registerProof(proof); })); const ProofId &proofid = proof->getId(); std::array nodes; for (CNode *&n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(addNode(n->GetId(), proofid)); } return nodes; } void runEventLoop() { AvalancheTest::runEventLoop(*m_processor); } NodeId getSuitableNodeToQuery() { return AvalancheTest::getSuitableNodeToQuery(*m_processor); } std::vector getInvsForNextPoll() { return AvalancheTest::getInvsForNextPoll(*m_processor); } uint64_t getRound() const { return AvalancheTest::getRound(*m_processor); } bool registerVotes(NodeId nodeid, const avalanche::Response &response, std::vector &updates, std::string &error) { int banscore; return m_processor->registerVotes(nodeid, response, updates, banscore, error); } bool registerVotes(NodeId nodeid, const avalanche::Response &response, std::vector &updates) { int banscore; std::string error; return m_processor->registerVotes(nodeid, response, updates, banscore, error); } void setArg(std::string key, std::string value) { ArgsManager &argsman = *Assert(m_node.args); argsman.ForceSetArg(key, std::move(value)); m_overridden_args.emplace(std::move(key)); } bool addToReconcile(const AnyVoteItem &item) { return m_processor->addToReconcile(item); } }; struct BlockProvider { AvalancheTestingSetup *fixture; uint32_t invType; BlockProvider(AvalancheTestingSetup *_fixture) : fixture(_fixture), invType(MSG_BLOCK) {} CBlockIndex *buildVoteItem() const { CBlock block = fixture->CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); LOCK(cs_main); return Assert(fixture->m_node.chainman) ->m_blockman.LookupBlockIndex(blockHash); } uint256 getVoteItemId(const CBlockIndex *pindex) const { return pindex->GetBlockHash(); } std::vector buildVotesForItems(uint32_t error, std::vector &&items) { size_t numItems = items.size(); std::vector votes; votes.reserve(numItems); // Votes are sorted by most work first std::sort(items.begin(), items.end(), CBlockIndexWorkComparator()); for (auto &item : reverse_iterate(items)) { votes.emplace_back(error, item->GetBlockHash()); } return votes; } void invalidateItem(CBlockIndex *pindex) { LOCK(::cs_main); pindex->nStatus = pindex->nStatus.withFailed(); } const CBlockIndex *fromAnyVoteItem(const AnyVoteItem &item) { return std::get(item); } }; struct ProofProvider { AvalancheTestingSetup *fixture; uint32_t invType; ProofProvider(AvalancheTestingSetup *_fixture) : fixture(_fixture), invType(MSG_AVA_PROOF) {} ProofRef buildVoteItem() const { const ProofRef proof = fixture->GetProof(); fixture->m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof)); }); return proof; } uint256 getVoteItemId(const ProofRef &proof) const { return proof->getId(); } std::vector buildVotesForItems(uint32_t error, std::vector &&items) { size_t numItems = items.size(); std::vector votes; votes.reserve(numItems); // Votes are sorted by high score first std::sort(items.begin(), items.end(), ProofComparatorByScore()); for (auto &item : items) { votes.emplace_back(error, item->getId()); } return votes; } void invalidateItem(const ProofRef &proof) { fixture->m_processor->withPeerManager([&](avalanche::PeerManager &pm) { pm.rejectProof(proof->getId(), avalanche::PeerManager::RejectionMode::INVALIDATE); }); } const ProofRef fromAnyVoteItem(const AnyVoteItem &item) { return std::get(item); } }; struct TxProvider { AvalancheTestingSetup *fixture; std::vector updates; uint32_t invType; TxProvider(AvalancheTestingSetup *_fixture) : fixture(_fixture), invType(MSG_TX) {} CTransactionRef buildVoteItem() const { + auto rng = FastRandomContext(); CMutableTransaction mtx; mtx.nVersion = 2; - mtx.vin.emplace_back(COutPoint{TxId(FastRandomContext().rand256()), 0}); - mtx.vout.emplace_back(1 * COIN, CScript() << OP_TRUE); + mtx.vin.emplace_back(COutPoint{TxId(rng.rand256()), 0}); + mtx.vout.emplace_back(10 * COIN, CScript() << OP_TRUE); CTransactionRef tx = MakeTransactionRef(std::move(mtx)); TestMemPoolEntryHelper mempoolEntryHelper; - auto entry = mempoolEntryHelper.FromTx(tx); + auto entry = mempoolEntryHelper.Fee(int64_t(rng.randrange(10)) * COIN) + .FromTx(tx); CTxMemPool *mempool = Assert(fixture->m_node.mempool.get()); { LOCK2(cs_main, mempool->cs); mempool->addUnchecked(entry); BOOST_CHECK(mempool->exists(tx->GetId())); } return tx; } uint256 getVoteItemId(const CTransactionRef &tx) const { return tx->GetId(); } std::vector buildVotesForItems(uint32_t error, std::vector &&items) { size_t numItems = items.size(); std::vector votes; votes.reserve(numItems); - // Transactions are sorted by TxId - std::sort(items.begin(), items.end(), - [](const CTransactionRef &lhs, const CTransactionRef &rhs) { - return lhs->GetId() < rhs->GetId(); - }); + CTxMemPool *mempool = Assert(fixture->m_node.mempool.get()); + + { + LOCK(mempool->cs); + + // Transactions are sorted by modified fee rate as long as they are + // in the mempool. Let's keep it simple here and assume it's the + // case. + std::sort(items.begin(), items.end(), + [mempool](const CTransactionRef &lhs, + const CTransactionRef &rhs) + EXCLUSIVE_LOCKS_REQUIRED(mempool->cs) { + auto lhsIter = mempool->GetIter(lhs->GetId()); + auto rhsIter = mempool->GetIter(rhs->GetId()); + BOOST_CHECK(lhsIter); + BOOST_CHECK(rhsIter); + + return CompareTxMemPoolEntryByModifiedFeeRate{}( + **lhsIter, **rhsIter); + }); + } + for (auto &item : items) { votes.emplace_back(error, item->GetId()); } return votes; } void invalidateItem(const CTransactionRef &tx) { BOOST_CHECK(tx != nullptr); CTxMemPool *mempool = Assert(fixture->m_node.mempool.get()); LOCK(mempool->cs); mempool->removeRecursive(*tx, MemPoolRemovalReason::CONFLICT); BOOST_CHECK(!mempool->exists(tx->GetId())); } const CTransactionRef fromAnyVoteItem(const AnyVoteItem &item) { return std::get(item); } }; } // namespace BOOST_FIXTURE_TEST_SUITE(processor_tests, AvalancheTestingSetup) // FIXME A std::tuple can be used instead of boost::mpl::list after boost 1.67 using VoteItemProviders = boost::mpl::list; BOOST_AUTO_TEST_CASE_TEMPLATE(voteitemupdate, P, VoteItemProviders) { P provider(this); std::set status{ VoteStatus::Invalid, VoteStatus::Rejected, VoteStatus::Accepted, VoteStatus::Finalized, VoteStatus::Stale, }; auto item = provider.buildVoteItem(); for (auto s : status) { VoteItemUpdate itemUpdate(item, s); // The use of BOOST_CHECK instead of BOOST_CHECK_EQUAL prevents from // having to define operator<<() for each argument type. BOOST_CHECK(provider.fromAnyVoteItem(itemUpdate.getVoteItem()) == item); BOOST_CHECK(itemUpdate.getStatus() == s); } } namespace { Response next(Response &r) { auto copy = r; r = {r.getRound() + 1, r.getCooldown(), r.GetVotes()}; return copy; } } // namespace BOOST_AUTO_TEST_CASE_TEMPLATE(item_reconcile_twice, P, VoteItemProviders) { P provider(this); const CBlockIndex *chaintip = Assert(m_node.chainman)->ActiveTip(); auto item = provider.buildVoteItem(); auto itemid = provider.getVoteItemId(item); // Adding the item twice does nothing. BOOST_CHECK(addToReconcile(item)); BOOST_CHECK(!addToReconcile(item)); BOOST_CHECK(m_processor->isAccepted(item)); // Create nodes that supports avalanche so we can finalize the item. auto avanodes = ConnectNodes(); int nextNodeIndex = 0; std::vector updates; auto registerNewVote = [&](const Response &resp) { runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(registerVotes(nodeid, resp, updates)); }; // Finalize the item. auto finalize = [&](const auto finalizeItemId) { Response resp = {getRound(), 0, {Vote(0, finalizeItemId)}}; for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE + 6; i++) { registerNewVote(next(resp)); if (updates.size() > 0) { break; } } BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Finalized); updates.clear(); }; finalize(itemid); // The finalized item cannot be reconciled for a while. BOOST_CHECK(!addToReconcile(item)); auto finalizeNewItem = [&]() { auto anotherItem = provider.buildVoteItem(); AnyVoteItem anotherVoteItem = AnyVoteItem(anotherItem); auto anotherItemId = provider.getVoteItemId(anotherItem); TestVoteRecord voteRecord(AVALANCHE_FINALIZATION_SCORE - 1); AvalancheTest::addVoteRecord(*m_processor, anotherVoteItem, voteRecord); finalize(anotherItemId); }; // The filter can have new items added up to its size and the item will // still not reconcile. for (uint32_t i = 0; i < AVALANCHE_FINALIZED_ITEMS_FILTER_NUM_ELEMENTS; i++) { finalizeNewItem(); BOOST_CHECK(!addToReconcile(item)); } // But if we keep going it will eventually roll out of the filter and can // be reconciled again. for (uint32_t i = 0; i < AVALANCHE_FINALIZED_ITEMS_FILTER_NUM_ELEMENTS; i++) { finalizeNewItem(); } // Roll back the finalization point so that reconciling the old block does // not fail the finalization check. This is a no-op for other types. AvalancheTest::setFinalizationTip(*m_processor, chaintip); BOOST_CHECK(addToReconcile(item)); } BOOST_AUTO_TEST_CASE_TEMPLATE(item_null, P, VoteItemProviders) { P provider(this); // Check that null case is handled on the public interface BOOST_CHECK(!m_processor->isAccepted(nullptr)); BOOST_CHECK_EQUAL(m_processor->getConfidence(nullptr), -1); auto item = decltype(provider.buildVoteItem())(); BOOST_CHECK(item == nullptr); BOOST_CHECK(!addToReconcile(item)); // Check that adding item to vote on doesn't change the outcome. A // comparator is used under the hood, and this is skipped if there are no // vote records. item = provider.buildVoteItem(); BOOST_CHECK(addToReconcile(item)); BOOST_CHECK(!m_processor->isAccepted(nullptr)); BOOST_CHECK_EQUAL(m_processor->getConfidence(nullptr), -1); } BOOST_AUTO_TEST_CASE_TEMPLATE(vote_item_register, P, VoteItemProviders) { P provider(this); const uint32_t invType = provider.invType; auto item = provider.buildVoteItem(); auto itemid = provider.getVoteItemId(item); // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random item returns false. BOOST_CHECK(!m_processor->isAccepted(item)); // Add a new item. Check it is added to the polls. BOOST_CHECK(addToReconcile(item)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); BOOST_CHECK(m_processor->isAccepted(item)); int nextNodeIndex = 0; std::vector updates; auto registerNewVote = [&](const Response &resp) { runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(registerVotes(nodeid, resp, updates)); }; // Let's vote for this item a few times. Response resp{0, 0, {Vote(0, itemid)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // A single neutral vote do not change anything. resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 0); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, itemid)}}; for (int i = 1; i < 7; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // Two neutral votes will stall progress. resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 6); BOOST_CHECK_EQUAL(updates.size(), 0); registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 6); BOOST_CHECK_EQUAL(updates.size(), 0); resp = {getRound(), 0, {Vote(0, itemid)}}; for (int i = 2; i < 8; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), 6); BOOST_CHECK_EQUAL(updates.size(), 0); } // We vote for it numerous times to finalize it. for (int i = 7; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), i); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(provider.fromAnyVoteItem(updates[0].getVoteItem()) == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Finalized); updates.clear(); // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); // Get a new item to vote on item = provider.buildVoteItem(); itemid = provider.getVoteItemId(item); BOOST_CHECK(addToReconcile(item)); // Now let's finalize rejection. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); resp = {getRound(), 0, {Vote(1, itemid)}}; for (int i = 0; i < 6; i++) { registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(provider.fromAnyVoteItem(updates[0].getVoteItem()) == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Rejected); updates.clear(); // Now it is rejected, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not finalized, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); // Now finalize the decision. registerNewVote(next(resp)); BOOST_CHECK(!m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(provider.fromAnyVoteItem(updates[0].getVoteItem()) == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Invalid); updates.clear(); // Once the decision is finalized, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } BOOST_AUTO_TEST_CASE_TEMPLATE(multi_item_register, P, VoteItemProviders) { P provider(this); const uint32_t invType = provider.invType; auto itemA = provider.buildVoteItem(); auto itemidA = provider.getVoteItemId(itemA); auto itemB = provider.buildVoteItem(); auto itemidB = provider.getVoteItemId(itemB); // Create several nodes that support avalanche. auto avanodes = ConnectNodes(); // Querying for random item returns false. BOOST_CHECK(!m_processor->isAccepted(itemA)); BOOST_CHECK(!m_processor->isAccepted(itemB)); // Start voting on item A. BOOST_CHECK(addToReconcile(itemA)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemidA); uint64_t round = getRound(); runEventLoop(); std::vector updates; BOOST_CHECK(registerVotes(avanodes[0]->GetId(), {round, 0, {Vote(0, itemidA)}}, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on item B after one vote. std::vector votes = provider.buildVotesForItems(0, {itemA, itemB}); Response resp{round + 1, 0, votes}; BOOST_CHECK(addToReconcile(itemB)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 2); // Ensure the inv ordering is as expected for (size_t i = 0; i < invs.size(); i++) { BOOST_CHECK_EQUAL(invs[i].type, invType); BOOST_CHECK(invs[i].hash == votes[i].GetHash()); } // Let's vote for these items a few times. for (int i = 0; i < 4; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is accepted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { NodeId nodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(registerVotes(nodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Running two iterration of the event loop so that vote gets triggered on A // and B. NodeId firstNodeid = getSuitableNodeToQuery(); runEventLoop(); NodeId secondNodeid = getSuitableNodeToQuery(); runEventLoop(); BOOST_CHECK(firstNodeid != secondNodeid); // Next vote will finalize item A. BOOST_CHECK(registerVotes(firstNodeid, next(resp), updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(provider.fromAnyVoteItem(updates[0].getVoteItem()) == itemA); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Finalized); updates.clear(); // We do not vote on A anymore. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemidB); // Next vote will finalize item B. BOOST_CHECK(registerVotes(secondNodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(provider.fromAnyVoteItem(updates[0].getVoteItem()) == itemB); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Finalized); updates.clear(); // There is nothing left to vote on. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } BOOST_AUTO_TEST_CASE_TEMPLATE(poll_and_response, P, VoteItemProviders) { P provider(this); const uint32_t invType = provider.invType; auto item = provider.buildVoteItem(); auto itemid = provider.getVoteItemId(item); // There is no node to query. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Add enough nodes to have a valid quorum, and the same amount with no // avalanche support std::set avanodeIds; auto avanodes = ConnectNodes(); for (auto avanode : avanodes) { ConnectNode(NODE_NONE); avanodeIds.insert(avanode->GetId()); } auto getSelectedAvanodeId = [&]() { NodeId avanodeid = getSuitableNodeToQuery(); BOOST_CHECK(avanodeIds.find(avanodeid) != avanodeIds.end()); return avanodeid; }; // It returns one of the avalanche peer. NodeId avanodeid = getSelectedAvanodeId(); // Register an item and check it is added to the list of elements to poll. BOOST_CHECK(addToReconcile(item)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); std::set unselectedNodeids = avanodeIds; unselectedNodeids.erase(avanodeid); const size_t remainingNodeIds = unselectedNodeids.size(); uint64_t round = getRound(); for (size_t i = 0; i < remainingNodeIds; i++) { // Trigger a poll on avanode. runEventLoop(); // Another node is selected NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(unselectedNodeids.find(nodeid) != avanodeIds.end()); unselectedNodeids.erase(nodeid); } // There is no more suitable peer available, so return nothing. BOOST_CHECK(unselectedNodeids.empty()); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Respond to the request. Response resp = {round, 0, {Vote(0, itemid)}}; std::vector updates; BOOST_CHECK(registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Now that avanode fullfilled his request, it is added back to the list of // queriable nodes. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); auto checkRegisterVotesError = [&](NodeId nodeid, const avalanche::Response &response, const std::string &expectedError) { std::string error; BOOST_CHECK(!registerVotes(nodeid, response, updates, error)); BOOST_CHECK_EQUAL(error, expectedError); BOOST_CHECK_EQUAL(updates.size(), 0); }; // Sending a response when not polled fails. checkRegisterVotesError(avanodeid, next(resp), "unexpected-ava-response"); // Trigger a poll on avanode. round = getRound(); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Sending responses that do not match the request also fails. // 1. Too many results. resp = {round, 0, {Vote(0, itemid), Vote(0, itemid)}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-size"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 2. Not enough results. resp = {getRound(), 0, {}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-size"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // 3. Do not match the poll. resp = {getRound(), 0, {Vote()}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-content"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // At this stage we have reached the max inflight requests for our inv, so // it won't be requested anymore until the requests are fullfilled. Let's // vote on another item with no inflight request so the remaining tests // makes sense. invs = getInvsForNextPoll(); BOOST_CHECK(invs.empty()); item = provider.buildVoteItem(); itemid = provider.getVoteItemId(item); BOOST_CHECK(addToReconcile(item)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); // 4. Invalid round count. Request is not discarded. uint64_t queryRound = getRound(); runEventLoop(); resp = {queryRound + 1, 0, {Vote()}}; checkRegisterVotesError(avanodeid, resp, "unexpected-ava-response"); resp = {queryRound - 1, 0, {Vote()}}; checkRegisterVotesError(avanodeid, resp, "unexpected-ava-response"); // 5. Making request for invalid nodes do not work. Request is not // discarded. resp = {queryRound, 0, {Vote(0, itemid)}}; checkRegisterVotesError(avanodeid + 1234, resp, "unexpected-ava-response"); // Proper response gets processed and avanode is available again. resp = {queryRound, 0, {Vote(0, itemid)}}; BOOST_CHECK(registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // Out of order response are rejected. const auto item2 = provider.buildVoteItem(); BOOST_CHECK(addToReconcile(item2)); std::vector votes = provider.buildVotesForItems(0, {item, item2}); resp = {getRound(), 0, {votes[1], votes[0]}}; runEventLoop(); checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-content"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); // But they are accepted in order. resp = {getRound(), 0, votes}; runEventLoop(); BOOST_CHECK(registerVotes(avanodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); } BOOST_AUTO_TEST_CASE_TEMPLATE(dont_poll_invalid_item, P, VoteItemProviders) { P provider(this); const uint32_t invType = provider.invType; auto itemA = provider.buildVoteItem(); auto itemB = provider.buildVoteItem(); auto avanodes = ConnectNodes(); // Build votes to get proper ordering std::vector votes = provider.buildVotesForItems(0, {itemA, itemB}); // Register the items and check they are added to the list of elements to // poll. BOOST_CHECK(addToReconcile(itemA)); BOOST_CHECK(addToReconcile(itemB)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 2); for (size_t i = 0; i < invs.size(); i++) { BOOST_CHECK_EQUAL(invs[i].type, invType); BOOST_CHECK(invs[i].hash == votes[i].GetHash()); } // When an item is marked invalid, stop polling. provider.invalidateItem(itemB); Response goodResp{getRound(), 0, {Vote(0, provider.getVoteItemId(itemA))}}; std::vector updates; runEventLoop(); BOOST_CHECK(registerVotes(avanodes[0]->GetId(), goodResp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Votes including itemB are rejected Response badResp{getRound(), 0, votes}; runEventLoop(); std::string error; BOOST_CHECK(!registerVotes(avanodes[1]->GetId(), badResp, updates, error)); BOOST_CHECK_EQUAL(error, "invalid-ava-response-size"); } BOOST_TEST_DECORATOR(*boost::unit_test::timeout(60)) BOOST_AUTO_TEST_CASE_TEMPLATE(poll_inflight_timeout, P, VoteItemProviders) { P provider(this); ChainstateManager &chainman = *Assert(m_node.chainman); auto queryTimeDuration = std::chrono::milliseconds(10); setArg("-avatimeout", ToString(queryTimeDuration.count())); bilingual_str error; m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), chainman, m_node.mempool.get(), *m_node.scheduler, error); const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); // Add the item BOOST_CHECK(addToReconcile(item)); // Create a quorum of nodes that support avalanche. ConnectNodes(); NodeId avanodeid = NO_NODE; // Expire requests after some time. for (int i = 0; i < 10; i++) { Response resp = {getRound(), 0, {Vote(0, itemid)}}; avanodeid = getSuitableNodeToQuery(); auto start = std::chrono::steady_clock::now(); runEventLoop(); // We cannot guarantee that we'll wait for just 1ms, so we have to bail // if we aren't within the proper time range. std::this_thread::sleep_for(std::chrono::milliseconds(1)); runEventLoop(); std::vector updates; bool ret = registerVotes(avanodeid, next(resp), updates); if (std::chrono::steady_clock::now() > start + queryTimeDuration) { // We waited for too long, bail. Because we can't know for sure when // previous steps ran, ret is not deterministic and we do not check // it. i--; continue; } // We are within time bounds, so the vote should have worked. BOOST_CHECK(ret); avanodeid = getSuitableNodeToQuery(); // Now try again but wait for expiration. runEventLoop(); std::this_thread::sleep_for(queryTimeDuration); runEventLoop(); BOOST_CHECK(!registerVotes(avanodeid, next(resp), updates)); } } BOOST_AUTO_TEST_CASE_TEMPLATE(poll_inflight_count, P, VoteItemProviders) { P provider(this); const uint32_t invType = provider.invType; // Create enough nodes so that we run into the inflight request limit. auto proof = GetProof(); BOOST_CHECK(m_processor->withPeerManager( [&](avalanche::PeerManager &pm) { return pm.registerProof(proof); })); std::array nodes; for (auto &n : nodes) { n = ConnectNode(NODE_AVALANCHE); BOOST_CHECK(addNode(n->GetId(), proof->getId())); } // Add an item to poll const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); BOOST_CHECK(addToReconcile(item)); // Ensure there are enough requests in flight. std::map node_round_map; for (int i = 0; i < AVALANCHE_MAX_INFLIGHT_POLL; i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map.insert(std::pair(nodeid, getRound())); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); runEventLoop(); } // Now that we have enough in flight requests, we shouldn't poll. auto suitablenodeid = getSuitableNodeToQuery(); BOOST_CHECK(suitablenodeid != NO_NODE); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), suitablenodeid); // Send one response, now we can poll again. auto it = node_round_map.begin(); Response resp = {it->second, 0, {Vote(0, itemid)}}; std::vector updates; BOOST_CHECK(registerVotes(it->first, resp, updates)); node_round_map.erase(it); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); } BOOST_AUTO_TEST_CASE(quorum_diversity) { std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = Assert(m_node.chainman)->m_blockman.LookupBlockIndex(blockHash); } // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); // Querying for random block returns false. BOOST_CHECK(!m_processor->isAccepted(pindex)); // Add a new block. Check it is added to the polls. BOOST_CHECK(m_processor->addToReconcile(pindex)); // Do one valid round of voting. uint64_t round = getRound(); Response resp{round, 0, {Vote(0, blockHash)}}; // Check that all nodes can vote. for (size_t i = 0; i < avanodes.size(); i++) { runEventLoop(); BOOST_CHECK(registerVotes(avanodes[i]->GetId(), next(resp), updates)); } // Generate a query for every single node. const NodeId firstNodeId = getSuitableNodeToQuery(); std::map node_round_map; round = getRound(); for (size_t i = 0; i < avanodes.size(); i++) { NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK(node_round_map.find(nodeid) == node_round_map.end()); node_round_map[nodeid] = getRound(); runEventLoop(); } // Now only the first node can vote. All others would be duplicate in the // quorum. auto confidence = m_processor->getConfidence(pindex); BOOST_REQUIRE(confidence > 0); for (auto &[nodeid, r] : node_round_map) { if (nodeid == firstNodeId) { // Node 0 is the only one which can vote at this stage. round = r; continue; } BOOST_CHECK( registerVotes(nodeid, {r, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence); } BOOST_CHECK( registerVotes(firstNodeId, {round, 0, {Vote(0, blockHash)}}, updates)); BOOST_CHECK_EQUAL(m_processor->getConfidence(pindex), confidence + 1); } BOOST_AUTO_TEST_CASE(event_loop) { CScheduler s; CBlock block = CreateAndProcessBlock({}, CScript()); const BlockHash blockHash = block.GetHash(); const CBlockIndex *pindex; { LOCK(cs_main); pindex = Assert(m_node.chainman)->m_blockman.LookupBlockIndex(blockHash); } // Starting the event loop. BOOST_CHECK(m_processor->startEventLoop(s)); // There is one task planned in the next hour (our event loop). std::chrono::steady_clock::time_point start, stop; BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. BOOST_CHECK(!m_processor->startEventLoop(s)); // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); // Create a quorum of nodes that support avalanche. auto avanodes = ConnectNodes(); // There is no query in flight at the moment. NodeId nodeid = getSuitableNodeToQuery(); BOOST_CHECK_NE(nodeid, NO_NODE); // Add a new block. Check it is added to the polls. uint64_t queryRound = getRound(); BOOST_CHECK(m_processor->addToReconcile(pindex)); // Wait until all nodes got a poll for (int i = 0; i < 60 * 1000; i++) { // Technically, this is a race condition, but this should do just fine // as we wait up to 1 minute for an event that should take 80ms. UninterruptibleSleep(std::chrono::milliseconds(1)); if (getRound() == queryRound + avanodes.size()) { break; } } // Check that we effectively got a request and not timed out. BOOST_CHECK(getRound() > queryRound); // Respond and check the cooldown time is respected. uint64_t responseRound = getRound(); auto queryTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(100); std::vector updates; // Only the first node answers, so it's the only one that gets polled again BOOST_CHECK(registerVotes(nodeid, {queryRound, 100, {Vote(0, blockHash)}}, updates)); for (int i = 0; i < 10000; i++) { // We make sure that we do not get a request before queryTime. UninterruptibleSleep(std::chrono::milliseconds(1)); if (getRound() != responseRound) { BOOST_CHECK(std::chrono::steady_clock::now() > queryTime); break; } } // But we eventually get one. BOOST_CHECK(getRound() > responseRound); // Stop event loop. BOOST_CHECK(m_processor->stopEventLoop()); // We don't have any task scheduled anymore. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Can't stop the event loop twice. BOOST_CHECK(!m_processor->stopEventLoop()); // Wait for the scheduler to stop. s.StopWhenDrained(); schedulerThread.join(); } BOOST_AUTO_TEST_CASE(destructor) { CScheduler s; std::chrono::steady_clock::time_point start, stop; std::thread schedulerThread; BOOST_CHECK(m_processor->startEventLoop(s)); BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Start the service thread after the queue size check to prevent a race // condition where the thread may be processing the event loop task during // the check. schedulerThread = std::thread(std::bind(&CScheduler::serviceQueue, &s)); // Destroy the processor. m_processor.reset(); // Now that avalanche is destroyed, there is no more scheduled tasks. BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Wait for the scheduler to stop. s.StopWhenDrained(); schedulerThread.join(); } BOOST_AUTO_TEST_CASE(add_proof_to_reconcile) { uint32_t score = MIN_VALID_PROOF_SCORE; Chainstate &active_chainstate = Assert(m_node.chainman)->ActiveChainstate(); auto addProofToReconcile = [&](uint32_t proofScore) { auto proof = buildRandomProof(active_chainstate, proofScore); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof)); }); BOOST_CHECK(m_processor->addToReconcile(proof)); return proof; }; for (size_t i = 0; i < AVALANCHE_MAX_ELEMENT_POLL; i++) { auto proof = addProofToReconcile(++score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); BOOST_CHECK_EQUAL(invs.size(), i + 1); BOOST_CHECK(invs.front().IsMsgProof()); BOOST_CHECK_EQUAL(invs.front().hash, proof->getId()); } // From here a new proof is only polled if its score is in the top // AVALANCHE_MAX_ELEMENT_POLL ProofId lastProofId; for (size_t i = 0; i < 10; i++) { auto proof = addProofToReconcile(++score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); BOOST_CHECK_EQUAL(invs.size(), AVALANCHE_MAX_ELEMENT_POLL); BOOST_CHECK(invs.front().IsMsgProof()); BOOST_CHECK_EQUAL(invs.front().hash, proof->getId()); lastProofId = proof->getId(); } for (size_t i = 0; i < 10; i++) { auto proof = addProofToReconcile(--score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); BOOST_CHECK_EQUAL(invs.size(), AVALANCHE_MAX_ELEMENT_POLL); BOOST_CHECK(invs.front().IsMsgProof()); BOOST_CHECK_EQUAL(invs.front().hash, lastProofId); } { // The score is not high enough to get polled auto proof = addProofToReconcile(--score); auto invs = AvalancheTest::getInvsForNextPoll(*m_processor); for (auto &inv : invs) { BOOST_CHECK_NE(inv.hash, proof->getId()); } } } BOOST_AUTO_TEST_CASE(proof_record) { setArg("-avaproofstakeutxoconfirmations", "2"); setArg("-avalancheconflictingproofcooldown", "0"); BOOST_CHECK(!m_processor->isAccepted(nullptr)); BOOST_CHECK_EQUAL(m_processor->getConfidence(nullptr), -1); const CKey key = CKey::MakeCompressedKey(); const COutPoint conflictingOutpoint{TxId(GetRandHash()), 0}; const COutPoint immatureOutpoint{TxId(GetRandHash()), 0}; { CScript script = GetScriptForDestination(PKHash(key.GetPubKey())); LOCK(cs_main); CCoinsViewCache &coins = Assert(m_node.chainman)->ActiveChainstate().CoinsTip(); coins.AddCoin(conflictingOutpoint, Coin(CTxOut(PROOF_DUST_THRESHOLD, script), 10, false), false); coins.AddCoin(immatureOutpoint, Coin(CTxOut(PROOF_DUST_THRESHOLD, script), 100, false), false); } auto buildProof = [&](const COutPoint &outpoint, uint64_t sequence, uint32_t height = 10) { ProofBuilder pb(sequence, 0, key, UNSPENDABLE_ECREG_PAYOUT_SCRIPT); BOOST_CHECK( pb.addUTXO(outpoint, PROOF_DUST_THRESHOLD, height, false, key)); return pb.build(); }; auto conflictingProof = buildProof(conflictingOutpoint, 1); auto validProof = buildProof(conflictingOutpoint, 2); auto immatureProof = buildProof(immatureOutpoint, 3, 100); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(!m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(immatureProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), -1); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), -1); BOOST_CHECK_EQUAL(m_processor->getConfidence(immatureProof), -1); // Reconciling proofs that don't exist will fail BOOST_CHECK(!m_processor->addToReconcile(conflictingProof)); BOOST_CHECK(!m_processor->addToReconcile(validProof)); BOOST_CHECK(!m_processor->addToReconcile(immatureProof)); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(conflictingProof)); BOOST_CHECK(pm.registerProof(validProof)); BOOST_CHECK(!pm.registerProof(immatureProof)); BOOST_CHECK(pm.isBoundToPeer(validProof->getId())); BOOST_CHECK(pm.isInConflictingPool(conflictingProof->getId())); BOOST_CHECK(pm.isImmature(immatureProof->getId())); }); BOOST_CHECK(m_processor->addToReconcile(conflictingProof)); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(!m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(immatureProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), -1); BOOST_CHECK_EQUAL(m_processor->getConfidence(immatureProof), -1); BOOST_CHECK(m_processor->addToReconcile(validProof)); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(immatureProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(immatureProof), -1); BOOST_CHECK(!m_processor->addToReconcile(immatureProof)); BOOST_CHECK(!m_processor->isAccepted(conflictingProof)); BOOST_CHECK(m_processor->isAccepted(validProof)); BOOST_CHECK(!m_processor->isAccepted(immatureProof)); BOOST_CHECK_EQUAL(m_processor->getConfidence(conflictingProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(validProof), 0); BOOST_CHECK_EQUAL(m_processor->getConfidence(immatureProof), -1); } BOOST_AUTO_TEST_CASE(quorum_detection) { // Set min quorum parameters for our test int minStake = 400'000'000; setArg("-avaminquorumstake", ToString(minStake)); setArg("-avaminquorumconnectedstakeratio", "0.5"); // Create a new processor with our given quorum parameters const auto currency = Currency::get(); uint32_t minScore = Proof::amountToScore(minStake * currency.baseunit); Chainstate &active_chainstate = Assert(m_node.chainman)->ActiveChainstate(); const CKey key = CKey::MakeCompressedKey(); auto localProof = buildRandomProof(active_chainstate, minScore / 4, 100, key); setArg("-avamasterkey", EncodeSecret(key)); setArg("-avaproof", localProof->ToHex()); bilingual_str error; ChainstateManager &chainman = *Assert(m_node.chainman); m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), chainman, m_node.mempool.get(), *m_node.scheduler, error); BOOST_CHECK(m_processor != nullptr); BOOST_CHECK(m_processor->getLocalProof() != nullptr); BOOST_CHECK_EQUAL(m_processor->getLocalProof()->getId(), localProof->getId()); BOOST_CHECK_EQUAL(AvalancheTest::getMinQuorumScore(*m_processor), minScore); BOOST_CHECK_EQUAL( AvalancheTest::getMinQuorumConnectedScoreRatio(*m_processor), 0.5); // The local proof has not been validated yet m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 0); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); }); BOOST_CHECK(!m_processor->isQuorumEstablished()); // Register the local proof. This is normally done when the chain tip is // updated. The local proof should be accounted for in the min quorum // computation but the peer manager doesn't know about that. m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(m_processor->getLocalProof())); BOOST_CHECK(pm.isBoundToPeer(m_processor->getLocalProof()->getId())); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); }); BOOST_CHECK(!m_processor->isQuorumEstablished()); // Add enough nodes to get a conclusive vote for (NodeId id = 0; id < 8; id++) { m_processor->withPeerManager([&](avalanche::PeerManager &pm) { pm.addNode(id, m_processor->getLocalProof()->getId()); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); } // Add part of the required stake and make sure we still report no quorum auto proof1 = buildRandomProof(active_chainstate, minScore / 2); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof1)); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 3 * minScore / 4); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(!m_processor->isQuorumEstablished()); // Add the rest of the stake, but we are still lacking connected stake const int64_t tipTime = chainman.ActiveTip()->GetBlockTime(); const COutPoint utxo{TxId(GetRandHash()), 0}; const Amount amount = (int64_t(minScore / 4) * COIN) / 100; const int height = 100; const bool isCoinbase = false; { LOCK(cs_main); CCoinsViewCache &coins = active_chainstate.CoinsTip(); coins.AddCoin(utxo, Coin(CTxOut(amount, GetScriptForDestination( PKHash(key.GetPubKey()))), height, isCoinbase), false); } ProofBuilder pb(1, tipTime + 1, key, UNSPENDABLE_ECREG_PAYOUT_SCRIPT); BOOST_CHECK(pb.addUTXO(utxo, amount, height, isCoinbase, key)); auto proof2 = pb.build(); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof2)); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(!m_processor->isQuorumEstablished()); // Adding a node should cause the quorum to be detected and locked-in m_processor->withPeerManager([&](avalanche::PeerManager &pm) { pm.addNode(8, proof2->getId()); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); // The peer manager knows that proof2 has a node attached ... BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 2); }); // ... but the processor also account for the local proof, so we reached 50% BOOST_CHECK(m_processor->isQuorumEstablished()); // Go back to not having enough connected score, but we've already latched // the quorum as established m_processor->withPeerManager([&](avalanche::PeerManager &pm) { pm.removeNode(8); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(m_processor->isQuorumEstablished()); // Removing one more node drops our count below the minimum and the quorum // is no longer ready m_processor->withPeerManager( [&](avalanche::PeerManager &pm) { pm.removeNode(7); }); BOOST_CHECK(!m_processor->isQuorumEstablished()); // It resumes when we have enough nodes again m_processor->withPeerManager([&](avalanche::PeerManager &pm) { pm.addNode(7, m_processor->getLocalProof()->getId()); }); BOOST_CHECK(m_processor->isQuorumEstablished()); // Remove peers one at a time until the quorum is no longer established auto spendProofUtxo = [&](ProofRef proof) { { LOCK(cs_main); CCoinsViewCache &coins = chainman.ActiveChainstate().CoinsTip(); coins.SpendCoin(proof->getStakes()[0].getStake().getUTXO()); } m_processor->withPeerManager([&proof](avalanche::PeerManager &pm) { pm.updatedBlockTip(); BOOST_CHECK(!pm.isBoundToPeer(proof->getId())); }); }; // Expire proof2, the quorum is still latched for (int64_t i = 0; i < 6; i++) { SetMockTime(proof2->getExpirationTime() + i); CreateAndProcessBlock({}, CScript()); } BOOST_CHECK_EQUAL(chainman.ActiveTip()->GetMedianTimePast(), proof2->getExpirationTime()); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { pm.updatedBlockTip(); BOOST_CHECK(!pm.exists(proof2->getId())); }); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 3 * minScore / 4); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(m_processor->isQuorumEstablished()); spendProofUtxo(proof1); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(m_processor->isQuorumEstablished()); spendProofUtxo(m_processor->getLocalProof()); m_processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 0); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); }); // There is no node left BOOST_CHECK(!m_processor->isQuorumEstablished()); } BOOST_AUTO_TEST_CASE(quorum_detection_parameter_validation) { // Create vector of tuples of: // const std::vector> testCases = { // All parameters are invalid {"", "", "", false}, {"-1", "-1", "-1", false}, // Min stake is out of range {"-1", "0", "0", false}, {"-0.01", "0", "0", false}, {"21000000000000.01", "0", "0", false}, // Min connected ratio is out of range {"0", "-1", "0", false}, {"0", "1.1", "0", false}, // Min avaproofs messages ratio is out of range {"0", "0", "-1", false}, // All parameters are valid {"0", "0", "0", true}, {"0.00", "0", "0", true}, {"0.01", "0", "0", true}, {"1", "0.1", "0", true}, {"10", "0.5", "0", true}, {"10", "1", "0", true}, {"21000000000000.00", "0", "0", true}, {"0", "0", "1", true}, {"0", "0", "100", true}, }; // For each case set the parameters and check that making the processor // succeeds or fails as expected for (const auto &[stake, stakeRatio, numProofsMessages, success] : testCases) { setArg("-avaminquorumstake", stake); setArg("-avaminquorumconnectedstakeratio", stakeRatio); setArg("-avaminavaproofsnodecount", numProofsMessages); bilingual_str error; std::unique_ptr processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), *Assert(m_node.chainman), m_node.mempool.get(), *m_node.scheduler, error); if (success) { BOOST_CHECK(processor != nullptr); BOOST_CHECK(error.empty()); BOOST_CHECK_EQUAL(error.original, ""); } else { BOOST_CHECK(processor == nullptr); BOOST_CHECK(!error.empty()); BOOST_CHECK(error.original != ""); } } } BOOST_AUTO_TEST_CASE(min_avaproofs_messages) { ChainstateManager &chainman = *Assert(m_node.chainman); auto checkMinAvaproofsMessages = [&](int64_t minAvaproofsMessages) { setArg("-avaminavaproofsnodecount", ToString(minAvaproofsMessages)); bilingual_str error; auto processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), chainman, m_node.mempool.get(), *m_node.scheduler, error); auto addNode = [&](NodeId nodeid) { auto proof = buildRandomProof(chainman.ActiveChainstate(), MIN_VALID_PROOF_SCORE); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof)); BOOST_CHECK(pm.addNode(nodeid, proof->getId())); }); }; // Add enough node to have a conclusive vote, but don't account any // avaproofs. // NOTE: we can't use the test facilites like ConnectNodes() because we // are not testing on m_processor. for (NodeId id = 100; id < 108; id++) { addNode(id); } BOOST_CHECK_EQUAL(processor->isQuorumEstablished(), minAvaproofsMessages <= 0); for (int64_t i = 0; i < minAvaproofsMessages - 1; i++) { addNode(i); processor->avaproofsSent(i); BOOST_CHECK_EQUAL(processor->getAvaproofsNodeCounter(), i + 1); // Receiving again on the same node does not increase the counter processor->avaproofsSent(i); BOOST_CHECK_EQUAL(processor->getAvaproofsNodeCounter(), i + 1); BOOST_CHECK(!processor->isQuorumEstablished()); } addNode(minAvaproofsMessages); processor->avaproofsSent(minAvaproofsMessages); BOOST_CHECK(processor->isQuorumEstablished()); // Check the latch AvalancheTest::clearavaproofsNodeCounter(*processor); BOOST_CHECK(processor->isQuorumEstablished()); }; checkMinAvaproofsMessages(0); checkMinAvaproofsMessages(1); checkMinAvaproofsMessages(10); checkMinAvaproofsMessages(100); } BOOST_AUTO_TEST_CASE_TEMPLATE(voting_parameters, P, VoteItemProviders) { // Check that setting voting parameters has the expected effect setArg("-avastalevotethreshold", ToString(AVALANCHE_VOTE_STALE_MIN_THRESHOLD)); setArg("-avastalevotefactor", "2"); const std::vector> testCases = { // {number of yes votes, number of neutral votes} {0, AVALANCHE_VOTE_STALE_MIN_THRESHOLD}, {AVALANCHE_FINALIZATION_SCORE + 4, AVALANCHE_FINALIZATION_SCORE - 6}, }; bilingual_str error; m_processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), *Assert(m_node.chainman), m_node.mempool.get(), *m_node.scheduler, error); BOOST_CHECK(m_processor != nullptr); BOOST_CHECK(error.empty()); P provider(this); const uint32_t invType = provider.invType; const auto item = provider.buildVoteItem(); const auto itemid = provider.getVoteItemId(item); // Create nodes that supports avalanche. auto avanodes = ConnectNodes(); int nextNodeIndex = 0; std::vector updates; for (const auto &[numYesVotes, numNeutralVotes] : testCases) { // Add a new item. Check it is added to the polls. BOOST_CHECK(addToReconcile(item)); auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); BOOST_CHECK(m_processor->isAccepted(item)); auto registerNewVote = [&](const Response &resp) { runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(registerVotes(nodeid, resp, updates)); }; // Add some confidence for (int i = 0; i < numYesVotes; i++) { Response resp = {getRound(), 0, {Vote(0, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK(m_processor->isAccepted(item)); BOOST_CHECK_EQUAL(m_processor->getConfidence(item), i >= 6 ? i - 5 : 0); BOOST_CHECK_EQUAL(updates.size(), 0); } // Vote until just before item goes stale for (int i = 0; i < numNeutralVotes; i++) { Response resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 0); } // As long as it is not stale, we poll. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); // Now stale Response resp = {getRound(), 0, {Vote(-1, itemid)}}; registerNewVote(next(resp)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(provider.fromAnyVoteItem(updates[0].getVoteItem()) == item); BOOST_CHECK(updates[0].getStatus() == VoteStatus::Stale); updates.clear(); // Once stale, there is no poll for it. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); } } BOOST_AUTO_TEST_CASE(block_vote_finalization_tip) { BlockProvider provider(this); std::vector blockIndexes; for (size_t i = 0; i < AVALANCHE_MAX_ELEMENT_POLL; i++) { CBlockIndex *pindex = provider.buildVoteItem(); BOOST_CHECK(addToReconcile(pindex)); blockIndexes.push_back(pindex); } auto invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), AVALANCHE_MAX_ELEMENT_POLL); for (size_t i = 0; i < AVALANCHE_MAX_ELEMENT_POLL; i++) { BOOST_CHECK_EQUAL( invs[i].hash, blockIndexes[AVALANCHE_MAX_ELEMENT_POLL - i - 1]->GetBlockHash()); } // Build a vote vector with the 11th block only being accepted and others // unknown. const BlockHash eleventhBlockHash = blockIndexes[AVALANCHE_MAX_ELEMENT_POLL - 10 - 1]->GetBlockHash(); std::vector votes; votes.reserve(AVALANCHE_MAX_ELEMENT_POLL); for (size_t i = AVALANCHE_MAX_ELEMENT_POLL; i > 0; i--) { BlockHash blockhash = blockIndexes[i - 1]->GetBlockHash(); votes.emplace_back(blockhash == eleventhBlockHash ? 0 : -1, blockhash); } auto avanodes = ConnectNodes(); int nextNodeIndex = 0; std::vector updates; auto registerNewVote = [&]() { Response resp = {getRound(), 0, votes}; runEventLoop(); auto nodeid = avanodes[nextNodeIndex++ % avanodes.size()]->GetId(); BOOST_CHECK(registerVotes(nodeid, resp, updates)); }; // Vote for the blocks until the one being accepted finalizes bool eleventhBlockFinalized = false; for (size_t i = 0; i < 10000 && !eleventhBlockFinalized; i++) { registerNewVote(); for (auto &update : updates) { if (update.getStatus() == VoteStatus::Finalized && provider.fromAnyVoteItem(update.getVoteItem()) ->GetBlockHash() == eleventhBlockHash) { eleventhBlockFinalized = true; } } } BOOST_CHECK(eleventhBlockFinalized); // From now only the 10 blocks with more work are polled for invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 10); for (size_t i = 0; i < 10; i++) { BOOST_CHECK_EQUAL( invs[i].hash, blockIndexes[AVALANCHE_MAX_ELEMENT_POLL - i - 1]->GetBlockHash()); } // Adding ancestor blocks to reconcile will fail for (size_t i = 0; i < AVALANCHE_MAX_ELEMENT_POLL - 10 - 1; i++) { BOOST_CHECK(!addToReconcile(blockIndexes[i])); } // Create a couple concurrent chain tips CBlockIndex *tip = provider.buildVoteItem(); const auto &config = GetConfig(); auto &activeChainstate = m_node.chainman->ActiveChainstate(); BlockValidationState state; activeChainstate.InvalidateBlock(config, state, tip); // Use another script to make sure we don't generate the same block again CBlock altblock = CreateAndProcessBlock({}, CScript() << OP_TRUE); auto alttip = WITH_LOCK( cs_main, return Assert(m_node.chainman) ->m_blockman.LookupBlockIndex(altblock.GetHash())); BOOST_CHECK(alttip); BOOST_CHECK(alttip->pprev == tip->pprev); BOOST_CHECK(alttip->GetBlockHash() != tip->GetBlockHash()); // Reconsider the previous tip valid, so we have concurrent tip candidates { LOCK(cs_main); activeChainstate.ResetBlockFailureFlags(tip); } activeChainstate.ActivateBestChain(config, state); BOOST_CHECK(addToReconcile(tip)); BOOST_CHECK(addToReconcile(alttip)); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 12); // Vote for the tip until it finalizes BlockHash tiphash = tip->GetBlockHash(); votes.clear(); votes.reserve(12); for (auto &inv : invs) { votes.emplace_back(inv.hash == tiphash ? 0 : -1, inv.hash); } bool tipFinalized = false; for (size_t i = 0; i < 10000 && !tipFinalized; i++) { registerNewVote(); for (auto &update : updates) { if (update.getStatus() == VoteStatus::Finalized && provider.fromAnyVoteItem(update.getVoteItem()) ->GetBlockHash() == tiphash) { tipFinalized = true; } } } BOOST_CHECK(tipFinalized); // Now the tip and all its ancestors will be removed from polls. Only the // alttip remains because it is on a forked chain so we want to keep polling // for that one until it's invalidated or stalled. invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].hash, alttip->GetBlockHash()); // Cannot reconcile a finalized block BOOST_CHECK(!addToReconcile(tip)); // Vote for alttip until it invalidates BlockHash alttiphash = alttip->GetBlockHash(); votes = {{1, alttiphash}}; bool alttipInvalidated = false; for (size_t i = 0; i < 10000 && !alttipInvalidated; i++) { registerNewVote(); for (auto &update : updates) { if (update.getStatus() == VoteStatus::Invalid && provider.fromAnyVoteItem(update.getVoteItem()) ->GetBlockHash() == alttiphash) { alttipInvalidated = true; } } } BOOST_CHECK(alttipInvalidated); invs = getInvsForNextPoll(); BOOST_CHECK_EQUAL(invs.size(), 0); // Cannot reconcile an invalidated block BOOST_CHECK(!addToReconcile(alttip)); } BOOST_AUTO_TEST_CASE(vote_map_comparator) { ChainstateManager &chainman = *Assert(m_node.chainman); Chainstate &activeChainState = chainman.ActiveChainstate(); const int numberElementsEachType = 100; FastRandomContext rng; std::vector proofs; for (size_t i = 1; i <= numberElementsEachType; i++) { auto proof = buildRandomProof(activeChainState, i * MIN_VALID_PROOF_SCORE); BOOST_CHECK(proof != nullptr); proofs.emplace_back(std::move(proof)); } Shuffle(proofs.begin(), proofs.end(), rng); std::vector indexes; for (size_t i = 1; i <= numberElementsEachType; i++) { CBlockIndex index; index.nChainWork = i; indexes.emplace_back(std::move(index)); } Shuffle(indexes.begin(), indexes.end(), rng); - auto allItems = std::make_tuple(std::move(proofs), std::move(indexes)); + CTxMemPool *mempool = Assert(m_node.mempool.get()); + TestMemPoolEntryHelper mempoolEntryHelper; + std::vector txs; + for (size_t i = 1; i <= numberElementsEachType; i++) { + CMutableTransaction mtx; + mtx.nVersion = 2; + mtx.vin.emplace_back(COutPoint{TxId(rng.rand256()), 0}); + mtx.vout.emplace_back(1000 * COIN, CScript() << OP_TRUE); + + CTransactionRef tx = MakeTransactionRef(std::move(mtx)); + + auto entry = mempoolEntryHelper.Fee(int64_t(i) * COIN).FromTx(tx); + { + LOCK2(cs_main, mempool->cs); + mempool->addUnchecked(entry); + BOOST_CHECK(mempool->exists(tx->GetId())); + } + + txs.emplace_back(std::move(tx)); + } + + auto allItems = + std::make_tuple(std::move(proofs), std::move(indexes), std::move(txs)); static const size_t numTypes = std::tuple_size::value; - RWCollection voteMap; + RWCollection voteMap(VoteMap(m_node.mempool.get())); { auto writeView = voteMap.getWriteView(); for (size_t i = 0; i < numberElementsEachType; i++) { // Randomize the insert order at each loop increment const size_t firstType = rng.randrange(numTypes); for (size_t j = 0; j < numTypes; j++) { switch ((firstType + j) % numTypes) { // ProofRef case 0: writeView->insert(std::make_pair( std::get<0>(allItems)[i], VoteRecord(true))); break; // CBlockIndex * case 1: writeView->insert(std::make_pair( &std::get<1>(allItems)[i], VoteRecord(true))); break; + // CTransactionRef + case 2: + writeView->insert(std::make_pair( + std::get<2>(allItems)[i], VoteRecord(true))); + break; default: break; } } } } { // Check ordering auto readView = voteMap.getReadView(); auto it = readView.begin(); - // The first batch of items is the proofs ordered by score (descending) + // The first batch of items is the proofs ordered by score + // (descending) uint32_t lastScore = std::numeric_limits::max(); for (size_t i = 0; i < numberElementsEachType; i++) { BOOST_CHECK(std::holds_alternative(it->first)); uint32_t currentScore = std::get(it->first)->getScore(); BOOST_CHECK_LT(currentScore, lastScore); lastScore = currentScore; it++; } // The next batch of items is the block indexes ordered by work // (descending) arith_uint256 lastWork = -1; for (size_t i = 0; i < numberElementsEachType; i++) { BOOST_CHECK(std::holds_alternative(it->first)); arith_uint256 currentWork = std::get(it->first)->nChainWork; BOOST_CHECK(currentWork < lastWork); lastWork = currentWork; it++; } + // The last batch of items is the txs ordered by modified fee rate + CFeeRate lastFeeRate{MAX_MONEY}; + { + LOCK(mempool->cs); + + for (size_t i = 0; i < numberElementsEachType; i++) { + BOOST_CHECK( + std::holds_alternative(it->first)); + + auto iter = mempool->GetIter( + std::get(it->first)->GetId()); + BOOST_CHECK(iter.has_value()); + + CFeeRate currentFeeRate = (*iter)->GetModifiedFeeRate(); + + BOOST_CHECK(currentFeeRate < lastFeeRate); + lastFeeRate = currentFeeRate; + + it++; + } + } + BOOST_CHECK(it == readView.end()); } } +BOOST_AUTO_TEST_CASE(vote_map_tx_comparator) { + CTxMemPool *mempool = Assert(m_node.mempool.get()); + TestMemPoolEntryHelper mempoolEntryHelper; + TxProvider provider(this); + + std::vector txs; + for (size_t i = 0; i < 5; i++) { + txs.emplace_back(provider.buildVoteItem()); + } + + { + // When there is no mempool, the txs are sorted by txid + RWCollection voteMap(VoteMap(nullptr)); + { + auto writeView = voteMap.getWriteView(); + for (const auto &tx : txs) { + writeView->insert(std::make_pair(tx, VoteRecord(true))); + } + } + + auto readView = voteMap.getReadView(); + TxId lastTxId{uint256::ZERO}; + for (const auto &[item, vote] : readView) { + auto tx = std::get(item); + BOOST_CHECK_GT(tx->GetId(), lastTxId); + lastTxId = tx->GetId(); + } + } + + // Remove the 5 first txs from the mempool, and add 5 more + mempool->clear(); + for (size_t i = 0; i < 5; i++) { + txs.emplace_back(provider.buildVoteItem()); + } + + { + RWCollection voteMap((VoteMap(mempool))); + + { + auto writeView = voteMap.getWriteView(); + for (const auto &tx : txs) { + writeView->insert(std::make_pair(tx, VoteRecord(true))); + } + } + + auto readView = voteMap.getReadView(); + auto it = readView.begin(); + + LOCK(mempool->cs); + + // The first 5 txs are sorted by fee + CFeeRate lastFeeRate{MAX_MONEY}; + for (size_t i = 0; i < 5; i++) { + auto tx = std::get(it->first); + + auto iter = mempool->GetIter(tx->GetId()); + BOOST_CHECK(iter.has_value()); + + BOOST_CHECK((*iter)->GetModifiedFeeRate() <= lastFeeRate); + lastFeeRate = (*iter)->GetModifiedFeeRate(); + it++; + } + + // The last 5 txs are sorted by txid + TxId lastTxId{uint256::ZERO}; + for (size_t i = 0; i < 5; i++) { + auto tx = std::get(it->first); + + BOOST_CHECK(!mempool->exists(tx->GetId())); + + BOOST_CHECK_GT(tx->GetId(), lastTxId); + lastTxId = tx->GetId(); + it++; + } + } +} + BOOST_AUTO_TEST_CASE(block_reconcile_initial_vote) { const auto &config = GetConfig(); auto &chainman = Assert(m_node.chainman); Chainstate &chainstate = chainman->ActiveChainstate(); const auto block = std::make_shared( this->CreateBlock({}, CScript(), chainstate)); const BlockHash blockhash = block->GetHash(); BlockValidationState state; CBlockIndex *blockindex; { LOCK(cs_main); BOOST_CHECK(chainstate.AcceptBlock(config, block, state, /*fRequested=*/true, /*dbp=*/nullptr, /*fNewBlock=*/nullptr)); blockindex = chainman->m_blockman.LookupBlockIndex(blockhash); BOOST_CHECK(blockindex); } // ActivateBestChain() interacts with g_avalanche, so make it happy g_avalanche = std::move(m_processor); // The block is not connected yet, and not added to the poll list yet BOOST_CHECK(AvalancheTest::getInvsForNextPoll(*g_avalanche).empty()); BOOST_CHECK(!g_avalanche->isAccepted(blockindex)); // Call ActivateBestChain to connect the new block BOOST_CHECK(chainstate.ActivateBestChain(config, state, block)); // It is a valid block so the tip is updated BOOST_CHECK_EQUAL(chainstate.m_chain.Tip(), blockindex); // Check the block is added to the poll auto invs = AvalancheTest::getInvsForNextPoll(*g_avalanche); BOOST_CHECK_EQUAL(invs.size(), 1); BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK_EQUAL(invs[0].hash, blockhash); // This block is our new tip so we should vote "yes" BOOST_CHECK(g_avalanche->isAccepted(blockindex)); // Prevent a data race between UpdatedBlockTip and the Processor destructor SyncWithValidationInterfaceQueue(); g_avalanche.reset(nullptr); } BOOST_AUTO_TEST_SUITE_END() diff --git a/src/rwcollection.h b/src/rwcollection.h index ad45a1d72..3d4f37247 100644 --- a/src/rwcollection.h +++ b/src/rwcollection.h @@ -1,88 +1,90 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_RWCOLLECTION_H #define BITCOIN_RWCOLLECTION_H #include #include #include #include #include #include template class RWCollectionView { private: L lock; T *collection; template struct BracketType { using type = decltype(std::declval()[std::declval()]); }; public: RWCollectionView(L l, T &c) : lock(std::move(l)), collection(&c) {} RWCollectionView(RWCollectionView &&other) : lock(std::move(other.lock)), collection(other.collection) {} RWCollectionView(const RWCollectionView &) = delete; const RWCollectionView &operator=(const RWCollectionView) = delete; T *operator->() { return collection; } const T *operator->() const { return collection; } /** * Iterator mechanics. */ using iterator = typename boost::range_iterator::type; iterator begin() { return std::begin(*collection); } iterator end() { return std::end(*collection); } std::reverse_iterator rbegin() { return std::rbegin(*collection); } std::reverse_iterator rend() { return std::rend(*collection); } using const_iterator = typename boost::range_iterator::type; const_iterator begin() const { return std::begin(*collection); } const_iterator end() const { return std::end(*collection); } std::reverse_iterator rbegin() const { return std::rbegin(*collection); } std::reverse_iterator rend() const { return std::rend(*collection); } /** * Forward bracket operator. */ template typename BracketType::type operator[](I &&index) { return (*collection)[std::forward(index)]; } }; template class RWCollection { private: T collection; mutable std::shared_mutex rwmutex; public: RWCollection() : collection() {} + explicit RWCollection(T &&collection_) + : collection(std::move(collection_)) {} using ReadView = RWCollectionView>; ReadView getReadView() const { return ReadView(std::shared_lock(rwmutex), collection); } using WriteView = RWCollectionView>; WriteView getWriteView() { return WriteView(std::unique_lock(rwmutex), collection); } }; #endif // BITCOIN_RWCOLLECTION_H