diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1680,7 +1680,10 @@ void PeerManagerImpl::AvalanchePeriodicNetworking(CScheduler &scheduler) const { const auto now = GetTime(); - std::vector avanode_outbound_ids; + std::vector avanode_ids; + + const bool fQuorumEstablished = + g_avalanche && g_avalanche->isQuorumEstablished(); if (!g_avalanche) { // Not enabled or not ready yet, retry later @@ -1688,9 +1691,10 @@ } m_connman.ForEachNode([&](CNode *pnode) { - // Build a list of the avalanche manual or outbound peers nodeids - if (pnode->m_avalanche_state && !pnode->IsInboundConn()) { - avanode_outbound_ids.push_back(pnode->GetId()); + // Build a list of the avalanche peers nodeids + if (pnode->m_avalanche_state && + (!fQuorumEstablished || !pnode->IsInboundConn())) { + avanode_ids.push_back(pnode->GetId()); } // If a proof radix tree timed out, cleanup @@ -1701,21 +1705,19 @@ } }); - if (avanode_outbound_ids.empty()) { - // Not node is available for messaging, retry later + if (avanode_ids.empty()) { + // No node is available for messaging, retry later goto scheduleLater; } - Shuffle(avanode_outbound_ids.begin(), avanode_outbound_ids.end(), - FastRandomContext()); + Shuffle(avanode_ids.begin(), avanode_ids.end(), FastRandomContext()); - if (!g_avalanche->isQuorumEstablished() || + if (!fQuorumEstablished || g_avalanche->withPeerManager([&](avalanche::PeerManager &pm) { return pm.shouldRequestMoreNodes(); })) { - // Randomly select an avalanche outbound peer to send the getavaaddr - // message to - const NodeId avanodeId = avanode_outbound_ids.front(); + // Randomly select an avalanche peer to send the getavaaddr message to + const NodeId avanodeId = avanode_ids.front(); m_connman.ForNode(avanodeId, [&](CNode *pavanode) { LogPrint(BCLog::AVALANCHE, @@ -1741,11 +1743,10 @@ // subset of our peers as we expect a ton of avaproofs message in the // process. if (g_avalanche->getAvaproofsNodeCounter() == 0) { - avanode_outbound_ids.resize( - std::min(avanode_outbound_ids.size(), 3)); + avanode_ids.resize(std::min(avanode_ids.size(), 3)); } - for (NodeId nodeid : avanode_outbound_ids) { + for (NodeId nodeid : avanode_ids) { // Send a getavaproofs to one of our peers m_connman.ForNode(nodeid, [&](CNode *pavanode) { LogPrint(BCLog::AVALANCHE, diff --git a/test/functional/abc_p2p_compactproofs.py b/test/functional/abc_p2p_compactproofs.py --- a/test/functional/abc_p2p_compactproofs.py +++ b/test/functional/abc_p2p_compactproofs.py @@ -610,6 +610,62 @@ with p2p_lock: assert_equal(peer.message_count.get("getavaproofs", 0), 0) + def test_send_inbound_getavaproofs_until_quorum_is_established(self): + self.log.info( + "Check we also request the inbounds until the quorum is established") + + node = self.nodes[0] + + self.restart_node( + 0, + extra_args=self.extra_args[0] + + ['-avaminquorumstake=1000000']) + + assert_equal(node.getavalancheinfo()['active'], False) + + outbound = HelloAvaP2PInterface() + node.add_outbound_p2p_connection(outbound, p2p_idx=0) + + inbound = HelloAvaP2PInterface() + node.add_p2p_connection(inbound) + inbound.nodeid = node.getpeerinfo()[-1]['id'] + + def count_getavaproofs(peers): + with p2p_lock: + return sum([peer.message_count.get("getavaproofs", 0) + for peer in peers]) + + # Upon connection only the outbound gets a compact proofs message + assert_equal(count_getavaproofs([inbound]), 0) + self.wait_until(lambda: count_getavaproofs([outbound]) == 1) + + # Periodic send will include the inbound as well + current_total = count_getavaproofs([inbound, outbound]) + while count_getavaproofs([inbound]) == 0: + node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) + self.wait_until(lambda: count_getavaproofs( + [inbound, outbound]) > current_total) + current_total = count_getavaproofs([inbound, outbound]) + + # Connect the minimum amount of stake + privkey, proof = gen_proof(node) + assert node.addavalanchenode( + inbound.nodeid, + privkey.get_pubkey().get_bytes().hex(), + proof.serialize().hex()) + + assert_equal(node.getavalancheinfo()['active'], True) + + # From now only the outbound is requested + count_inbound = count_getavaproofs([inbound]) + for _ in range(20): + node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) + self.wait_until(lambda: count_getavaproofs( + [inbound, outbound]) > current_total) + current_total = count_getavaproofs([inbound, outbound]) + + assert_equal(count_getavaproofs([inbound]), count_inbound) + def run_test(self): # Most if the tests only need a single node, let the other ones start # the node when required @@ -622,6 +678,7 @@ self.test_send_missing_proofs() self.test_compact_proofs_download_on_connect() self.test_no_compactproofs_during_ibs() + self.test_send_inbound_getavaproofs_until_quorum_is_established() if __name__ == '__main__': diff --git a/test/functional/abc_p2p_getavaaddr.py b/test/functional/abc_p2p_getavaaddr.py --- a/test/functional/abc_p2p_getavaaddr.py +++ b/test/functional/abc_p2p_getavaaddr.py @@ -366,6 +366,66 @@ expected_addresses = [avapeer.addr for avapeer in avapeers] assert all([address in expected_addresses for address in addresses]) + def test_send_inbound_getavaaddr_until_quorum_is_established(self): + self.log.info( + "Check we also request the inbounds until the quorum is established") + + node = self.nodes[0] + + self.restart_node( + 0, + extra_args=self.extra_args[0] + + ['-avaminquorumstake=1000000']) + + assert_equal(node.getavalancheinfo()['active'], False) + + outbound = MutedAvaP2PInterface() + node.add_outbound_p2p_connection(outbound, p2p_idx=0) + + inbound = MutedAvaP2PInterface() + node.add_p2p_connection(inbound) + inbound.nodeid = node.getpeerinfo()[-1]['id'] + + def count_getavaaddr(peers): + with p2p_lock: + return sum([peer.message_count.get("getavaaddr", 0) + for peer in peers]) + + # Upon connection only the outbound gets a getavaaddr message + assert_equal(count_getavaaddr([inbound]), 0) + self.wait_until(lambda: count_getavaaddr([outbound]) == 1) + + # Periodic send will include the inbound as well + current_total = count_getavaaddr([inbound, outbound]) + while count_getavaaddr([inbound]) == 0: + node.mockscheduler(MAX_GETAVAADDR_DELAY) + self.wait_until(lambda: count_getavaaddr( + [inbound, outbound]) > current_total) + current_total = count_getavaaddr([inbound, outbound]) + + # Connect the minimum amount of stake + privkey, proof = gen_proof(node) + assert node.addavalanchenode( + inbound.nodeid, + privkey.get_pubkey().get_bytes().hex(), + proof.serialize().hex()) + + assert_equal(node.getavalancheinfo()['active'], True) + + # From now only the outbound is requested + count_inbound = count_getavaaddr([inbound]) + for _ in range(10): + # Trigger a poll + node.generate(1) + inbound.sync_send_with_ping() + + node.mockscheduler(MAX_GETAVAADDR_DELAY) + self.wait_until(lambda: count_getavaaddr( + [inbound, outbound]) > current_total) + current_total = count_getavaaddr([inbound, outbound]) + + assert_equal(count_getavaaddr([inbound]), count_inbound) + def run_test(self): self.getavaaddr_interval_test() @@ -377,6 +437,7 @@ self.getavaaddr_outbound_test() self.getavaaddr_manual_test() self.getavaaddr_noquorum() + self.test_send_inbound_getavaaddr_until_quorum_is_established() if __name__ == '__main__':