diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp index 5ad13a0b8..7212e32a2 100644 --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -1,806 +1,808 @@ // Copyright (c) 2018-2019 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include // For DecodeSecret #include #include #include #include #include #include #include #include #include #include /** * Run the avalanche event loop every 10ms. */ static constexpr std::chrono::milliseconds AVALANCHE_TIME_STEP{10}; // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; namespace avalanche { static bool IsWorthPolling(const CBlockIndex *pindex) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); if (pindex->nStatus.isInvalid()) { // No point polling invalid blocks. return false; } if (::ChainstateActive().IsBlockFinalized(pindex)) { // There is no point polling finalized block. return false; } return true; } static bool VerifyProof(const Proof &proof, bilingual_str &error) { ProofValidationState proof_state; if (!proof.verify(proof_state)) { switch (proof_state.GetResult()) { case ProofValidationResult::NO_STAKE: error = _("The avalanche proof has no stake."); return false; case ProofValidationResult::DUST_THRESOLD: error = _("The avalanche proof stake is too low."); return false; case ProofValidationResult::DUPLICATE_STAKE: error = _("The avalanche proof has duplicated stake."); return false; case ProofValidationResult::INVALID_STAKE_SIGNATURE: error = _("The avalanche proof has invalid stake signatures."); return false; case ProofValidationResult::TOO_MANY_UTXOS: error = strprintf( _("The avalanche proof has too many utxos (max: %u)."), AVALANCHE_MAX_PROOF_STAKES); return false; default: error = _("The avalanche proof is invalid."); return false; } } return true; } static bool VerifyDelegation(const Delegation &dg, const CPubKey &expectedPubKey, bilingual_str &error) { DelegationState dg_state; CPubKey auth; if (!dg.verify(dg_state, auth)) { switch (dg_state.GetResult()) { case avalanche::DelegationResult::INVALID_SIGNATURE: error = _("The avalanche delegation has invalid signatures."); return false; default: error = _("The avalanche delegation is invalid."); return false; } } if (auth != expectedPubKey) { error = _( "The avalanche delegation does not match the expected public key."); return false; } return true; } struct Processor::PeerData { ProofRef proof; Delegation delegation; }; class Processor::NotificationsHandler : public interfaces::Chain::Notifications { Processor *m_processor; public: NotificationsHandler(Processor *p) : m_processor(p) {} void updatedBlockTip() override { LOCK(m_processor->cs_peerManager); - if (m_processor->peerData && m_processor->peerData->proof) { + if (m_processor->peerData && m_processor->peerData->proof && m_processor->peerManager->registerProof( - m_processor->peerData->proof); + m_processor->peerData->proof)) { + m_processor->peerManager->addUnbroadcastProof( + m_processor->peerData->proof->getId()); } m_processor->peerManager->updatedBlockTip(); } }; Processor::Processor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connmanIn, std::unique_ptr peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn) : connman(connmanIn), queryTimeoutDuration(argsman.GetArg( "-avatimeout", AVALANCHE_DEFAULT_QUERY_TIMEOUT.count())), round(0), peerManager(std::make_unique()), peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)), minQuorumScore(minQuorumTotalScoreIn), minQuorumConnectedScoreRatio(minQuorumConnectedScoreRatioIn) { // Make sure we get notified of chain state changes. chainNotificationsHandler = chain.handleNotifications(std::make_shared(this)); } Processor::~Processor() { chainNotificationsHandler.reset(); stopEventLoop(); } std::unique_ptr Processor::MakeProcessor(const ArgsManager &argsman, interfaces::Chain &chain, CConnman *connman, bilingual_str &error) { std::unique_ptr peerData; CKey masterKey; CKey sessionKey; if (argsman.IsArgSet("-avasessionkey")) { sessionKey = DecodeSecret(argsman.GetArg("-avasessionkey", "")); if (!sessionKey.IsValid()) { error = _("The avalanche session key is invalid."); return nullptr; } } else { // Pick a random key for the session. sessionKey.MakeNewKey(true); } if (argsman.IsArgSet("-avaproof")) { if (!argsman.IsArgSet("-avamasterkey")) { error = _( "The avalanche master key is missing for the avalanche proof."); return nullptr; } masterKey = DecodeSecret(argsman.GetArg("-avamasterkey", "")); if (!masterKey.IsValid()) { error = _("The avalanche master key is invalid."); return nullptr; } peerData = std::make_unique(); Proof proof; if (!Proof::FromHex(proof, argsman.GetArg("-avaproof", ""), error)) { // error is set by FromHex return nullptr; } peerData->proof = std::make_shared(std::move(proof)); if (!VerifyProof(*peerData->proof, error)) { // error is set by VerifyProof return nullptr; } std::unique_ptr dgb; const CPubKey &masterPubKey = masterKey.GetPubKey(); if (argsman.IsArgSet("-avadelegation")) { Delegation dg; if (!Delegation::FromHex(dg, argsman.GetArg("-avadelegation", ""), error)) { // error is set by FromHex() return nullptr; } if (dg.getProofId() != peerData->proof->getId()) { error = _("The delegation does not match the proof."); return nullptr; } if (masterPubKey != dg.getDelegatedPubkey()) { error = _( "The master key does not match the delegation public key."); return nullptr; } dgb = std::make_unique(dg); } else { if (masterPubKey != peerData->proof->getMaster()) { error = _("The master key does not match the proof public key."); return nullptr; } dgb = std::make_unique(*peerData->proof); } // Generate the delegation to the session key. const CPubKey sessionPubKey = sessionKey.GetPubKey(); if (sessionPubKey != masterPubKey) { if (!dgb->addLevel(masterKey, sessionPubKey)) { error = _("Failed to generate a delegation for this session."); return nullptr; } } peerData->delegation = dgb->build(); if (!VerifyDelegation(peerData->delegation, sessionPubKey, error)) { // error is set by VerifyDelegation return nullptr; } } // Determine quorum parameters Amount minQuorumStake = AVALANCHE_DEFAULT_MIN_QUORUM_STAKE; if (gArgs.IsArgSet("-avaminquorumstake") && !ParseMoney(gArgs.GetArg("-avaminquorumstake", ""), minQuorumStake)) { error = _("The avalanche min quorum stake amount is invalid."); return nullptr; } if (!MoneyRange(minQuorumStake)) { error = _("The avalanche min quorum stake amount is out of range."); return nullptr; } double minQuorumConnectedStakeRatio = AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO; if (gArgs.IsArgSet("-avaminquorumconnectedstakeratio") && !ParseDouble(gArgs.GetArg("-avaminquorumconnectedstakeratio", ""), &minQuorumConnectedStakeRatio)) { error = _("The avalanche min quorum connected stake ratio is invalid."); return nullptr; } if (minQuorumConnectedStakeRatio < 0 || minQuorumConnectedStakeRatio > 1) { error = _( "The avalanche min quorum connected stake ratio is out of range."); return nullptr; } // We can't use std::make_unique with a private constructor return std::unique_ptr(new Processor( argsman, chain, connman, std::move(peerData), std::move(sessionKey), Proof::amountToScore(minQuorumStake), minQuorumConnectedStakeRatio)); } bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; { LOCK(cs_main); if (!IsWorthPolling(pindex)) { // There is no point polling this block. return false; } isAccepted = ::ChainActive().Contains(pindex); } return blockVoteRecords.getWriteView() ->insert(std::make_pair(pindex, VoteRecord(isAccepted))) .second; } void Processor::addProofToReconcile(const ProofRef &proof) { // TODO We don't want to accept an infinite number of conflicting proofs. // They should be some rules to make them expensive and/or limited by // design. const bool isAccepted = WITH_LOCK( cs_peerManager, return peerManager->isBoundToPeer(proof->getId())); proofVoteRecords.getWriteView()->insert( std::make_pair(proof, VoteRecord(isAccepted))); } bool Processor::isAccepted(const CBlockIndex *pindex) const { auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return false; } return it->second.isAccepted(); } bool Processor::isAccepted(const ProofRef &proof) const { auto r = proofVoteRecords.getReadView(); auto it = r->find(proof); if (it == r.end()) { return false; } return it->second.isAccepted(); } int Processor::getConfidence(const CBlockIndex *pindex) const { auto r = blockVoteRecords.getReadView(); auto it = r->find(pindex); if (it == r.end()) { return -1; } return it->second.getConfidence(); } int Processor::getConfidence(const ProofRef &proof) const { auto r = proofVoteRecords.getReadView(); auto it = r->find(proof); if (it == r.end()) { return -1; } return it->second.getConfidence(); } namespace { /** * When using TCP, we need to sign all messages as the transport layer is * not secure. */ class TCPResponse { Response response; SchnorrSig sig; public: TCPResponse(Response responseIn, const CKey &key) : response(std::move(responseIn)) { CHashWriter hasher(SER_GETHASH, 0); hasher << response; const uint256 hash = hasher.GetHash(); // Now let's sign! if (!key.SignSchnorr(hash, sig)) { sig.fill(0); } } // serialization support SERIALIZE_METHODS(TCPResponse, obj) { READWRITE(obj.response, obj.sig); } }; } // namespace void Processor::sendResponse(CNode *pfrom, Response response) const { connman->PushMessage( pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVARESPONSE, TCPResponse(std::move(response), sessionKey))); } bool Processor::registerVotes(NodeId nodeid, const Response &response, std::vector &blockUpdates, std::vector &proofUpdates, int &banscore, std::string &error) { { // Save the time at which we can query again. LOCK(cs_peerManager); // FIXME: This will override the time even when we received an old stale // message. This should check that the message is indeed the most up to // date one before updating the time. peerManager->updateNextRequestTime( nodeid, std::chrono::steady_clock::now() + std::chrono::milliseconds(response.getCooldown())); } std::vector invs; { // Check that the query exists. auto w = queries.getWriteView(); auto it = w->find(std::make_tuple(nodeid, response.getRound())); if (it == w.end()) { banscore = 2; error = "unexpected-ava-response"; return false; } invs = std::move(it->invs); w->erase(it); } // Verify that the request and the vote are consistent. const std::vector &votes = response.GetVotes(); size_t size = invs.size(); if (votes.size() != size) { banscore = 100; error = "invalid-ava-response-size"; return false; } for (size_t i = 0; i < size; i++) { if (invs[i].hash != votes[i].GetHash()) { banscore = 100; error = "invalid-ava-response-content"; return false; } } std::map responseIndex; std::map responseProof; // At this stage we are certain that invs[i] matches votes[i], so we can use // the inv type to retrieve what is being voted on. for (size_t i = 0; i < size; i++) { if (invs[i].IsMsgBlk()) { CBlockIndex *pindex; { LOCK(cs_main); pindex = g_chainman.m_blockman.LookupBlockIndex( BlockHash(votes[i].GetHash())); if (!pindex) { // This should not happen, but just in case... continue; } if (!IsWorthPolling(pindex)) { // There is no point polling this block. continue; } } responseIndex.insert(std::make_pair(pindex, votes[i])); } if (invs[i].IsMsgProof()) { const ProofId proofid(votes[i].GetHash()); const ProofRef proof = WITH_LOCK( cs_peerManager, return peerManager->getProof(proofid)); if (!proof) { continue; } responseProof.insert(std::make_pair(proof, votes[i])); } } // Thanks to C++14 generic lambdas, we can apply the same logic to various // parameter types sharing the same interface. auto registerVoteItems = [&](auto voteRecordsWriteView, auto &updates, auto responseItems) { // Register votes. for (const auto &p : responseItems) { auto item = p.first; const Vote &v = p.second; auto it = voteRecordsWriteView->find(item); if (it == voteRecordsWriteView.end()) { // We are not voting on that item anymore. continue; } auto &vr = it->second; if (!vr.registerVote(nodeid, v.GetError())) { // This vote did not provide any extra information, move on. continue; } if (!vr.hasFinalized()) { // This item has note been finalized, so we have nothing more to // do. updates.emplace_back(item, vr.isAccepted() ? VoteStatus::Accepted : VoteStatus::Rejected); continue; } // We just finalized a vote. If it is valid, then let the caller // know. Either way, remove the item from the map. updates.emplace_back(item, vr.isAccepted() ? VoteStatus::Finalized : VoteStatus::Invalid); voteRecordsWriteView->erase(it); } }; registerVoteItems(blockVoteRecords.getWriteView(), blockUpdates, responseIndex); registerVoteItems(proofVoteRecords.getWriteView(), proofUpdates, responseProof); return true; } CPubKey Processor::getSessionPubKey() const { return sessionKey.GetPubKey(); } uint256 Processor::buildLocalSighash(CNode *pfrom) const { CHashWriter hasher(SER_GETHASH, 0); hasher << peerData->delegation.getId(); hasher << pfrom->GetLocalNonce(); hasher << pfrom->nRemoteHostNonce; hasher << pfrom->GetLocalExtraEntropy(); hasher << pfrom->nRemoteExtraEntropy; return hasher.GetHash(); } bool Processor::sendHello(CNode *pfrom) const { if (!peerData) { // We do not have a delegation to advertise. return false; } // Now let's sign! SchnorrSig sig; { const uint256 hash = buildLocalSighash(pfrom); if (!sessionKey.SignSchnorr(hash, sig)) { return false; } } connman->PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()) .Make(NetMsgType::AVAHELLO, Hello(peerData->delegation, sig))); pfrom->AddKnownProof(peerData->delegation.getProofId()); return true; } ProofRef Processor::getLocalProof() const { return peerData ? peerData->proof : nullptr; } bool Processor::startEventLoop(CScheduler &scheduler) { return eventLoop.startEventLoop( scheduler, [this]() { this->runEventLoop(); }, AVALANCHE_TIME_STEP); } bool Processor::stopEventLoop() { return eventLoop.stopEventLoop(); } std::vector Processor::getInvsForNextPoll(bool forPoll) { std::vector invs; auto extractVoteRecordsToInvs = [&](const auto &itemVoteRecordRange, auto buildInvFromVoteItem) { for (const auto &[item, voteRecord] : itemVoteRecordRange) { if (invs.size() >= AVALANCHE_MAX_ELEMENT_POLL) { // Make sure we do not produce more invs than specified by the // protocol. return true; } const bool shouldPoll = forPoll ? voteRecord.registerPoll() : voteRecord.shouldPoll(); if (!shouldPoll) { continue; } invs.emplace_back(buildInvFromVoteItem(item)); } return invs.size() >= AVALANCHE_MAX_ELEMENT_POLL; }; if (extractVoteRecordsToInvs(proofVoteRecords.getReadView(), [](const ProofRef &proof) { return CInv(MSG_AVA_PROOF, proof->getId()); })) { // The inventory vector is full, we're done return invs; } // First remove all blocks that are not worth polling. { LOCK(cs_main); auto w = blockVoteRecords.getWriteView(); for (auto it = w->begin(); it != w->end();) { const CBlockIndex *pindex = it->first; if (!IsWorthPolling(pindex)) { w->erase(it++); } else { ++it; } } } auto r = blockVoteRecords.getReadView(); extractVoteRecordsToInvs(reverse_iterate(r), [](const CBlockIndex *pindex) { return CInv(MSG_BLOCK, pindex->GetBlockHash()); }); return invs; } NodeId Processor::getSuitableNodeToQuery() { LOCK(cs_peerManager); return peerManager->selectNode(); } void Processor::clearTimedoutRequests() { auto now = std::chrono::steady_clock::now(); std::map timedout_items{}; { // Clear expired requests. auto w = queries.getWriteView(); auto it = w->get().begin(); while (it != w->get().end() && it->timeout < now) { for (const auto &i : it->invs) { timedout_items[i]++; } w->get().erase(it++); } } if (timedout_items.empty()) { return; } auto clearInflightRequest = [&](auto &voteRecords, const auto &voteItem, uint8_t count) { if (!voteItem) { return false; } auto voteRecordsWriteView = voteRecords.getWriteView(); auto it = voteRecordsWriteView->find(voteItem); if (it == voteRecordsWriteView.end()) { return false; } it->second.clearInflightRequest(count); return true; }; // In flight request accounting. for (const auto &p : timedout_items) { const CInv &inv = p.first; if (inv.IsMsgBlk()) { const CBlockIndex *pindex = WITH_LOCK( cs_main, return g_chainman.m_blockman.LookupBlockIndex( BlockHash(inv.hash))); if (!clearInflightRequest(blockVoteRecords, pindex, p.second)) { continue; } } if (inv.IsMsgProof()) { const ProofRef proof = WITH_LOCK(cs_peerManager, return peerManager->getProof(ProofId(inv.hash))); if (!clearInflightRequest(proofVoteRecords, proof, p.second)) { continue; } } } } void Processor::runEventLoop() { // Don't do Avalanche while node is IBD'ing if (::ChainstateActive().IsInitialBlockDownload()) { return; } // Don't poll if quorum hasn't been established yet if (!isQuorumEstablished()) { return; } // First things first, check if we have requests that timed out and clear // them. clearTimedoutRequests(); // Make sure there is at least one suitable node to query before gathering // invs. NodeId nodeid = getSuitableNodeToQuery(); if (nodeid == NO_NODE) { return; } std::vector invs = getInvsForNextPoll(); if (invs.empty()) { return; } do { /** * If we lost contact to that node, then we remove it from nodeids, but * never add the request to queries, which ensures bad nodes get cleaned * up over time. */ bool hasSent = connman->ForNode(nodeid, [this, &invs](CNode *pnode) { uint64_t current_round = round++; { // Compute the time at which this requests times out. auto timeout = std::chrono::steady_clock::now() + queryTimeoutDuration; // Register the query. queries.getWriteView()->insert( {pnode->GetId(), current_round, timeout, invs}); // Set the timeout. LOCK(cs_peerManager); peerManager->updateNextRequestTime(pnode->GetId(), timeout); } pnode->m_avalanche_state->invsPolled(invs.size()); // Send the query to the node. connman->PushMessage( pnode, CNetMsgMaker(pnode->GetCommonVersion()) .Make(NetMsgType::AVAPOLL, Poll(current_round, std::move(invs)))); return true; }); // Success! if (hasSent) { return; } { // This node is obsolete, delete it. LOCK(cs_peerManager); peerManager->removeNode(nodeid); } // Get next suitable node to try again nodeid = getSuitableNodeToQuery(); } while (nodeid != NO_NODE); } /* * Returns a bool indicating whether we have a usable Avalanche quorum enabling * us to take decisions based on polls. */ bool Processor::isQuorumEstablished() { if (quorumIsEstablished) { return true; } // Get the registered proof score and registered score we have nodes for uint32_t totalPeersScore; uint32_t connectedPeersScore; { LOCK(cs_peerManager); totalPeersScore = peerManager->getTotalPeersScore(); connectedPeersScore = peerManager->getConnectedPeersScore(); } // Ensure enough is being staked overall if (totalPeersScore < minQuorumScore) { return false; } // Ensure we have connected score for enough of the overall score uint32_t minConnectedScore = std::round(double(totalPeersScore) * minQuorumConnectedScoreRatio); if (connectedPeersScore < minConnectedScore) { return false; } quorumIsEstablished = true; return true; } } // namespace avalanche diff --git a/test/functional/abc_p2p_proof_inventory.py b/test/functional/abc_p2p_proof_inventory.py index 44e76870e..db27d553e 100644 --- a/test/functional/abc_p2p_proof_inventory.py +++ b/test/functional/abc_p2p_proof_inventory.py @@ -1,299 +1,332 @@ #!/usr/bin/env python3 # Copyright (c) 2021 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test proof inventory relaying """ import time from test_framework.address import ADDRESS_ECREG_UNSPENDABLE from test_framework.avatools import ( avalanche_proof_from_hex, gen_proof, get_proof_ids, wait_for_proof, ) from test_framework.key import ECKey from test_framework.messages import ( MSG_AVA_PROOF, MSG_TYPE_MASK, CInv, msg_avaproof, msg_getdata, ) from test_framework.p2p import P2PInterface, p2p_lock from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, assert_greater_than from test_framework.wallet_util import bytes_to_wif # Broadcast reattempt occurs every 10 to 15 minutes MAX_INITIAL_BROADCAST_DELAY = 15 * 60 # Delay to allow the node to respond to getdata requests UNCONDITIONAL_RELAY_DELAY = 2 * 60 class ProofInvStoreP2PInterface(P2PInterface): def __init__(self): super().__init__() self.proof_invs_counter = 0 def on_inv(self, message): for i in message.inv: if i.type & MSG_TYPE_MASK == MSG_AVA_PROOF: self.proof_invs_counter += 1 class ProofInventoryTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 5 self.extra_args = [[ '-enableavalanche=1', '-avacooldown=0', '-whitelist=noban@127.0.0.1', ]] * self.num_nodes def test_send_proof_inv(self): self.log.info("Test sending a proof to our peers") node = self.nodes[0] for i in range(10): node.add_p2p_connection(ProofInvStoreP2PInterface()) _, proof = gen_proof(node) assert node.sendavalancheproof(proof.serialize().hex()) def proof_inv_found(peer): with p2p_lock: return peer.last_message.get( "inv") and peer.last_message["inv"].inv[-1].hash == proof.proofid self.wait_until(lambda: all(proof_inv_found(i) for i in node.p2ps)) self.log.info("Test that we don't send the same inv several times") extra_peer = ProofInvStoreP2PInterface() node.add_p2p_connection(extra_peer) # Send the same proof one more time node.sendavalancheproof(proof.serialize().hex()) # Our new extra peer should receive it but not the others self.wait_until(lambda: proof_inv_found(extra_peer)) assert all(p.proof_invs_counter == 1 for p in node.p2ps) # Send the proof again and force the send loop to be processed for peer in node.p2ps: node.sendavalancheproof(proof.serialize().hex()) peer.sync_with_ping() assert all(p.proof_invs_counter == 1 for p in node.p2ps) def test_receive_proof(self): self.log.info("Test a peer is created on proof reception") node = self.nodes[0] _, proof = gen_proof(node) peer = node.add_p2p_connection(P2PInterface()) msg = msg_avaproof() msg.proof = proof peer.send_message(msg) self.wait_until(lambda: proof.proofid in get_proof_ids(node)) self.log.info("Test receiving a proof with missing utxo is orphaned") privkey = ECKey() privkey.generate() orphan_hex = node.buildavalancheproof( 42, 2000000000, bytes_to_wif(privkey.get_bytes()), [{ 'txid': '0' * 64, 'vout': 0, 'amount': 10e6, 'height': 42, 'iscoinbase': False, 'privatekey': bytes_to_wif(privkey.get_bytes()), }] ) orphan = avalanche_proof_from_hex(orphan_hex) orphan_proofid = "{:064x}".format(orphan.proofid) msg = msg_avaproof() msg.proof = orphan peer.send_message(msg) wait_for_proof(node, orphan_proofid, expect_orphan=True) def test_ban_invalid_proof(self): node = self.nodes[0] _, bad_proof = gen_proof(node) bad_proof.stakes = [] self.restart_node(0, ['-enableavalanche=1']) peer = node.add_p2p_connection(P2PInterface()) msg = msg_avaproof() msg.proof = bad_proof with node.assert_debug_log([ 'Misbehaving', 'invalid-proof', ]): peer.send_message(msg) peer.wait_for_disconnect() def test_proof_relay(self): # This test makes no sense with a single node ! assert_greater_than(self.num_nodes, 1) def restart_nodes_with_proof(nodes=self.nodes): proofids = set() for i, node in enumerate(nodes): privkey, proof = gen_proof(node) proofids.add(proof.proofid) self.restart_node(node.index, self.extra_args[node.index] + [ "-avaproof={}".format(proof.serialize().hex()), "-avamasterkey={}".format(bytes_to_wif(privkey.get_bytes())) ]) # Connect a block to make the proof be added to our pool node.generate(1) self.wait_until(lambda: proof.proofid in get_proof_ids(node)) [self.connect_nodes(node.index, j) for j in range(node.index)] return proofids proofids = restart_nodes_with_proof(self.nodes) self.log.info("Nodes should eventually get the proof from their peer") self.sync_proofs() for node in self.nodes: assert_equal(set(get_proof_ids(node)), proofids) def test_manually_sent_proof(self): node0 = self.nodes[0] _, proof = gen_proof(node0) self.log.info( "Send a proof via RPC and check all the nodes download it") node0.sendavalancheproof(proof.serialize().hex()) self.sync_proofs() def test_unbroadcast(self): self.log.info("Test broadcasting proofs") node = self.nodes[0] # Disconnect the other nodes, or they will request the proof and # invalidate the test [node.stop_node() for node in self.nodes[1:]] def add_peers(count): peers = [] for i in range(count): peer = node.add_p2p_connection(ProofInvStoreP2PInterface()) peer.wait_for_verack() peers.append(peer) return peers _, proof = gen_proof(node) proofid_hex = "{:064x}".format(proof.proofid) # Broadcast the proof peers = add_peers(3) assert node.sendavalancheproof(proof.serialize().hex()) wait_for_proof(node, proofid_hex) def proof_inv_received(peers): with p2p_lock: return all(p.last_message.get( "inv") and p.last_message["inv"].inv[-1].hash == proof.proofid for p in peers) self.wait_until(lambda: proof_inv_received(peers)) # If no peer request the proof for download, the node should reattempt # broadcasting to all new peers after 10 to 15 minutes. peers = add_peers(3) node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY + 1) peers[-1].sync_with_ping() self.wait_until(lambda: proof_inv_received(peers)) # If at least one peer requests the proof, there is no more attempt to # broadcast it node.setmocktime(int(time.time()) + UNCONDITIONAL_RELAY_DELAY) msg = msg_getdata([CInv(t=MSG_AVA_PROOF, h=proof.proofid)]) peers[-1].send_message(msg) # Give enough time for the node to broadcast the proof again peers = add_peers(3) node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY + 1) peers[-1].sync_with_ping() assert not proof_inv_received(peers) self.log.info( "Proofs that become invalid should no longer be broadcasted") # Restart and add connect a new set of peers self.restart_node(0) # Broadcast the proof peers = add_peers(3) assert node.sendavalancheproof(proof.serialize().hex()) self.wait_until(lambda: proof_inv_received(peers)) # Sanity check our node knows the proof, and it is valid wait_for_proof(node, proofid_hex, expect_orphan=False) # Mature the utxo then spend it node.generate(100) utxo = proof.stakes[0].stake.utxo raw_tx = node.createrawtransaction( inputs=[{ # coinbase "txid": "{:064x}".format(utxo.hash), "vout": utxo.n }], outputs={ADDRESS_ECREG_UNSPENDABLE: 25_000_000 - 250.00}, ) signed_tx = node.signrawtransactionwithkey( hexstring=raw_tx, privkeys=[node.get_deterministic_priv_key().key], ) node.sendrawtransaction(signed_tx['hex']) # Mine the tx in a block node.generate(1) # Wait for the proof to be orphaned self.wait_until(lambda: node.getrawavalancheproof( proofid_hex)["orphan"] is True) # It should no longer be broadcasted peers = add_peers(3) node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY + 1) peers[-1].sync_with_ping() assert not proof_inv_received(peers) + def test_local_proof_broadcast(self): + node = self.nodes[0] + + privkey, proof = gen_proof(node) + proofid_hex = "{:064x}".format(proof.proofid) + + [node.stop_node() for node in self.nodes[1:]] + + self.restart_node(0, self.extra_args[0] + [ + "-avaproof={}".format(proof.serialize().hex()), + "-avamasterkey={}".format(bytes_to_wif(privkey.get_bytes())), + ]) + + peers = [] + for _ in range(10): + peers.append(node.add_p2p_connection(ProofInvStoreP2PInterface())) + + with p2p_lock: + assert all([p.proof_invs_counter == 0 for p in peers]) + + # Mine a block so the proof gets validated + node.generate(1) + wait_for_proof(node, proofid_hex) + + node.mockscheduler(MAX_INITIAL_BROADCAST_DELAY + 1) + + def proof_inv_received(): + with p2p_lock: + return all(p.last_message.get( + "inv") and p.last_message["inv"].inv[-1].hash == proof.proofid for p in peers) + self.wait_until(proof_inv_received) + def run_test(self): self.test_send_proof_inv() self.test_receive_proof() self.test_proof_relay() self.test_manually_sent_proof() # Run these tests last because they need to disconnect the nodes self.test_unbroadcast() self.test_ban_invalid_proof() + self.test_local_proof_broadcast() if __name__ == '__main__': ProofInventoryTest().main()