diff --git a/test/functional/p2p_tx_download.py b/test/functional/p2p_tx_download.py --- a/test/functional/p2p_tx_download.py +++ b/test/functional/p2p_tx_download.py @@ -30,14 +30,15 @@ class TestP2PConn(P2PInterface): - def __init__(self): + def __init__(self, inv_type): super().__init__() - self.tx_getdata_count = 0 + self.inv_type = inv_type + self.getdata_count = 0 def on_getdata(self, message): for i in message.inv: - if i.type & MSG_TYPE_MASK == MSG_TX: - self.tx_getdata_count += 1 + if i.type & MSG_TYPE_MASK == self.inv_type: + self.getdata_count += 1 class NetConstants: @@ -59,11 +60,16 @@ class TestContext: - def __init__(self, constants): + def __init__(self, inv_type, constants): + self.inv_type = inv_type self.constants = constants + def p2p_conn(self): + return TestP2PConn(self.inv_type) + TX_TEST_CONTEXT = TestContext( + MSG_TX, NetConstants( getdata_interval=60, # seconds inbound_peer_delay=2, # seconds @@ -89,7 +95,7 @@ txid = 0xdeadbeef self.log.info("Announce the txid from each incoming peer to node 0") - msg = msg_inv([CInv(t=MSG_TX, h=txid)]) + msg = msg_inv([CInv(t=context.inv_type, h=txid)]) for p in self.nodes[0].p2ps: p.send_and_ping(msg) @@ -133,7 +139,7 @@ self.log.info( "Announce the transaction to all nodes from all {} incoming peers, but never send it".format(NUM_INBOUND)) - msg = msg_inv([CInv(t=MSG_TX, h=txid)]) + msg = msg_inv([CInv(t=context.inv_type, h=txid)]) for p in self.peers: p.send_and_ping(msg) @@ -167,18 +173,18 @@ p = self.nodes[0].p2ps[0] with p2p_lock: - p.tx_getdata_count = 0 + p.getdata_count = 0 mock_time = int(time.time() + 1) self.nodes[0].setmocktime(mock_time) for i in range(max_getdata_in_flight): - p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) + p.send_message(msg_inv([CInv(t=context.inv_type, h=txids[i])])) p.sync_with_ping() mock_time += context.constants.inbound_peer_delay self.nodes[0].setmocktime(mock_time) - p.wait_until(lambda: p.tx_getdata_count >= max_getdata_in_flight) + p.wait_until(lambda: p.getdata_count >= max_getdata_in_flight) for i in range(max_getdata_in_flight, len(txids)): - p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) + p.send_message(msg_inv([CInv(t=context.inv_type, h=txids[i])])) p.sync_with_ping() self.log.info( "No more than {} requests should be seen within {} seconds after announcement".format( @@ -189,40 +195,40 @@ max_inbound_delay - 1) p.sync_with_ping() with p2p_lock: - assert_equal(p.tx_getdata_count, max_getdata_in_flight) + assert_equal(p.getdata_count, max_getdata_in_flight) self.log.info( "If we wait {} seconds after announcement, we should eventually get more requests".format( max_inbound_delay)) self.nodes[0].setmocktime( mock_time + max_inbound_delay) - p.wait_until(lambda: p.tx_getdata_count == len(txids)) + p.wait_until(lambda: p.getdata_count == len(txids)) def test_expiry_fallback(self, context): self.log.info( 'Check that expiry will select another peer for download') TXID = 0xffaa - peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) - peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) + peer1 = self.nodes[0].add_p2p_connection(context.p2p_conn()) + peer2 = self.nodes[0].add_p2p_connection(context.p2p_conn()) for p in [peer1, peer2]: - p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) + p.send_message(msg_inv([CInv(t=context.inv_type, h=TXID)])) # One of the peers is asked for the tx peer2.wait_until( lambda: sum( - p.tx_getdata_count for p in [ + p.getdata_count for p in [ peer1, peer2]) == 1) with p2p_lock: peer_expiry, peer_fallback = ( - peer1, peer2) if peer1.tx_getdata_count == 1 else ( + peer1, peer2) if peer1.getdata_count == 1 else ( peer2, peer1) - assert_equal(peer_fallback.tx_getdata_count, 0) + assert_equal(peer_fallback.getdata_count, 0) # Wait for request to peer_expiry to expire self.nodes[0].setmocktime( int(time.time()) + context.constants.getdata_interval + 1) peer_fallback.wait_until( - lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) + lambda: peer_fallback.getdata_count >= 1, timeout=1) with p2p_lock: - assert_equal(peer_fallback.tx_getdata_count, 1) + assert_equal(peer_fallback.getdata_count, 1) # reset mocktime self.restart_node(0) @@ -230,88 +236,90 @@ self.log.info( 'Check that disconnect will select another peer for download') TXID = 0xffbb - peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) - peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) + peer1 = self.nodes[0].add_p2p_connection(context.p2p_conn()) + peer2 = self.nodes[0].add_p2p_connection(context.p2p_conn()) for p in [peer1, peer2]: - p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) + p.send_message(msg_inv([CInv(t=context.inv_type, h=TXID)])) # One of the peers is asked for the tx peer2.wait_until( lambda: sum( - p.tx_getdata_count for p in [ + p.getdata_count for p in [ peer1, peer2]) == 1) with p2p_lock: peer_disconnect, peer_fallback = ( - peer1, peer2) if peer1.tx_getdata_count == 1 else ( + peer1, peer2) if peer1.getdata_count == 1 else ( peer2, peer1) - assert_equal(peer_fallback.tx_getdata_count, 0) + assert_equal(peer_fallback.getdata_count, 0) peer_disconnect.peer_disconnect() peer_disconnect.wait_for_disconnect() peer_fallback.wait_until( - lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) + lambda: peer_fallback.getdata_count >= 1, timeout=1) with p2p_lock: - assert_equal(peer_fallback.tx_getdata_count, 1) + assert_equal(peer_fallback.getdata_count, 1) def test_notfound_fallback(self, context): self.log.info( 'Check that notfounds will select another peer for download immediately') TXID = 0xffdd - peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) - peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) + peer1 = self.nodes[0].add_p2p_connection(context.p2p_conn()) + peer2 = self.nodes[0].add_p2p_connection(context.p2p_conn()) for p in [peer1, peer2]: - p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) + p.send_message(msg_inv([CInv(t=context.inv_type, h=TXID)])) # One of the peers is asked for the tx peer2.wait_until( lambda: sum( - p.tx_getdata_count for p in [ + p.getdata_count for p in [ peer1, peer2]) == 1) with p2p_lock: peer_notfound, peer_fallback = ( - peer1, peer2) if peer1.tx_getdata_count == 1 else ( + peer1, peer2) if peer1.getdata_count == 1 else ( peer2, peer1) - assert_equal(peer_fallback.tx_getdata_count, 0) + assert_equal(peer_fallback.getdata_count, 0) # Send notfound, so that fallback peer is selected - peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_TX, TXID)])) + peer_notfound.send_and_ping(msg_notfound( + vec=[CInv(context.inv_type, TXID)])) peer_fallback.wait_until( - lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) + lambda: peer_fallback.getdata_count >= 1, timeout=1) with p2p_lock: - assert_equal(peer_fallback.tx_getdata_count, 1) + assert_equal(peer_fallback.getdata_count, 1) def test_preferred_inv(self, context): self.log.info( 'Check that invs from preferred peers are downloaded immediately') self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1']) - peer = self.nodes[0].add_p2p_connection(TestP2PConn()) - peer.send_message(msg_inv([CInv(t=MSG_TX, h=0xff00ff00)])) - peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1) + peer = self.nodes[0].add_p2p_connection(context.p2p_conn()) + peer.send_message(msg_inv([CInv(t=context.inv_type, h=0xff00ff00)])) + peer.wait_until(lambda: peer.getdata_count >= 1, timeout=1) with p2p_lock: - assert_equal(peer.tx_getdata_count, 1) + assert_equal(peer.getdata_count, 1) def test_large_inv_batch(self, context): max_peer_announcements = context.constants.max_peer_announcements self.log.info( 'Test how large inv batches are handled with relay permission') self.restart_node(0, extra_args=['-whitelist=relay@127.0.0.1']) - peer = self.nodes[0].add_p2p_connection(TestP2PConn()) - peer.send_message(msg_inv([CInv(t=MSG_TX, h=txid) + peer = self.nodes[0].add_p2p_connection(context.p2p_conn()) + peer.send_message(msg_inv([CInv(t=context.inv_type, h=txid) for txid in range(max_peer_announcements + 1)])) - peer.wait_until(lambda: peer.tx_getdata_count == + peer.wait_until(lambda: peer.getdata_count == max_peer_announcements + 1) self.log.info( 'Test how large inv batches are handled without relay permission') self.restart_node(0) - peer = self.nodes[0].add_p2p_connection(TestP2PConn()) - peer.send_message(msg_inv([CInv(t=MSG_TX, h=txid) + peer = self.nodes[0].add_p2p_connection(context.p2p_conn()) + peer.send_message(msg_inv([CInv(t=context.inv_type, h=txid) for txid in range(max_peer_announcements + 1)])) - peer.wait_until(lambda: peer.tx_getdata_count == + peer.wait_until(lambda: peer.getdata_count == max_peer_announcements) peer.sync_with_ping() with p2p_lock: - assert_equal(peer.tx_getdata_count, max_peer_announcements) + assert_equal(peer.getdata_count, max_peer_announcements) def test_spurious_notfound(self, context): self.log.info('Check that spurious notfound is ignored') - self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)])) + self.nodes[0].p2ps[0].send_message( + msg_notfound(vec=[CInv(context.inv_type, 1)])) def run_test(self): context = TX_TEST_CONTEXT @@ -338,7 +346,7 @@ for _ in range(NUM_INBOUND): self.peers.append( node.add_p2p_connection( - TestP2PConn())) + context.p2p_conn())) self.log.info( "Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) test(context)