diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -225,6 +225,9 @@ bool stopEventLoop(); void avaproofsSent(NodeId nodeid); + int64_t getAvaproofsNodeCounter() const { + return avaproofsNodeCounter.load(); + } bool isQuorumEstablished(); // Implement NetEventInterface. Only FinalizeNode is of interest. diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -54,10 +54,6 @@ return p.minQuorumConnectedScoreRatio; } - static int64_t getavaproofsNodeCounter(Processor &p) { - return p.avaproofsNodeCounter; - } - static void clearavaproofsNodeCounter(Processor &p) { p.avaproofsNodeCounter = 0; } @@ -1337,13 +1333,11 @@ addNode(i); processor->avaproofsSent(i); - BOOST_CHECK_EQUAL( - AvalancheTest::getavaproofsNodeCounter(*processor), i + 1); + BOOST_CHECK_EQUAL(processor->getAvaproofsNodeCounter(), i + 1); // Receiving again on the same node does not increase the counter processor->avaproofsSent(i); - BOOST_CHECK_EQUAL( - AvalancheTest::getavaproofsNodeCounter(*processor), i + 1); + BOOST_CHECK_EQUAL(processor->getAvaproofsNodeCounter(), i + 1); BOOST_CHECK(!processor->isQuorumEstablished()); } diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1745,15 +1745,26 @@ goto scheduleLater; } - // Send a getavaproofs to one of our peers - m_connman.ForNode(avanode_outbound_ids.back(), [&](CNode *pavanode) { - LogPrint(BCLog::AVALANCHE, "Requesting compact proofs from peer %d\n", - pavanode->GetId()); - m_connman.PushMessage(pavanode, - CNetMsgMaker(pavanode->GetCommonVersion()) - .Make(NetMsgType::GETAVAPROOFS)); - return true; - }); + // If we never had an avaproofs message yet, be kind and only request to a + // subset of our peers as we expect a ton of avaproofs message in the + // process. + if (g_avalanche->getAvaproofsNodeCounter() == 0) { + avanode_outbound_ids.resize( + std::min(avanode_outbound_ids.size(), 3)); + } + + for (NodeId nodeid : avanode_outbound_ids) { + // Send a getavaproofs to one of our peers + m_connman.ForNode(nodeid, [&](CNode *pavanode) { + LogPrint(BCLog::AVALANCHE, + "Requesting compact proofs from peer %d\n", + pavanode->GetId()); + m_connman.PushMessage(pavanode, + CNetMsgMaker(pavanode->GetCommonVersion()) + .Make(NetMsgType::GETAVAPROOFS)); + return true; + }); + } scheduleLater: // Schedule next run for 2-5 minutes in the future. diff --git a/test/functional/abc_p2p_compactproofs.py b/test/functional/abc_p2p_compactproofs.py --- a/test/functional/abc_p2p_compactproofs.py +++ b/test/functional/abc_p2p_compactproofs.py @@ -70,16 +70,18 @@ "Check we send a getavaproofs message to our avalanche outbound peers") node = self.nodes[0] + p2p_idx = 0 non_avapeers = [] for i in range(4): peer = P2PInterface() node.add_outbound_p2p_connection( peer, - p2p_idx=i, + p2p_idx=p2p_idx, connection_type="outbound-full-relay", services=NODE_NETWORK, ) non_avapeers.append(peer) + p2p_idx += 1 inbound_avapeers = [ node.add_p2p_connection( @@ -90,11 +92,12 @@ peer = P2PInterface() node.add_outbound_p2p_connection( peer, - p2p_idx=16 + i, + p2p_idx=p2p_idx, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) outbound_avapeers.append(peer) + p2p_idx += 1 def all_peers_received_getavaproofs(): with p2p_lock: @@ -111,7 +114,7 @@ "getavaproofs", 0) == 0 for p in inbound_avapeers]) self.log.info( - "Check we send periodic getavaproofs message to one of our peers") + "Check we send periodic getavaproofs message to some of our peers") def count_outbounds_getavaproofs(): with p2p_lock: @@ -122,8 +125,58 @@ for i in range(12): node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) self.wait_until(lambda: count_outbounds_getavaproofs() - == outbounds_getavaproofs + 1) - outbounds_getavaproofs += 1 + == outbounds_getavaproofs + 3) + outbounds_getavaproofs += 3 + + with p2p_lock: + assert all([p.message_count.get( + "getavaproofs", 0) == 0 for p in non_avapeers]) + assert all([p.message_count.get( + "getavaproofs", 0) == 0 for p in inbound_avapeers]) + + self.log.info( + "After the first avaproofs has been received, all the peers are requested periodically") + + responding_outbound_avapeers = P2PInterface() + node.add_outbound_p2p_connection( + responding_outbound_avapeers, + p2p_idx=p2p_idx, + connection_type="avalanche", + services=NODE_NETWORK | NODE_AVALANCHE, + ) + p2p_idx += 1 + responding_outbound_avapeers_id = node.getpeerinfo()[-1]['id'] + outbound_avapeers.append(responding_outbound_avapeers) + + self.wait_until(all_peers_received_getavaproofs) + + # Register as an avalanche node for the avaproofs message to be counted + key, proof = gen_proof(node) + assert node.addavalanchenode( + responding_outbound_avapeers_id, + key.get_pubkey().get_bytes().hex(), + proof.serialize().hex()) + + # Send the avaproofs message + avaproofs = msg_avaproofs() + avaproofs.key0 = random.randint(0, 2**64 - 1) + avaproofs.key1 = random.randint(0, 2**64 - 1) + avaproofs.prefilled_proofs = [] + avaproofs.shortids = [ + calculate_shortid( + avaproofs.key0, + avaproofs.key1, + proof.proofid)] + responding_outbound_avapeers.send_and_ping(avaproofs) + + # Now the node will request from all its peers at each time period + outbounds_getavaproofs = count_outbounds_getavaproofs() + num_outbound_avapeers = len(outbound_avapeers) + for i in range(12): + node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) + self.wait_until(lambda: count_outbounds_getavaproofs() + == outbounds_getavaproofs + num_outbound_avapeers) + outbounds_getavaproofs += num_outbound_avapeers with p2p_lock: assert all([p.message_count.get( @@ -179,6 +232,7 @@ assert_equal(len(avaproofs.shortids), expected_len) # Initially the node has 0 peer + self.restart_node(0) assert_equal(len(get_proof_ids(node)), 0) peer = node.add_p2p_connection(AvaP2PInterface())