diff --git a/src/avalanche/peermanager.cpp b/src/avalanche/peermanager.cpp index 877986138..75398acb2 100644 --- a/src/avalanche/peermanager.cpp +++ b/src/avalanche/peermanager.cpp @@ -1,815 +1,817 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include // For ChainstateActive() #include #include namespace avalanche { PeerManager::PeerManager(CScheduler &scheduler) { scheduler.scheduleEvery( [this]() -> bool { this->cleanupDanglingProofs(); return true; }, 5min); } bool PeerManager::addNode(NodeId nodeid, const ProofId &proofid) { auto &pview = peers.get(); auto it = pview.find(proofid); if (it == pview.end()) { // If the node exists, it is actually updating its proof to an unknown // one. In this case we need to remove it so it is not both active and // pending at the same time. removeNode(nodeid); pendingNodes.emplace(proofid, nodeid); return false; } return addOrUpdateNode(peers.project<0>(it), nodeid); } bool PeerManager::addOrUpdateNode(const PeerSet::iterator &it, NodeId nodeid) { assert(it != peers.end()); const PeerId peerid = it->peerid; auto nit = nodes.find(nodeid); if (nit == nodes.end()) { if (!nodes.emplace(nodeid, peerid).second) { return false; } } else { const PeerId oldpeerid = nit->peerid; if (!nodes.modify(nit, [&](Node &n) { n.peerid = peerid; })) { return false; } // We actually have this node already, we need to update it. bool success = removeNodeFromPeer(peers.find(oldpeerid)); assert(success); } bool success = addNodeToPeer(it); assert(success); // If the added node was in the pending set, remove it pendingNodes.get().erase(nodeid); return true; } bool PeerManager::addNodeToPeer(const PeerSet::iterator &it) { assert(it != peers.end()); return peers.modify(it, [&](Peer &p) { if (p.node_count++ > 0) { // We are done. return; } // We need to allocate this peer. p.index = uint32_t(slots.size()); const uint32_t score = p.getScore(); const uint64_t start = slotCount; slots.emplace_back(start, score, it->peerid); slotCount = start + score; // Add to our allocated score when we allocate a new peer in the slots connectedPeersScore += score; }); } bool PeerManager::removeNode(NodeId nodeid) { if (pendingNodes.get().erase(nodeid) > 0) { // If this was a pending node, there is nothing else to do. return true; } auto it = nodes.find(nodeid); if (it == nodes.end()) { return false; } const PeerId peerid = it->peerid; nodes.erase(it); // Keep the track of the reference count. bool success = removeNodeFromPeer(peers.find(peerid)); assert(success); return true; } bool PeerManager::removeNodeFromPeer(const PeerSet::iterator &it, uint32_t count) { // It is possible for nodes to be dangling. If there was an inflight query // when the peer gets removed, the node was not erased. In this case there // is nothing to do. if (it == peers.end()) { return true; } assert(count <= it->node_count); if (count == 0) { // This is a NOOP. return false; } const uint32_t new_count = it->node_count - count; if (!peers.modify(it, [&](Peer &p) { p.node_count = new_count; })) { return false; } if (new_count > 0) { // We are done. return true; } // There are no more nodes left, we need to clean up. Subtract allocated // score and remove from slots. const size_t i = it->index; assert(i < slots.size()); assert(connectedPeersScore >= slots[i].getScore()); connectedPeersScore -= slots[i].getScore(); if (i + 1 == slots.size()) { slots.pop_back(); slotCount = slots.empty() ? 0 : slots.back().getStop(); } else { fragmentation += slots[i].getScore(); slots[i] = slots[i].withPeerId(NO_PEER); } return true; } bool PeerManager::updateNextRequestTime(NodeId nodeid, TimePoint timeout) { auto it = nodes.find(nodeid); if (it == nodes.end()) { return false; } return nodes.modify(it, [&](Node &n) { n.nextRequestTime = timeout; }); } bool PeerManager::latchAvaproofsSent(NodeId nodeid) { auto it = nodes.find(nodeid); if (it == nodes.end()) { return false; } return !it->avaproofsSent && nodes.modify(it, [&](Node &n) { n.avaproofsSent = true; }); } static bool isOrphanState(const ProofValidationState &state) { return state.GetResult() == ProofValidationResult::MISSING_UTXO || state.GetResult() == ProofValidationResult::HEIGHT_MISMATCH || state.GetResult() == ProofValidationResult::IMMATURE_UTXO; } bool PeerManager::updateNextPossibleConflictTime( PeerId peerid, const std::chrono::seconds &nextTime) { auto it = peers.find(peerid); if (it == peers.end()) { // No such peer return false; } // Make sure we don't move the time in the past. peers.modify(it, [&](Peer &p) { p.nextPossibleConflictTime = std::max(p.nextPossibleConflictTime, nextTime); }); return it->nextPossibleConflictTime == nextTime; } template void PeerManager::moveToConflictingPool(const ProofContainer &proofs) { auto &peersView = peers.get(); for (const ProofRef &proof : proofs) { auto it = peersView.find(proof->getId()); if (it != peersView.end()) { removePeer(it->peerid); } conflictingProofPool.addProofIfPreferred(proof); } } bool PeerManager::registerProof(const ProofRef &proof, ProofRegistrationState ®istrationState, RegistrationMode mode) { assert(proof); const ProofId &proofid = proof->getId(); auto invalidate = [&](ProofRegistrationResult result, const std::string &message) { return registrationState.Invalid( result, message, strprintf("proofid: %s", proofid.ToString())); }; if ((mode != RegistrationMode::FORCE_ACCEPT || !isInConflictingPool(proofid)) && exists(proofid)) { // In default mode, we expect the proof to be unknown, i.e. in none of // the pools. // In forced accept mode, the proof can be in the conflicting pool. return invalidate(ProofRegistrationResult::ALREADY_REGISTERED, "proof-already-registered"); } // Check the proof's validity. ProofValidationState validationState; if (!WITH_LOCK(cs_main, return proof->verify(validationState, ::ChainstateActive().CoinsTip()))) { if (isOrphanState(validationState)) { // Only accept orphan proofs if there's room in the orphan pool. auto status = orphanProofPool.addProofIfNoConflict(proof); if (status != ProofPool::AddProofStatus::SUCCEED) { // Attempt proof replacement orphanProofPool.addProofIfPreferred(proof); } else if (orphanProofPool.countProofs() > AVALANCHE_MAX_ORPHAN_PROOFS) { // Adding this proof exceeds the orphan pool limit, so remove // it. orphanProofPool.removeProof(proof->getId()); } return invalidate(ProofRegistrationResult::ORPHAN, "orphan-proof"); } // Reject invalid proof. return invalidate(ProofRegistrationResult::INVALID, "invalid-proof"); } auto now = GetTime(); auto nextCooldownTimePoint = now + std::chrono::seconds( gArgs.GetArg("-avalancheconflictingproofcooldown", AVALANCHE_DEFAULT_CONFLICTING_PROOF_COOLDOWN)); ProofPool::ConflictingProofSet conflictingProofs; switch (validProofPool.addProofIfNoConflict(proof, conflictingProofs)) { case ProofPool::AddProofStatus::REJECTED: { if (mode != RegistrationMode::FORCE_ACCEPT) { auto bestPossibleConflictTime = std::chrono::seconds(); auto &pview = peers.get(); for (auto &conflictingProof : conflictingProofs) { auto it = pview.find(conflictingProof->getId()); assert(it != pview.end()); // Search the most recent time over the peers bestPossibleConflictTime = std::max( bestPossibleConflictTime, it->nextPossibleConflictTime); updateNextPossibleConflictTime(it->peerid, nextCooldownTimePoint); } if (bestPossibleConflictTime > now) { // Cooldown not elapsed, reject the proof. return invalidate( ProofRegistrationResult::COOLDOWN_NOT_ELAPSED, "cooldown-not-elapsed"); } // If proof replacement is enabled, give the proof a chance to // replace the conflicting ones. if (gArgs.GetBoolArg( "-enableavalancheproofreplacement", AVALANCHE_DEFAULT_PROOF_REPLACEMENT_ENABLED)) { if (validProofPool.addProofIfPreferred(proof)) { // If we have overridden other proofs due to conflict, // remove the peers and attempt to move them to the // conflicting pool. moveToConflictingPool(conflictingProofs); // Replacement is successful, continue to peer creation break; } } // Not the preferred proof, or replacement is not enabled return conflictingProofPool.addProofIfPreferred(proof) == ProofPool::AddProofStatus::REJECTED ? invalidate(ProofRegistrationResult::REJECTED, "rejected-proof") : invalidate(ProofRegistrationResult::CONFLICTING, "conflicting-utxos"); } conflictingProofPool.removeProof(proofid); // Move the conflicting proofs from the valid pool to the // conflicting pool moveToConflictingPool(conflictingProofs); auto status = validProofPool.addProofIfNoConflict(proof); assert(status == ProofPool::AddProofStatus::SUCCEED); break; } case ProofPool::AddProofStatus::DUPLICATED: // If the proof was already in the pool, don't duplicate the peer. return invalidate(ProofRegistrationResult::ALREADY_REGISTERED, "proof-already-registered"); case ProofPool::AddProofStatus::SUCCEED: break; // No default case, so the compiler can warn about missing cases } // At this stage we are going to create a peer so the proof should never // exist in the conflicting pool, but use belt and suspenders. conflictingProofPool.removeProof(proofid); // New peer means new peerid! const PeerId peerid = nextPeerId++; // We have no peer for this proof, time to create it. auto inserted = peers.emplace(peerid, proof, nextCooldownTimePoint); assert(inserted.second); auto insertedRadixTree = shareableProofs.insert(proof); assert(insertedRadixTree); // Add to our registered score when adding to the peer list totalPeersScore += proof->getScore(); // If there are nodes waiting for this proof, add them auto &pendingNodesView = pendingNodes.get(); auto range = pendingNodesView.equal_range(proofid); // We want to update the nodes then remove them from the pending set. That // will invalidate the range iterators, so we need to save the node ids // first before we can loop over them. std::vector nodeids; nodeids.reserve(std::distance(range.first, range.second)); std::transform(range.first, range.second, std::back_inserter(nodeids), [](const PendingNode &n) { return n.nodeid; }); for (const NodeId &nodeid : nodeids) { addOrUpdateNode(inserted.first, nodeid); } return true; } bool PeerManager::rejectProof(const ProofId &proofid, RejectionMode mode) { if (!exists(proofid)) { return false; } if (orphanProofPool.removeProof(proofid)) { return true; } if (mode == RejectionMode::DEFAULT && conflictingProofPool.getProof(proofid)) { // In default mode we keep the proof in the conflicting pool return true; } if (mode == RejectionMode::INVALIDATE && conflictingProofPool.removeProof(proofid)) { // In invalidate mode we remove the proof completely return true; } auto &pview = peers.get(); auto it = pview.find(proofid); assert(it != pview.end()); const ProofRef proof = it->proof; if (!removePeer(it->peerid)) { return false; } // If there was conflicting proofs, attempt to pull them back for (const SignedStake &ss : proof->getStakes()) { const ProofRef conflictingProof = conflictingProofPool.getProof(ss.getStake().getUTXO()); if (!conflictingProof) { continue; } conflictingProofPool.removeProof(conflictingProof->getId()); registerProof(conflictingProof); } if (mode == RejectionMode::DEFAULT) { conflictingProofPool.addProofIfPreferred(proof); } return true; } NodeId PeerManager::selectNode() { for (int retry = 0; retry < SELECT_NODE_MAX_RETRY; retry++) { const PeerId p = selectPeer(); // If we cannot find a peer, it may be due to the fact that it is // unlikely due to high fragmentation, so compact and retry. if (p == NO_PEER) { compact(); continue; } // See if that peer has an available node. auto &nview = nodes.get(); auto it = nview.lower_bound(boost::make_tuple(p, TimePoint())); if (it != nview.end() && it->peerid == p && it->nextRequestTime <= std::chrono::steady_clock::now()) { return it->nodeid; } } // We failed to find a node to query, flag this so we can request more needMoreNodes = true; return NO_NODE; } -void PeerManager::updatedBlockTip() { +std::unordered_set PeerManager::updatedBlockTip() { std::vector invalidProofIds; std::vector newOrphans; { LOCK(cs_main); const CCoinsViewCache &coins = ::ChainstateActive().CoinsTip(); for (const auto &p : peers) { ProofValidationState state; if (!p.proof->verify(state, coins)) { if (isOrphanState(state)) { newOrphans.push_back(p.proof); } invalidProofIds.push_back(p.getProofId()); } } } // Remove the invalid proofs before the orphans rescan. This makes it // possible to pull back proofs with utxos that conflicted with these // invalid proofs. for (const ProofId &invalidProofId : invalidProofIds) { rejectProof(invalidProofId, RejectionMode::INVALIDATE); } - orphanProofPool.rescan(*this); + auto registeredProofs = orphanProofPool.rescan(*this); for (auto &p : newOrphans) { orphanProofPool.addProofIfPreferred(p); } + + return registeredProofs; } ProofRef PeerManager::getProof(const ProofId &proofid) const { ProofRef proof; forPeer(proofid, [&](const Peer &p) { proof = p.proof; return true; }); if (!proof) { proof = conflictingProofPool.getProof(proofid); } if (!proof) { proof = orphanProofPool.getProof(proofid); } return proof; } bool PeerManager::isBoundToPeer(const ProofId &proofid) const { auto &pview = peers.get(); return pview.find(proofid) != pview.end(); } bool PeerManager::isOrphan(const ProofId &proofid) const { return orphanProofPool.getProof(proofid) != nullptr; } bool PeerManager::isInConflictingPool(const ProofId &proofid) const { return conflictingProofPool.getProof(proofid) != nullptr; } bool PeerManager::removePeer(const PeerId peerid) { auto it = peers.find(peerid); if (it == peers.end()) { return false; } // Remove all nodes from this peer. removeNodeFromPeer(it, it->node_count); auto &nview = nodes.get(); // Add the nodes to the pending set auto range = nview.equal_range(peerid); for (auto &nit = range.first; nit != range.second; ++nit) { pendingNodes.emplace(it->getProofId(), nit->nodeid); }; // Remove nodes associated with this peer, unless their timeout is still // active. This ensure that we don't overquery them in case they are // subsequently added to another peer. nview.erase(nview.lower_bound(boost::make_tuple(peerid, TimePoint())), nview.upper_bound(boost::make_tuple( peerid, std::chrono::steady_clock::now()))); // Release UTXOs attached to this proof. validProofPool.removeProof(it->getProofId()); auto removed = shareableProofs.remove(Uint256RadixKey(it->getProofId())); assert(removed != nullptr); m_unbroadcast_proofids.erase(it->getProofId()); // Remove the peer from the PeerSet and remove its score from the registered // score total. assert(totalPeersScore >= it->getScore()); totalPeersScore -= it->getScore(); peers.erase(it); return true; } PeerId PeerManager::selectPeer() const { if (slots.empty() || slotCount == 0) { return NO_PEER; } const uint64_t max = slotCount; for (int retry = 0; retry < SELECT_PEER_MAX_RETRY; retry++) { size_t i = selectPeerImpl(slots, GetRand(max), max); if (i != NO_PEER) { return i; } } return NO_PEER; } uint64_t PeerManager::compact() { // There is nothing to compact. if (fragmentation == 0) { return 0; } std::vector newslots; newslots.reserve(peers.size()); uint64_t prevStop = 0; uint32_t i = 0; for (auto it = peers.begin(); it != peers.end(); it++) { if (it->node_count == 0) { continue; } newslots.emplace_back(prevStop, it->getScore(), it->peerid); prevStop = slots[i].getStop(); if (!peers.modify(it, [&](Peer &p) { p.index = i++; })) { return 0; } } slots = std::move(newslots); const uint64_t saved = slotCount - prevStop; slotCount = prevStop; fragmentation = 0; return saved; } bool PeerManager::verify() const { uint64_t prevStop = 0; uint32_t scoreFromSlots = 0; for (size_t i = 0; i < slots.size(); i++) { const Slot &s = slots[i]; // Slots must be in correct order. if (s.getStart() < prevStop) { return false; } prevStop = s.getStop(); // If this is a dead slot, then nothing more needs to be checked. if (s.getPeerId() == NO_PEER) { continue; } // We have a live slot, verify index. auto it = peers.find(s.getPeerId()); if (it == peers.end() || it->index != i) { return false; } // Accumulate score across slots scoreFromSlots += slots[i].getScore(); } // Score across slots must be the same as our allocated score if (scoreFromSlots != connectedPeersScore) { return false; } uint32_t scoreFromAllPeers = 0; uint32_t scoreFromPeersWithNodes = 0; std::unordered_set peersUtxos; for (const auto &p : peers) { // Accumulate the score across peers to compare with total known score scoreFromAllPeers += p.getScore(); // A peer should have a proof attached if (!p.proof) { return false; } // Check proof pool consistency for (const auto &ss : p.proof->getStakes()) { const COutPoint &outpoint = ss.getStake().getUTXO(); auto proof = validProofPool.getProof(outpoint); if (!proof) { // Missing utxo return false; } if (proof != p.proof) { // Wrong proof return false; } if (!peersUtxos.emplace(outpoint).second) { // Duplicated utxo return false; } } // Count node attached to this peer. const auto count_nodes = [&]() { size_t count = 0; auto &nview = nodes.get(); auto begin = nview.lower_bound(boost::make_tuple(p.peerid, TimePoint())); auto end = nview.upper_bound(boost::make_tuple(p.peerid + 1, TimePoint())); for (auto it = begin; it != end; ++it) { count++; } return count; }; if (p.node_count != count_nodes()) { return false; } // If there are no nodes attached to this peer, then we are done. if (p.node_count == 0) { continue; } scoreFromPeersWithNodes += p.getScore(); // The index must point to a slot refering to this peer. if (p.index >= slots.size() || slots[p.index].getPeerId() != p.peerid) { return false; } // If the score do not match, same thing. if (slots[p.index].getScore() != p.getScore()) { return false; } // Check the proof is in the radix tree if (shareableProofs.get(p.getProofId()) == nullptr) { return false; } } // Check our accumulated scores against our registred and allocated scores if (scoreFromAllPeers != totalPeersScore) { return false; } if (scoreFromPeersWithNodes != connectedPeersScore) { return false; } // We checked the utxo consistency for all our peers utxos already, so if // the pool size differs from the expected one there are dangling utxos. if (validProofPool.size() != peersUtxos.size()) { return false; } // Check there is no dangling proof in the radix tree return shareableProofs.forEachLeaf([&](RCUPtr pLeaf) { return isBoundToPeer(pLeaf->getId()); }); } PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max) { assert(slot <= max); size_t begin = 0, end = slots.size(); uint64_t bottom = 0, top = max; // Try to find the slot using dichotomic search. while ((end - begin) > 8) { // The slot we picked in not allocated. if (slot < bottom || slot >= top) { return NO_PEER; } // Guesstimate the position of the slot. size_t i = begin + ((slot - bottom) * (end - begin) / (top - bottom)); assert(begin <= i && i < end); // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } // We undershooted. if (slots[i].precedes(slot)) { begin = i + 1; if (begin >= end) { return NO_PEER; } bottom = slots[begin].getStart(); continue; } // We overshooted. if (slots[i].follows(slot)) { end = i; top = slots[end].getStart(); continue; } // We have an unalocated slot. return NO_PEER; } // Enough of that nonsense, let fallback to linear search. for (size_t i = begin; i < end; i++) { // We have a match. if (slots[i].contains(slot)) { return slots[i].getPeerId(); } } // We failed to find a slot, retry. return NO_PEER; } void PeerManager::addUnbroadcastProof(const ProofId &proofid) { // The proof should be bound to a peer if (isBoundToPeer(proofid)) { m_unbroadcast_proofids.insert(proofid); } } void PeerManager::removeUnbroadcastProof(const ProofId &proofid) { m_unbroadcast_proofids.erase(proofid); } void PeerManager::cleanupDanglingProofs() { const auto now = GetTime(); std::vector danglingProofIds; for (const Peer &peer : peers) { // If the peer has been registered for some time and has no node // attached, discard it. if (peer.node_count == 0 && (peer.registration_time + Peer::DANGLING_TIMEOUT) <= now) { danglingProofIds.push_back(peer.getProofId()); } } for (const ProofId &proofid : danglingProofIds) { rejectProof(proofid, RejectionMode::INVALIDATE); } } } // namespace avalanche diff --git a/src/avalanche/peermanager.h b/src/avalanche/peermanager.h index a42a6e936..08c6e31f2 100644 --- a/src/avalanche/peermanager.h +++ b/src/avalanche/peermanager.h @@ -1,414 +1,414 @@ // Copyright (c) 2020 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PEERMANAGER_H #define BITCOIN_AVALANCHE_PEERMANAGER_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include class CScheduler; namespace avalanche { /** * Maximum number of orphan proofs the peer manager will accept from the * network. Under good conditions, this allows the node to collect relevant * proofs during IBD. Note that reorgs can cause the orphan pool to * temporarily exceed this limit. But a change in chaintip cause previously * reorged proofs to be trimmed. */ static constexpr uint32_t AVALANCHE_MAX_ORPHAN_PROOFS = 4000; class Delegation; namespace { struct TestPeerManager; } struct Slot { private: uint64_t start; uint32_t score; PeerId peerid; public: Slot(uint64_t startIn, uint32_t scoreIn, PeerId peeridIn) : start(startIn), score(scoreIn), peerid(peeridIn) {} Slot withStart(uint64_t startIn) const { return Slot(startIn, score, peerid); } Slot withScore(uint64_t scoreIn) const { return Slot(start, scoreIn, peerid); } Slot withPeerId(PeerId peeridIn) const { return Slot(start, score, peeridIn); } uint64_t getStart() const { return start; } uint64_t getStop() const { return start + score; } uint32_t getScore() const { return score; } PeerId getPeerId() const { return peerid; } bool contains(uint64_t slot) const { return getStart() <= slot && slot < getStop(); } bool precedes(uint64_t slot) const { return slot >= getStop(); } bool follows(uint64_t slot) const { return getStart() > slot; } }; struct Peer { PeerId peerid; uint32_t index = -1; uint32_t node_count = 0; ProofRef proof; // The network stack uses timestamp in seconds, so we oblige. std::chrono::seconds registration_time; std::chrono::seconds nextPossibleConflictTime; /** * Consider dropping the peer if no node is attached after this timeout * expired. */ static constexpr auto DANGLING_TIMEOUT = 15min; Peer(PeerId peerid_, ProofRef proof_, std::chrono::seconds nextPossibleConflictTime_) : peerid(peerid_), proof(std::move(proof_)), registration_time(GetTime()), nextPossibleConflictTime(std::move(nextPossibleConflictTime_)) {} const ProofId &getProofId() const { return proof->getId(); } uint32_t getScore() const { return proof->getScore(); } }; struct proof_index { using result_type = ProofId; result_type operator()(const Peer &p) const { return p.proof->getId(); } }; struct score_index { using result_type = uint32_t; result_type operator()(const Peer &p) const { return p.getScore(); } }; struct next_request_time {}; struct PendingNode { ProofId proofid; NodeId nodeid; PendingNode(ProofId proofid_, NodeId nodeid_) : proofid(proofid_), nodeid(nodeid_){}; }; struct by_proofid; struct by_nodeid; struct by_score; enum class ProofRegistrationResult { NONE = 0, ALREADY_REGISTERED, ORPHAN, INVALID, CONFLICTING, REJECTED, COOLDOWN_NOT_ELAPSED, }; class ProofRegistrationState : public ValidationState { }; namespace bmi = boost::multi_index; class PeerManager { std::vector slots; uint64_t slotCount = 0; uint64_t fragmentation = 0; /** * Several nodes can make an avalanche peer. In this case, all nodes are * considered interchangeable parts of the same peer. */ using PeerSet = boost::multi_index_container< Peer, bmi::indexed_by< // index by peerid bmi::hashed_unique>, // index by proof bmi::hashed_unique, proof_index, SaltedProofIdHasher>, // ordered by score, decreasing order bmi::ordered_non_unique, score_index, std::greater>>>; PeerId nextPeerId = 0; PeerSet peers; ProofPool validProofPool; ProofPool conflictingProofPool; ProofPool orphanProofPool; using ProofRadixTree = RadixTree; ProofRadixTree shareableProofs; using NodeSet = boost::multi_index_container< Node, bmi::indexed_by< // index by nodeid bmi::hashed_unique>, // sorted by peerid/nextRequestTime bmi::ordered_non_unique< bmi::tag, bmi::composite_key< Node, bmi::member, bmi::member>>>>; NodeSet nodes; /** * Flag indicating that we failed to select a node and need to expand our * node set. */ std::atomic needMoreNodes{false}; using PendingNodeSet = boost::multi_index_container< PendingNode, bmi::indexed_by< // index by proofid bmi::hashed_non_unique< bmi::tag, bmi::member, SaltedProofIdHasher>, // index by nodeid bmi::hashed_unique< bmi::tag, bmi::member>>>; PendingNodeSet pendingNodes; static constexpr int SELECT_PEER_MAX_RETRY = 3; static constexpr int SELECT_NODE_MAX_RETRY = 3; /** * Track proof ids to broadcast */ std::unordered_set m_unbroadcast_proofids; /** * Quorum management. */ uint32_t totalPeersScore = 0; uint32_t connectedPeersScore = 0; public: PeerManager(CScheduler &scheduler); /** * Node API. */ bool addNode(NodeId nodeid, const ProofId &proofid); bool removeNode(NodeId nodeid); size_t getNodeCount() const { return nodes.size(); } size_t getPendingNodeCount() const { return pendingNodes.size(); } // Update when a node is to be polled next. bool updateNextRequestTime(NodeId nodeid, TimePoint timeout); /** * Flag that a node did send its compact proofs. * @return True if the flag changed state, i;e. if this is the first time * the message is accounted for this node. */ bool latchAvaproofsSent(NodeId nodeid); // Randomly select a node to poll. NodeId selectNode(); /** * Returns true if we encountered a lack of node since the last call. */ bool shouldRequestMoreNodes() { return needMoreNodes.exchange(false); } template bool forNode(NodeId nodeid, Callable &&func) const { auto it = nodes.find(nodeid); return it != nodes.end() && func(*it); } template void forEachNode(const Peer &peer, Callable &&func) const { auto &nview = nodes.get(); auto range = nview.equal_range(peer.peerid); for (auto it = range.first; it != range.second; ++it) { func(*it); } } /** * Proof and Peer related API. */ /** * Update the time before which a proof is not allowed to have conflicting * UTXO with this peer's proof. */ bool updateNextPossibleConflictTime(PeerId peerid, const std::chrono::seconds &nextTime); /** * Registration mode * - DEFAULT: Default policy, register only if the proof is unknown and has * no conflict. * - FORCE_ACCEPT: Turn a valid proof into a peer even if it has conflicts * and is not the best candidate. */ enum class RegistrationMode { DEFAULT, FORCE_ACCEPT, }; bool registerProof(const ProofRef &proof, ProofRegistrationState ®istrationState, RegistrationMode mode = RegistrationMode::DEFAULT); bool registerProof(const ProofRef &proof, RegistrationMode mode = RegistrationMode::DEFAULT) { ProofRegistrationState dummy; return registerProof(proof, dummy, mode); } /** * Rejection mode * - DEFAULT: Default policy, reject a proof and attempt to keep it in the * conflicting pool if possible. * - INVALIDATE: Reject a proof by removing it from any of the pool. * * In any case if a peer is rejected, it attempts to pull the conflicting * proofs back. */ enum class RejectionMode { DEFAULT, INVALIDATE, }; bool rejectProof(const ProofId &proofid, RejectionMode mode = RejectionMode::DEFAULT); bool exists(const ProofId &proofid) const { return getProof(proofid) != nullptr; } template bool forPeer(const ProofId &proofid, Callable &&func) const { auto &pview = peers.get(); auto it = pview.find(proofid); return it != pview.end() && func(*it); } template void forEachPeer(Callable &&func) const { for (const auto &p : peers) { func(p); } } /** * Update the peer set when a new block is connected. */ - void updatedBlockTip(); + std::unordered_set updatedBlockTip(); /** * Proof broadcast API. */ void addUnbroadcastProof(const ProofId &proofid); void removeUnbroadcastProof(const ProofId &proofid); auto getUnbroadcastProofs() const { return m_unbroadcast_proofids; } /* * Quorum management */ uint32_t getTotalPeersScore() const { return totalPeersScore; } uint32_t getConnectedPeersScore() const { return connectedPeersScore; } /**************************************************** * Functions which are public for testing purposes. * ****************************************************/ /** * Remove an existing peer. */ bool removePeer(const PeerId peerid); /** * Randomly select a peer to poll. */ PeerId selectPeer() const; /** * Trigger maintenance of internal data structures. * Returns how much slot space was saved after compaction. */ uint64_t compact(); /** * Perform consistency check on internal data structures. */ bool verify() const; // Accessors. uint64_t getSlotCount() const { return slotCount; } uint64_t getFragmentation() const { return fragmentation; } ProofRef getProof(const ProofId &proofid) const; bool isBoundToPeer(const ProofId &proofid) const; bool isOrphan(const ProofId &proofid) const; bool isInConflictingPool(const ProofId &proofid) const; const ProofRadixTree &getShareableProofsSnapshot() const { return shareableProofs; } private: template void moveToConflictingPool(const ProofContainer &proofs); bool addOrUpdateNode(const PeerSet::iterator &it, NodeId nodeid); bool addNodeToPeer(const PeerSet::iterator &it); bool removeNodeFromPeer(const PeerSet::iterator &it, uint32_t count = 1); void cleanupDanglingProofs(); friend struct ::avalanche::TestPeerManager; }; /** * Internal methods that are exposed for testing purposes. */ PeerId selectPeerImpl(const std::vector &slots, const uint64_t slot, const uint64_t max); } // namespace avalanche #endif // BITCOIN_AVALANCHE_PEERMANAGER_H diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 7bf3d4685..25f75374a 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,953 +1,960 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include // For DecodeSecret #include #include #include #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; namespace avalanche { static bool VerifyProof(const Proof &proof, bilingual_str &error) { ProofValidationState proof_state; if (!proof.verify(proof_state)) { switch (proof_state.GetResult()) { case ProofValidationResult::NO_STAKE: error = _("The avalanche proof has no stake."); return false; case ProofValidationResult::DUST_THRESOLD: error = _("The avalanche proof stake is too low."); return false; case ProofValidationResult::DUPLICATE_STAKE: error = _("The avalanche proof has duplicated stake."); return false; case ProofValidationResult::INVALID_STAKE_SIGNATURE: error = _("The avalanche proof has invalid stake signatures."); return false; case ProofValidationResult::TOO_MANY_UTXOS: error = strprintf( _("The avalanche proof has too many utxos (max: %u)."), AVALANCHE_MAX_PROOF_STAKES); return false; default: error = _("The avalanche proof is invalid."); return false; } } return true; } static bool VerifyDelegation(const Delegation &dg, const CPubKey &expectedPubKey, bilingual_str &error) { DelegationState dg_state; CPubKey auth; if (!dg.verify(dg_state, auth)) { switch (dg_state.GetResult()) { case avalanche::DelegationResult::INVALID_SIGNATURE: error = _("The avalanche delegation has invalid signatures."); return false; case avalanche::DelegationResult::TOO_MANY_LEVELS: error = _( "The avalanche delegation has too many delegation levels."); return false; default: error = _("The avalanche delegation is invalid."); return false; } } if (auth != expectedPubKey) { error = _( "The avalanche delegation does not match the expected public key."); return false; } return true; } struct Processor::PeerData { ProofRef proof; Delegation delegation; }; class Processor::NotificationsHandler : public interfaces::Chain::Notifications { Processor *m_processor; public: NotificationsHandler(Processor *p) : m_processor(p) {} void updatedBlockTip() override { - LOCK(m_processor->cs_peerManager); + auto registerProofs = [&]() { + LOCK(m_processor->cs_peerManager); - if (m_processor->peerData && m_processor->peerData->proof) { - m_processor->peerManager->registerProof( - m_processor->peerData->proof); - } + if (m_processor->peerData && m_processor->peerData->proof) { + m_processor->peerManager->registerProof( + m_processor->peerData->proof); + } - m_processor->peerManager->updatedBlockTip(); + return m_processor->peerManager->updatedBlockTip(); + }; + + auto registeredProofs = registerProofs(); + for (const auto &proof : registeredProofs) { + m_processor->addProofToReconcile(proof); + } } }; Processor::Processor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connmanIn, CScheduler &scheduler, std::unique_ptr peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, int64_t minAvaproofsNodeCountIn, uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn) : connman(connmanIn), queryTimeoutDuration(argsman.GetArg( "-avatimeout", AVALANCHE_DEFAULT_QUERY_TIMEOUT.count())), round(0), peerManager(std::make_unique(scheduler)), peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)), minQuorumScore(minQuorumTotalScoreIn), minQuorumConnectedScoreRatio(minQuorumConnectedScoreRatioIn), minAvaproofsNodeCount(minAvaproofsNodeCountIn), staleVoteThreshold(staleVoteThresholdIn), staleVoteFactor(staleVoteFactorIn) { // Make sure we get notified of chain state changes. chainNotificationsHandler = chain.handleNotifications(std::make_shared(this)); } Processor::~Processor() { chainNotificationsHandler.reset(); stopEventLoop(); } std::unique_ptr Processor::MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, CScheduler &scheduler, bilingual_str &error) { std::unique_ptr peerData; CKey masterKey; CKey sessionKey; if (argsman.IsArgSet("-avasessionkey")) { sessionKey = DecodeSecret(argsman.GetArg("-avasessionkey", "")); if (!sessionKey.IsValid()) { error = _("The avalanche session key is invalid."); return nullptr; } } else { // Pick a random key for the session. sessionKey.MakeNewKey(true); } if (argsman.IsArgSet("-avaproof")) { if (!argsman.IsArgSet("-avamasterkey")) { error = _( "The avalanche master key is missing for the avalanche proof."); return nullptr; } masterKey = DecodeSecret(argsman.GetArg("-avamasterkey", "")); if (!masterKey.IsValid()) { error = _("The avalanche master key is invalid."); return nullptr; } auto proof = RCUPtr::make(); if (!Proof::FromHex(*proof, argsman.GetArg("-avaproof", ""), error)) { // error is set by FromHex return nullptr; } peerData = std::make_unique(); peerData->proof = std::move(proof); if (!VerifyProof(*peerData->proof, error)) { // error is set by VerifyProof return nullptr; } std::unique_ptr dgb; const CPubKey &masterPubKey = masterKey.GetPubKey(); if (argsman.IsArgSet("-avadelegation")) { Delegation dg; if (!Delegation::FromHex(dg, argsman.GetArg("-avadelegation", ""), error)) { // error is set by FromHex() return nullptr; } if (dg.getProofId() != peerData->proof->getId()) { error = _("The delegation does not match the proof."); return nullptr; } if (masterPubKey != dg.getDelegatedPubkey()) { error = _( "The master key does not match the delegation public key."); return nullptr; } dgb = std::make_unique(dg); } else { if (masterPubKey != peerData->proof->getMaster()) { error = _("The master key does not match the proof public key."); return nullptr; } dgb = std::make_unique(*peerData->proof); } // Generate the delegation to the session key. const CPubKey sessionPubKey = sessionKey.GetPubKey(); if (sessionPubKey != masterPubKey) { if (!dgb->addLevel(masterKey, sessionPubKey)) { error = _("Failed to generate a delegation for this session."); return nullptr; } } peerData->delegation = dgb->build(); if (!VerifyDelegation(peerData->delegation, sessionPubKey, error)) { // error is set by VerifyDelegation return nullptr; } } // Determine quorum parameters Amount minQuorumStake = AVALANCHE_DEFAULT_MIN_QUORUM_STAKE; if (argsman.IsArgSet("-avaminquorumstake") && !ParseMoney(argsman.GetArg("-avaminquorumstake", ""), minQuorumStake)) { error = _("The avalanche min quorum stake amount is invalid."); return nullptr; } if (!MoneyRange(minQuorumStake)) { error = _("The avalanche min quorum stake amount is out of range."); return nullptr; } double minQuorumConnectedStakeRatio = AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO; if (argsman.IsArgSet("-avaminquorumconnectedstakeratio") && !ParseDouble(argsman.GetArg("-avaminquorumconnectedstakeratio", ""), &minQuorumConnectedStakeRatio)) { error = _("The avalanche min quorum connected stake ratio is invalid."); return nullptr; } if (minQuorumConnectedStakeRatio < 0 || minQuorumConnectedStakeRatio > 1) { error = _( "The avalanche min quorum connected stake ratio is out of range."); return nullptr; } int64_t minAvaproofsNodeCount = argsman.GetArg("-avaminavaproofsnodecount", AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT); if (minAvaproofsNodeCount < 0) { error = _("The minimum number of node that sent avaproofs message " "should be non-negative"); return nullptr; } // Determine voting parameters int64_t staleVoteThreshold = argsman.GetArg("-avastalevotethreshold", AVALANCHE_VOTE_STALE_THRESHOLD); if (staleVoteThreshold < AVALANCHE_VOTE_STALE_MIN_THRESHOLD) { error = strprintf(_("The avalanche stale vote threshold must be " "greater than or equal to %d"), AVALANCHE_VOTE_STALE_MIN_THRESHOLD); return nullptr; } if (staleVoteThreshold > std::numeric_limits::max()) { error = strprintf(_("The avalanche stale vote threshold must be less " "than or equal to %d"), std::numeric_limits::max()); return nullptr; } int64_t staleVoteFactor = argsman.GetArg("-avastalevotefactor", AVALANCHE_VOTE_STALE_FACTOR); if (staleVoteFactor <= 0) { error = _("The avalanche stale vote factor must be greater than 0"); return nullptr; } if (staleVoteFactor > std::numeric_limits::max()) { error = strprintf(_("The avalanche stale vote factor must be less than " "or equal to %d"), std::numeric_limits::max()); return nullptr; } // We can't use std::make_unique with a private constructor return std::unique_ptr(new Processor( argsman, chain, connman, scheduler, std::move(peerData), std::move(sessionKey), Proof::amountToScore(minQuorumStake), minQuorumConnectedStakeRatio, minAvaproofsNodeCount, staleVoteThreshold, staleVoteFactor)); } bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { if (!pindex) { // isWorthPolling expects this to be non-null, so bail early. return false; } bool isAccepted; { LOCK(cs_main); if (!isWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = ::ChainActive().Contains(pindex); } return blockVoteRecords.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } bool Processor::addProofToReconcile(const ProofRef &proof) { if (!proof) { // isWorthPolling expects this to be non-null, so bail early. return false; } bool isAccepted; { LOCK(cs_peerManager); if (!isWorthPolling(proof)) { return false; } isAccepted = peerManager->isBoundToPeer(proof->getId()); } return proofVoteRecords.getWriteView() ->insert(std::make_pair(proof, VoteRecord(isAccepted))) .second; } bool Processor::isAccepted(const CBlockIndex *pindex) const { if (!pindex) { // CBlockIndexWorkComparator expects this to be non-null, so bail early. return false; } auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } bool Processor::isAccepted(const ProofRef &proof) const { auto r = proofVoteRecords.getReadView(); auto it = r->find(proof); if (it == r.end()) { return false; } return it->second.isAccepted(); } int Processor::getConfidence(const CBlockIndex *pindex) const { if (!pindex) { // CBlockIndexWorkComparator expects this to be non-null, so bail early. return -1; } auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } int Processor::getConfidence(const ProofRef &proof) const { auto r = proofVoteRecords.getReadView(); auto it = r->find(proof); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is * not secure. */ class TCPResponse { Response response; SchnorrSig sig; public: TCPResponse(Response responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! if (!key.SignSchnorr(hash, sig)) { sig.fill(0); } } // serialization support SERIALIZE_METHODS(TCPResponse, obj) { READWRITE(obj.response, obj.sig); } }; } // namespace void Processor::sendResponse(CNode *pfrom, Response response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVARESPONSE, TCPResponse(std::move(response), sessionKey))); } bool Processor::registerVotes(NodeId nodeid, const Response &response, std::vector &blockUpdates, std::vector &proofUpdates, int &banscore, std::string &error) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { banscore = 2; error = "unexpected-ava-response"; return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { banscore = 100; error = "invalid-ava-response-size"; return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { banscore = 100; error = "invalid-ava-response-content"; return false; } } std::map responseIndex; std::map responseProof; // At this stage we are certain that invs[i] matches votes[i], so we can use // the inv type to retrieve what is being voted on. for (size_t i = 0; i < size; i++) { if (invs[i].IsMsgBlk()) { CBlockIndex *pindex; { LOCK(cs_main); pindex = g_chainman.m_blockman.LookupBlockIndex( BlockHash(votes[i].GetHash())); if (!pindex) { // This should not happen, but just in case... continue; } if (!isWorthPolling(pindex)) { // There is no point polling this block. continue; } } responseIndex.insert(std::make_pair(pindex, votes[i])); } if (invs[i].IsMsgProof()) { const ProofId proofid(votes[i].GetHash()); ProofRef proof; { LOCK(cs_peerManager); proof = peerManager->getProof(proofid); if (!proof) { continue; } if (!isWorthPolling(proof)) { continue; } } responseProof.insert(std::make_pair(proof, votes[i])); } } // Thanks to C++14 generic lambdas, we can apply the same logic to various // parameter types sharing the same interface. auto registerVoteItems = [&](auto voteRecordsWriteView, auto &updates, auto responseItems) { // Register votes. for (const auto &p : responseItems) { auto item = p.first; const Vote &v = p.second; auto it = voteRecordsWriteView->find(item); if (it == voteRecordsWriteView.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { if (vr.isStale(staleVoteThreshold, staleVoteFactor)) { updates.emplace_back(item, VoteStatus::Stale); // Just drop stale votes. If we see this item again, we'll // do a new vote. voteRecordsWriteView->erase(it); } // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back(item, vr.isAccepted() ? VoteStatus::Accepted : VoteStatus::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(item, vr.isAccepted() ? VoteStatus::Finalized : VoteStatus::Invalid); voteRecordsWriteView->erase(it); } }; registerVoteItems(blockVoteRecords.getWriteView(), blockUpdates, responseIndex); registerVoteItems(proofVoteRecords.getWriteView(), proofUpdates, responseProof); return true; } CPubKey Processor::getSessionPubKey() const { return sessionKey.GetPubKey(); } bool Processor::sendHello(CNode *pfrom) const { if (!peerData) { // We do not have a delegation to advertise. return false; } // Now let's sign! SchnorrSig sig; { const uint256 hash = buildLocalSighash(pfrom); if (!sessionKey.SignSchnorr(hash, sig)) { return false; } } connman->PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVAHELLO, Hello(peerData->delegation, sig))); pfrom->AddKnownProof(peerData->delegation.getProofId()); return true; } ProofRef Processor::getLocalProof() const { return peerData ? peerData->proof : ProofRef(); } bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } void Processor::avaproofsSent(NodeId nodeid) { if (::ChainstateActive().IsInitialBlockDownload()) { // Before IBD is complete there is no way to make sure a proof is valid // or not, e.g. it can be spent in a block we don't know yet. In order // to increase confidence that our proof set is similar to other nodes // on the network, the messages received during IBD are not accounted. return; } LOCK(cs_peerManager); if (peerManager->latchAvaproofsSent(nodeid)) { avaproofsNodeCounter++; } } /* * Returns a bool indicating whether we have a usable Avalanche quorum enabling * us to take decisions based on polls. */ bool Processor::isQuorumEstablished() { if (quorumIsEstablished) { return true; } // Don't do Avalanche while node is IBD'ing if (::ChainstateActive().IsInitialBlockDownload()) { return false; } if (avaproofsNodeCounter < minAvaproofsNodeCount) { return false; } auto localProof = getLocalProof(); // Get the registered proof score and registered score we have nodes for uint32_t totalPeersScore; uint32_t connectedPeersScore; { LOCK(cs_peerManager); totalPeersScore = peerManager->getTotalPeersScore(); connectedPeersScore = peerManager->getConnectedPeersScore(); // Consider that we are always connected to our proof, even if we are // the single node using that proof. if (localProof && peerManager->forPeer(localProof->getId(), [](const Peer &peer) { return peer.node_count == 0; })) { connectedPeersScore += localProof->getScore(); } } // Ensure enough is being staked overall if (totalPeersScore < minQuorumScore) { return false; } // Ensure we have connected score for enough of the overall score uint32_t minConnectedScore = std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); if (connectedPeersScore < minConnectedScore) { return false; } quorumIsEstablished = true; return true; } void Processor::FinalizeNode(const Config &config, const CNode &node, bool &update_connection_time) { WITH_LOCK(cs_peerManager, peerManager->removeNode(node.GetId())); } void Processor::runEventLoop() { // Don't poll if quorum hasn't been established yet if (!isQuorumEstablished()) { return; } // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. LOCK(cs_peerManager); peerManager->updateNextRequestTime(pnode->GetId(), timeout); } pnode->m_avalanche_state->invsPolled(invs.size()); // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetCommonVersion()) .Make(NetMsgType::AVAPOLL, Poll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } { // This node is obsolete, delete it. LOCK(cs_peerManager); peerManager->removeNode(nodeid); } // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); } while (nodeid != NO_NODE); } void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } auto clearInflightRequest = [&](auto &voteRecords, const auto &voteItem, uint8_t count) { if (!voteItem) { return false; } auto voteRecordsWriteView = voteRecords.getWriteView(); auto it = voteRecordsWriteView->find(voteItem); if (it == voteRecordsWriteView.end()) { return false; } it->second.clearInflightRequest(count); return true; }; // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; if (inv.IsMsgBlk()) { const CBlockIndex *pindex = WITH_LOCK( cs_main, return g_chainman.m_blockman.LookupBlockIndex( BlockHash(inv.hash))); if (!clearInflightRequest(blockVoteRecords, pindex, p.second)) { continue; } } if (inv.IsMsgProof()) { const ProofRef proof = WITH_LOCK(cs_peerManager, return peerManager->getProof(ProofId(inv.hash))); if (!clearInflightRequest(proofVoteRecords, proof, p.second)) { continue; } } } } std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; // Use NO_THREAD_SAFETY_ANALYSIS to avoid false positive due to // isWorthPolling requiring a different lock depending of the prototype. auto removeItemsNotWorthPolling = [&](auto &itemVoteRecords) NO_THREAD_SAFETY_ANALYSIS { auto w = itemVoteRecords.getWriteView(); for (auto it = w->begin(); it != w->end();) { if (!isWorthPolling(it->first)) { it = w->erase(it); } else { ++it; } } }; auto extractVoteRecordsToInvs = [&](const auto &itemVoteRecordRange, auto buildInvFromVoteItem) { for (const auto &[item, voteRecord] : itemVoteRecordRange) { if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return true; } const bool shouldPoll = forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); if (!shouldPoll) { continue; } invs.emplace_back(buildInvFromVoteItem(item)); } return invs.size() >= AVALANCHE_MAX_ELEMENT_POLL; }; // First remove all proofs that are not worth polling. WITH_LOCK(cs_peerManager, removeItemsNotWorthPolling(proofVoteRecords)); if (extractVoteRecordsToInvs(proofVoteRecords.getReadView(), [](const ProofRef &proof) { return CInv(MSG_AVA_PROOF, proof->getId()); })) { // The inventory vector is full, we're done return invs; } // First remove all blocks that are not worth polling. WITH_LOCK(cs_main, removeItemsNotWorthPolling(blockVoteRecords)); auto r = blockVoteRecords.getReadView(); extractVoteRecordsToInvs(reverse_iterate(r), [](const CBlockIndex *pindex) { return CInv(MSG_BLOCK, pindex->GetBlockHash()); }); return invs; } NodeId Processor::getSuitableNodeToQuery() { LOCK(cs_peerManager); return peerManager->selectNode(); } uint256 Processor::buildLocalSighash(CNode *pfrom) const { CHashWriter hasher(SER_GETHASH, 0); hasher << peerData->delegation.getId(); hasher << pfrom->GetLocalNonce(); hasher << pfrom->nRemoteHostNonce; hasher << pfrom->GetLocalExtraEntropy(); hasher << pfrom->nRemoteExtraEntropy; return hasher.GetHash(); } bool Processor::isWorthPolling(const CBlockIndex *pindex) const { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (::ChainstateActive().IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } bool Processor::isWorthPolling(const ProofRef &proof) const { AssertLockHeld(cs_peerManager); if (!gArgs.GetBoolArg("-enableavalancheproofreplacement", AVALANCHE_DEFAULT_PROOF_REPLACEMENT_ENABLED)) { // If proof replacement is not enabled there is no point dealing // with proof polling, so we're done. return false; } const ProofId &proofid = proof->getId(); // No point polling orphans or discarded proofs return peerManager->isBoundToPeer(proofid) || peerManager->isInConflictingPool(proofid); } } // namespace avalanche diff --git a/src/avalanche/proofpool.cpp b/src/avalanche/proofpool.cpp index 81ad06bef..0379e2f9b 100644 --- a/src/avalanche/proofpool.cpp +++ b/src/avalanche/proofpool.cpp @@ -1,138 +1,141 @@ // Copyright (c) 2021 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include namespace avalanche { ProofPool::AddProofStatus ProofPool::addProofIfNoConflict(const ProofRef &proof, ConflictingProofSet &conflictingProofs) { const ProofId &proofid = proof->getId(); // Make sure the set is empty before we add items conflictingProofs.clear(); auto &poolView = pool.get(); if (poolView.find(proofid) != poolView.end()) { return AddProofStatus::DUPLICATED; } // Attach UTXOs to this proof. for (size_t i = 0; i < proof->getStakes().size(); i++) { auto p = pool.emplace(i, proof); if (!p.second) { // We have a collision with an existing proof. conflictingProofs.insert(p.first->proof); } } // If there is a conflict, just cleanup the mess. if (conflictingProofs.size() > 0) { for (const auto &s : proof->getStakes()) { auto it = pool.find(s.getStake().getUTXO()); assert(it != pool.end()); // We need to delete that one. if (it->proof->getId() == proofid) { pool.erase(it); } } return AddProofStatus::REJECTED; } cacheClean = false; return AddProofStatus::SUCCEED; } ProofPool::AddProofStatus ProofPool::addProofIfPreferred(const ProofRef &proof, ConflictingProofSet &conflictingProofs) { auto status = addProofIfNoConflict(proof, conflictingProofs); // In case the proof was rejected due to conflict and it is the best // candidate, override the conflicting ones and add it again if (status != AddProofStatus::REJECTED || ConflictingProofComparator()(*conflictingProofs.begin(), proof)) { return status; } for (auto &conflictingProof : conflictingProofs) { removeProof(conflictingProof->getId()); } status = addProofIfNoConflict(proof); assert(status == AddProofStatus::SUCCEED); cacheClean = false; return AddProofStatus::SUCCEED; } // Having the ProofId passed by reference is risky because it is usually a // reference to a proof member. This proof will be deleted during the erasure // loop so we pass it by value. bool ProofPool::removeProof(ProofId proofid) { cacheClean = false; auto &poolView = pool.get(); return poolView.erase(proofid); } -void ProofPool::rescan(PeerManager &peerManager) { +std::unordered_set +ProofPool::rescan(PeerManager &peerManager) { auto previousPool = std::move(pool); pool.clear(); cacheClean = false; std::unordered_set registeredProofs; for (auto &entry : previousPool) { if (registeredProofs.insert(entry.proof).second) { peerManager.registerProof(entry.proof); } } + + return registeredProofs; } ProofRef ProofPool::getProof(const ProofId &proofid) const { auto &poolView = pool.get(); auto it = poolView.find(proofid); return it == poolView.end() ? ProofRef() : it->proof; } ProofRef ProofPool::getProof(const COutPoint &outpoint) const { auto it = pool.find(outpoint); return it == pool.end() ? ProofRef() : it->proof; } ProofRef ProofPool::getLowestScoreProof() const { auto &poolView = pool.get(); return poolView.rbegin() == poolView.rend() ? ProofRef() : poolView.rbegin()->proof; } size_t ProofPool::countProofs() { if (cacheClean) { return cacheProofCount; } size_t count = 0; ProofId lastProofId; auto &poolView = pool.get(); for (auto it = poolView.begin(); it != poolView.end(); it++) { const ProofId &proofId = it->proof->getId(); if (lastProofId != proofId) { lastProofId = proofId; count++; } } cacheProofCount = count; cacheClean = true; return cacheProofCount; } } // namespace avalanche diff --git a/src/avalanche/proofpool.h b/src/avalanche/proofpool.h index ba027141b..5ddf7bf55 100644 --- a/src/avalanche/proofpool.h +++ b/src/avalanche/proofpool.h @@ -1,124 +1,125 @@ // Copyright (c) 2021 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_AVALANCHE_PROOFPOOL_H #define BITCOIN_AVALANCHE_PROOFPOOL_H #include #include #include #include #include #include #include #include #include #include #include namespace avalanche { class PeerManager; struct ProofPoolEntry { size_t utxoIndex; ProofRef proof; const COutPoint &getUTXO() const { return proof->getStakes().at(utxoIndex).getStake().getUTXO(); } ProofPoolEntry(size_t _utxoIndex, ProofRef _proof) : utxoIndex(_utxoIndex), proof(std::move(_proof)) {} }; struct by_utxo; struct by_proofid; struct by_proof_score; struct ProofPoolEntryProofIdKeyExtractor { using result_type = ProofId; result_type operator()(const ProofPoolEntry &entry) const { return entry.proof->getId(); } }; namespace bmi = boost::multi_index; /** * Map a proof to each utxo. A proof can be mapped with several utxos. */ class ProofPool { boost::multi_index_container< ProofPoolEntry, bmi::indexed_by< // index by utxo bmi::hashed_unique< bmi::tag, bmi::const_mem_fun, SaltedOutpointHasher>, // index by proofid bmi::hashed_non_unique, ProofPoolEntryProofIdKeyExtractor, SaltedProofIdHasher>, // index by proof score bmi::ordered_non_unique< bmi::tag, bmi::member, ProofComparatorByScore>>> pool; bool cacheClean = true; size_t cacheProofCount = 0; public: enum AddProofStatus { REJECTED = 0, //!< Rejected due to conflicts SUCCEED = 1, //!< Added successfully DUPLICATED = 2, //!< Already in pool }; using ConflictingProofSet = std::set; /** * Attempt to add a proof to the pool, and fail if there is a conflict on * any UTXO. */ AddProofStatus addProofIfNoConflict(const ProofRef &proof, ConflictingProofSet &conflictingProofs); AddProofStatus addProofIfNoConflict(const ProofRef &proof) { ConflictingProofSet dummy; return addProofIfNoConflict(proof, dummy); } /** * Attempt to add a proof to the pool. In case there is a conflict with one * or more UTXO, the proof is only added if it is the best candidate over * all the conflicting proofs according to ConflictingProofComparator. */ AddProofStatus addProofIfPreferred(const ProofRef &proof, ConflictingProofSet &conflictingProofs); AddProofStatus addProofIfPreferred(const ProofRef &proof) { ConflictingProofSet dummy; return addProofIfPreferred(proof, dummy); } bool removeProof(ProofId proofid); - void rescan(PeerManager &peerManager); + std::unordered_set + rescan(PeerManager &peerManager); ProofRef getProof(const ProofId &proofid) const; ProofRef getProof(const COutPoint &outpoint) const; ProofRef getLowestScoreProof() const; size_t size() const { return pool.size(); } size_t countProofs(); }; } // namespace avalanche #endif // BITCOIN_AVALANCHE_PROOFPOOL_H diff --git a/test/functional/abc_p2p_avalanche_proof_voting.py b/test/functional/abc_p2p_avalanche_proof_voting.py index edd6a3b70..d7b027fd8 100755 --- a/test/functional/abc_p2p_avalanche_proof_voting.py +++ b/test/functional/abc_p2p_avalanche_proof_voting.py @@ -1,483 +1,509 @@ #!/usr/bin/env python3 # Copyright (c) 2021 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the resolution of conflicting proofs via avalanche.""" import time from test_framework.avatools import ( avalanche_proof_from_hex, create_coinbase_stakes, gen_proof, get_ava_p2p_interface, get_proof_ids, ) from test_framework.key import ECPubKey from test_framework.messages import ( MSG_AVA_PROOF, AvalancheProofVoteResponse, AvalancheVote, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, assert_greater_than, assert_raises_rpc_error, try_rpc, ) from test_framework.wallet_util import bytes_to_wif QUORUM_NODE_COUNT = 16 class AvalancheProofVotingTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.conflicting_proof_cooldown = 100 self.peer_replacement_cooldown = 2000 self.extra_args = [ ['-enableavalanche=1', '-enableavalancheproofreplacement=1', '-avaproofstakeutxoconfirmations=1', f'-avalancheconflictingproofcooldown={self.conflicting_proof_cooldown}', f'-avalanchepeerreplacementcooldown={self.peer_replacement_cooldown}', '-avacooldown=0', '-avastalevotethreshold=140', '-avastalevotefactor=1'], ] self.supports_cli = False # Build a fake quorum of nodes. def get_quorum(self, node): quorum = [get_ava_p2p_interface(node) for _ in range(0, QUORUM_NODE_COUNT)] for n in quorum: success = node.addavalanchenode( n.nodeid, self.privkey.get_pubkey().get_bytes().hex(), self.quorum_proof.serialize().hex(), ) assert success is True return quorum def can_find_proof_in_poll(self, hash, response): found_hash = False for n in self.quorum: poll = n.get_avapoll_if_available() # That node has not received a poll if poll is None: continue # We got a poll, check for the hash and repond votes = [] for inv in poll.invs: # Vote yes to everything r = AvalancheProofVoteResponse.ACTIVE # Look for what we expect if inv.hash == hash: r = response found_hash = True votes.append(AvalancheVote(r, inv.hash)) n.send_avaresponse(poll.round, votes, self.privkey) return found_hash @staticmethod def send_proof(from_peer, proof_hex): proof = avalanche_proof_from_hex(proof_hex) from_peer.send_avaproof(proof) return proof.proofid def send_and_check_for_polling(self, peer, proof_hex, response=AvalancheProofVoteResponse.ACTIVE): proofid = self.send_proof(peer, proof_hex) self.wait_until(lambda: self.can_find_proof_in_poll(proofid, response)) def build_conflicting_proof(self, node, sequence): return node.buildavalancheproof( sequence, 0, self.privkey_wif, self.conflicting_stakes) def run_test(self): node = self.nodes[0] self.privkey, self.quorum_proof = gen_proof(node) self.privkey_wif = bytes_to_wif(self.privkey.get_bytes()) self.quorum = self.get_quorum(node) addrkey0 = node.get_deterministic_priv_key() blockhash = node.generatetoaddress(10, addrkey0.address) self.conflicting_stakes = create_coinbase_stakes( node, blockhash[5:], addrkey0.key) self.poll_tests(node) self.update_tests(node) self.vote_tests(node) self.stale_proof_tests(node) + self.unorphan_poll_tests(node) def poll_tests(self, node): proof_seq10 = self.build_conflicting_proof(node, 10) proof_seq20 = self.build_conflicting_proof(node, 20) proof_seq30 = self.build_conflicting_proof(node, 30) proof_seq40 = self.build_conflicting_proof(node, 40) orphan = node.buildavalancheproof( 100, 2000000000, self.privkey_wif, [{ 'txid': '0' * 64, 'vout': 0, 'amount': 10e6, 'height': 42, 'iscoinbase': False, 'privatekey': self.privkey_wif, }] ) no_stake = node.buildavalancheproof( 200, 2000000000, self.privkey_wif, [] ) # Get the key so we can verify signatures. avakey = ECPubKey() avakey.set(bytes.fromhex(node.getavalanchekey())) self.log.info("Trigger polling from the node...") peer = get_ava_p2p_interface(node) mock_time = int(time.time()) node.setmocktime(mock_time) self.log.info("Check we poll for valid proof") self.send_and_check_for_polling(peer, proof_seq30) self.log.info( "Check we don't poll for subsequent proofs if the cooldown is not elapsed, proof not the favorite") with node.assert_debug_log(["Not polling the avalanche proof (cooldown-not-elapsed)"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq20)) self.log.info( "Check we don't poll for subsequent proofs if the cooldown is not elapsed, proof is the favorite") with node.assert_debug_log(["Not polling the avalanche proof (cooldown-not-elapsed)"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq40)) self.log.info( "Check we poll for conflicting proof if the proof is not the favorite") mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.send_and_check_for_polling( peer, proof_seq20, response=AvalancheProofVoteResponse.REJECTED) self.log.info( "Check we poll for conflicting proof if the proof is the favorite") mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.send_and_check_for_polling(peer, proof_seq40) mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.log.info("Check we don't poll for orphans") with node.assert_debug_log(["Not polling the avalanche proof (orphan-proof)"]): peer.send_avaproof(avalanche_proof_from_hex(orphan)) self.log.info("Check we don't poll for proofs that get rejected") with node.assert_debug_log(["Not polling the avalanche proof (rejected-proof)"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq10)) self.log.info("Check we don't poll for invalid proofs and get banned") with node.assert_debug_log(["Misbehaving", "invalid-proof"]): peer.send_avaproof(avalanche_proof_from_hex(no_stake)) peer.wait_for_disconnect() self.log.info("We don't poll for proofs if replacement is disabled") self.restart_node( 0, extra_args=self.extra_args[0] + ['-enableavalancheproofreplacement=0']) peer = get_ava_p2p_interface(node) with node.assert_debug_log(["Not polling the avalanche proof (not-worth-polling)"]): peer.send_avaproof(avalanche_proof_from_hex(proof_seq10)) def update_tests(self, node): # Restart the node to get rid of in-flight requests self.restart_node(0) mock_time = int(time.time()) node.setmocktime(mock_time) self.quorum = self.get_quorum(node) peer = get_ava_p2p_interface(node) proof_seq30 = self.build_conflicting_proof(node, 30) proof_seq40 = self.build_conflicting_proof(node, 40) proof_seq50 = self.build_conflicting_proof(node, 50) proofid_seq30 = avalanche_proof_from_hex(proof_seq30).proofid proofid_seq40 = avalanche_proof_from_hex(proof_seq40).proofid proofid_seq50 = avalanche_proof_from_hex(proof_seq50).proofid node.sendavalancheproof(proof_seq40) self.wait_until(lambda: proofid_seq40 in get_proof_ids(node)) assert proofid_seq40 in get_proof_ids(node) assert proofid_seq30 not in get_proof_ids(node) self.log.info("Test proof acceptance") def accept_proof(proofid): self.wait_until(lambda: self.can_find_proof_in_poll( proofid, response=AvalancheProofVoteResponse.ACTIVE), timeout=5) return proofid in get_proof_ids(node) mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) self.send_and_check_for_polling(peer, proof_seq30) # Let the quorum vote for it self.wait_until(lambda: accept_proof(proofid_seq30)) assert proofid_seq40 not in get_proof_ids(node) self.log.info("Test the peer replacement rate limit") # Wait until proof_seq30 is finalized retry = 5 while retry > 0: try: with node.assert_debug_log([f"Avalanche finalized proof {proofid_seq30:0{64}x}"]): self.wait_until(lambda: not self.can_find_proof_in_poll( proofid_seq30, response=AvalancheProofVoteResponse.ACTIVE)) break except AssertionError: retry -= 1 assert_greater_than(retry, 0) # Not enough assert self.conflicting_proof_cooldown < self.peer_replacement_cooldown mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) peer = get_ava_p2p_interface(node) with node.assert_debug_log(["Not polling the avalanche proof (cooldown-not-elapsed)"]): self.send_proof(peer, proof_seq50) mock_time += self.peer_replacement_cooldown node.setmocktime(mock_time) self.log.info("Test proof rejection") self.send_proof(peer, proof_seq50) self.wait_until(lambda: proofid_seq50 in get_proof_ids(node)) assert proofid_seq40 not in get_proof_ids(node) def reject_proof(proofid): self.wait_until( lambda: self.can_find_proof_in_poll( proofid, response=AvalancheProofVoteResponse.REJECTED)) return proofid not in get_proof_ids(node) self.wait_until(lambda: reject_proof(proofid_seq50)) assert proofid_seq50 not in get_proof_ids(node) assert proofid_seq40 in get_proof_ids(node) self.log.info("Test proof invalidation") def invalidate_proof(proofid): self.wait_until( lambda: self.can_find_proof_in_poll( proofid, response=AvalancheProofVoteResponse.REJECTED)) return try_rpc(-8, "Proof not found", node.getrawavalancheproof, f"{proofid:0{64}x}") self.wait_until(lambda: invalidate_proof(proofid_seq50)) self.log.info("The node will now ignore the invalid proof") for i in range(5): with node.assert_debug_log(["received: avaproof"]): self.send_proof(peer, proof_seq50) assert_raises_rpc_error(-8, "Proof not found", node.getrawavalancheproof, f"{proofid_seq50:0{64}x}") def vote_tests(self, node): self.restart_node(0, extra_args=['-enableavalanche=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', '-avalancheconflictingproofcooldown=0', '-whitelist=noban@127.0.0.1', ]) ava_node = get_ava_p2p_interface(node) # Generate coinbases to use for stakes stakes_key = node.get_deterministic_priv_key() blocks = node.generatetoaddress(4, stakes_key.address) # Get the ava key so we can verify signatures. ava_key = ECPubKey() ava_key.set(bytes.fromhex(node.getavalanchekey())) def create_proof(stakes): proof = node.buildavalancheproof(11, 12, self.privkey_wif, stakes) proof_id = avalanche_proof_from_hex(proof).proofid return proof, proof_id # proof_0 is valid right now stakes_0 = create_coinbase_stakes(node, [blocks[0]], stakes_key.key) proof_0, proof_0_id = create_proof(stakes_0) # proof_1 is valid right now, and from different stakes stakes_1 = create_coinbase_stakes(node, [blocks[1]], stakes_key.key) proof_1, proof_1_id = create_proof(stakes_1) # proof_2 is an orphan because the stake UTXO is unknown stakes_2 = create_coinbase_stakes(node, [blocks[2]], stakes_key.key) stakes_2[0]['height'] = 5 proof_2, proof_2_id = create_proof(stakes_2) # proof_3 conflicts with proof_0 and proof_1 stakes_3 = create_coinbase_stakes( node, [blocks[0], blocks[1]], stakes_key.key) proof_3, proof_3_id = create_proof(stakes_3) # proof_4 is invalid and should be rejected stakes_4 = create_coinbase_stakes(node, [blocks[3]], stakes_key.key) stakes_4[0]['amount'] -= 100000 proof_4, proof_4_id = create_proof(stakes_4) # Create a helper to issue a poll and validate the responses def poll_assert_response(expected): # Issue a poll for each proof self.log.info("Trigger polling from the node...") ava_node.send_poll( [proof_0_id, proof_1_id, proof_2_id, proof_3_id, proof_4_id], MSG_AVA_PROOF) response = ava_node.wait_for_avaresponse() r = response.response # Verify signature assert ava_key.verify_schnorr(response.sig, r.get_hash()) # Verify votes votes = r.votes assert_equal(len(votes), len(expected)) for i in range(0, len(votes)): assert_equal(repr(votes[i]), repr(expected[i])) # Check that all proofs start unknown poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # Send the first proof. Nodes should now respond that it's accepted node.sendavalancheproof(proof_0) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # Send and check the 2nd proof. Nodes should now respond that it's # accepted node.sendavalancheproof(proof_1) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # The next proof should be rejected/put in the orphan pool ava_node.send_proof(avalanche_proof_from_hex(proof_2)) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.ORPHAN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # The next proof should be rejected and marked as a conflicting proof assert_raises_rpc_error(-8, "The proof has conflicting utxo with an existing proof", node.sendavalancheproof, proof_3) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.ORPHAN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.CONFLICT, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proof_4_id)]) # The final proof should be permanently rejected for being completely # invalid ava_node.send_proof(avalanche_proof_from_hex(proof_4)) poll_assert_response([ AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_0_id), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proof_1_id), AvalancheVote(AvalancheProofVoteResponse.ORPHAN, proof_2_id), AvalancheVote(AvalancheProofVoteResponse.CONFLICT, proof_3_id), AvalancheVote(AvalancheProofVoteResponse.REJECTED, proof_4_id)]) def stale_proof_tests(self, node): # Restart the node to get rid of in-flight requests self.restart_node(0) mock_time = int(time.time()) node.setmocktime(mock_time) self.quorum = self.get_quorum(node) peer = get_ava_p2p_interface(node) proof_seq1 = self.build_conflicting_proof(node, 1) proof_seq2 = self.build_conflicting_proof(node, 2) proofid_seq1 = avalanche_proof_from_hex(proof_seq1).proofid proofid_seq2 = avalanche_proof_from_hex(proof_seq2).proofid node.sendavalancheproof(proof_seq2) self.wait_until(lambda: proofid_seq2 in get_proof_ids(node)) assert proofid_seq2 in get_proof_ids(node) assert proofid_seq1 not in get_proof_ids(node) mock_time += self.conflicting_proof_cooldown node.setmocktime(mock_time) peer.send_avaproof(avalanche_proof_from_hex(proof_seq1)) # Wait until proof_seq1 voting goes stale retry = 5 while retry > 0: try: with node.assert_debug_log([f"Avalanche stalled proof {proofid_seq1:0{64}x}"]): self.wait_until(lambda: not self.can_find_proof_in_poll( proofid_seq1, response=AvalancheProofVoteResponse.UNKNOWN), timeout=10) break except AssertionError: retry -= 1 assert_greater_than(retry, 0) # Verify that proof_seq2 was not replaced assert proofid_seq2 in get_proof_ids(node) assert proofid_seq1 not in get_proof_ids(node) # When polled, peer responds with expected votes for both proofs peer.send_poll([proofid_seq1, proofid_seq2], MSG_AVA_PROOF) response = peer.wait_for_avaresponse() assert repr(response.response.votes) == repr([ AvalancheVote(AvalancheProofVoteResponse.UNKNOWN, proofid_seq1), AvalancheVote(AvalancheProofVoteResponse.ACTIVE, proofid_seq2)]) + def unorphan_poll_tests(self, node): + # Restart the node with appropriate flags for this test + self.restart_node(0, extra_args=[ + '-enableavalanche=1', + '-enableavalancheproofreplacement=1', + '-avaproofstakeutxoconfirmations=2', + '-avalancheconflictingproofcooldown=0', + '-avacooldown=0', + ]) + + self.quorum = self.get_quorum(node) + peer = get_ava_p2p_interface(node) + + _, immature_proof = gen_proof(node) + + self.log.info("Orphan proofs are not polled") + + with node.assert_debug_log(["Not polling the avalanche proof (orphan-proof)"]): + peer.send_avaproof(immature_proof) + + self.log.info("Unorphaned proofs are polled") + + node.generate(1) + self.send_and_check_for_polling(peer, immature_proof.serialize().hex()) + if __name__ == '__main__': AvalancheProofVotingTest().main()