diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -88,39 +88,49 @@ self.log.debug("Destroying ZMQ context") self.ctx.destroy(linger=None) + # Restart node with the specified zmq notifications enabled, subscribe to + # all of them and return the corresponding ZMQSubscriber objects. + def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False): + subscribers = [] + for topic, address in services: + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, recv_timeout * 1000) + subscribers.append(ZMQSubscriber(socket, topic.encode())) + + self.restart_node( + 0, + self.extra_args[0] + [ + f"-zmqpub{topic}={address}" for topic, address in services + ]) + + if connect_nodes: + self.connect_nodes(0, 1) + + for i, sub in enumerate(subscribers): + sub.socket.connect(services[i][1]) + + # Relax so that the subscribers are ready before publishing zmq + # messages + sleep(0.2) + + return subscribers + def test_basic(self): # Invalid zmq arguments don't take down the node, see #17185. self.restart_node(0, ["-zmqpubrawtx=foo", "-zmqpubhashtx=bar"]) address = 'tcp://127.0.0.1:28332' - sockets = [] - subs = [] - services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"] - for service in services: - sockets.append(self.ctx.socket(zmq.SUB)) - sockets[-1].set(zmq.RCVTIMEO, 60000) - subs.append(ZMQSubscriber(sockets[-1], service)) - - # Subscribe to all available topics. + subs = self.setup_zmq_test( + [(topic, address) + for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]], + connect_nodes=True) + hashblock = subs[0] hashtx = subs[1] rawblock = subs[2] rawtx = subs[3] - self.restart_node( - 0, - self.extra_args[0] + [ - f"-zmqpub{sub.topic.decode()}={address}" for sub in [ - hashblock, hashtx, rawblock, rawtx]] - ) - - self.connect_nodes(0, 1) - for socket in sockets: - socket.connect(address) - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) - num_blocks = 5 self.log.info( "Generate {0} blocks (and {0} coinbase txes)".format(num_blocks)) @@ -188,27 +198,11 @@ address = 'tcp://127.0.0.1:28333' - services = [b"hashblock", b"hashtx"] - sockets = [] - subs = [] - for service in services: - sockets.append(self.ctx.socket(zmq.SUB)) - # 2 second timeout to check end of notifications - sockets[-1].set(zmq.RCVTIMEO, 2000) - subs.append(ZMQSubscriber(sockets[-1], service)) - - # Subscribe to all available topics. - hashblock = subs[0] - hashtx = subs[1] - # Should only notify the tip if a reorg occurs - self.restart_node( - 0, self.extra_args[0] + [f'-zmqpub{sub.topic.decode()}={address}' - for sub in [hashblock, hashtx]]) - for socket in sockets: - socket.connect(address) - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + hashblock, hashtx = self.setup_zmq_test( + [(topic, address) for topic in ["hashblock", "hashtx"]], + # 2 second timeout to check end of notifications + recv_timeout=2) # Generate 1 block in nodes[0] with 1 mempool tx and receive all # notifications @@ -307,16 +301,7 @@ <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool """ self.log.info("Testing 'sequence' publisher") - address = 'tcp://127.0.0.1:28333' - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - seq = ZMQSubscriber(socket, b'sequence') - - self.restart_node( - 0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) - socket.connect(address) - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) # Mempool sequence number starts at 1 seq_num = 1 @@ -507,17 +492,8 @@ return self.log.info("Testing 'mempool sync' usage of sequence notifier") - address = 'tcp://127.0.0.1:28333' - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - seq = ZMQSubscriber(socket, b'sequence') - - self.restart_node( - 0, self.extra_args[0] + [f'-zmqpub{seq.topic.decode()}={address}']) - self.connect_nodes(0, 1) - socket.connect(address) - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], + connect_nodes=True) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool( @@ -617,31 +593,19 @@ def test_multiple_interfaces(self): # Set up two subscribers with different addresses - subscribers = [] - for i in range(2): - address = f"tcp://127.0.0.1:{28334 + i}" - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - hashblock = ZMQSubscriber(socket, b"hashblock") - socket.connect(address) - subscribers.append({'address': address, 'hashblock': hashblock}) - - self.restart_node( - 0, - [f'-zmqpub{subscriber["hashblock"].topic.decode()}={subscriber["address"]}' - for subscriber in subscribers]) - - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + subscribers = self.setup_zmq_test([ + ("hashblock", "tcp://127.0.0.1:28334"), + ("hashblock", "tcp://127.0.0.1:28335"), + ]) # Generate 1 block in nodes[0] and receive all notifications self.nodes[0].generatetoaddress(1, ADDRESS_ECREG_UNSPENDABLE) # Should receive the same block hash on both subscribers assert_equal(self.nodes[0].getbestblockhash(), - subscribers[0]['hashblock'].receive().hex()) + subscribers[0].receive().hex()) assert_equal(self.nodes[0].getbestblockhash(), - subscribers[1]['hashblock'].receive().hex()) + subscribers[1].receive().hex()) if __name__ == '__main__':