diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -22,6 +22,7 @@ assert_equal, assert_raises_rpc_error, connect_nodes, + disconnect_nodes, ) # Test may be skipped and not have zmq installed @@ -73,6 +74,7 @@ class ZMQTest (BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 + self.extra_args = [["-whitelist=noban@127.0.0.1"]] * self.num_nodes def skip_test_if_missing_module(self): self.skip_if_no_py3_zmq() @@ -110,8 +112,13 @@ rawblock = subs[2] rawtx = subs[3] - self.restart_node(0, ["-zmqpub{}={}".format(sub.topic.decode(), address) - for sub in [hashblock, hashtx, rawblock, rawtx]]) + self.restart_node( + 0, + self.extra_args[0] + [ + f"-zmqpub{sub.topic.decode()}={address}" for sub in [ + hashblock, hashtx, rawblock, rawtx]] + ) + connect_nodes(self.nodes[0], self.nodes[1]) for socket in sockets: socket.connect(address) @@ -200,7 +207,9 @@ # Should only notify the tip if a reorg occurs self.restart_node( - 0, ['-zmqpub{}={}'.format(sub.topic.decode(), address) for sub in [hashblock, hashtx]]) + 0, + self.extra_args[0] + [f'-zmqpub{sub.topic.decode()}={address}' + for sub in [hashblock, hashtx]]) for socket in sockets: socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages @@ -260,6 +269,37 @@ self.nodes[1].getblock( connect_blocks[0])["tx"][0]) + def create_conflicting_tx(self): + """Create a transaction that is initially added to node0's mempool + and is then rejected by a transaction created and included into a + block by node1.""" + utxo = self.nodes[1].listunspent()[0] + + def send_conflicting_transaction(send_node): + """Send a transaction using an identical utxo as input and + a different address as output each time the function is + called. Return the TxId.""" + address = self.nodes[1].getnewaddress() + change_address = self.nodes[1].getrawchangeaddress() + tx = self.nodes[1].signrawtransactionwithwallet( + self.nodes[1].createrawtransaction( + inputs=[{"txid": utxo["txid"], "vout": utxo["vout"]}], + outputs=[{address: 5_000_000}, + {change_address: utxo["amount"] - 5_001_000}] + ) + ) + return send_node.sendrawtransaction(tx["hex"]) + + disconnect_nodes(self.nodes[0], self.nodes[1]) + txid_to_be_replaced = send_conflicting_transaction(self.nodes[0]) + replacement_txid = send_conflicting_transaction(self.nodes[1]) + block_hash = self.nodes[1].generatetoaddress( + 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] + connect_nodes(self.nodes[0], self.nodes[1]) + self.sync_all() + + return block_hash, txid_to_be_replaced, replacement_txid + def test_sequence(self): """ Sequence zmq notifications give every blockhash and txhash in order @@ -277,7 +317,8 @@ socket.set(zmq.RCVTIMEO, 60000) seq = ZMQSubscriber(socket, b'sequence') - self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) + self.restart_node( + 0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages sleep(0.2) @@ -312,19 +353,33 @@ # Rest of test requires wallet functionality if self.is_wallet_compiled(): + (block_hash, txid_to_be_replaced, replacement_txid + ) = self.create_conflicting_tx() + self.log.info( + "Testing sequence notifications with mempool sequence values") + # Should receive the initially broadcasted txid. + assert_equal((txid_to_be_replaced, "A", seq_num), + seq.receive_sequence()) + seq_num += 1 + + self.log.info("Testing a tx removal notification") + # Next we receive a notification for the transaction removal + assert_equal((txid_to_be_replaced, "R", seq_num), + seq.receive_sequence()) + seq_num += 1 + # Then we see the block notification + assert_equal((block_hash, "C", None), seq.receive_sequence()) + # There is no sequence notification for the transaction that was + # never in node0's mempool, but it can be found in the block. + assert replacement_txid in self.nodes[0].getblock(block_hash)["tx"] + self.log.info("Wait for tx from second node") payment_txid = self.nodes[1].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=5_000_000) self.sync_all() - self.log.info( - "Testing sequence notifications with mempool sequence values") - - # Should receive the broadcasted txid. assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 - # Removed RBF tests - # Doesn't get published when mined, make a block and tx to "flush" # the possibility though the mempool sequence number does go up by # the number of transactions removed from the mempool by the block @@ -462,7 +517,8 @@ socket.set(zmq.RCVTIMEO, 60000) seq = ZMQSubscriber(socket, b'sequence') - self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) + self.restart_node( + 0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) connect_nodes(self.nodes[0], self.nodes[1]) socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages @@ -510,6 +566,7 @@ txids.append(self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000)) self.sync_all() + self.create_conflicting_tx() self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) final_txid = self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=100_000) @@ -528,14 +585,18 @@ f"get_raw_seq: {get_raw_seq}") # 4) Moving forward, we apply the delta to our local view - # remaining txs(5) + 1 block connect + 1 final tx + # remaining txs + conflict (A, R, C) + 1 block connect + 1 final tx expected_sequence = get_raw_seq - for _ in range(num_txs + 1 + 1): + for _ in range(num_txs + 3 + 1 + 1): (hash_str, label, mempool_sequence) = seq.receive_sequence() if label == "A": assert hash_str not in mempool_view mempool_view.add(hash_str) expected_sequence = mempool_sequence + 1 + elif label == "R": + assert hash_str in mempool_view + mempool_view.remove(hash_str) + expected_sequence = mempool_sequence + 1 elif label == "C": # (Attempt to) remove all txids from known block connects block_txids = self.nodes[0].getblock(hash_str)["tx"][1:]