diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -22,6 +22,7 @@ assert_equal, assert_raises_rpc_error, connect_nodes, + disconnect_nodes, ) # Test may be skipped and not have zmq installed @@ -73,6 +74,8 @@ class ZMQTest (BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 + self.whitelist_arg = "-whitelist=noban@127.0.0.1" + self.extra_args = [[self.whitelist_arg]] * self.num_nodes def skip_test_if_missing_module(self): self.skip_if_no_py3_zmq() @@ -110,8 +113,12 @@ rawblock = subs[2] rawtx = subs[3] - self.restart_node(0, ["-zmqpub{}={}".format(sub.topic.decode(), address) - for sub in [hashblock, hashtx, rawblock, rawtx]]) + self.restart_node( + 0, + [f"-zmqpub{sub.topic.decode()}={address}" for sub in [ + hashblock, hashtx, rawblock, rawtx]] + + [self.whitelist_arg]) + connect_nodes(self.nodes[0], self.nodes[1]) for socket in sockets: socket.connect(address) @@ -200,7 +207,8 @@ # Should only notify the tip if a reorg occurs self.restart_node( - 0, ['-zmqpub{}={}'.format(sub.topic.decode(), address) for sub in [hashblock, hashtx]]) + 0, [f'-zmqpub{sub.topic.decode()}={address}' + for sub in [hashblock, hashtx]] + [self.whitelist_arg]) for socket in sockets: socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages @@ -277,7 +285,8 @@ socket.set(zmq.RCVTIMEO, 60000) seq = ZMQSubscriber(socket, b'sequence') - self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) + self.restart_node( + 0, [f'-zmqpub{seq.topic.decode()}={address}', self.whitelist_arg]) socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages sleep(0.2) @@ -313,17 +322,60 @@ # Rest of test requires wallet functionality if self.is_wallet_compiled(): self.log.info("Wait for tx from second node") - payment_txid = self.nodes[1].sendtoaddress( - address=self.nodes[0].getnewaddress(), amount=5_000_000) - self.sync_all() + + # Prepare data needed to generate conflicting transactions + utxo = self.nodes[1].listunspent()[0] + address1 = self.nodes[1].getnewaddress() + address2 = self.nodes[1].getnewaddress() + change_address = self.nodes[1].getrawchangeaddress() + + # Produce a transaction that node1 will not receive. + disconnect_nodes(self.nodes[0], self.nodes[1]) + tx1 = self.nodes[1].signrawtransactionwithwallet( + self.nodes[1].createrawtransaction( + inputs=[{"txid": utxo["txid"], "vout": utxo["vout"]}], + outputs=[{address1: 5_000_000}, + {change_address: utxo["amount"] - 5_001_000}] + ) + ) + payment_txid = self.nodes[0].sendrawtransaction(tx1["hex"]) + self.log.info( "Testing sequence notifications with mempool sequence values") - # Should receive the broadcasted txid. assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) seq_num += 1 - # Removed RBF tests + self.log.info("Testing a tx removal notification") + tx2 = self.nodes[1].signrawtransactionwithwallet( + self.nodes[1].createrawtransaction( + inputs=[{"txid": utxo["txid"], "vout": utxo["vout"]}], + outputs=[{address2: 5_000_000}, + {change_address: utxo["amount"] - 5_001_000}] + ) + ) + self.nodes[1].sendrawtransaction(tx2["hex"]) + # Mine it before reconnection, so that will get ejected from + # from node0's mempool + self.nodes[1].generatetoaddress(1, ADDRESS_ECREG_P2SH_OP_TRUE) + connect_nodes(self.nodes[0], self.nodes[1]) + self.sync_all() + + best_block = self.nodes[1].getblockhash( + self.nodes[1].getblockcount()) + assert_equal((payment_txid, "R", seq_num), seq.receive_sequence()) + seq_num += 1 + assert_equal((best_block, "C", None), seq.receive_sequence()) + # We don't see a notification for the replacement tx, because it + # was never in node0's mempool. + + # Add another transaction, because mining that last block in order + # to cause a tx rejection put us out of sync with Core's test + payment_txid = self.nodes[1].sendtoaddress( + address=self.nodes[0].getnewaddress(), amount=5_000_000) + self.sync_all() + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 # Doesn't get published when mined, make a block and tx to "flush" # the possibility though the mempool sequence number does go up by @@ -335,8 +387,8 @@ self.sync_all() # Make sure the number of mined transactions matches the number of # txs out of mempool - mempool_size_delta = mempool_size - \ - len(self.nodes[0].getrawmempool()) + mempool_size_delta = ( + mempool_size - len(self.nodes[0].getrawmempool())) assert_equal(len(self.nodes[0].getblock(c_block)["tx"]) - 1, mempool_size_delta) seq_num += mempool_size_delta @@ -464,7 +516,8 @@ socket.set(zmq.RCVTIMEO, 60000) seq = ZMQSubscriber(socket, b'sequence') - self.restart_node(0, [f'-zmqpub{seq.topic.decode()}={address}']) + self.restart_node( + 0, [f'-zmqpub{seq.topic.decode()}={address}', self.whitelist_arg]) connect_nodes(self.nodes[0], self.nodes[1]) socket.connect(address) # Relax so that the subscriber is ready before publishing zmq messages @@ -508,7 +561,6 @@ # Things continue to happen in the "interim" while waiting for # snapshot results - # We have node 0 do all these to avoid p2p races with RBF announcements for _ in range(num_txs): txids.append(self.nodes[0].sendtoaddress( address=self.nodes[0].getnewaddress(), amount=1_000_000))