Changeset View
Changeset View
Standalone View
Standalone View
test/functional/p2p_tx_download.py
Show All 36 Lines | class TestP2PConn(P2PInterface): | ||||
def on_getdata(self, message): | def on_getdata(self, message): | ||||
for i in message.inv: | for i in message.inv: | ||||
if i.type & MSG_TYPE_MASK == MSG_TX: | if i.type & MSG_TYPE_MASK == MSG_TX: | ||||
self.tx_getdata_count += 1 | self.tx_getdata_count += 1 | ||||
# Constants from net_processing | # Constants from net_processing | ||||
GETDATA_TX_INTERVAL = 60 # seconds | GETDATA_TX_INTERVAL = 60 # seconds | ||||
MAX_GETDATA_RANDOM_DELAY = 2 # seconds | |||||
INBOUND_PEER_TX_DELAY = 2 # seconds | INBOUND_PEER_TX_DELAY = 2 # seconds | ||||
OVERLOADED_PEER_DELAY = 2 # seconds | |||||
MAX_GETDATA_IN_FLIGHT = 100 | MAX_GETDATA_IN_FLIGHT = 100 | ||||
TX_EXPIRY_INTERVAL = GETDATA_TX_INTERVAL * 10 | |||||
# Python test constants | # Python test constants | ||||
NUM_INBOUND = 10 | NUM_INBOUND = 10 | ||||
MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + \ | MAX_GETDATA_INBOUND_WAIT = GETDATA_TX_INTERVAL + INBOUND_PEER_TX_DELAY | ||||
MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY | |||||
class TxDownloadTest(BitcoinTestFramework): | class TxDownloadTest(BitcoinTestFramework): | ||||
def set_test_params(self): | def set_test_params(self): | ||||
self.setup_clean_chain = False | self.setup_clean_chain = False | ||||
self.num_nodes = 2 | self.num_nodes = 2 | ||||
def test_tx_requests(self): | def test_tx_requests(self): | ||||
▲ Show 20 Lines • Show All 58 Lines • ▼ Show 20 Lines | def test_inv_block(self): | ||||
# should get the tx within a timeout. (Assuming that node 0 | # should get the tx within a timeout. (Assuming that node 0 | ||||
# announced the tx within the timeout) | # announced the tx within the timeout) | ||||
# The timeout is the sum of | # The timeout is the sum of | ||||
# * the worst case until the tx is first requested from an inbound | # * the worst case until the tx is first requested from an inbound | ||||
# peer, plus | # peer, plus | ||||
# * the first time it is re-requested from the outbound peer, plus | # * the first time it is re-requested from the outbound peer, plus | ||||
# * 2 seconds to avoid races | # * 2 seconds to avoid races | ||||
assert self.nodes[1].getpeerinfo()[0]['inbound'] is False | assert self.nodes[1].getpeerinfo()[0]['inbound'] is False | ||||
timeout = 2 + (MAX_GETDATA_RANDOM_DELAY + INBOUND_PEER_TX_DELAY) + ( | timeout = 2 + INBOUND_PEER_TX_DELAY + GETDATA_TX_INTERVAL | ||||
GETDATA_TX_INTERVAL + MAX_GETDATA_RANDOM_DELAY) | |||||
self.log.info( | self.log.info( | ||||
"Tx should be received at node 1 after {} seconds".format(timeout)) | "Tx should be received at node 1 after {} seconds".format(timeout)) | ||||
self.sync_mempools(timeout=timeout) | self.sync_mempools(timeout=timeout) | ||||
def test_in_flight_max(self): | def test_in_flight_max(self): | ||||
self.log.info("Test that we don't request more than {} transactions from any peer, every {} minutes".format( | self.log.info("Test that we don't load peers with more than {} transaction requests immediately".format( | ||||
MAX_GETDATA_IN_FLIGHT, TX_EXPIRY_INTERVAL / 60)) | MAX_GETDATA_IN_FLIGHT)) | ||||
txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)] | txids = [i for i in range(MAX_GETDATA_IN_FLIGHT + 2)] | ||||
p = self.nodes[0].p2ps[0] | p = self.nodes[0].p2ps[0] | ||||
with mininode_lock: | with mininode_lock: | ||||
p.tx_getdata_count = 0 | p.tx_getdata_count = 0 | ||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=i) for i in txids])) | mock_time = int(time.time() + 1) | ||||
wait_until(lambda: p.tx_getdata_count >= | self.nodes[0].setmocktime(mock_time) | ||||
MAX_GETDATA_IN_FLIGHT, lock=mininode_lock) | for i in range(MAX_GETDATA_IN_FLIGHT): | ||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) | |||||
p.sync_with_ping() | |||||
mock_time += INBOUND_PEER_TX_DELAY | |||||
self.nodes[0].setmocktime(mock_time) | |||||
p.wait_until(lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT) | |||||
for i in range(MAX_GETDATA_IN_FLIGHT, len(txids)): | |||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=txids[i])])) | |||||
p.sync_with_ping() | |||||
self.log.info( | |||||
"No more than {} requests should be seen within {} seconds after announcement".format( | |||||
MAX_GETDATA_IN_FLIGHT, | |||||
INBOUND_PEER_TX_DELAY + | |||||
OVERLOADED_PEER_DELAY - | |||||
1)) | |||||
self.nodes[0].setmocktime( | |||||
mock_time + | |||||
INBOUND_PEER_TX_DELAY + | |||||
OVERLOADED_PEER_DELAY - | |||||
1) | |||||
p.sync_with_ping() | |||||
with mininode_lock: | with mininode_lock: | ||||
assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT) | assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT) | ||||
self.log.info( | |||||
"If we wait {} seconds after announcement, we should eventually get more requests".format( | |||||
INBOUND_PEER_TX_DELAY + | |||||
OVERLOADED_PEER_DELAY)) | |||||
self.nodes[0].setmocktime( | |||||
mock_time + | |||||
INBOUND_PEER_TX_DELAY + | |||||
OVERLOADED_PEER_DELAY) | |||||
p.wait_until(lambda: p.tx_getdata_count == len(txids)) | |||||
def test_expiry_fallback(self): | |||||
self.log.info( | self.log.info( | ||||
"Now check that if we send a NOTFOUND for a transaction, we'll get one more request") | 'Check that expiry will select another peer for download') | ||||
p.send_message(msg_notfound(vec=[CInv(t=MSG_TX, h=txids[0])])) | WTXID = 0xffaa | ||||
wait_until( | peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) | ||||
lambda: p.tx_getdata_count >= MAX_GETDATA_IN_FLIGHT + 1, | peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) | ||||
timeout=10, | for p in [peer1, peer2]: | ||||
lock=mininode_lock) | p.send_message(msg_inv([CInv(t=MSG_TX, h=WTXID)])) | ||||
with mininode_lock: | # One of the peers is asked for the tx | ||||
assert_equal(p.tx_getdata_count, MAX_GETDATA_IN_FLIGHT + 1) | peer2.wait_until( | ||||
lambda: sum( | |||||
WAIT_TIME = TX_EXPIRY_INTERVAL // 2 + TX_EXPIRY_INTERVAL | p.tx_getdata_count for p in [ | ||||
self.log.info( | peer1, peer2]) == 1) | ||||
"if we wait about {} minutes, we should eventually get more requests".format( | with mininode_lock: | ||||
WAIT_TIME / 60)) | peer_expiry, peer_fallback = ( | ||||
self.nodes[0].setmocktime(int(time.time() + WAIT_TIME)) | peer1, peer2) if peer1.tx_getdata_count == 1 else ( | ||||
wait_until(lambda: p.tx_getdata_count == MAX_GETDATA_IN_FLIGHT + 2) | peer2, peer1) | ||||
self.nodes[0].setmocktime(0) | assert_equal(peer_fallback.tx_getdata_count, 0) | ||||
# Wait for request to peer_expiry to expire | |||||
self.nodes[0].setmocktime(int(time.time()) + GETDATA_TX_INTERVAL + 1) | |||||
peer_fallback.wait_until( | |||||
lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) | |||||
with mininode_lock: | |||||
assert_equal(peer_fallback.tx_getdata_count, 1) | |||||
self.restart_node(0) # reset mocktime | |||||
def test_disconnect_fallback(self): | |||||
self.log.info( | |||||
'Check that disconnect will select another peer for download') | |||||
WTXID = 0xffbb | |||||
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) | |||||
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) | |||||
for p in [peer1, peer2]: | |||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=WTXID)])) | |||||
# One of the peers is asked for the tx | |||||
peer2.wait_until( | |||||
lambda: sum( | |||||
p.tx_getdata_count for p in [ | |||||
peer1, peer2]) == 1) | |||||
with mininode_lock: | |||||
peer_disconnect, peer_fallback = ( | |||||
peer1, peer2) if peer1.tx_getdata_count == 1 else ( | |||||
peer2, peer1) | |||||
assert_equal(peer_fallback.tx_getdata_count, 0) | |||||
peer_disconnect.peer_disconnect() | |||||
peer_disconnect.wait_for_disconnect() | |||||
peer_fallback.wait_until( | |||||
lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) | |||||
with mininode_lock: | |||||
assert_equal(peer_fallback.tx_getdata_count, 1) | |||||
def test_notfound_fallback(self): | |||||
self.log.info( | |||||
'Check that notfounds will select another peer for download immediately') | |||||
WTXID = 0xffdd | |||||
peer1 = self.nodes[0].add_p2p_connection(TestP2PConn()) | |||||
peer2 = self.nodes[0].add_p2p_connection(TestP2PConn()) | |||||
for p in [peer1, peer2]: | |||||
p.send_message(msg_inv([CInv(t=MSG_TX, h=WTXID)])) | |||||
# One of the peers is asked for the tx | |||||
peer2.wait_until( | |||||
lambda: sum( | |||||
p.tx_getdata_count for p in [ | |||||
peer1, peer2]) == 1) | |||||
with mininode_lock: | |||||
peer_notfound, peer_fallback = ( | |||||
peer1, peer2) if peer1.tx_getdata_count == 1 else ( | |||||
peer2, peer1) | |||||
assert_equal(peer_fallback.tx_getdata_count, 0) | |||||
# Send notfound, so that fallback peer is selected | |||||
peer_notfound.send_and_ping(msg_notfound(vec=[CInv(MSG_TX, WTXID)])) | |||||
peer_fallback.wait_until( | |||||
lambda: peer_fallback.tx_getdata_count >= 1, timeout=1) | |||||
with mininode_lock: | |||||
assert_equal(peer_fallback.tx_getdata_count, 1) | |||||
def test_preferred_inv(self): | |||||
self.log.info( | |||||
'Check that invs from preferred peers are downloaded immediately') | |||||
self.restart_node(0, extra_args=['-whitelist=noban@127.0.0.1']) | |||||
peer = self.nodes[0].add_p2p_connection(TestP2PConn()) | |||||
peer.send_message(msg_inv([CInv(t=MSG_TX, h=0xff00ff00)])) | |||||
peer.wait_until(lambda: peer.tx_getdata_count >= 1, timeout=1) | |||||
with mininode_lock: | |||||
assert_equal(peer.tx_getdata_count, 1) | |||||
def test_spurious_notfound(self): | def test_spurious_notfound(self): | ||||
self.log.info('Check that spurious notfound is ignored') | self.log.info('Check that spurious notfound is ignored') | ||||
self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)])) | self.nodes[0].p2ps[0].send_message(msg_notfound(vec=[CInv(MSG_TX, 1)])) | ||||
def run_test(self): | def run_test(self): | ||||
# Run tests without mocktime that only need one peer-connection first, | |||||
# to avoid restarting the nodes | |||||
self.test_expiry_fallback() | |||||
self.test_disconnect_fallback() | |||||
self.test_notfound_fallback() | |||||
self.test_preferred_inv() | |||||
self.test_spurious_notfound() | |||||
# Run each test against new bitcoind instances, as setting mocktimes has long-term effects on when | |||||
# the next trickle relay event happens. | |||||
for test in [self.test_in_flight_max, | |||||
self.test_inv_block, self.test_tx_requests]: | |||||
self.stop_nodes() | |||||
self.start_nodes() | |||||
self.connect_nodes(1, 0) | |||||
# Setup the p2p connections | # Setup the p2p connections | ||||
self.peers = [] | self.peers = [] | ||||
for node in self.nodes: | for node in self.nodes: | ||||
for i in range(NUM_INBOUND): | for _ in range(NUM_INBOUND): | ||||
self.peers.append(node.add_p2p_connection(TestP2PConn())) | self.peers.append(node.add_p2p_connection(TestP2PConn())) | ||||
self.log.info( | self.log.info( | ||||
"Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) | "Nodes are setup with {} incoming connections each".format(NUM_INBOUND)) | ||||
test() | |||||
self.test_spurious_notfound() | |||||
# Test the in-flight max first, because we want no transactions in | |||||
# flight ahead of this test. | |||||
self.test_in_flight_max() | |||||
self.test_inv_block() | |||||
self.test_tx_requests() | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
TxDownloadTest().main() | TxDownloadTest().main() |