diff --git a/test/functional/interface_bitcoin_cli.py b/test/functional/interface_bitcoin_cli.py index 48a201e2b..e5a032c8e 100755 --- a/test/functional/interface_bitcoin_cli.py +++ b/test/functional/interface_bitcoin_cli.py @@ -1,47 +1,47 @@ #!/usr/bin/env python3 # Copyright (c) 2017 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test bitcoin-cli""" from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, assert_raises_process_error, get_auth_cookie class TestBitcoinCli(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 def run_test(self): """Main test logic""" self.log.info( "Compare responses from gewalletinfo RPC and `bitcoin-cli getwalletinfo`") cli_response = self.nodes[0].cli.getwalletinfo() rpc_response = self.nodes[0].getwalletinfo() assert_equal(cli_response, rpc_response) self.log.info( "Compare responses from getblockchaininfo RPC and `bitcoin-cli getblockchaininfo`") cli_response = self.nodes[0].cli.getblockchaininfo() rpc_response = self.nodes[0].getblockchaininfo() assert_equal(cli_response, rpc_response) user, password = get_auth_cookie(self.nodes[0].datadir) self.log.info("Test -stdinrpcpass option") assert_equal(0, self.nodes[0].cli( - '-rpcuser=%s' % user, '-stdinrpcpass', input=password).getblockcount()) + '-rpcuser={}'.format(user), '-stdinrpcpass', input=password).getblockcount()) assert_raises_process_error(1, "incorrect rpcuser or rpcpassword", self.nodes[0].cli( - '-rpcuser=%s' % user, '-stdinrpcpass', input="foo").echo) + '-rpcuser={}'.format(user), '-stdinrpcpass', input="foo").echo) self.log.info("Test -stdin and -stdinrpcpass") - assert_equal(["foo", "bar"], self.nodes[0].cli('-rpcuser=%s' % user, + assert_equal(["foo", "bar"], self.nodes[0].cli('-rpcuser={}'.format(user), '-stdin', '-stdinrpcpass', input=password + "\nfoo\nbar").echo()) assert_raises_process_error(1, "incorrect rpcuser or rpcpassword", self.nodes[0].cli( - '-rpcuser=%s' % user, '-stdin', '-stdinrpcpass', input="foo").echo) + '-rpcuser={}'.format(user), '-stdin', '-stdinrpcpass', input="foo").echo) if __name__ == '__main__': TestBitcoinCli().main() diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index fc6c393dd..31dac7381 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -1,160 +1,162 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ API.""" import configparser import os import struct from test_framework.test_framework import BitcoinTestFramework, SkipTest from test_framework.util import (assert_equal, bytes_to_hex_str, hash256, ) class ZMQTest (BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 def setup_nodes(self): # Try to import python3-zmq. Skip this test if the import fails. try: import zmq except ImportError: raise SkipTest("python3-zmq module not available.") # Check that bitcoin has been built with ZMQ enabled config = configparser.ConfigParser() if not self.options.configfile: self.options.configfile = os.path.dirname( __file__) + "/../config.ini" config.read_file(open(self.options.configfile)) if not config["components"].getboolean("ENABLE_ZMQ"): raise SkipTest("bitcoind has not been built with zmq enabled.") self.zmqContext = zmq.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket.set(zmq.RCVTIMEO, 60000) self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock") self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx") self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawblock") self.zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtx") ip_address = "tcp://127.0.0.1:28332" self.zmqSubSocket.connect(ip_address) - self.extra_args = [['-zmqpubhashblock=%s' % ip_address, '-zmqpubhashtx=%s' % ip_address, - '-zmqpubrawblock=%s' % ip_address, '-zmqpubrawtx=%s' % ip_address], []] + self.extra_args = [['-zmqpubhashblock={}'.format(ip_address), + '-zmqpubhashtx={}'.format(ip_address), + '-zmqpubrawblock={}'.format(ip_address), + '-zmqpubrawtx={}'.format(ip_address)], []] self.add_nodes(self.num_nodes, self.extra_args) self.start_nodes() def run_test(self): try: self._zmq_test() finally: # Destroy the zmq context self.log.debug("Destroying zmq context") self.zmqContext.destroy(linger=None) def _zmq_test(self): genhashes = self.nodes[0].generate(1) self.sync_all() self.log.info("Wait for tx") msg = self.zmqSubSocket.recv_multipart() topic = msg[0] assert_equal(topic, b"hashtx") txhash = msg[1] msgSequence = struct.unpack('