diff --git a/doc/functional-tests.md b/doc/functional-tests.md --- a/doc/functional-tests.md +++ b/doc/functional-tests.md @@ -262,7 +262,7 @@ wrappers for them, `msg_block`, `msg_tx`, etc). - P2P tests have two threads. One thread handles all network communication -with the bitcoind(s) being tested (using python's asyncore package); the other +with the bitcoind(s) being tested in a callback-based event loop; the other implements the test logic. - `P2PConnection` is the class used to connect to a bitcoind. `P2PInterface` @@ -270,10 +270,6 @@ the Bitcoin Core node application logic. For custom behaviour, subclass the P2PInterface object and override the callback methods. -- Call `network_thread_start()` after all `P2PInterface` objects are created to -start the networking thread. (Continue with the test logic in your existing -thread.) - - Can be used to write tests where specific P2P protocol behavior is tested. Examples tests are `p2p_unrequested_blocks.py`, `p2p_compactblocks.py`. diff --git a/test/functional/abc-invalid-chains.py b/test/functional/abc-invalid-chains.py --- a/test/functional/abc-invalid-chains.py +++ b/test/functional/abc-invalid-chains.py @@ -6,7 +6,7 @@ import time from test_framework.test_framework import BitcoinTestFramework -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.util import assert_equal from test_framework.blocktools import ( create_block, @@ -46,7 +46,6 @@ def run_test(self): node = self.nodes[0] node.add_p2p_connection(P2PDataStore()) - network_thread_start() node.p2p.wait_for_verack() self.genesis_hash = int(node.getbestblockhash(), 16) diff --git a/test/functional/abc-invalid-message.py b/test/functional/abc-invalid-message.py --- a/test/functional/abc-invalid-message.py +++ b/test/functional/abc-invalid-message.py @@ -15,7 +15,6 @@ MAGIC_BYTES, mininode_lock, msg_ping, - network_thread_start, P2PInterface, ) from test_framework.test_framework import BitcoinTestFramework @@ -38,7 +37,7 @@ class BadVersionP2PInterface(P2PInterface): def peer_connect(self, *args, services=NODE_NETWORK, send_version=False, **kwargs): - super().peer_connect(*args, send_version=send_version, **kwargs) + create_conn = super().peer_connect(*args, send_version=send_version, **kwargs) # Send version message with invalid checksum vt = msg_version() @@ -48,8 +47,11 @@ vt.addrFrom.ip = "0.0.0.0" vt.addrFrom.port = 0 invalid_vt = msg_bad_checksum(self, vt) - # Will be sent right after handle_connect - self.sendbuf = invalid_vt + # Will be sent right after connection_made + self.on_connection_send_msg = invalid_vt + self.on_connection_send_msg_is_raw = True + + return create_conn class InvalidMessageTest(BitcoinTestFramework): @@ -66,8 +68,6 @@ interface = P2PInterface() connection = self.nodes[1].add_p2p_connection(interface) - network_thread_start() - # The invalid version message should cause a disconnect on the first # connection because we are now banned bad_interface.wait_for_disconnect() diff --git a/test/functional/abc-mempool-accept-txn.py b/test/functional/abc-mempool-accept-txn.py --- a/test/functional/abc-mempool-accept-txn.py +++ b/test/functional/abc-mempool-accept-txn.py @@ -25,7 +25,6 @@ ToHex, ) from test_framework.mininode import ( - network_thread_start, P2PDataStore, ) from test_framework.script import ( @@ -144,7 +143,6 @@ def run_test(self): node = self.nodes[0] node.add_p2p_connection(P2PDataStore()) - network_thread_start() node.p2p.wait_for_verack() self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16) diff --git a/test/functional/abc-mempool-coherence-on-activations.py b/test/functional/abc-mempool-coherence-on-activations.py --- a/test/functional/abc-mempool-coherence-on-activations.py +++ b/test/functional/abc-mempool-coherence-on-activations.py @@ -34,7 +34,7 @@ CTxOut, ToHex, ) -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, OP_CHECKSIG, @@ -155,7 +155,6 @@ def run_test(self): node = self.nodes[0] node.add_p2p_connection(P2PDataStore()) - network_thread_start() node.p2p.wait_for_verack() node.setmocktime(ACTIVATION_TIME) diff --git a/test/functional/abc-minimaldata-activation.py b/test/functional/abc-minimaldata-activation.py --- a/test/functional/abc-minimaldata-activation.py +++ b/test/functional/abc-minimaldata-activation.py @@ -28,8 +28,6 @@ ToHex, ) from test_framework.mininode import ( - network_thread_join, - network_thread_start, P2PDataStore, ) from test_framework.script import ( @@ -74,7 +72,6 @@ Helper to connect and wait for version handshake.""" for _ in range(num_connections): self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() def reconnect_p2p(self, **kwargs): @@ -83,7 +80,6 @@ The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() - network_thread_join() self.bootstrap_p2p(**kwargs) def getbestblock(self, node): diff --git a/test/functional/abc-p2p-compactblocks.py b/test/functional/abc-p2p-compactblocks.py --- a/test/functional/abc-p2p-compactblocks.py +++ b/test/functional/abc-p2p-compactblocks.py @@ -34,7 +34,6 @@ ) from test_framework.mininode import ( mininode_lock, - network_thread_start, P2PDataStore, P2PInterface, ) @@ -227,7 +226,6 @@ node = self.nodes[0] default_p2p = node.add_p2p_connection(P2PDataStore()) test_p2p = node.add_p2p_connection(TestP2PConn()) - network_thread_start() default_p2p.wait_for_verack() test_p2p.wait_for_verack() diff --git a/test/functional/abc-p2p-fullblocktest.py b/test/functional/abc-p2p-fullblocktest.py --- a/test/functional/abc-p2p-fullblocktest.py +++ b/test/functional/abc-p2p-fullblocktest.py @@ -35,7 +35,7 @@ ser_compact_size, ToHex, ) -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, hash160, @@ -201,7 +201,6 @@ def run_test(self): node = self.nodes[0] node.add_p2p_connection(P2PDataStore()) - network_thread_start() node.p2p.wait_for_verack() # Set the blocksize to 2MB as initial condition node.setexcessiveblock(self.excessive_block_size) diff --git a/test/functional/abc-replay-protection.py b/test/functional/abc-replay-protection.py --- a/test/functional/abc-replay-protection.py +++ b/test/functional/abc-replay-protection.py @@ -26,7 +26,7 @@ CTxOut, ToHex, ) -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, OP_CHECKSIG, @@ -87,7 +87,6 @@ def run_test(self): node = self.nodes[0] node.add_p2p_connection(P2PDataStore()) - network_thread_start() node.p2p.wait_for_verack() node.setmocktime(REPLAY_PROTECTION_START_TIME) diff --git a/test/functional/abc-schnorr.py b/test/functional/abc-schnorr.py --- a/test/functional/abc-schnorr.py +++ b/test/functional/abc-schnorr.py @@ -27,8 +27,6 @@ ToHex, ) from test_framework.mininode import ( - network_thread_join, - network_thread_start, P2PDataStore, ) from test_framework import schnorr @@ -73,7 +71,6 @@ Helper to connect and wait for version handshake.""" for _ in range(num_connections): self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() def reconnect_p2p(self, **kwargs): @@ -82,7 +79,6 @@ The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() - network_thread_join() self.bootstrap_p2p(**kwargs) def getbestblock(self, node): diff --git a/test/functional/abc-schnorrmultisig-activation.py b/test/functional/abc-schnorrmultisig-activation.py --- a/test/functional/abc-schnorrmultisig-activation.py +++ b/test/functional/abc-schnorrmultisig-activation.py @@ -31,8 +31,6 @@ ToHex, ) from test_framework.mininode import ( - network_thread_join, - network_thread_start, P2PDataStore, ) from test_framework import schnorr @@ -98,7 +96,6 @@ Helper to connect and wait for version handshake.""" for _ in range(num_connections): self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() def reconnect_p2p(self, **kwargs): @@ -107,7 +104,6 @@ The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() - network_thread_join() self.bootstrap_p2p(**kwargs) def getbestblock(self, node): diff --git a/test/functional/abc-segwit-recovery.py b/test/functional/abc-segwit-recovery.py --- a/test/functional/abc-segwit-recovery.py +++ b/test/functional/abc-segwit-recovery.py @@ -24,8 +24,6 @@ ToHex, ) from test_framework.mininode import ( - network_thread_join, - network_thread_start, P2PDataStore, ) from test_framework.script import ( @@ -112,7 +110,6 @@ for node in self.nodes: for _ in range(num_connections): node.add_p2p_connection(P2PDataStore()) - network_thread_start() for node in self.nodes: node.p2p.wait_for_verack() @@ -123,7 +120,6 @@ method reconnects the p2p and restarts the network thread.""" for node in self.nodes: node.disconnect_p2ps() - network_thread_join() self.bootstrap_p2p(**kwargs) def run_test(self): diff --git a/test/functional/abc-sync-chain.py b/test/functional/abc-sync-chain.py --- a/test/functional/abc-sync-chain.py +++ b/test/functional/abc-sync-chain.py @@ -13,9 +13,9 @@ from test_framework.blocktools import create_block, create_coinbase from test_framework.messages import CBlockHeader, msg_block, msg_headers -from test_framework.mininode import network_thread_start, P2PInterface +from test_framework.mininode import P2PInterface from test_framework.test_framework import BitcoinTestFramework -from test_framework.util import p2p_port, wait_until +from test_framework.util import wait_until NUM_IBD_BLOCKS = 50 @@ -39,13 +39,9 @@ ["-minimumchainwork={:#x}".format(202 + 2 * NUM_IBD_BLOCKS)]] def run_test(self): - node0conn = BaseNode() - node0conn.peer_connect('127.0.0.1', p2p_port(0)) - - network_thread_start() - node0conn.wait_for_verack() - node0 = self.nodes[0] + node0conn = node0.add_p2p_connection(BaseNode()) + node0.p2p.wait_for_verack() tip = int(node0.getbestblockhash(), 16) height = node0.getblockcount() + 1 diff --git a/test/functional/abc-transaction-ordering.py b/test/functional/abc-transaction-ordering.py --- a/test/functional/abc-transaction-ordering.py +++ b/test/functional/abc-transaction-ordering.py @@ -22,7 +22,7 @@ CTxIn, CTxOut, ) -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, OP_RETURN, @@ -130,7 +130,6 @@ def run_test(self): node = self.nodes[0] node.add_p2p_connection(P2PDataStore()) - network_thread_start() node.p2p.wait_for_verack() self.genesis_hash = int(node.getbestblockhash(), 16) diff --git a/test/functional/example_test.py b/test/functional/example_test.py --- a/test/functional/example_test.py +++ b/test/functional/example_test.py @@ -19,8 +19,6 @@ from test_framework.mininode import ( P2PInterface, mininode_lock, - network_thread_join, - network_thread_start, ) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( @@ -137,9 +135,6 @@ # Create P2P connections to two of the nodes self.nodes[0].add_p2p_connection(BaseNode()) - # Start up network handling in another thread. This needs to be called - # after the P2P connections have been created. - network_thread_start() # wait_for_verack ensures that the P2P connection is fully up. self.nodes[0].p2p.wait_for_verack() @@ -194,14 +189,9 @@ connect_nodes(self.nodes[1], self.nodes[2]) self.log.info("Add P2P connection to node2") - # We can't add additional P2P connections once the network thread has started. Disconnect the connection - # to node0, wait for the network thread to terminate, then connect to node2. This is specific to - # the current implementation of the network thread and may be improved in future. self.nodes[0].disconnect_p2ps() - network_thread_join() self.nodes[2].add_p2p_connection(BaseNode()) - network_thread_start() self.nodes[2].p2p.wait_for_verack() self.log.info( diff --git a/test/functional/feature_assumevalid.py b/test/functional/feature_assumevalid.py --- a/test/functional/feature_assumevalid.py +++ b/test/functional/feature_assumevalid.py @@ -42,11 +42,7 @@ msg_block, msg_headers, ) -from test_framework.mininode import ( - network_thread_join, - network_thread_start, - P2PInterface, -) +from test_framework.mininode import P2PInterface from test_framework.script import (CScript, OP_TRUE) from test_framework.test_framework import BitcoinTestFramework from test_framework.txtools import pad_tx @@ -106,8 +102,6 @@ # Connect to node0 p2p0 = self.nodes[0].add_p2p_connection(BaseNode()) - - network_thread_start() self.nodes[0].p2p.wait_for_verack() # Build the blockchain @@ -175,9 +169,7 @@ self.block_time += 1 height += 1 - # We're adding new connections so terminate the network thread self.nodes[0].disconnect_p2ps() - network_thread_join() # Start node1 and node2 with assumevalid so they accept a block with a bad signature. self.start_node(1, extra_args=["-assumevalid=" + hex(block102.sha256)]) @@ -187,8 +179,6 @@ p2p1 = self.nodes[1].add_p2p_connection(BaseNode()) p2p2 = self.nodes[2].add_p2p_connection(BaseNode()) - network_thread_start() - p2p0.wait_for_verack() p2p1.wait_for_verack() p2p2.wait_for_verack() diff --git a/test/functional/feature_block.py b/test/functional/feature_block.py --- a/test/functional/feature_block.py +++ b/test/functional/feature_block.py @@ -27,7 +27,7 @@ uint256_from_compact, uint256_from_str, ) -from test_framework.mininode import P2PDataStore, network_thread_start, network_thread_join +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, hash160, @@ -1399,7 +1399,6 @@ Helper to connect and wait for version handshake.""" self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() # We need to wait for the initial getheaders from the peer before we # start populating our blockstore. If we don't, then we may run ahead # to the next subtest before we receive the getheaders. We'd then send @@ -1414,7 +1413,6 @@ The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() - network_thread_join() self.bootstrap_p2p() def sync_blocks(self, blocks, success=True, reject_reason=None, request_block=True, reconnect=False, timeout=60): diff --git a/test/functional/feature_cltv.py b/test/functional/feature_cltv.py --- a/test/functional/feature_cltv.py +++ b/test/functional/feature_cltv.py @@ -17,7 +17,6 @@ ToHex, ) from test_framework.mininode import ( - network_thread_start, P2PInterface, ) from test_framework.script import ( @@ -100,10 +99,6 @@ def run_test(self): self.nodes[0].add_p2p_connection(P2PInterface()) - - network_thread_start() - - # wait_for_verack ensures that the P2P connection is fully up. self.nodes[0].p2p.wait_for_verack() self.log.info("Mining {} blocks".format(CLTV_HEIGHT - 2)) diff --git a/test/functional/feature_csv_activation.py b/test/functional/feature_csv_activation.py --- a/test/functional/feature_csv_activation.py +++ b/test/functional/feature_csv_activation.py @@ -50,7 +50,7 @@ make_conform_to_ctor, ) from test_framework.messages import COIN, CTransaction, FromHex, ToHex -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.script import ( CScript, OP_CHECKSEQUENCEVERIFY, @@ -226,7 +226,6 @@ def run_test(self): self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() self.log.info("Generate blocks in the past for coinbase outputs.") diff --git a/test/functional/feature_dersig.py b/test/functional/feature_dersig.py --- a/test/functional/feature_dersig.py +++ b/test/functional/feature_dersig.py @@ -11,7 +11,6 @@ from test_framework.messages import CTransaction, FromHex, msg_block, ToHex from test_framework.mininode import ( mininode_lock, - network_thread_start, P2PInterface, ) from test_framework.script import CScript @@ -62,8 +61,6 @@ def run_test(self): self.nodes[0].add_p2p_connection(P2PInterface()) - network_thread_start() - # wait_for_verack ensures that the P2P connection is fully up. self.nodes[0].p2p.wait_for_verack() diff --git a/test/functional/feature_maxuploadtarget.py b/test/functional/feature_maxuploadtarget.py --- a/test/functional/feature_maxuploadtarget.py +++ b/test/functional/feature_maxuploadtarget.py @@ -17,7 +17,7 @@ from test_framework.cdefs import LEGACY_MAX_BLOCK_SIZE from test_framework.blocktools import mine_big_block from test_framework.messages import CInv, msg_getdata -from test_framework.mininode import network_thread_start, P2PInterface +from test_framework.mininode import P2PInterface from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal @@ -64,7 +64,6 @@ for _ in range(3): p2p_conns.append(self.nodes[0].add_p2p_connection(TestP2PConn())) - network_thread_start() for p2pc in p2p_conns: p2pc.wait_for_verack() @@ -158,8 +157,6 @@ # Reconnect to self.nodes[0] self.nodes[0].add_p2p_connection(TestP2PConn()) - - network_thread_start() self.nodes[0].p2p.wait_for_verack() # retrieve 20 blocks which should be enough to break the 1MB limit diff --git a/test/functional/feature_nulldummy.py b/test/functional/feature_nulldummy.py --- a/test/functional/feature_nulldummy.py +++ b/test/functional/feature_nulldummy.py @@ -17,7 +17,6 @@ from test_framework.blocktools import create_block, create_coinbase from test_framework.messages import CTransaction, FromHex, ToHex -from test_framework.mininode import network_thread_start from test_framework.script import CScript from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, assert_raises_rpc_error @@ -57,7 +56,6 @@ self.ms_address = self.nodes[0].addmultisigaddress(1, [self.address])[ 'address'] - network_thread_start() # Block 2 self.coinbase_blocks = self.nodes[0].generate(2) coinbase_txid = [] diff --git a/test/functional/p2p_compactblocks.py b/test/functional/p2p_compactblocks.py --- a/test/functional/p2p_compactblocks.py +++ b/test/functional/p2p_compactblocks.py @@ -42,7 +42,6 @@ ) from test_framework.mininode import ( mininode_lock, - network_thread_start, P2PInterface, ) from test_framework.script import CScript, OP_TRUE @@ -835,15 +834,13 @@ assert_equal(int(node.getbestblockhash(), 16), block.sha256) def run_test(self): - # Setup the p2p connections and start up the network thread. + # Setup the p2p connections self.test_node = self.nodes[0].add_p2p_connection(TestP2PConn()) self.ex_softfork_node = self.nodes[1].add_p2p_connection( TestP2PConn(), services=NODE_NETWORK) self.old_node = self.nodes[1].add_p2p_connection( TestP2PConn(), services=NODE_NETWORK) - network_thread_start() - self.test_node.wait_for_verack() # We will need UTXOs to construct transactions in later tests. diff --git a/test/functional/p2p_feefilter.py b/test/functional/p2p_feefilter.py --- a/test/functional/p2p_feefilter.py +++ b/test/functional/p2p_feefilter.py @@ -10,7 +10,6 @@ from test_framework.messages import msg_feefilter from test_framework.mininode import ( mininode_lock, - network_thread_start, P2PInterface, ) from test_framework.test_framework import BitcoinTestFramework @@ -58,9 +57,8 @@ node1.generate(1) sync_blocks(self.nodes) - # Setup the p2p connections and start up the network thread. + # Setup the p2p connections self.nodes[0].add_p2p_connection(TestP2PConn()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() # Test that invs are received for all txs at feerate of 20 sat/byte diff --git a/test/functional/p2p_fingerprint.py b/test/functional/p2p_fingerprint.py --- a/test/functional/p2p_fingerprint.py +++ b/test/functional/p2p_fingerprint.py @@ -18,7 +18,7 @@ msg_getheaders, msg_headers, ) -from test_framework.mininode import P2PInterface, network_thread_start +from test_framework.mininode import P2PInterface from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, wait_until @@ -74,8 +74,6 @@ # last month but that have over a month's worth of work are also withheld. def run_test(self): node0 = self.nodes[0].add_p2p_connection(P2PInterface()) - - network_thread_start() node0.wait_for_verack() # Set node time to 60 days ago diff --git a/test/functional/p2p_invalid_block.py b/test/functional/p2p_invalid_block.py --- a/test/functional/p2p_invalid_block.py +++ b/test/functional/p2p_invalid_block.py @@ -19,7 +19,7 @@ create_transaction, ) from test_framework.messages import COIN -from test_framework.mininode import network_thread_start, P2PDataStore +from test_framework.mininode import P2PDataStore from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal @@ -34,8 +34,6 @@ # Add p2p connection to node0 node = self.nodes[0] # convenience reference to the node node.add_p2p_connection(P2PDataStore()) - - network_thread_start() node.p2p.wait_for_verack() best_block = node.getblock(node.getbestblockhash()) diff --git a/test/functional/p2p_invalid_tx.py b/test/functional/p2p_invalid_tx.py --- a/test/functional/p2p_invalid_tx.py +++ b/test/functional/p2p_invalid_tx.py @@ -20,7 +20,7 @@ CTxIn, CTxOut, ) -from test_framework.mininode import network_thread_start, P2PDataStore, network_thread_join +from test_framework.mininode import P2PDataStore from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, @@ -40,7 +40,6 @@ Helper to connect and wait for version handshake.""" for _ in range(num_connections): self.nodes[0].add_p2p_connection(P2PDataStore()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() def reconnect_p2p(self, **kwargs): @@ -49,7 +48,6 @@ The node gets disconnected several times in this test. This helper method reconnects the p2p and restarts the network thread.""" self.nodes[0].disconnect_p2ps() - network_thread_join() self.bootstrap_p2p(**kwargs) def run_test(self): diff --git a/test/functional/p2p_leak.py b/test/functional/p2p_leak.py --- a/test/functional/p2p_leak.py +++ b/test/functional/p2p_leak.py @@ -20,8 +20,6 @@ ) from test_framework.mininode import ( mininode_lock, - network_thread_join, - network_thread_start, P2PInterface, ) from test_framework.test_framework import BitcoinTestFramework @@ -142,8 +140,6 @@ no_verack_idlenode = self.nodes[0].add_p2p_connection( CNodeNoVerackIdle()) - network_thread_start() - wait_until(lambda: no_version_bannode.ever_connected, timeout=10, lock=mininode_lock) wait_until(lambda: no_version_idlenode.ever_connected, @@ -162,9 +158,8 @@ self.nodes[0].disconnect_p2ps() - # Wait until all connections are closed and the network thread has terminated + # Wait until all connections are closed wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0) - network_thread_join() # Make sure no unexpected messages came in assert(no_version_bannode.unexpected_msg == False) diff --git a/test/functional/p2p_leak_tx.py b/test/functional/p2p_leak_tx.py --- a/test/functional/p2p_leak_tx.py +++ b/test/functional/p2p_leak_tx.py @@ -5,7 +5,7 @@ """Test that we don't leak txs to inbound peers that we haven't yet announced to""" from test_framework.messages import msg_getdata, CInv -from test_framework.mininode import P2PDataStore, network_thread_start +from test_framework.mininode import P2PDataStore from test_framework.test_framework import BitcoinTestFramework from test_framework.util import ( assert_equal, @@ -34,7 +34,6 @@ # Backport note: the following two lines were backported out of order, # and should be removed in the appropriate future backports that do a # blanket removal of each of these calls across many tests. - network_thread_start() self.nodes[0].p2p.wait_for_verack() MAX_REPEATS = 100 diff --git a/test/functional/p2p_mempool.py b/test/functional/p2p_mempool.py --- a/test/functional/p2p_mempool.py +++ b/test/functional/p2p_mempool.py @@ -9,7 +9,7 @@ """ from test_framework.messages import msg_mempool -from test_framework.mininode import network_thread_start, P2PInterface +from test_framework.mininode import P2PInterface from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal @@ -23,7 +23,6 @@ def run_test(self): # Add a p2p connection self.nodes[0].add_p2p_connection(P2PInterface()) - network_thread_start() self.nodes[0].p2p.wait_for_verack() # request mempool diff --git a/test/functional/p2p_node_network_limited.py b/test/functional/p2p_node_network_limited.py --- a/test/functional/p2p_node_network_limited.py +++ b/test/functional/p2p_node_network_limited.py @@ -18,8 +18,6 @@ ) from test_framework.mininode import ( mininode_lock, - network_thread_join, - network_thread_start, P2PInterface, ) from test_framework.test_framework import BitcoinTestFramework @@ -72,7 +70,6 @@ def run_test(self): node = self.nodes[0].add_p2p_connection(P2PIgnoreInv()) - network_thread_start() node.wait_for_verack() expected_services = NODE_BLOOM | NODE_BITCOIN_CASH | NODE_NETWORK_LIMITED @@ -103,9 +100,7 @@ self.log.info("Check local address relay, do a fresh connection.") self.nodes[0].disconnect_p2ps() - network_thread_join() node1 = self.nodes[0].add_p2p_connection(P2PIgnoreInv()) - network_thread_start() node1.wait_for_verack() node1.send_message(msg_verack()) diff --git a/test/functional/p2p_sendheaders.py b/test/functional/p2p_sendheaders.py --- a/test/functional/p2p_sendheaders.py +++ b/test/functional/p2p_sendheaders.py @@ -99,7 +99,6 @@ ) from test_framework.mininode import ( mininode_lock, - network_thread_start, P2PInterface, ) from test_framework.test_framework import BitcoinTestFramework @@ -243,15 +242,11 @@ return [int(x, 16) for x in all_hashes] def run_test(self): - # Setup the p2p connections and start up the network thread. + # Setup the p2p connections inv_node = self.nodes[0].add_p2p_connection(BaseNode()) # Make sure NODE_NETWORK is not set for test_node, so no block download # will occur outside of direct fetching test_node = self.nodes[0].add_p2p_connection(BaseNode(), services=0) - - network_thread_start() - - # Test logic begins here inv_node.wait_for_verack() test_node.wait_for_verack() diff --git a/test/functional/p2p_timeouts.py b/test/functional/p2p_timeouts.py --- a/test/functional/p2p_timeouts.py +++ b/test/functional/p2p_timeouts.py @@ -24,7 +24,7 @@ from time import sleep from test_framework.messages import msg_ping -from test_framework.mininode import network_thread_start, P2PInterface +from test_framework.mininode import P2PInterface from test_framework.test_framework import BitcoinTestFramework @@ -40,15 +40,13 @@ self.num_nodes = 1 def run_test(self): - # Setup the p2p connections and start up the network thread. + # Setup the p2p connections no_verack_node = self.nodes[0].add_p2p_connection(TestP2PConn()) no_version_node = self.nodes[0].add_p2p_connection( TestP2PConn(), send_version=False) no_send_node = self.nodes[0].add_p2p_connection( TestP2PConn(), send_version=False) - network_thread_start() - sleep(1) assert no_verack_node.is_connected diff --git a/test/functional/p2p_unrequested_blocks.py b/test/functional/p2p_unrequested_blocks.py --- a/test/functional/p2p_unrequested_blocks.py +++ b/test/functional/p2p_unrequested_blocks.py @@ -67,8 +67,6 @@ ) from test_framework.mininode import ( mininode_lock, - network_thread_join, - network_thread_start, P2PInterface, ) from test_framework.test_framework import BitcoinTestFramework @@ -97,15 +95,11 @@ self.setup_nodes() def run_test(self): - # Setup the p2p connections and start up the network thread. + # Setup the p2p connections # test_node connects to node0 (not whitelisted) test_node = self.nodes[0].add_p2p_connection(P2PInterface()) # min_work_node connects to node1 (whitelisted) min_work_node = self.nodes[1].add_p2p_connection(P2PInterface()) - - network_thread_start() - - # Test logic begins here test_node.wait_for_verack() min_work_node.wait_for_verack() @@ -238,10 +232,8 @@ # disconnect/reconnect first self.nodes[0].disconnect_p2ps() self.nodes[1].disconnect_p2ps() - network_thread_join() test_node = self.nodes[0].add_p2p_connection(P2PInterface()) - network_thread_start() test_node.wait_for_verack() test_node.send_message(msg_block(block_h1f)) @@ -336,8 +328,6 @@ self.nodes[0].disconnect_p2ps() test_node = self.nodes[0].add_p2p_connection(P2PInterface()) - - network_thread_start() test_node.wait_for_verack() # We should have failed reorg and switched back to 290 (but have block 291) diff --git a/test/functional/rpc_blockchain.py b/test/functional/rpc_blockchain.py --- a/test/functional/rpc_blockchain.py +++ b/test/functional/rpc_blockchain.py @@ -42,7 +42,6 @@ ) from test_framework.mininode import ( P2PInterface, - network_thread_start, ) @@ -312,7 +311,6 @@ # Start a P2P connection since we'll need to create some blocks. node.add_p2p_connection(P2PInterface()) - network_thread_start() node.p2p.wait_for_verack() current_height = node.getblock(node.getbestblockhash())['height'] diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -55,9 +55,6 @@ MSG_BLOCK = 2 MSG_TYPE_MASK = 0xffffffff >> 2 -# Howmuch data will be read from the network at once -READ_BUFFER_SIZE = 8192 - # Serialization/deserialization tools diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py --- a/test/functional/test_framework/mininode.py +++ b/test/functional/test_framework/mininode.py @@ -13,11 +13,10 @@ P2PInterface: A high-level interface object for communicating to a node over P2P P2PDataStore: A p2p interface class that keeps a store of transactions and blocks and can respond correctly to getdata and getheaders messages""" -import asyncore +import asyncio from collections import defaultdict from io import BytesIO import logging -import socket import struct import sys import threading @@ -51,7 +50,6 @@ msg_verack, msg_version, NODE_NETWORK, - READ_BUFFER_SIZE, sha256, ) from test_framework.util import wait_until @@ -90,7 +88,7 @@ } -class P2PConnection(asyncore.dispatcher): +class P2PConnection(asyncio.Protocol): """A low-level connection object to a node's P2P interface. This class is responsible for: @@ -104,73 +102,73 @@ sub-classed and the on_message() callback overridden.""" def __init__(self): - # All P2PConnections must be created before starting the NetworkThread. - # assert that the network thread is not running. - assert not network_thread_running() - - super().__init__(map=mininode_socket_map) - - self._conn_open = False + # The underlying transport of the connection. + # Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe + self._transport = None @property def is_connected(self): - return self._conn_open + return self._transport is not None def peer_connect(self, dstaddr, dstport, net="regtest"): + assert not self.is_connected self.dstaddr = dstaddr self.dstport = dstport - self.create_socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.sendbuf = b"" + # The initial message to send after the connection was made: + self.on_connection_send_msg = None + self.on_connection_send_msg_is_raw = False self.recvbuf = b"" - self._asyncore_pre_connection = True self.network = net - self.disconnect = False - logger.debug('Connecting to Bitcoin Node: {}:{}'.format( self.dstaddr, self.dstport)) - try: - self.connect((dstaddr, dstport)) - except: - self.handle_close() + loop = NetworkThread.network_event_loop + conn_gen_unsafe = loop.create_connection( + lambda: self, host=self.dstaddr, port=self.dstport) + + def conn_gen(): return loop.call_soon_threadsafe( + loop.create_task, conn_gen_unsafe) + return conn_gen def peer_disconnect(self): # Connection could have already been closed by other end. - if self.is_connected: - # Signal asyncore to disconnect - self.disconnect = True + NetworkThread.network_event_loop.call_soon_threadsafe( + lambda: self._transport and self._transport.abort()) # Connection and disconnection methods - def handle_connect(self): - """asyncore callback when a connection is opened.""" - if not self.is_connected: - logger.debug("Connected & Listening: {}:{}".format( - self.dstaddr, self.dstport)) - self._conn_open = True - self._asyncore_pre_connection = False - self.on_open() - - def handle_close(self): - """asyncore callback when a connection is closed.""" - logger.debug("Closing connection to: {}:{}".format( + def connection_made(self, transport): + """asyncio callback when a connection is opened.""" + assert not self._transport + logger.debug("Connected & Listening: {}:{}".format( self.dstaddr, self.dstport)) - self._conn_open = False + self._transport = transport + if self.on_connection_send_msg: + if self.on_connection_send_msg_is_raw: + self.send_raw_message(self.on_connection_send_msg) + else: + self.send_message(self.on_connection_send_msg) + # Never used again + self.on_connection_send_msg = None + self.on_open() + + def connection_lost(self, exc): + """asyncio callback when a connection is closed.""" + if exc: + logger.warning("Connection lost to {}:{} due to {}".format( + self.dstaddr, self.dstport, exc)) + else: + logger.debug("Closed connection to: {}:{}".format( + self.dstaddr, self.dstport)) + self._transport = None self.recvbuf = b"" - self.sendbuf = b"" - try: - self.close() - except: - pass self.on_close() # Socket read methods - def handle_read(self): - """asyncore callback when data is read from the socket.""" + def data_received(self, t): + """asyncio callback when data is read from the socket.""" with mininode_lock: - t = self.recv(READ_BUFFER_SIZE) if len(t) > 0: self.recvbuf += t @@ -223,30 +221,6 @@ # Socket write methods - def writable(self): - """asyncore method to determine whether the handle_write() callback should be called on the next loop.""" - with mininode_lock: - length = len(self.sendbuf) - return length > 0 or self._asyncore_pre_connection - - def handle_write(self): - """asyncore callback when data should be written to the socket.""" - with mininode_lock: - # asyncore does not expose socket connection, only the first read/write - # event, thus we must check connection manually here to know when we - # actually connect - if self._asyncore_pre_connection: - self.handle_connect() - if not self.writable(): - return - - try: - sent = self.send(self.sendbuf) - except: - self.handle_close() - return - self.sendbuf = self.sendbuf[sent:] - def send_message(self, message): """Send a P2P message over the socket. @@ -265,15 +239,8 @@ socket.""" if not self.is_connected: raise IOError('Not connected') - with mininode_lock: - if len(self.sendbuf) == 0: - try: - sent = self.send(tmsg) - self.sendbuf = tmsg[sent:] - except BlockingIOError: - self.sendbuf = tmsg - else: - self.sendbuf += tmsg + NetworkThread.network_event_loop.call_soon_threadsafe( + lambda: self._transport and not self._transport.is_closing() and self._transport.write(tmsg)) # Class utility methods @@ -329,7 +296,7 @@ self.nServices = 0 def peer_connect(self, *args, services=NODE_NETWORK, send_version=True, **kwargs): - super().peer_connect(*args, **kwargs) + create_conn = super().peer_connect(*args, **kwargs) if send_version: # Send a version msg @@ -339,8 +306,10 @@ vt.addrTo.port = self.dstport vt.addrFrom.ip = "0.0.0.0" vt.addrFrom.port = 0 - # Will be sent right after handle_connect - self.sendbuf = self._build_message(vt) + # Will be sent soon after connection_made + self.on_connection_send_msg = vt + + return create_conn # Message receiving methods @@ -491,61 +460,36 @@ self.ping_counter += 1 -# Keep our own socket map for asyncore, so that we can track disconnects -# ourselves (to workaround an issue with closing an asyncore socket when -# using select) -mininode_socket_map = dict() - -# One lock for synchronizing all data access between the networking thread (see +# One lock for synchronizing all data access between the network event loop (see # NetworkThread below) and the thread running the test logic. For simplicity, -# P2PConnection acquires this lock whenever delivering a message to a P2PInterface, -# and whenever adding anything to the send buffer (in send_message()). This -# lock should be acquired in the thread running the test logic to synchronize +# P2PConnection acquires this lock whenever delivering a message to a P2PInterface. +# This lock should be acquired in the thread running the test logic to synchronize # access to any data shared with the P2PInterface or P2PConnection. mininode_lock = threading.RLock() class NetworkThread(threading.Thread): + network_event_loop = None + def __init__(self): super().__init__(name="NetworkThread") + # There is only one event loop and no more than one thread must be created + assert not self.network_event_loop + + NetworkThread.network_event_loop = asyncio.new_event_loop() def run(self): - while mininode_socket_map: - # We check for whether to disconnect outside of the asyncore - # loop to workaround the behavior of asyncore when using - # select - disconnected = [] - for fd, obj in mininode_socket_map.items(): - if obj.disconnect: - disconnected.append(obj) - [obj.handle_close() for obj in disconnected] - asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1) - logger.debug("Network thread closing") - - -def network_thread_start(): - """Start the network thread.""" - # Only one network thread may run at a time - assert not network_thread_running() - - NetworkThread().start() - - -def network_thread_running(): - """Return whether the network thread is running.""" - return any([thread.name == "NetworkThread" for thread in threading.enumerate()]) - - -def network_thread_join(timeout=10): - """Wait timeout seconds for the network thread to terminate. - - Throw if the network thread doesn't terminate in timeout seconds.""" - network_threads = [ - thread for thread in threading.enumerate() if thread.name == "NetworkThread"] - assert len(network_threads) <= 1 - for thread in network_threads: - thread.join(timeout) - assert not thread.is_alive() + """Start the network thread.""" + self.network_event_loop.run_forever() + + def close(self, timeout=10): + """Close the connections and network event loop.""" + self.network_event_loop.call_soon_threadsafe( + self.network_event_loop.stop) + wait_until(lambda: not self.network_event_loop.is_running(), + timeout=timeout) + self.network_event_loop.close() + self.join(timeout) class P2PDataStore(P2PInterface): diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py --- a/test/functional/test_framework/test_framework.py +++ b/test/functional/test_framework/test_framework.py @@ -18,6 +18,7 @@ from .authproxy import JSONRPCException from . import coverage from .test_node import TestNode +from .mininode import NetworkThread from .util import ( assert_equal, check_json_precision, @@ -69,6 +70,7 @@ """Sets test framework defaults. Do not override this method. Instead, override the set_test_params() method""" self.setup_clean_chain = False self.nodes = [] + self.network_thread = None self.mocktime = 0 self.supports_cli = False self.bind_to_localhost_only = True @@ -133,6 +135,10 @@ self.options.tmpdir = tempfile.mkdtemp(prefix="test") self._start_logging() + self.log.debug('Setting up network thread') + self.network_thread = NetworkThread() + self.network_thread.start() + success = TestStatus.FAILED try: @@ -161,6 +167,8 @@ print("Testcase failed. Attaching python debugger. Enter ? for help") pdb.set_trace() + self.log.debug('Closing down network thread') + self.network_thread.close() if not self.options.noshutdown: self.log.info("Stopping nodes") if self.nodes: diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -340,7 +340,7 @@ if 'dstaddr' not in kwargs: kwargs['dstaddr'] = '127.0.0.1' - p2p_conn.peer_connect(*args, **kwargs) + p2p_conn.peer_connect(*args, **kwargs)() self.p2ps.append(p2p_conn) return p2p_conn