diff --git a/src/avalanche/avalanche.h b/src/avalanche/avalanche.h --- a/src/avalanche/avalanche.h +++ b/src/avalanche/avalanche.h @@ -19,6 +19,11 @@ */ static constexpr bool AVALANCHE_DEFAULT_ENABLED = false; +/** + * Is avalanche peer discovery enabled by default. + */ +static constexpr bool AVALANCHE_DEFAULT_PEER_DISCOVERY_ENABLED = false; + /** * Avalanche default cooldown in milliseconds. */ diff --git a/src/avalanche/peermanager.h b/src/avalanche/peermanager.h --- a/src/avalanche/peermanager.h +++ b/src/avalanche/peermanager.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -141,6 +142,10 @@ NodeSet nodes; + std::unordered_multimap, + SaltedProofIdHasher> + pendingNodes; + static constexpr int SELECT_PEER_MAX_RETRY = 3; static constexpr int SELECT_NODE_MAX_RETRY = 3; diff --git a/src/avalanche/peermanager.cpp b/src/avalanche/peermanager.cpp --- a/src/avalanche/peermanager.cpp +++ b/src/avalanche/peermanager.cpp @@ -20,18 +20,22 @@ } bool PeerManager::addNode(NodeId nodeid, const Delegation &delegation) { - auto &pview = peers.get(); - auto it = pview.find(delegation.getProofId()); - if (it == pview.end()) { - return false; - } - DelegationState state; CPubKey pubkey; if (!delegation.verify(state, pubkey)) { return false; } + auto &pview = peers.get(); + const ProofId proofid = delegation.getProofId(); + auto it = pview.find(proofid); + if (it == pview.end()) { + // Remember the node is pending a peer with a matching proof id + pendingNodes.emplace(proofid, + std::make_pair(nodeid, std::move(pubkey))); + return false; + } + return addOrUpdateNode(peers.project<0>(it), nodeid, std::move(pubkey)); } @@ -55,9 +59,14 @@ return false; } - // We actually have this node already, we need to update it. - bool success = removeNodeFromPeer(peers.find(oldpeerid)); - assert(success); + // It is possible for the node to be dangling. If there was an inflight + // query when the peer gets removed, the node was not erased. + auto oldpeer_it = peers.find(oldpeerid); + if (oldpeer_it != peers.end()) { + // We actually have this node already, we need to update it. + bool success = removeNodeFromPeer(oldpeer_it); + assert(success); + } } bool success = addNodeToPeer(it); @@ -219,10 +228,13 @@ PeerManager::PeerSet::iterator PeerManager::fetchOrCreatePeer(const std::shared_ptr &proof) { + assert(proof); + const ProofId &proofid = proof->getId(); + { // Check if we already know of that peer. auto &pview = peers.get(); - auto it = pview.find(proof->getId()); + auto it = pview.find(proofid); if (it != pview.end()) { return peers.project<0>(it); } @@ -279,6 +291,16 @@ auto inserted = peers.emplace(peerid, proof); assert(inserted.second); + // If there are nodes waiting for this proof, add them + auto range = pendingNodes.equal_range(proofid); + auto &it = range.first; + while (it != range.second && + addOrUpdateNode(inserted.first, it->second.first, + std::move(it->second.second))) { + ++it; + } + pendingNodes.erase(range.first, it); + return inserted.first; } @@ -291,10 +313,21 @@ // Remove all nodes from this peer. removeNodeFromPeer(it, it->node_count); + auto &nview = nodes.get(); + + // If the proof is orphan, add the nodes back to the pending map + const ProofId &proofid = it->proof->getId(); + if (isOrphan(proofid)) { + auto range = nview.equal_range(peerid); + for (auto &nit = range.first; nit != range.second; ++nit) { + pendingNodes.emplace(proofid, + std::make_pair(nit->nodeid, nit->pubkey)); + } + } + // Remove nodes associated with this peer, unless their timeout is still // active. This ensure that we don't overquery them in case they are // subsequently added to another peer. - auto &nview = nodes.get(); nview.erase(nview.lower_bound(boost::make_tuple(peerid, TimePoint())), nview.upper_bound(boost::make_tuple( peerid, std::chrono::steady_clock::now()))); diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -1224,6 +1224,10 @@ "-enableavalanche", strprintf("Enable avalanche (default: %u)", AVALANCHE_DEFAULT_ENABLED), ArgsManager::ALLOW_ANY, OptionsCategory::AVALANCHE); + argsman.AddArg("-enableavalanchepeerdiscovery", + strprintf("Enable avalanche peer discovery (default: %u)", + AVALANCHE_DEFAULT_PEER_DISCOVERY_ENABLED), + ArgsManager::ALLOW_ANY, OptionsCategory::AVALANCHE); argsman.AddArg( "-avacooldown", strprintf("Mandatory cooldown between two avapoll (default: %u)", diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -4156,6 +4156,13 @@ preferred); } + if (gArgs.GetBoolArg("-enableavalanchepeerdiscovery", + AVALANCHE_DEFAULT_PEER_DISCOVERY_ENABLED)) { + // Don't check the return value. If it fails we probably don't know + // about the proof yet. + g_avalanche->addNode(pfrom.GetId(), delegation); + } + return; } diff --git a/test/functional/abc_p2p_avalanche_peer_discovery.py b/test/functional/abc_p2p_avalanche_peer_discovery.py --- a/test/functional/abc_p2p_avalanche_peer_discovery.py +++ b/test/functional/abc_p2p_avalanche_peer_discovery.py @@ -10,6 +10,7 @@ import time +from test_framework.address import ADDRESS_BCHREG_UNSPENDABLE from test_framework.avatools import ( get_ava_p2p_interface, create_coinbase_stakes, @@ -43,7 +44,8 @@ def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-enableavalanche=1']] + self.extra_args = [['-enableavalanche=1', + '-enableavalanchepeerdiscovery=1']] self.supports_cli = False def run_test(self): @@ -151,11 +153,11 @@ proofid = good_interface.send_avahello( interface_delegation_hex, delegated_key) - def getdata_found(): + def getdata_found(peer, proofid): with p2p_lock: return good_interface.last_message.get( "getdata") and good_interface.last_message["getdata"].inv[-1].hash == proofid - wait_until(getdata_found) + wait_until(lambda: getdata_found(good_interface, proofid)) self.log.info('Check that we can download the proof from our peer') @@ -204,6 +206,62 @@ peer.send_message(getdata) wait_until(lambda: proof_received(peer)) + # Restart the node + self.restart_node(0, self.extra_args[0] + [ + "-avaproof={}".format(proof), + "-avamasterkey=cND2ZvtabDbJ1gucx9GWH6XT9kgTAqfb6cotPt5Q5CyxVDhid2EN", + ]) + wait_for_proof_validation() + # The only peer is the node itself + assert_equal(len(node.getavalanchepeerinfo()), 1) + assert_equal(node.getavalanchepeerinfo()[0]["proof"], proof) + + peer = get_ava_p2p_interface(node) + peer_proofid = peer.send_avahello( + interface_delegation_hex, delegated_key) + + wait_until(lambda: getdata_found(peer, peer_proofid)) + assert peer_proofid not in get_proof_ids(node) + + self.log.info( + "Check that the peer gets added as an avalanche node as soon as the node knows about the proof") + node.sendavalancheproof(interface_proof_hex) + + def has_node_count(count): + peerinfo = node.getavalanchepeerinfo() + return (len(peerinfo) == 2 and + peerinfo[-1]["proof"] == interface_proof_hex and + peerinfo[-1]["nodecount"] == count) + + wait_until(lambda: has_node_count(1)) + + self.log.info( + "Check that the peer gets added immediatly if the proof is already known") + + # Connect another peer using the same proof + peer_proof_known = get_ava_p2p_interface(node) + peer_proof_known.send_avahello(interface_delegation_hex, delegated_key) + + wait_until(lambda: has_node_count(2)) + + self.log.info("Invalidate the proof and check the nodes are removed") + tip = node.getbestblockhash() + # Invalidate the block with the proof utxo + node.invalidateblock(blockhashes[1]) + # Change the address to make sure we don't generate a block identical + # to the one we just invalidated. Can be generate(1) after D9694 or + # D9697 is landed. + forked_tip = node.generatetoaddress(1, ADDRESS_BCHREG_UNSPENDABLE)[0] + wait_until(lambda: node.getbestblockhash() == forked_tip) + + wait_until(lambda: len(node.getavalanchepeerinfo()) == 1) + assert peer_proofid not in get_proof_ids(node) + + self.log.info("Reorg back and check the nodes are added back") + node.invalidateblock(forked_tip) + node.reconsiderblock(tip) + wait_until(lambda: has_node_count(2), timeout=2) + if __name__ == '__main__': AvalancheTest().main()