diff --git a/test/functional/abc_p2p_getavaaddr.py b/test/functional/abc_p2p_getavaaddr.py index bc210ba23..926f6a06c 100755 --- a/test/functional/abc_p2p_getavaaddr.py +++ b/test/functional/abc_p2p_getavaaddr.py @@ -1,455 +1,471 @@ #!/usr/bin/env python3 # Copyright (c) 2022 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test getavaaddr p2p message""" import time from decimal import Decimal from test_framework.avatools import AvaP2PInterface, gen_proof from test_framework.messages import ( NODE_AVALANCHE, NODE_NETWORK, AvalancheVote, AvalancheVoteError, msg_getavaaddr, ) from test_framework.p2p import P2PInterface, p2p_lock from test_framework.test_framework import BitcoinTestFramework from test_framework.util import MAX_NODES, assert_equal, p2p_port # getavaaddr time interval in seconds, as defined in net_processing.cpp # A node will ignore repeated getavaaddr during this interval GETAVAADDR_INTERVAL = 2 * 60 # Address are sent every 30s on average, with a Poisson filter. Use a large # enough delay so it's very unlikely we don't get the message within this time. MAX_ADDR_SEND_DELAY = 5 * 60 # The interval between avalanche statistics computation AVALANCHE_STATISTICS_INTERVAL = 10 * 60 # The getavaaddr messages are sent every 2 to 5 minutes MAX_GETAVAADDR_DELAY = 5 * 60 class AddrReceiver(P2PInterface): def __init__(self): super().__init__() self.received_addrs = None def get_received_addrs(self): with p2p_lock: return self.received_addrs def on_addr(self, message): self.received_addrs = [] for addr in message.addrs: self.received_addrs.append(f"{addr.ip}:{addr.port}") def addr_received(self): return self.received_addrs is not None class MutedAvaP2PInterface(AvaP2PInterface): def __init__(self, node=None): super().__init__(node) self.is_responding = False self.privkey = None self.addr = None self.poll_received = 0 def set_addr(self, addr): self.addr = addr def on_avapoll(self, message): self.poll_received += 1 class AllYesAvaP2PInterface(MutedAvaP2PInterface): def __init__(self, node=None): super().__init__(node) self.is_responding = True def on_avapoll(self, message): self.send_avaresponse( message.poll.round, [ AvalancheVote( AvalancheVoteError.ACCEPTED, inv.hash) for inv in message.poll.invs], self.master_privkey if self.delegation is None else self.delegated_privkey) super().on_avapoll(message) class AvaAddrTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = False self.num_nodes = 1 self.extra_args = [['-enableavalanche=1', '-enableavalanchepeerdiscovery=1', + '-enableavalancheproofreplacement=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', '-whitelist=noban@127.0.0.1']] def check_all_peers_received_getavaaddr_once(self, avapeers): def received_all_getavaaddr(avapeers): with p2p_lock: return all([p.last_message.get("getavaaddr") for p in avapeers]) self.wait_until(lambda: received_all_getavaaddr(avapeers)) with p2p_lock: assert all([p.message_count.get( "getavaaddr", 0) == 1 for p in avapeers]) def getavaaddr_interval_test(self): node = self.nodes[0] # Init mock time mock_time = int(time.time()) node.setmocktime(mock_time) # Add some avalanche peers to the node for _ in range(10): node.add_p2p_connection(AllYesAvaP2PInterface(node)) # Build some statistics to ensure some addresses will be returned def all_peers_received_poll(): with p2p_lock: return all([avanode.poll_received > 0 for avanode in node.p2ps]) self.wait_until(all_peers_received_poll) node.mockscheduler(AVALANCHE_STATISTICS_INTERVAL) requester = node.add_p2p_connection(AddrReceiver()) requester.send_message(msg_getavaaddr()) # Remember the time we sent the getavaaddr message getavaddr_time = mock_time # Spamming more get getavaaddr has no effect for _ in range(10): with node.assert_debug_log(["Ignoring repeated getavaaddr from peer"]): requester.send_message(msg_getavaaddr()) # Move the time so we get an addr response mock_time += MAX_ADDR_SEND_DELAY node.setmocktime(mock_time) requester.wait_until(requester.addr_received) # Elapse the getavaaddr interval and check our message is now accepted # again mock_time = getavaddr_time + GETAVAADDR_INTERVAL node.setmocktime(mock_time) requester.send_message(msg_getavaaddr()) # We can get an addr message again mock_time += MAX_ADDR_SEND_DELAY node.setmocktime(mock_time) requester.wait_until(requester.addr_received) def address_test(self, maxaddrtosend, num_proof, num_avanode): self.restart_node( 0, extra_args=self.extra_args[0] + [f'-maxaddrtosend={maxaddrtosend}']) node = self.nodes[0] # Init mock time mock_time = int(time.time()) node.setmocktime(mock_time) # Create a bunch of proofs and associate each a bunch of nodes. avanodes = [] for _ in range(num_proof): master_privkey, proof = gen_proof(node) for n in range(num_avanode): avanode = AllYesAvaP2PInterface() if n % 2 else MutedAvaP2PInterface() avanode.master_privkey = master_privkey avanode.proof = proof node.add_p2p_connection(avanode) peerinfo = node.getpeerinfo()[-1] avanode.set_addr(peerinfo["addr"]) avanodes.append(avanode) responding_addresses = [ avanode.addr for avanode in avanodes if avanode.is_responding] assert_equal(len(responding_addresses), num_proof * num_avanode // 2) # Check we have what we expect def all_nodes_connected(): avapeers = node.getavalanchepeerinfo() if len(avapeers) != num_proof: return False for avapeer in avapeers: if avapeer['nodecount'] != num_avanode: return False return True self.wait_until(all_nodes_connected) # Force the availability score to diverge between the responding and the # muted nodes. def poll_all_for_block(): node.generate(1) with p2p_lock: return all([avanode.poll_received > ( 10 if avanode.is_responding else 0) for avanode in avanodes]) self.wait_until(poll_all_for_block) # Move the scheduler time 10 minutes forward so that so that our peers # get an availability score computed. node.mockscheduler(AVALANCHE_STATISTICS_INTERVAL) requester = node.add_p2p_connection(AddrReceiver()) requester.send_and_ping(msg_getavaaddr()) # Sanity check that the availability score is set up as expected peerinfo = node.getpeerinfo() muted_addresses = [ avanode.addr for avanode in avanodes if not avanode.is_responding] assert all([p['availability_score'] < 0 for p in peerinfo if p["addr"] in muted_addresses]) assert all([p['availability_score'] > 0 for p in peerinfo if p["addr"] in responding_addresses]) # Requester has no availability_score because it's not an avalanche # peer assert 'availability_score' not in peerinfo[-1].keys() mock_time += MAX_ADDR_SEND_DELAY node.setmocktime(mock_time) requester.wait_until(requester.addr_received) addresses = requester.get_received_addrs() assert_equal(len(addresses), min(maxaddrtosend, len(responding_addresses))) # Check all the addresses belong to responding peer assert all([address in responding_addresses for address in addresses]) def getavaaddr_outbound_test(self): self.log.info( "Check we send a getavaaddr message to our avalanche outbound peers") node = self.nodes[0] # Get rid of previously connected nodes node.disconnect_p2ps() avapeers = [] for i in range(16): avapeer = AvaP2PInterface() node.add_outbound_p2p_connection( avapeer, p2p_idx=i, ) avapeers.append(avapeer) self.check_all_peers_received_getavaaddr_once(avapeers) # Generate some block to poll for node.generate(1) # Because none of the avalanche peers is responding, our node should # fail out of option shortly and send a getavaaddr message to its # outbound avalanche peers. node.mockscheduler(MAX_GETAVAADDR_DELAY) def all_peers_received_getavaaddr(): with p2p_lock: return all([p.message_count.get( "getavaaddr", 0) > 1 for p in avapeers]) self.wait_until(all_peers_received_getavaaddr) def getavaaddr_manual_test(self): self.log.info( "Check we send a getavaaddr message to our manually connected peers that support avalanche") node = self.nodes[0] # Get rid of previously connected nodes node.disconnect_p2ps() def added_node_connected(ip_port): added_node_info = node.getaddednodeinfo(ip_port) return len( added_node_info) == 1 and added_node_info[0]['connected'] def connect_callback(address, port): self.log.debug("Connecting to {}:{}".format(address, port)) p = AvaP2PInterface() p2p_idx = 1 p.peer_accept_connection( connect_cb=connect_callback, connect_id=p2p_idx, net=node.chain, timeout_factor=node.timeout_factor, )() ip_port = f"127.0.01:{p2p_port(MAX_NODES - p2p_idx)}" node.addnode(node=ip_port, command="add") self.wait_until(lambda: added_node_connected(ip_port)) assert_equal(node.getpeerinfo()[-1]['addr'], ip_port) assert_equal(node.getpeerinfo()[-1]['connection_type'], 'manual') p.wait_until(lambda: p.last_message.get("getavaaddr")) # Generate some block to poll for node.generate(1) # Because our avalanche peer is not responding, our node should fail # out of option shortly and send another getavaaddr message. node.mockscheduler(MAX_GETAVAADDR_DELAY) p.wait_until(lambda: p.message_count.get("getavaaddr", 0) > 1) def getavaaddr_noquorum(self): self.log.info( "Check we send a getavaaddr message while our quorum is not established") node = self.nodes[0] self.restart_node(0, extra_args=self.extra_args[0] + [ '-avaminquorumstake=500000000', '-avaminquorumconnectedstakeratio=0.8', ]) avapeers = [] for i in range(16): avapeer = AllYesAvaP2PInterface(node) node.add_outbound_p2p_connection( avapeer, p2p_idx=i, connection_type="avalanche", services=NODE_NETWORK | NODE_AVALANCHE, ) avapeers.append(avapeer) peerinfo = node.getpeerinfo()[-1] avapeer.set_addr(peerinfo["addr"]) self.check_all_peers_received_getavaaddr_once(avapeers) def total_getavaaddr_msg(): with p2p_lock: return sum([p.message_count.get("getavaaddr", 0) for p in avapeers]) # Because we have not enough stake to start polling, we keep requesting # more addresses from all our peers total_getavaaddr = total_getavaaddr_msg() node.mockscheduler(MAX_GETAVAADDR_DELAY) self.wait_until(lambda: total_getavaaddr_msg() == total_getavaaddr + len(avapeers)) # Move the schedulter time forward to make sure we get statistics # computed. But since we did not start polling yet it should remain all # zero. node.mockscheduler(AVALANCHE_STATISTICS_INTERVAL) def wait_for_availability_score(): peerinfo = node.getpeerinfo() return all([p.get('availability_score', None) == Decimal(0) for p in peerinfo]) self.wait_until(wait_for_availability_score) requester = node.add_p2p_connection(AddrReceiver()) requester.send_and_ping(msg_getavaaddr()) node.setmocktime(int(time.time() + MAX_ADDR_SEND_DELAY)) # Check all the peers addresses are returned. requester.wait_until(requester.addr_received) addresses = requester.get_received_addrs() assert_equal(len(addresses), len(avapeers)) expected_addresses = [avapeer.addr for avapeer in avapeers] assert all([address in expected_addresses for address in addresses]) # Add more nodes so we reach the mininum quorum stake amount. for _ in range(4): avapeer = AllYesAvaP2PInterface(node) node.add_p2p_connection(avapeer) self.wait_until(lambda: node.getavalancheinfo()['active'] is True) - # From now only a single outbound peer is requested periodically + def is_vote_finalized(proof): + return node.getrawavalancheproof( + f"{proof.proofid:0{64}x}").get("finalized", False) + + # Wait until all proofs are finalized + self.wait_until(lambda: all([is_vote_finalized(p.proof) + for p in node.p2ps if isinstance(p, AvaP2PInterface)])) + + # Go through a round of getavaaddr requests. We don't know for sure how + # many will be sent as it depends on whether the peers responded fast + # enough during the polling phase. + total_getavaaddr = total_getavaaddr_msg() + node.mockscheduler(MAX_GETAVAADDR_DELAY) + self.wait_until(lambda: total_getavaaddr_msg() >= total_getavaaddr + 1) + + # But from now only a single outbound peer is requested periodically total_getavaaddr = total_getavaaddr_msg() node.mockscheduler(MAX_GETAVAADDR_DELAY) self.wait_until(lambda: total_getavaaddr_msg() == total_getavaaddr + 1) # And no more for p in avapeers: p.sync_send_with_ping() assert_equal(total_getavaaddr_msg(), total_getavaaddr + 1) def test_send_inbound_getavaaddr_until_quorum_is_established(self): self.log.info( "Check we also request the inbounds until the quorum is established") node = self.nodes[0] self.restart_node( 0, extra_args=self.extra_args[0] + ['-avaminquorumstake=1000000']) assert_equal(node.getavalancheinfo()['active'], False) outbound = MutedAvaP2PInterface() node.add_outbound_p2p_connection(outbound, p2p_idx=0) inbound = MutedAvaP2PInterface() node.add_p2p_connection(inbound) inbound.nodeid = node.getpeerinfo()[-1]['id'] def count_getavaaddr(peers): with p2p_lock: return sum([peer.message_count.get("getavaaddr", 0) for peer in peers]) # Upon connection only the outbound gets a getavaaddr message assert_equal(count_getavaaddr([inbound]), 0) self.wait_until(lambda: count_getavaaddr([outbound]) == 1) # Periodic send will include the inbound as well current_total = count_getavaaddr([inbound, outbound]) while count_getavaaddr([inbound]) == 0: node.mockscheduler(MAX_GETAVAADDR_DELAY) self.wait_until(lambda: count_getavaaddr( [inbound, outbound]) > current_total) current_total = count_getavaaddr([inbound, outbound]) # Connect the minimum amount of stake and nodes for _ in range(8): node.add_p2p_connection(AvaP2PInterface(node)) self.wait_until(lambda: node.getavalancheinfo()['active'] is True) # From now only the outbound is requested count_inbound = count_getavaaddr([inbound]) for _ in range(10): # Trigger a poll node.generate(1) inbound.sync_send_with_ping() node.mockscheduler(MAX_GETAVAADDR_DELAY) self.wait_until(lambda: count_getavaaddr( [inbound, outbound]) > current_total) current_total = count_getavaaddr([inbound, outbound]) assert_equal(count_getavaaddr([inbound]), count_inbound) def run_test(self): self.getavaaddr_interval_test() # Limited by maxaddrtosend self.address_test(maxaddrtosend=3, num_proof=2, num_avanode=8) # Limited by the number of good nodes self.address_test(maxaddrtosend=100, num_proof=2, num_avanode=8) self.getavaaddr_outbound_test() self.getavaaddr_manual_test() self.getavaaddr_noquorum() self.test_send_inbound_getavaaddr_until_quorum_is_established() if __name__ == '__main__': AvaAddrTest().main()