diff --git a/doc/functional-tests.md b/doc/functional-tests.md
--- a/doc/functional-tests.md
+++ b/doc/functional-tests.md
@@ -262,7 +262,7 @@
 wrappers for them, `msg_block`, `msg_tx`, etc).
 
 - P2P tests have two threads. One thread handles all network communication
-with the bitcoind(s) being tested (using python's asyncore package); the other
+with the bitcoind(s) being tested in a callback-based event loop; the other
 implements the test logic.
 
 - `P2PConnection` is the class used to connect to a bitcoind.  `P2PInterface`
@@ -270,10 +270,6 @@
 the Bitcoin Core node application logic. For custom behaviour, subclass the
 P2PInterface object and override the callback methods.
 
-- Call `network_thread_start()` after all `P2PInterface` objects are created to
-start the networking thread.  (Continue with the test logic in your existing
-thread.)
-
 - Can be used to write tests where specific P2P protocol behavior is tested.
 Examples tests are `p2p_unrequested_blocks.py`, `p2p_compactblocks.py`.
 
diff --git a/test/functional/abc-invalid-chains.py b/test/functional/abc-invalid-chains.py
--- a/test/functional/abc-invalid-chains.py
+++ b/test/functional/abc-invalid-chains.py
@@ -6,7 +6,7 @@
 import time
 
 from test_framework.test_framework import BitcoinTestFramework
-from test_framework.mininode import network_thread_start, P2PDataStore
+from test_framework.mininode import P2PDataStore
 from test_framework.util import assert_equal
 from test_framework.blocktools import (
     create_block,
@@ -46,7 +46,6 @@
     def run_test(self):
         node = self.nodes[0]
         node.add_p2p_connection(P2PDataStore())
-        network_thread_start()
         node.p2p.wait_for_verack()
 
         self.genesis_hash = int(node.getbestblockhash(), 16)
diff --git a/test/functional/abc-invalid-message.py b/test/functional/abc-invalid-message.py
--- a/test/functional/abc-invalid-message.py
+++ b/test/functional/abc-invalid-message.py
@@ -15,7 +15,6 @@
     MAGIC_BYTES,
     mininode_lock,
     msg_ping,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.test_framework import BitcoinTestFramework
@@ -38,7 +37,7 @@
 
 class BadVersionP2PInterface(P2PInterface):
     def peer_connect(self, *args, services=NODE_NETWORK, send_version=False, **kwargs):
-        super().peer_connect(*args, send_version=send_version, **kwargs)
+        create_conn = super().peer_connect(*args, send_version=send_version, **kwargs)
 
         # Send version message with invalid checksum
         vt = msg_version()
@@ -48,8 +47,11 @@
         vt.addrFrom.ip = "0.0.0.0"
         vt.addrFrom.port = 0
         invalid_vt = msg_bad_checksum(self, vt)
-        # Will be sent right after handle_connect
-        self.sendbuf = invalid_vt
+        # Will be sent right after connection_made
+        self.on_connection_send_msg = invalid_vt
+        self.on_connection_send_msg_is_raw = True
+
+        return create_conn
 
 
 class InvalidMessageTest(BitcoinTestFramework):
@@ -66,8 +68,6 @@
         interface = P2PInterface()
         connection = self.nodes[1].add_p2p_connection(interface)
 
-        network_thread_start()
-
         # The invalid version message should cause a disconnect on the first
         # connection because we are now banned
         bad_interface.wait_for_disconnect()
diff --git a/test/functional/abc-mempool-accept-txn.py b/test/functional/abc-mempool-accept-txn.py
--- a/test/functional/abc-mempool-accept-txn.py
+++ b/test/functional/abc-mempool-accept-txn.py
@@ -25,7 +25,6 @@
     ToHex,
 )
 from test_framework.mininode import (
-    network_thread_start,
     P2PDataStore,
 )
 from test_framework.script import (
@@ -144,7 +143,6 @@
     def run_test(self):
         node = self.nodes[0]
         node.add_p2p_connection(P2PDataStore())
-        network_thread_start()
         node.p2p.wait_for_verack()
 
         self.genesis_hash = int(self.nodes[0].getbestblockhash(), 16)
diff --git a/test/functional/abc-mempool-coherence-on-activations.py b/test/functional/abc-mempool-coherence-on-activations.py
--- a/test/functional/abc-mempool-coherence-on-activations.py
+++ b/test/functional/abc-mempool-coherence-on-activations.py
@@ -34,7 +34,7 @@
     CTxOut,
     ToHex,
 )
-from test_framework.mininode import network_thread_start, P2PDataStore
+from test_framework.mininode import P2PDataStore
 from test_framework.script import (
     CScript,
     OP_CHECKSIG,
@@ -155,7 +155,6 @@
     def run_test(self):
         node = self.nodes[0]
         node.add_p2p_connection(P2PDataStore())
-        network_thread_start()
         node.p2p.wait_for_verack()
         node.setmocktime(ACTIVATION_TIME)
 
diff --git a/test/functional/abc-minimaldata-activation.py b/test/functional/abc-minimaldata-activation.py
--- a/test/functional/abc-minimaldata-activation.py
+++ b/test/functional/abc-minimaldata-activation.py
@@ -28,8 +28,6 @@
     ToHex,
 )
 from test_framework.mininode import (
-    network_thread_join,
-    network_thread_start,
     P2PDataStore,
 )
 from test_framework.script import (
@@ -74,7 +72,6 @@
         Helper to connect and wait for version handshake."""
         for _ in range(num_connections):
             self.nodes[0].add_p2p_connection(P2PDataStore())
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
     def reconnect_p2p(self, **kwargs):
@@ -83,7 +80,6 @@
         The node gets disconnected several times in this test. This helper
         method reconnects the p2p and restarts the network thread."""
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
         self.bootstrap_p2p(**kwargs)
 
     def getbestblock(self, node):
diff --git a/test/functional/abc-p2p-compactblocks.py b/test/functional/abc-p2p-compactblocks.py
--- a/test/functional/abc-p2p-compactblocks.py
+++ b/test/functional/abc-p2p-compactblocks.py
@@ -34,7 +34,6 @@
 )
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_start,
     P2PDataStore,
     P2PInterface,
 )
@@ -227,7 +226,6 @@
         node = self.nodes[0]
         default_p2p = node.add_p2p_connection(P2PDataStore())
         test_p2p = node.add_p2p_connection(TestP2PConn())
-        network_thread_start()
         default_p2p.wait_for_verack()
         test_p2p.wait_for_verack()
 
diff --git a/test/functional/abc-p2p-fullblocktest.py b/test/functional/abc-p2p-fullblocktest.py
--- a/test/functional/abc-p2p-fullblocktest.py
+++ b/test/functional/abc-p2p-fullblocktest.py
@@ -35,7 +35,7 @@
     ser_compact_size,
     ToHex,
 )
-from test_framework.mininode import network_thread_start, P2PDataStore
+from test_framework.mininode import P2PDataStore
 from test_framework.script import (
     CScript,
     hash160,
@@ -201,7 +201,6 @@
     def run_test(self):
         node = self.nodes[0]
         node.add_p2p_connection(P2PDataStore())
-        network_thread_start()
         node.p2p.wait_for_verack()
         # Set the blocksize to 2MB as initial condition
         node.setexcessiveblock(self.excessive_block_size)
diff --git a/test/functional/abc-replay-protection.py b/test/functional/abc-replay-protection.py
--- a/test/functional/abc-replay-protection.py
+++ b/test/functional/abc-replay-protection.py
@@ -26,7 +26,7 @@
     CTxOut,
     ToHex,
 )
-from test_framework.mininode import network_thread_start, P2PDataStore
+from test_framework.mininode import P2PDataStore
 from test_framework.script import (
     CScript,
     OP_CHECKSIG,
@@ -87,7 +87,6 @@
     def run_test(self):
         node = self.nodes[0]
         node.add_p2p_connection(P2PDataStore())
-        network_thread_start()
         node.p2p.wait_for_verack()
         node.setmocktime(REPLAY_PROTECTION_START_TIME)
 
diff --git a/test/functional/abc-schnorr.py b/test/functional/abc-schnorr.py
--- a/test/functional/abc-schnorr.py
+++ b/test/functional/abc-schnorr.py
@@ -27,8 +27,6 @@
     ToHex,
 )
 from test_framework.mininode import (
-    network_thread_join,
-    network_thread_start,
     P2PDataStore,
 )
 from test_framework import schnorr
@@ -73,7 +71,6 @@
         Helper to connect and wait for version handshake."""
         for _ in range(num_connections):
             self.nodes[0].add_p2p_connection(P2PDataStore())
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
     def reconnect_p2p(self, **kwargs):
@@ -82,7 +79,6 @@
         The node gets disconnected several times in this test. This helper
         method reconnects the p2p and restarts the network thread."""
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
         self.bootstrap_p2p(**kwargs)
 
     def getbestblock(self, node):
diff --git a/test/functional/abc-schnorrmultisig-activation.py b/test/functional/abc-schnorrmultisig-activation.py
--- a/test/functional/abc-schnorrmultisig-activation.py
+++ b/test/functional/abc-schnorrmultisig-activation.py
@@ -31,8 +31,6 @@
     ToHex,
 )
 from test_framework.mininode import (
-    network_thread_join,
-    network_thread_start,
     P2PDataStore,
 )
 from test_framework import schnorr
@@ -98,7 +96,6 @@
         Helper to connect and wait for version handshake."""
         for _ in range(num_connections):
             self.nodes[0].add_p2p_connection(P2PDataStore())
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
     def reconnect_p2p(self, **kwargs):
@@ -107,7 +104,6 @@
         The node gets disconnected several times in this test. This helper
         method reconnects the p2p and restarts the network thread."""
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
         self.bootstrap_p2p(**kwargs)
 
     def getbestblock(self, node):
diff --git a/test/functional/abc-segwit-recovery.py b/test/functional/abc-segwit-recovery.py
--- a/test/functional/abc-segwit-recovery.py
+++ b/test/functional/abc-segwit-recovery.py
@@ -24,8 +24,6 @@
     ToHex,
 )
 from test_framework.mininode import (
-    network_thread_join,
-    network_thread_start,
     P2PDataStore,
 )
 from test_framework.script import (
@@ -112,7 +110,6 @@
         for node in self.nodes:
             for _ in range(num_connections):
                 node.add_p2p_connection(P2PDataStore())
-        network_thread_start()
         for node in self.nodes:
             node.p2p.wait_for_verack()
 
@@ -123,7 +120,6 @@
         method reconnects the p2p and restarts the network thread."""
         for node in self.nodes:
             node.disconnect_p2ps()
-        network_thread_join()
         self.bootstrap_p2p(**kwargs)
 
     def run_test(self):
diff --git a/test/functional/abc-sync-chain.py b/test/functional/abc-sync-chain.py
--- a/test/functional/abc-sync-chain.py
+++ b/test/functional/abc-sync-chain.py
@@ -13,9 +13,9 @@
 
 from test_framework.blocktools import create_block, create_coinbase
 from test_framework.messages import CBlockHeader, msg_block, msg_headers
-from test_framework.mininode import network_thread_start, P2PInterface
+from test_framework.mininode import P2PInterface
 from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import p2p_port, wait_until
+from test_framework.util import wait_until
 
 
 NUM_IBD_BLOCKS = 50
@@ -39,13 +39,9 @@
             ["-minimumchainwork={:#x}".format(202 + 2 * NUM_IBD_BLOCKS)]]
 
     def run_test(self):
-        node0conn = BaseNode()
-        node0conn.peer_connect('127.0.0.1', p2p_port(0))
-
-        network_thread_start()
-        node0conn.wait_for_verack()
-
         node0 = self.nodes[0]
+        node0conn = node0.add_p2p_connection(BaseNode())
+        node0.p2p.wait_for_verack()
 
         tip = int(node0.getbestblockhash(), 16)
         height = node0.getblockcount() + 1
diff --git a/test/functional/abc-transaction-ordering.py b/test/functional/abc-transaction-ordering.py
--- a/test/functional/abc-transaction-ordering.py
+++ b/test/functional/abc-transaction-ordering.py
@@ -22,7 +22,7 @@
     CTxIn,
     CTxOut,
 )
-from test_framework.mininode import network_thread_start, P2PDataStore
+from test_framework.mininode import P2PDataStore
 from test_framework.script import (
     CScript,
     OP_RETURN,
@@ -130,7 +130,6 @@
     def run_test(self):
         node = self.nodes[0]
         node.add_p2p_connection(P2PDataStore())
-        network_thread_start()
         node.p2p.wait_for_verack()
 
         self.genesis_hash = int(node.getbestblockhash(), 16)
diff --git a/test/functional/example_test.py b/test/functional/example_test.py
--- a/test/functional/example_test.py
+++ b/test/functional/example_test.py
@@ -19,8 +19,6 @@
 from test_framework.mininode import (
     P2PInterface,
     mininode_lock,
-    network_thread_join,
-    network_thread_start,
 )
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import (
@@ -137,9 +135,6 @@
         # Create P2P connections to two of the nodes
         self.nodes[0].add_p2p_connection(BaseNode())
 
-        # Start up network handling in another thread. This needs to be called
-        # after the P2P connections have been created.
-        network_thread_start()
         # wait_for_verack ensures that the P2P connection is fully up.
         self.nodes[0].p2p.wait_for_verack()
 
@@ -194,14 +189,9 @@
         connect_nodes(self.nodes[1], self.nodes[2])
 
         self.log.info("Add P2P connection to node2")
-        # We can't add additional P2P connections once the network thread has started. Disconnect the connection
-        # to node0, wait for the network thread to terminate, then connect to node2. This is specific to
-        # the current implementation of the network thread and may be improved in future.
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
 
         self.nodes[2].add_p2p_connection(BaseNode())
-        network_thread_start()
         self.nodes[2].p2p.wait_for_verack()
 
         self.log.info(
diff --git a/test/functional/feature_assumevalid.py b/test/functional/feature_assumevalid.py
--- a/test/functional/feature_assumevalid.py
+++ b/test/functional/feature_assumevalid.py
@@ -42,11 +42,7 @@
     msg_block,
     msg_headers,
 )
-from test_framework.mininode import (
-    network_thread_join,
-    network_thread_start,
-    P2PInterface,
-)
+from test_framework.mininode import P2PInterface
 from test_framework.script import (CScript, OP_TRUE)
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.txtools import pad_tx
@@ -106,8 +102,6 @@
 
         # Connect to node0
         p2p0 = self.nodes[0].add_p2p_connection(BaseNode())
-
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
         # Build the blockchain
@@ -175,9 +169,7 @@
             self.block_time += 1
             height += 1
 
-        # We're adding new connections so terminate the network thread
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
 
         # Start node1 and node2 with assumevalid so they accept a block with a bad signature.
         self.start_node(1, extra_args=["-assumevalid=" + hex(block102.sha256)])
@@ -187,8 +179,6 @@
         p2p1 = self.nodes[1].add_p2p_connection(BaseNode())
         p2p2 = self.nodes[2].add_p2p_connection(BaseNode())
 
-        network_thread_start()
-
         p2p0.wait_for_verack()
         p2p1.wait_for_verack()
         p2p2.wait_for_verack()
diff --git a/test/functional/feature_block.py b/test/functional/feature_block.py
--- a/test/functional/feature_block.py
+++ b/test/functional/feature_block.py
@@ -27,7 +27,7 @@
     uint256_from_compact,
     uint256_from_str,
 )
-from test_framework.mininode import P2PDataStore, network_thread_start, network_thread_join
+from test_framework.mininode import P2PDataStore
 from test_framework.script import (
     CScript,
     hash160,
@@ -1399,7 +1399,6 @@
 
         Helper to connect and wait for version handshake."""
         self.nodes[0].add_p2p_connection(P2PDataStore())
-        network_thread_start()
         # We need to wait for the initial getheaders from the peer before we
         # start populating our blockstore. If we don't, then we may run ahead
         # to the next subtest before we receive the getheaders. We'd then send
@@ -1414,7 +1413,6 @@
         The node gets disconnected several times in this test. This helper
         method reconnects the p2p and restarts the network thread."""
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
         self.bootstrap_p2p()
 
     def sync_blocks(self, blocks, success=True, reject_reason=None, request_block=True, reconnect=False, timeout=60):
diff --git a/test/functional/feature_cltv.py b/test/functional/feature_cltv.py
--- a/test/functional/feature_cltv.py
+++ b/test/functional/feature_cltv.py
@@ -17,7 +17,6 @@
     ToHex,
 )
 from test_framework.mininode import (
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.script import (
@@ -100,10 +99,6 @@
 
     def run_test(self):
         self.nodes[0].add_p2p_connection(P2PInterface())
-
-        network_thread_start()
-
-        # wait_for_verack ensures that the P2P connection is fully up.
         self.nodes[0].p2p.wait_for_verack()
 
         self.log.info("Mining {} blocks".format(CLTV_HEIGHT - 2))
diff --git a/test/functional/feature_csv_activation.py b/test/functional/feature_csv_activation.py
--- a/test/functional/feature_csv_activation.py
+++ b/test/functional/feature_csv_activation.py
@@ -50,7 +50,7 @@
     make_conform_to_ctor,
 )
 from test_framework.messages import COIN, CTransaction, FromHex, ToHex
-from test_framework.mininode import network_thread_start, P2PDataStore
+from test_framework.mininode import P2PDataStore
 from test_framework.script import (
     CScript,
     OP_CHECKSEQUENCEVERIFY,
@@ -226,7 +226,6 @@
 
     def run_test(self):
         self.nodes[0].add_p2p_connection(P2PDataStore())
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
         self.log.info("Generate blocks in the past for coinbase outputs.")
diff --git a/test/functional/feature_dersig.py b/test/functional/feature_dersig.py
--- a/test/functional/feature_dersig.py
+++ b/test/functional/feature_dersig.py
@@ -11,7 +11,6 @@
 from test_framework.messages import CTransaction, FromHex, msg_block, ToHex
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.script import CScript
@@ -62,8 +61,6 @@
     def run_test(self):
         self.nodes[0].add_p2p_connection(P2PInterface())
 
-        network_thread_start()
-
         # wait_for_verack ensures that the P2P connection is fully up.
         self.nodes[0].p2p.wait_for_verack()
 
diff --git a/test/functional/feature_maxuploadtarget.py b/test/functional/feature_maxuploadtarget.py
--- a/test/functional/feature_maxuploadtarget.py
+++ b/test/functional/feature_maxuploadtarget.py
@@ -17,7 +17,7 @@
 from test_framework.cdefs import LEGACY_MAX_BLOCK_SIZE
 from test_framework.blocktools import mine_big_block
 from test_framework.messages import CInv, msg_getdata
-from test_framework.mininode import network_thread_start, P2PInterface
+from test_framework.mininode import P2PInterface
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import assert_equal
 
@@ -64,7 +64,6 @@
         for _ in range(3):
             p2p_conns.append(self.nodes[0].add_p2p_connection(TestP2PConn()))
 
-        network_thread_start()
         for p2pc in p2p_conns:
             p2pc.wait_for_verack()
 
@@ -158,8 +157,6 @@
 
         # Reconnect to self.nodes[0]
         self.nodes[0].add_p2p_connection(TestP2PConn())
-
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
         # retrieve 20 blocks which should be enough to break the 1MB limit
diff --git a/test/functional/feature_nulldummy.py b/test/functional/feature_nulldummy.py
--- a/test/functional/feature_nulldummy.py
+++ b/test/functional/feature_nulldummy.py
@@ -17,7 +17,6 @@
 
 from test_framework.blocktools import create_block, create_coinbase
 from test_framework.messages import CTransaction, FromHex, ToHex
-from test_framework.mininode import network_thread_start
 from test_framework.script import CScript
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import assert_equal, assert_raises_rpc_error
@@ -57,7 +56,6 @@
         self.ms_address = self.nodes[0].addmultisigaddress(1, [self.address])[
             'address']
 
-        network_thread_start()
         # Block 2
         self.coinbase_blocks = self.nodes[0].generate(2)
         coinbase_txid = []
diff --git a/test/functional/p2p_compactblocks.py b/test/functional/p2p_compactblocks.py
--- a/test/functional/p2p_compactblocks.py
+++ b/test/functional/p2p_compactblocks.py
@@ -42,7 +42,6 @@
 )
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.script import CScript, OP_TRUE
@@ -835,15 +834,13 @@
         assert_equal(int(node.getbestblockhash(), 16), block.sha256)
 
     def run_test(self):
-        # Setup the p2p connections and start up the network thread.
+        # Setup the p2p connections
         self.test_node = self.nodes[0].add_p2p_connection(TestP2PConn())
         self.ex_softfork_node = self.nodes[1].add_p2p_connection(
             TestP2PConn(), services=NODE_NETWORK)
         self.old_node = self.nodes[1].add_p2p_connection(
             TestP2PConn(), services=NODE_NETWORK)
 
-        network_thread_start()
-
         self.test_node.wait_for_verack()
 
         # We will need UTXOs to construct transactions in later tests.
diff --git a/test/functional/p2p_feefilter.py b/test/functional/p2p_feefilter.py
--- a/test/functional/p2p_feefilter.py
+++ b/test/functional/p2p_feefilter.py
@@ -10,7 +10,6 @@
 from test_framework.messages import msg_feefilter
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.test_framework import BitcoinTestFramework
@@ -58,9 +57,8 @@
         node1.generate(1)
         sync_blocks(self.nodes)
 
-        # Setup the p2p connections and start up the network thread.
+        # Setup the p2p connections
         self.nodes[0].add_p2p_connection(TestP2PConn())
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
         # Test that invs are received for all txs at feerate of 20 sat/byte
diff --git a/test/functional/p2p_fingerprint.py b/test/functional/p2p_fingerprint.py
--- a/test/functional/p2p_fingerprint.py
+++ b/test/functional/p2p_fingerprint.py
@@ -18,7 +18,7 @@
     msg_getheaders,
     msg_headers,
 )
-from test_framework.mininode import P2PInterface, network_thread_start
+from test_framework.mininode import P2PInterface
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import assert_equal, wait_until
 
@@ -74,8 +74,6 @@
     # last month but that have over a month's worth of work are also withheld.
     def run_test(self):
         node0 = self.nodes[0].add_p2p_connection(P2PInterface())
-
-        network_thread_start()
         node0.wait_for_verack()
 
         # Set node time to 60 days ago
diff --git a/test/functional/p2p_invalid_block.py b/test/functional/p2p_invalid_block.py
--- a/test/functional/p2p_invalid_block.py
+++ b/test/functional/p2p_invalid_block.py
@@ -19,7 +19,7 @@
     create_transaction,
 )
 from test_framework.messages import COIN
-from test_framework.mininode import network_thread_start, P2PDataStore
+from test_framework.mininode import P2PDataStore
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import assert_equal
 
@@ -34,8 +34,6 @@
         # Add p2p connection to node0
         node = self.nodes[0]  # convenience reference to the node
         node.add_p2p_connection(P2PDataStore())
-
-        network_thread_start()
         node.p2p.wait_for_verack()
 
         best_block = node.getblock(node.getbestblockhash())
diff --git a/test/functional/p2p_invalid_tx.py b/test/functional/p2p_invalid_tx.py
--- a/test/functional/p2p_invalid_tx.py
+++ b/test/functional/p2p_invalid_tx.py
@@ -20,7 +20,7 @@
     CTxIn,
     CTxOut,
 )
-from test_framework.mininode import network_thread_start, P2PDataStore, network_thread_join
+from test_framework.mininode import P2PDataStore
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import (
     assert_equal,
@@ -40,7 +40,6 @@
         Helper to connect and wait for version handshake."""
         for _ in range(num_connections):
             self.nodes[0].add_p2p_connection(P2PDataStore())
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
     def reconnect_p2p(self, **kwargs):
@@ -49,7 +48,6 @@
         The node gets disconnected several times in this test. This helper
         method reconnects the p2p and restarts the network thread."""
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
         self.bootstrap_p2p(**kwargs)
 
     def run_test(self):
diff --git a/test/functional/p2p_leak.py b/test/functional/p2p_leak.py
--- a/test/functional/p2p_leak.py
+++ b/test/functional/p2p_leak.py
@@ -20,8 +20,6 @@
 )
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_join,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.test_framework import BitcoinTestFramework
@@ -142,8 +140,6 @@
         no_verack_idlenode = self.nodes[0].add_p2p_connection(
             CNodeNoVerackIdle())
 
-        network_thread_start()
-
         wait_until(lambda: no_version_bannode.ever_connected,
                    timeout=10, lock=mininode_lock)
         wait_until(lambda: no_version_idlenode.ever_connected,
@@ -162,9 +158,8 @@
 
         self.nodes[0].disconnect_p2ps()
 
-        # Wait until all connections are closed and the network thread has terminated
+        # Wait until all connections are closed
         wait_until(lambda: len(self.nodes[0].getpeerinfo()) == 0)
-        network_thread_join()
 
         # Make sure no unexpected messages came in
         assert(no_version_bannode.unexpected_msg == False)
diff --git a/test/functional/p2p_leak_tx.py b/test/functional/p2p_leak_tx.py
--- a/test/functional/p2p_leak_tx.py
+++ b/test/functional/p2p_leak_tx.py
@@ -5,7 +5,7 @@
 """Test that we don't leak txs to inbound peers that we haven't yet announced to"""
 
 from test_framework.messages import msg_getdata, CInv
-from test_framework.mininode import P2PDataStore, network_thread_start
+from test_framework.mininode import P2PDataStore
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import (
     assert_equal,
@@ -34,7 +34,6 @@
         # Backport note: the following two lines were backported out of order,
         # and should be removed in the appropriate future backports that do a
         # blanket removal of each of these calls across many tests.
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
         MAX_REPEATS = 100
diff --git a/test/functional/p2p_mempool.py b/test/functional/p2p_mempool.py
--- a/test/functional/p2p_mempool.py
+++ b/test/functional/p2p_mempool.py
@@ -9,7 +9,7 @@
 """
 
 from test_framework.messages import msg_mempool
-from test_framework.mininode import network_thread_start, P2PInterface
+from test_framework.mininode import P2PInterface
 from test_framework.test_framework import BitcoinTestFramework
 from test_framework.util import assert_equal
 
@@ -23,7 +23,6 @@
     def run_test(self):
         # Add a p2p connection
         self.nodes[0].add_p2p_connection(P2PInterface())
-        network_thread_start()
         self.nodes[0].p2p.wait_for_verack()
 
         # request mempool
diff --git a/test/functional/p2p_node_network_limited.py b/test/functional/p2p_node_network_limited.py
--- a/test/functional/p2p_node_network_limited.py
+++ b/test/functional/p2p_node_network_limited.py
@@ -18,8 +18,6 @@
 )
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_join,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.test_framework import BitcoinTestFramework
@@ -72,7 +70,6 @@
 
     def run_test(self):
         node = self.nodes[0].add_p2p_connection(P2PIgnoreInv())
-        network_thread_start()
         node.wait_for_verack()
 
         expected_services = NODE_BLOOM | NODE_BITCOIN_CASH | NODE_NETWORK_LIMITED
@@ -103,9 +100,7 @@
 
         self.log.info("Check local address relay, do a fresh connection.")
         self.nodes[0].disconnect_p2ps()
-        network_thread_join()
         node1 = self.nodes[0].add_p2p_connection(P2PIgnoreInv())
-        network_thread_start()
         node1.wait_for_verack()
         node1.send_message(msg_verack())
 
diff --git a/test/functional/p2p_sendheaders.py b/test/functional/p2p_sendheaders.py
--- a/test/functional/p2p_sendheaders.py
+++ b/test/functional/p2p_sendheaders.py
@@ -99,7 +99,6 @@
 )
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.test_framework import BitcoinTestFramework
@@ -243,15 +242,11 @@
         return [int(x, 16) for x in all_hashes]
 
     def run_test(self):
-        # Setup the p2p connections and start up the network thread.
+        # Setup the p2p connections
         inv_node = self.nodes[0].add_p2p_connection(BaseNode())
         # Make sure NODE_NETWORK is not set for test_node, so no block download
         # will occur outside of direct fetching
         test_node = self.nodes[0].add_p2p_connection(BaseNode(), services=0)
-
-        network_thread_start()
-
-        # Test logic begins here
         inv_node.wait_for_verack()
         test_node.wait_for_verack()
 
diff --git a/test/functional/p2p_timeouts.py b/test/functional/p2p_timeouts.py
--- a/test/functional/p2p_timeouts.py
+++ b/test/functional/p2p_timeouts.py
@@ -24,7 +24,7 @@
 from time import sleep
 
 from test_framework.messages import msg_ping
-from test_framework.mininode import network_thread_start, P2PInterface
+from test_framework.mininode import P2PInterface
 from test_framework.test_framework import BitcoinTestFramework
 
 
@@ -40,15 +40,13 @@
         self.num_nodes = 1
 
     def run_test(self):
-        # Setup the p2p connections and start up the network thread.
+        # Setup the p2p connections
         no_verack_node = self.nodes[0].add_p2p_connection(TestP2PConn())
         no_version_node = self.nodes[0].add_p2p_connection(
             TestP2PConn(), send_version=False)
         no_send_node = self.nodes[0].add_p2p_connection(
             TestP2PConn(), send_version=False)
 
-        network_thread_start()
-
         sleep(1)
 
         assert no_verack_node.is_connected
diff --git a/test/functional/p2p_unrequested_blocks.py b/test/functional/p2p_unrequested_blocks.py
--- a/test/functional/p2p_unrequested_blocks.py
+++ b/test/functional/p2p_unrequested_blocks.py
@@ -67,8 +67,6 @@
 )
 from test_framework.mininode import (
     mininode_lock,
-    network_thread_join,
-    network_thread_start,
     P2PInterface,
 )
 from test_framework.test_framework import BitcoinTestFramework
@@ -97,15 +95,11 @@
         self.setup_nodes()
 
     def run_test(self):
-        # Setup the p2p connections and start up the network thread.
+        # Setup the p2p connections
         # test_node connects to node0 (not whitelisted)
         test_node = self.nodes[0].add_p2p_connection(P2PInterface())
         # min_work_node connects to node1 (whitelisted)
         min_work_node = self.nodes[1].add_p2p_connection(P2PInterface())
-
-        network_thread_start()
-
-        # Test logic begins here
         test_node.wait_for_verack()
         min_work_node.wait_for_verack()
 
@@ -238,10 +232,8 @@
         # disconnect/reconnect first
         self.nodes[0].disconnect_p2ps()
         self.nodes[1].disconnect_p2ps()
-        network_thread_join()
 
         test_node = self.nodes[0].add_p2p_connection(P2PInterface())
-        network_thread_start()
         test_node.wait_for_verack()
 
         test_node.send_message(msg_block(block_h1f))
@@ -336,8 +328,6 @@
 
             self.nodes[0].disconnect_p2ps()
             test_node = self.nodes[0].add_p2p_connection(P2PInterface())
-
-            network_thread_start()
             test_node.wait_for_verack()
 
         # We should have failed reorg and switched back to 290 (but have block 291)
diff --git a/test/functional/rpc_blockchain.py b/test/functional/rpc_blockchain.py
--- a/test/functional/rpc_blockchain.py
+++ b/test/functional/rpc_blockchain.py
@@ -42,7 +42,6 @@
 )
 from test_framework.mininode import (
     P2PInterface,
-    network_thread_start,
 )
 
 
@@ -312,7 +311,6 @@
 
         # Start a P2P connection since we'll need to create some blocks.
         node.add_p2p_connection(P2PInterface())
-        network_thread_start()
         node.p2p.wait_for_verack()
 
         current_height = node.getblock(node.getbestblockhash())['height']
diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py
--- a/test/functional/test_framework/messages.py
+++ b/test/functional/test_framework/messages.py
@@ -55,9 +55,6 @@
 MSG_BLOCK = 2
 MSG_TYPE_MASK = 0xffffffff >> 2
 
-# Howmuch data will be read from the network at once
-READ_BUFFER_SIZE = 8192
-
 # Serialization/deserialization tools
 
 
diff --git a/test/functional/test_framework/mininode.py b/test/functional/test_framework/mininode.py
--- a/test/functional/test_framework/mininode.py
+++ b/test/functional/test_framework/mininode.py
@@ -13,11 +13,10 @@
 P2PInterface: A high-level interface object for communicating to a node over P2P
 P2PDataStore: A p2p interface class that keeps a store of transactions and blocks
               and can respond correctly to getdata and getheaders messages"""
-import asyncore
+import asyncio
 from collections import defaultdict
 from io import BytesIO
 import logging
-import socket
 import struct
 import sys
 import threading
@@ -51,7 +50,6 @@
     msg_verack,
     msg_version,
     NODE_NETWORK,
-    READ_BUFFER_SIZE,
     sha256,
 )
 from test_framework.util import wait_until
@@ -90,7 +88,7 @@
 }
 
 
-class P2PConnection(asyncore.dispatcher):
+class P2PConnection(asyncio.Protocol):
     """A low-level connection object to a node's P2P interface.
 
     This class is responsible for:
@@ -104,73 +102,73 @@
     sub-classed and the on_message() callback overridden."""
 
     def __init__(self):
-        # All P2PConnections must be created before starting the NetworkThread.
-        # assert that the network thread is not running.
-        assert not network_thread_running()
-
-        super().__init__(map=mininode_socket_map)
-
-        self._conn_open = False
+        # The underlying transport of the connection.
+        # Should only call methods on this from the NetworkThread, c.f. call_soon_threadsafe
+        self._transport = None
 
     @property
     def is_connected(self):
-        return self._conn_open
+        return self._transport is not None
 
     def peer_connect(self, dstaddr, dstport, net="regtest"):
+        assert not self.is_connected
         self.dstaddr = dstaddr
         self.dstport = dstport
-        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
-        self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
-        self.sendbuf = b""
+        # The initial message to send after the connection was made:
+        self.on_connection_send_msg = None
+        self.on_connection_send_msg_is_raw = False
         self.recvbuf = b""
-        self._asyncore_pre_connection = True
         self.network = net
-        self.disconnect = False
-
         logger.debug('Connecting to Bitcoin Node: {}:{}'.format(
             self.dstaddr, self.dstport))
 
-        try:
-            self.connect((dstaddr, dstport))
-        except:
-            self.handle_close()
+        loop = NetworkThread.network_event_loop
+        conn_gen_unsafe = loop.create_connection(
+            lambda: self, host=self.dstaddr, port=self.dstport)
+
+        def conn_gen(): return loop.call_soon_threadsafe(
+            loop.create_task, conn_gen_unsafe)
+        return conn_gen
 
     def peer_disconnect(self):
         # Connection could have already been closed by other end.
-        if self.is_connected:
-            # Signal asyncore to disconnect
-            self.disconnect = True
+        NetworkThread.network_event_loop.call_soon_threadsafe(
+            lambda: self._transport and self._transport.abort())
 
     # Connection and disconnection methods
 
-    def handle_connect(self):
-        """asyncore callback when a connection is opened."""
-        if not self.is_connected:
-            logger.debug("Connected & Listening: {}:{}".format(
-                self.dstaddr, self.dstport))
-            self._conn_open = True
-            self._asyncore_pre_connection = False
-            self.on_open()
-
-    def handle_close(self):
-        """asyncore callback when a connection is closed."""
-        logger.debug("Closing connection to: {}:{}".format(
+    def connection_made(self, transport):
+        """asyncio callback when a connection is opened."""
+        assert not self._transport
+        logger.debug("Connected & Listening: {}:{}".format(
             self.dstaddr, self.dstport))
-        self._conn_open = False
+        self._transport = transport
+        if self.on_connection_send_msg:
+            if self.on_connection_send_msg_is_raw:
+                self.send_raw_message(self.on_connection_send_msg)
+            else:
+                self.send_message(self.on_connection_send_msg)
+            # Never used again
+            self.on_connection_send_msg = None
+        self.on_open()
+
+    def connection_lost(self, exc):
+        """asyncio callback when a connection is closed."""
+        if exc:
+            logger.warning("Connection lost to {}:{} due to {}".format(
+                self.dstaddr, self.dstport, exc))
+        else:
+            logger.debug("Closed connection to: {}:{}".format(
+                self.dstaddr, self.dstport))
+        self._transport = None
         self.recvbuf = b""
-        self.sendbuf = b""
-        try:
-            self.close()
-        except:
-            pass
         self.on_close()
 
     # Socket read methods
 
-    def handle_read(self):
-        """asyncore callback when data is read from the socket."""
+    def data_received(self, t):
+        """asyncio callback when data is read from the socket."""
         with mininode_lock:
-            t = self.recv(READ_BUFFER_SIZE)
             if len(t) > 0:
                 self.recvbuf += t
 
@@ -223,30 +221,6 @@
 
     # Socket write methods
 
-    def writable(self):
-        """asyncore method to determine whether the handle_write() callback should be called on the next loop."""
-        with mininode_lock:
-            length = len(self.sendbuf)
-        return length > 0 or self._asyncore_pre_connection
-
-    def handle_write(self):
-        """asyncore callback when data should be written to the socket."""
-        with mininode_lock:
-            # asyncore does not expose socket connection, only the first read/write
-            # event, thus we must check connection manually here to know when we
-            # actually connect
-            if self._asyncore_pre_connection:
-                self.handle_connect()
-            if not self.writable():
-                return
-
-            try:
-                sent = self.send(self.sendbuf)
-            except:
-                self.handle_close()
-                return
-            self.sendbuf = self.sendbuf[sent:]
-
     def send_message(self, message):
         """Send a P2P message over the socket.
 
@@ -265,15 +239,8 @@
         socket."""
         if not self.is_connected:
             raise IOError('Not connected')
-        with mininode_lock:
-            if len(self.sendbuf) == 0:
-                try:
-                    sent = self.send(tmsg)
-                    self.sendbuf = tmsg[sent:]
-                except BlockingIOError:
-                    self.sendbuf = tmsg
-            else:
-                self.sendbuf += tmsg
+        NetworkThread.network_event_loop.call_soon_threadsafe(
+            lambda: self._transport and not self._transport.is_closing() and self._transport.write(tmsg))
 
     # Class utility methods
 
@@ -329,7 +296,7 @@
         self.nServices = 0
 
     def peer_connect(self, *args, services=NODE_NETWORK, send_version=True, **kwargs):
-        super().peer_connect(*args, **kwargs)
+        create_conn = super().peer_connect(*args, **kwargs)
 
         if send_version:
             # Send a version msg
@@ -339,8 +306,10 @@
             vt.addrTo.port = self.dstport
             vt.addrFrom.ip = "0.0.0.0"
             vt.addrFrom.port = 0
-            # Will be sent right after handle_connect
-            self.sendbuf = self._build_message(vt)
+            # Will be sent soon after connection_made
+            self.on_connection_send_msg = vt
+
+        return create_conn
 
     # Message receiving methods
 
@@ -491,61 +460,36 @@
         self.ping_counter += 1
 
 
-# Keep our own socket map for asyncore, so that we can track disconnects
-# ourselves (to workaround an issue with closing an asyncore socket when
-# using select)
-mininode_socket_map = dict()
-
-# One lock for synchronizing all data access between the networking thread (see
+# One lock for synchronizing all data access between the network event loop (see
 # NetworkThread below) and the thread running the test logic.  For simplicity,
-# P2PConnection acquires this lock whenever delivering a message to a P2PInterface,
-# and whenever adding anything to the send buffer (in send_message()).  This
-# lock should be acquired in the thread running the test logic to synchronize
+# P2PConnection acquires this lock whenever delivering a message to a P2PInterface.
+# This lock should be acquired in the thread running the test logic to synchronize
 # access to any data shared with the P2PInterface or P2PConnection.
 mininode_lock = threading.RLock()
 
 
 class NetworkThread(threading.Thread):
+    network_event_loop = None
+
     def __init__(self):
         super().__init__(name="NetworkThread")
+        # There is only one event loop and no more than one thread must be created
+        assert not self.network_event_loop
+
+        NetworkThread.network_event_loop = asyncio.new_event_loop()
 
     def run(self):
-        while mininode_socket_map:
-            # We check for whether to disconnect outside of the asyncore
-            # loop to workaround the behavior of asyncore when using
-            # select
-            disconnected = []
-            for fd, obj in mininode_socket_map.items():
-                if obj.disconnect:
-                    disconnected.append(obj)
-            [obj.handle_close() for obj in disconnected]
-            asyncore.loop(0.1, use_poll=True, map=mininode_socket_map, count=1)
-        logger.debug("Network thread closing")
-
-
-def network_thread_start():
-    """Start the network thread."""
-    # Only one network thread may run at a time
-    assert not network_thread_running()
-
-    NetworkThread().start()
-
-
-def network_thread_running():
-    """Return whether the network thread is running."""
-    return any([thread.name == "NetworkThread" for thread in threading.enumerate()])
-
-
-def network_thread_join(timeout=10):
-    """Wait timeout seconds for the network thread to terminate.
-
-    Throw if the network thread doesn't terminate in timeout seconds."""
-    network_threads = [
-        thread for thread in threading.enumerate() if thread.name == "NetworkThread"]
-    assert len(network_threads) <= 1
-    for thread in network_threads:
-        thread.join(timeout)
-        assert not thread.is_alive()
+        """Start the network thread."""
+        self.network_event_loop.run_forever()
+
+    def close(self, timeout=10):
+        """Close the connections and network event loop."""
+        self.network_event_loop.call_soon_threadsafe(
+            self.network_event_loop.stop)
+        wait_until(lambda: not self.network_event_loop.is_running(),
+                   timeout=timeout)
+        self.network_event_loop.close()
+        self.join(timeout)
 
 
 class P2PDataStore(P2PInterface):
diff --git a/test/functional/test_framework/test_framework.py b/test/functional/test_framework/test_framework.py
--- a/test/functional/test_framework/test_framework.py
+++ b/test/functional/test_framework/test_framework.py
@@ -18,6 +18,7 @@
 from .authproxy import JSONRPCException
 from . import coverage
 from .test_node import TestNode
+from .mininode import NetworkThread
 from .util import (
     assert_equal,
     check_json_precision,
@@ -69,6 +70,7 @@
         """Sets test framework defaults. Do not override this method. Instead, override the set_test_params() method"""
         self.setup_clean_chain = False
         self.nodes = []
+        self.network_thread = None
         self.mocktime = 0
         self.supports_cli = False
         self.bind_to_localhost_only = True
@@ -133,6 +135,10 @@
             self.options.tmpdir = tempfile.mkdtemp(prefix="test")
         self._start_logging()
 
+        self.log.debug('Setting up network thread')
+        self.network_thread = NetworkThread()
+        self.network_thread.start()
+
         success = TestStatus.FAILED
 
         try:
@@ -161,6 +167,8 @@
             print("Testcase failed. Attaching python debugger. Enter ? for help")
             pdb.set_trace()
 
+        self.log.debug('Closing down network thread')
+        self.network_thread.close()
         if not self.options.noshutdown:
             self.log.info("Stopping nodes")
             if self.nodes:
diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py
--- a/test/functional/test_framework/test_node.py
+++ b/test/functional/test_framework/test_node.py
@@ -340,7 +340,7 @@
         if 'dstaddr' not in kwargs:
             kwargs['dstaddr'] = '127.0.0.1'
 
-        p2p_conn.peer_connect(*args, **kwargs)
+        p2p_conn.peer_connect(*args, **kwargs)()
         self.p2ps.append(p2p_conn)
 
         return p2p_conn