Changeset View
Changeset View
Standalone View
Standalone View
test/functional/p2p_tx_download.py
Show All 24 Lines | from test_framework.util import ( | ||||
assert_equal, | assert_equal, | ||||
wait_until, | wait_until, | ||||
) | ) | ||||
import time | import time | ||||
class TestP2PConn(P2PInterface): | class TestP2PConn(P2PInterface): | ||||
def __init__(self): | def __init__(self, inv_type): | ||||
super().__init__() | super().__init__() | ||||
self.tx_getdata_count = 0 | self.inv_type = inv_type | ||||
self.getdata_count = 0 | |||||
def on_getdata(self, message): | def on_getdata(self, message): | ||||
for i in message.inv: | for i in message.inv: | ||||
if i.type & MSG_TYPE_MASK == MSG_TX: | if i.type & MSG_TYPE_MASK == self.inv_type: | ||||
self.tx_getdata_count += 1 | self.getdata_count += 1 | ||||
class NetConstants: | class NetConstants: | ||||
"""Constants from net_processing""" | """Constants from net_processing""" | ||||
def __init__(self, | def __init__(self, | ||||
getdata_interval, | getdata_interval, | ||||
inbound_peer_delay, | inbound_peer_delay, | ||||
overloaded_peer_delay, | overloaded_peer_delay, | ||||
max_getdata_in_flight, | max_getdata_in_flight, | ||||
max_peer_announcements, | max_peer_announcements, | ||||
): | ): | ||||
self.getdata_interval = getdata_interval | self.getdata_interval = getdata_interval | ||||
self.inbound_peer_delay = inbound_peer_delay | self.inbound_peer_delay = inbound_peer_delay | ||||
self.overloaded_peer_delay = overloaded_peer_delay | self.overloaded_peer_delay = overloaded_peer_delay | ||||
self.max_getdata_in_flight = max_getdata_in_flight | self.max_getdata_in_flight = max_getdata_in_flight | ||||
self.max_peer_announcements = max_peer_announcements | self.max_peer_announcements = max_peer_announcements | ||||
self.max_getdata_inbound_wait = self.getdata_interval + self.inbound_peer_delay | self.max_getdata_inbound_wait = self.getdata_interval + self.inbound_peer_delay | ||||
class TestContext: | class TestContext: | ||||
def __init__(self, constants): | def __init__(self, inv_type, constants): | ||||
self.inv_type = inv_type | |||||
self.constants = constants | self.constants = constants | ||||
def p2p_conn(self): | |||||
return TestP2PConn(self.inv_type) | |||||
TX_TEST_CONTEXT = TestContext( | TX_TEST_CONTEXT = TestContext( | ||||
MSG_TX, | |||||
NetConstants( | NetConstants( | ||||
getdata_interval=60, # seconds | getdata_interval=60, # seconds | ||||
inbound_peer_delay=2, # seconds | inbound_peer_delay=2, # seconds | ||||
overloaded_peer_delay=2, # seconds | overloaded_peer_delay=2, # seconds | ||||
max_getdata_in_flight=100, | max_getdata_in_flight=100, | ||||
max_peer_announcements=5000, | max_peer_announcements=5000, | ||||
), | ), | ||||
) | ) | ||||
Show All 9 Lines | class TxDownloadTest(BitcoinTestFramework): | ||||
def test_tx_requests(self, context): | def test_tx_requests(self, context): | ||||
self.log.info( | self.log.info( | ||||
"Test that we request transactions from all our peers, eventually") | "Test that we request transactions from all our peers, eventually") | ||||
txid = 0xdeadbeef | txid = 0xdeadbeef | ||||
self.log.info("Announce the txid from each incoming peer to node 0") | self.log.info("Announce the txid from each incoming peer to node 0") | ||||
msg = msg_inv([CInv(t=MSG_TX, h=txid)]) | msg = msg_inv([CInv(t=context.inv_type, h=txid)]) | ||||
for p in self.nodes[0].p2ps: | for p in self.nodes[0].p2ps: | ||||
p.send_and_ping(msg) | p.send_and_ping(msg) | ||||
outstanding_peer_index = [i for i in range(len(self.nodes[0].p2ps))] | outstanding_peer_index = [i for i in range(len(self.nodes[0].p2ps))] | ||||
def getdata_found(peer_index): | def getdata_found(peer_index): | ||||
p = self.nodes[0].p2ps[peer_index] | p = self.nodes[0].p2ps[peer_index] | ||||
with p2p_lock: | with p2p_lock: | ||||
Show All 27 Lines | def test_inv_block(self, context): | ||||
hexstring=tx, | hexstring=tx, | ||||
privkeys=[self.nodes[0].get_deterministic_priv_key().key], | privkeys=[self.nodes[0].get_deterministic_priv_key().key], | ||||
)['hex'] | )['hex'] | ||||
ctx = FromHex(CTransaction(), tx) | ctx = FromHex(CTransaction(), tx) | ||||
txid = int(ctx.rehash(), 16) | txid = int(ctx.rehash(), 16) | ||||
self.log.info( | self.log.info( | ||||
"Announce the transaction to all nodes from all {} incoming peers, but never send it".format(NUM_INBOUND)) | "Announce the transaction to all nodes from all {} incoming peers, but never send it".format(NUM_INBOUND)) | ||||
msg = msg_inv([CInv(t=MSG_TX, h=txid)]) | msg = msg_inv([CInv(t=context.inv_type, h=txid)]) | ||||
for p in self.peers: | for p in self.peers: | ||||
p.send_and_ping(msg) | p.send_and_ping(msg) | ||||
self.log.info("Put the tx in node 0's mempool") | self.log.info("Put the tx in node 0's mempool") | ||||
self.nodes[0].sendrawtransaction(tx) | self.nodes[0].sendrawtransaction(tx) | ||||
# Since node 1 is connected outbound to an honest peer (node 0), it | # Since node 1 is connected outbound to an honest peer (node 0), it | ||||
# should get the tx within a timeout. (Assuming that node 0 | # should get the tx within a timeout. (Assuming that node 0 | ||||
Show All 17 Lines | def test_in_flight_max(self, context): | ||||
self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format( | self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format( | ||||
max_getdata_in_flight)) | max_getdata_in_flight)) | ||||
txids = [i for i in range(max_getdata_in_flight + 2)] | txids = [i for i in range(max_getdata_in_flight + 2)] | ||||
p = self.nodes[0].p2ps[0] | p = self.nodes[0].p2ps[0] | ||||
with p2p_lock: | with p2p_lock: | ||||
p.tx_getdata_count = 0 | p.getdata_count = 0 | ||||
mock_time = int(time.time() + 1) | mock_time = int(time.time() + 1) | ||||
self.nodes[0].setmocktime(mock_time) | self.nodes[0].setmocktime(mock_time) | ||||
for i in range(max_getdata_in_flight): | for i in range(max_getdata_in_flight): | ||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) | p.send_message(msg_inv([CInv(t=context.inv_type, h=txids[i])])) | ||||
p.sync_with_ping() | p.sync_with_ping() | ||||
mock_time += context.constants.inbound_peer_delay | mock_time += context.constants.inbound_peer_delay | ||||
self.nodes[0].setmocktime(mock_time) | self.nodes[0].setmocktime(mock_time) | ||||
p.wait_until(lambda: p.tx_getdata_count >= max_getdata_in_flight) | p.wait_until(lambda: p.getdata_count >= max_getdata_in_flight) | ||||
for i in range(max_getdata_in_flight, len(txids)): | for i in range(max_getdata_in_flight, len(txids)): | ||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) | p.send_message(msg_inv([CInv(t=context.inv_type, h=txids[i])])) | ||||
p.sync_with_ping() | p.sync_with_ping() | ||||
self.log.info( | self.log.info( | ||||
"No more than {} requests should be seen within {} seconds after announcement".format( | "No more than {} requests should be seen within {} seconds after announcement".format( | ||||
max_getdata_in_flight, | max_getdata_in_flight, | ||||
max_inbound_delay - 1)) | max_inbound_delay - 1)) | ||||
self.nodes[0].setmocktime( | self.nodes[0].setmocktime( | ||||
mock_time + | mock_time + | ||||
max_inbound_delay - 1) | max_inbound_delay - 1) | ||||
p.sync_with_ping() | p.sync_with_ping() | ||||
with p2p_lock: | with p2p_lock: | ||||
assert_equal(p.tx_getdata_count, max_getdata_in_flight) | assert_equal(p.getdata_count, max_getdata_in_flight) | ||||
self.log.info( | self.log.info( | ||||
"If we wait {} seconds after announcement, we should eventually get more requests".format( | "If we wait {} seconds after announcement, we should eventually get more requests".format( | ||||
max_inbound_delay)) | max_inbound_delay)) | ||||
self.nodes[0].setmocktime( | self.nodes[0].setmocktime( | ||||
mock_time + | mock_time + | ||||
max_inbound_delay) | max_inbound_delay) | ||||
p.wait_until(lambda: p.tx_getdata_count == len(txids)) | p.wait_until(lambda: p.getdata_count == len(txids)) | ||||
def test_expiry_fallback(self, context): | def test_expiry_fallback(self, context): | ||||
self.log.info( | self.log.info( | ||||
'Check that expiry will select another peer for download') | 'Check that expiry will select another peer for download') | ||||
TXID = 0xffaa | TXID = 0xffaa | ||||
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer1 = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer2 = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
for p in [peer1, peer2]: | for p in [peer1, peer2]: | ||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) | p.send_message(msg_inv([CInv(t=context.inv_type, h=TXID)])) | ||||
# One of the peers is asked for the tx | # One of the peers is asked for the tx | ||||
peer2.wait_until( | peer2.wait_until( | ||||
lambda: sum( | lambda: sum( | ||||
p.tx_getdata_count for p in [ | p.getdata_count for p in [ | ||||
peer1, peer2]) == 1) | peer1, peer2]) == 1) | ||||
with p2p_lock: | with p2p_lock: | ||||
peer_expiry, peer_fallback = ( | peer_expiry, peer_fallback = ( | ||||
peer1, peer2) if peer1.tx_getdata_count == 1 else ( | peer1, peer2) if peer1.getdata_count == 1 else ( | ||||
peer2, peer1) | peer2, peer1) | ||||
assert_equal(peer_fallback.tx_getdata_count, 0) | assert_equal(peer_fallback.getdata_count, 0) | ||||
# Wait for request to peer_expiry to expire | # Wait for request to peer_expiry to expire | ||||
self.nodes[0].setmocktime( | self.nodes[0].setmocktime( | ||||
int(time.time()) + context.constants.getdata_interval + 1) | int(time.time()) + context.constants.getdata_interval + 1) | ||||
peer_fallback.wait_until( | peer_fallback.wait_until( | ||||
lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) | lambda: peer_fallback.getdata_count >= 1, timeout=1) | ||||
with p2p_lock: | with p2p_lock: | ||||
assert_equal(peer_fallback.tx_getdata_count, 1) | assert_equal(peer_fallback.getdata_count, 1) | ||||
# reset mocktime | # reset mocktime | ||||
self.restart_node(0) | self.restart_node(0) | ||||
def test_disconnect_fallback(self, context): | def test_disconnect_fallback(self, context): | ||||
self.log.info( | self.log.info( | ||||
'Check that disconnect will select another peer for download') | 'Check that disconnect will select another peer for download') | ||||
TXID = 0xffbb | TXID = 0xffbb | ||||
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer1 = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer2 = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
for p in [peer1, peer2]: | for p in [peer1, peer2]: | ||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) | p.send_message(msg_inv([CInv(t=context.inv_type, h=TXID)])) | ||||
# One of the peers is asked for the tx | # One of the peers is asked for the tx | ||||
peer2.wait_until( | peer2.wait_until( | ||||
lambda: sum( | lambda: sum( | ||||
p.tx_getdata_count for p in [ | p.getdata_count for p in [ | ||||
peer1, peer2]) == 1) | peer1, peer2]) == 1) | ||||
with p2p_lock: | with p2p_lock: | ||||
peer_disconnect, peer_fallback = ( | peer_disconnect, peer_fallback = ( | ||||
peer1, peer2) if peer1.tx_getdata_count == 1 else ( | peer1, peer2) if peer1.getdata_count == 1 else ( | ||||
peer2, peer1) | peer2, peer1) | ||||
assert_equal(peer_fallback.tx_getdata_count, 0) | assert_equal(peer_fallback.getdata_count, 0) | ||||
peer_disconnect.peer_disconnect() | peer_disconnect.peer_disconnect() | ||||
peer_disconnect.wait_for_disconnect() | peer_disconnect.wait_for_disconnect() | ||||
peer_fallback.wait_until( | peer_fallback.wait_until( | ||||
lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) | lambda: peer_fallback.getdata_count >= 1, timeout=1) | ||||
with p2p_lock: | with p2p_lock: | ||||
assert_equal(peer_fallback.tx_getdata_count, 1) | assert_equal(peer_fallback.getdata_count, 1) | ||||
def test_notfound_fallback(self, context): | def test_notfound_fallback(self, context): | ||||
self.log.info( | self.log.info( | ||||
'Check that notfounds will select another peer for download immediately') | 'Check that notfounds will select another peer for download immediately') | ||||
TXID = 0xffdd | TXID = 0xffdd | ||||
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer1 = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer2 = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
for p in [peer1, peer2]: | for p in [peer1, peer2]: | ||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=TXID)])) | p.send_message(msg_inv([CInv(t=context.inv_type, h=TXID)])) | ||||
# One of the peers is asked for the tx | # One of the peers is asked for the tx | ||||
peer2.wait_until( | peer2.wait_until( | ||||
lambda: sum( | lambda: sum( | ||||
p.tx_getdata_count for p in [ | p.getdata_count for p in [ | ||||
peer1, peer2]) == 1) | peer1, peer2]) == 1) | ||||
with p2p_lock: | with p2p_lock: | ||||
peer_notfound, peer_fallback = ( | peer_notfound, peer_fallback = ( | ||||
peer1, peer2) if peer1.tx_getdata_count == 1 else ( | peer1, peer2) if peer1.getdata_count == 1 else ( | ||||
peer2, peer1) | peer2, peer1) | ||||
assert_equal(peer_fallback.tx_getdata_count, 0) | assert_equal(peer_fallback.getdata_count, 0) | ||||
# Send notfound, so that fallback peer is selected | # Send notfound, so that fallback peer is selected | ||||
peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_TX, TXID)])) | peer_notfound.send_and_ping(msg_notfound( | ||||
vec=[CInv(context.inv_type, TXID)])) | |||||
peer_fallback.wait_until( | peer_fallback.wait_until( | ||||
lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) | lambda: peer_fallback.getdata_count >= 1, timeout=1) | ||||
with p2p_lock: | with p2p_lock: | ||||
assert_equal(peer_fallback.tx_getdata_count, 1) | assert_equal(peer_fallback.getdata_count, 1) | ||||
def test_preferred_inv(self, context): | def test_preferred_inv(self, context): | ||||
self.log.info( | self.log.info( | ||||
'Check that invs from preferred peers are downloaded immediately') | 'Check that invs from preferred peers are downloaded immediately') | ||||
self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1']) | self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1']) | ||||
peer = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
peer.send_message(msg_inv([CInv(t=MSG_TX, h=0xff00ff00)])) | peer.send_message(msg_inv([CInv(t=context.inv_type, h=0xff00ff00)])) | ||||
peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1) | peer.wait_until(lambda: peer.getdata_count >= 1, timeout=1) | ||||
with p2p_lock: | with p2p_lock: | ||||
assert_equal(peer.tx_getdata_count, 1) | assert_equal(peer.getdata_count, 1) | ||||
def test_large_inv_batch(self, context): | def test_large_inv_batch(self, context): | ||||
max_peer_announcements = context.constants.max_peer_announcements | max_peer_announcements = context.constants.max_peer_announcements | ||||
self.log.info( | self.log.info( | ||||
'Test how large inv batches are handled with relay permission') | 'Test how large inv batches are handled with relay permission') | ||||
self.restart_node(0, extra_args=['-whitelist=relay@127.0.0.1']) | self.restart_node(0, extra_args=['-whitelist=relay@127.0.0.1']) | ||||
peer = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
peer.send_message(msg_inv([CInv(t=MSG_TX, h=txid) | peer.send_message(msg_inv([CInv(t=context.inv_type, h=txid) | ||||
for txid in range(max_peer_announcements + 1)])) | for txid in range(max_peer_announcements + 1)])) | ||||
peer.wait_until(lambda: peer.tx_getdata_count == | peer.wait_until(lambda: peer.getdata_count == | ||||
max_peer_announcements + 1) | max_peer_announcements + 1) | ||||
self.log.info( | self.log.info( | ||||
'Test how large inv batches are handled without relay permission') | 'Test how large inv batches are handled without relay permission') | ||||
self.restart_node(0) | self.restart_node(0) | ||||
peer = self.nodes[0].add_p2p_connection(TestP2PConn()) | peer = self.nodes[0].add_p2p_connection(context.p2p_conn()) | ||||
peer.send_message(msg_inv([CInv(t=MSG_TX, h=txid) | peer.send_message(msg_inv([CInv(t=context.inv_type, h=txid) | ||||
for txid in range(max_peer_announcements + 1)])) | for txid in range(max_peer_announcements + 1)])) | ||||
peer.wait_until(lambda: peer.tx_getdata_count == | peer.wait_until(lambda: peer.getdata_count == | ||||
max_peer_announcements) | max_peer_announcements) | ||||
peer.sync_with_ping() | peer.sync_with_ping() | ||||
with p2p_lock: | with p2p_lock: | ||||
assert_equal(peer.tx_getdata_count, max_peer_announcements) | assert_equal(peer.getdata_count, max_peer_announcements) | ||||
def test_spurious_notfound(self, context): | def test_spurious_notfound(self, context): | ||||
self.log.info('Check that spurious notfound is ignored') | self.log.info('Check that spurious notfound is ignored') | ||||
self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)])) | self.nodes[0].p2ps[0].send_message( | ||||
msg_notfound(vec=[CInv(context.inv_type, 1)])) | |||||
def run_test(self): | def run_test(self): | ||||
context = TX_TEST_CONTEXT | context = TX_TEST_CONTEXT | ||||
# Run tests without mocktime that only need one peer-connection first, | # Run tests without mocktime that only need one peer-connection first, | ||||
# to avoid restarting the nodes | # to avoid restarting the nodes | ||||
self.test_expiry_fallback(context) | self.test_expiry_fallback(context) | ||||
self.test_disconnect_fallback(context) | self.test_disconnect_fallback(context) | ||||
Show All 10 Lines | def run_test(self): | ||||
self.start_nodes() | self.start_nodes() | ||||
self.connect_nodes(1, 0) | self.connect_nodes(1, 0) | ||||
# Setup the p2p connections | # Setup the p2p connections | ||||
self.peers = [] | self.peers = [] | ||||
for node in self.nodes: | for node in self.nodes: | ||||
for _ in range(NUM_INBOUND): | for _ in range(NUM_INBOUND): | ||||
self.peers.append( | self.peers.append( | ||||
node.add_p2p_connection( | node.add_p2p_connection( | ||||
TestP2PConn())) | context.p2p_conn())) | ||||
self.log.info( | self.log.info( | ||||
"Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) | "Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) | ||||
test(context) | test(context) | ||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
TxDownloadTest().main() | TxDownloadTest().main() |