diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index ec6f082e3..ee0ec7c97 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -1,129 +1,125 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2016 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the ZMQ notification interface.""" import configparser -import os import struct from test_framework.test_framework import BitcoinTestFramework, SkipTest from test_framework.util import ( assert_equal, bytes_to_hex_str, hash256, ) class ZMQSubscriber: def __init__(self, socket, topic): self.sequence = 0 self.socket = socket self.topic = topic import zmq self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) def receive(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. assert_equal(struct.unpack(' 0: self.nodes[i].extend_default_args( ["-connect=127.0.0.1:" + str(p2p_port(0))]) if self.options.gravitonactivation: self.nodes[i].extend_default_args( ["-gravitonactivationtime={}".format(TIMESTAMP_IN_THE_PAST)]) self.start_node(i) # Wait for RPC connections to be ready for node in self.nodes: node.wait_for_rpc_connection() # For backwared compatibility of the python scripts with previous # versions of the cache, set mocktime to Jan 1, # 2014 + (201 * 10 * 60) self.mocktime = 1388534400 + (201 * 10 * 60) # Create a 200-block-long chain; each of the 4 first nodes # gets 25 mature blocks and 25 immature. # Note: To preserve compatibility with older versions of # initialize_chain, only 4 nodes will generate coins. # # blocks are created with timestamps 10 minutes apart # starting from 2010 minutes in the past block_time = self.mocktime - (201 * 10 * 60) for i in range(2): for peer in range(4): for j in range(25): set_node_times(self.nodes, block_time) self.nodes[peer].generate(1) block_time += 10 * 60 # Must sync before next peer starts generating blocks sync_blocks(self.nodes) # Shut them down, and clean up cache directories: self.stop_nodes() self.nodes = [] self.mocktime = 0 def cache_path(n, *paths): return os.path.join(get_datadir_path(self.options.cachedir, n), "regtest", *paths) for i in range(MAX_NODES): for entry in os.listdir(cache_path(i)): if entry not in ['wallets', 'chainstate', 'blocks']: os.remove(cache_path(i, entry)) for i in range(self.num_nodes): from_dir = get_datadir_path(self.options.cachedir, i) to_dir = get_datadir_path(self.options.tmpdir, i) shutil.copytree(from_dir, to_dir) # Overwrite port/rpcport in bitcoin.conf initialize_datadir(self.options.tmpdir, i) def _initialize_chain_clean(self): """Initialize empty blockchain for use by the test. Create an empty blockchain and num_nodes wallets. Useful if a test case wants complete control over initialization.""" for i in range(self.num_nodes): initialize_datadir(self.options.tmpdir, i) class ComparisonTestFramework(BitcoinTestFramework): """Test framework for doing p2p comparison testing Sets up some bitcoind binaries: - 1 binary: test binary - 2 binaries: 1 test binary, 1 ref binary - n>2 binaries: 1 test binary, n-1 ref binaries""" def set_test_params(self): self.num_nodes = 2 self.setup_clean_chain = True def add_options(self, parser): parser.add_argument("--testbinary", dest="testbinary", - default=os.getenv("BITCOIND", "bitcoind"), help="bitcoind binary to test") parser.add_argument("--refbinary", dest="refbinary", - default=os.getenv("BITCOIND", "bitcoind"), help="bitcoind binary to use for reference nodes (if any)") def setup_network(self): extra_args = [['-whitelist=127.0.0.1']] * self.num_nodes + + if not self.options.testbinary: + self.options.testbinary = self.options.bitcoind + if not self.options.refbinary: + self.options.refbinary = self.options.bitcoind + if hasattr(self, "extra_args"): extra_args = self.extra_args self.add_nodes(self.num_nodes, extra_args, binary=[self.options.testbinary] + [self.options.refbinary] * (self.num_nodes - 1)) self.start_nodes() class SkipTest(Exception): """This exception is raised to skip a test""" def __init__(self, message): self.message = message diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index cdf082dcb..61a0253af 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -1,394 +1,390 @@ #!/usr/bin/env python3 # Copyright (c) 2017 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Class for bitcoind node under test""" import decimal import errno import http.client import json import logging import os import re import subprocess import sys import tempfile import time from .authproxy import JSONRPCException from .messages import COIN, CTransaction, FromHex from .util import ( append_config, assert_equal, delete_cookie_file, get_rpc_proxy, p2p_port, rpc_url, wait_until, ) # For Python 3.4 compatibility JSONDecodeError = getattr(json, "JSONDecodeError", ValueError) BITCOIND_PROC_WAIT_TIMEOUT = 60 class FailedToStartError(Exception): """Raised when a node fails to start correctly.""" class TestNode(): """A class for representing a bitcoind node under test. This class contains: - state about the node (whether it's running, etc) - a Python subprocess.Popen object representing the running process - an RPC connection to the node - one or more P2P connections to the node To make things easier for the test writer, any unrecognised messages will be dispatched to the RPC connection.""" - def __init__(self, i, datadir, host, rpc_port, p2p_port, timewait, binary, stderr, mocktime, coverage_dir, extra_conf=None, extra_args=None, use_cli=False): + def __init__(self, i, datadir, host, rpc_port, p2p_port, timewait, bitcoind, bitcoin_cli, stderr, mocktime, coverage_dir, extra_conf=None, extra_args=None, use_cli=False): self.index = i self.datadir = datadir self.host = host self.rpc_port = rpc_port self.p2p_port = p2p_port self.name = "testnode-{}".format(i) if timewait: self.rpc_timeout = timewait else: # Wait for up to 60 seconds for the RPC server to respond self.rpc_timeout = 60 - if binary is None: - self.binary = os.getenv("BITCOIND", "bitcoind") - else: - self.binary = binary + self.binary = bitcoind if not os.path.isfile(self.binary): raise FileNotFoundError( "Binary '{}' could not be found.\nTry setting it manually:\n\tBITCOIND= {}".format(self.binary, sys.argv[0])) self.stderr = stderr self.coverage_dir = coverage_dir if extra_conf != None: append_config(datadir, extra_conf) # Most callers will just need to add extra args to the default list # below. # For those callers that need more flexibity, they can access the # default args using the provided facilities self.extra_args = extra_args self.default_args = ["-datadir=" + self.datadir, "-server", "-keypool=1", "-discover=0", "-rest", "-logtimemicros", "-debug", "-debugexclude=libevent", "-debugexclude=leveldb", "-mocktime=" + str(mocktime), "-uacomment=" + self.name] - cli_path = os.getenv("BITCOINCLI", "bitcoin-cli") - if not os.path.isfile(cli_path): + if not os.path.isfile(bitcoin_cli): raise FileNotFoundError( - "Binary '{}' could not be found.\nTry setting it manually:\n\tBITCOINCLI= {}".format(cli_path, sys.argv[0])) - self.cli = TestNodeCLI(cli_path, self.datadir) + "Binary '{}' could not be found.\nTry setting it manually:\n\tBITCOINCLI= {}".format(bitcoin_cli, sys.argv[0])) + self.cli = TestNodeCLI(bitcoin_cli, self.datadir) self.use_cli = use_cli self.running = False self.process = None self.rpc_connected = False self.rpc = None self.url = None self.relay_fee_cache = None self.log = logging.getLogger('TestFramework.node{}'.format(i)) # Whether to kill the node when this object goes away self.cleanup_on_exit = True self.p2ps = [] def __del__(self): # Ensure that we don't leave any bitcoind processes lying around after # the test ends if self.process and self.cleanup_on_exit: # Should only happen on test failure # Avoid using logger, as that may have already been shutdown when # this destructor is called. print("Cleaning up leftover process") self.process.kill() def __getattr__(self, name): """Dispatches any unrecognised messages to the RPC connection or a CLI instance.""" if self.use_cli: return getattr(self.cli, name) else: assert self.rpc is not None, "Error: RPC not initialized" assert self.rpc_connected, "Error: No RPC connection" return getattr(self.rpc, name) def clear_default_args(self): self.default_args.clear() def extend_default_args(self, args): self.default_args.extend(args) def remove_default_args(self, args): for rm_arg in args: # Remove all occurrences of rm_arg in self.default_args: # - if the arg is a flag (-flag), then the names must match # - if the arg is a value (-key=value) then the name must starts # with "-key=" (the '"' char is to avoid removing "-key_suffix" # arg is "-key" is the argument to remove). self.default_args = [def_arg for def_arg in self.default_args if rm_arg != def_arg and not def_arg.startswith(rm_arg + '=')] def start(self, extra_args=None, stderr=None, *args, **kwargs): """Start the node.""" if extra_args is None: extra_args = self.extra_args if stderr is None: stderr = self.stderr # Delete any existing cookie file -- if such a file exists (eg due to # unclean shutdown), it will get overwritten anyway by bitcoind, and # potentially interfere with our attempt to authenticate delete_cookie_file(self.datadir) self.process = subprocess.Popen( [self.binary] + self.default_args + extra_args, stderr=stderr, *args, **kwargs) self.running = True self.log.debug("bitcoind started, waiting for RPC to come up") def wait_for_rpc_connection(self): """Sets up an RPC connection to the bitcoind process. Returns False if unable to connect.""" # Poll at a rate of four times per second poll_per_s = 4 for _ in range(poll_per_s * self.rpc_timeout): if self.process.poll() is not None: raise FailedToStartError( 'bitcoind exited with status {} during initialization'.format(self.process.returncode)) try: self.rpc = get_rpc_proxy(rpc_url(self.datadir, self.host, self.rpc_port), self.index, timeout=self.rpc_timeout, coveragedir=self.coverage_dir) self.rpc.getblockcount() # If the call to getblockcount() succeeds then the RPC connection is up self.rpc_connected = True self.url = self.rpc.url self.log.debug("RPC successfully started") return except IOError as e: if e.errno != errno.ECONNREFUSED: # Port not yet open? raise # unknown IO error except JSONRPCException as e: # Initialization phase if e.error['code'] != -28: # RPC in warmup? raise # unknown JSON RPC exception except ValueError as e: # cookie file not found and no rpcuser or rpcassword. bitcoind still starting if "No RPC credentials" not in str(e): raise time.sleep(1.0 / poll_per_s) raise AssertionError("Unable to connect to bitcoind") def get_wallet_rpc(self, wallet_name): if self.use_cli: return self.cli("-rpcwallet={}".format(wallet_name)) else: assert self.rpc_connected assert self.rpc wallet_path = "wallet/{}".format(wallet_name) return self.rpc / wallet_path def stop_node(self): """Stop the node.""" if not self.running: return self.log.debug("Stopping node") try: self.stop() except http.client.CannotSendRequest: self.log.exception("Unable to stop node.") del self.p2ps[:] def is_node_stopped(self): """Checks whether the node has stopped. Returns True if the node has stopped. False otherwise. This method is responsible for freeing resources (self.process).""" if not self.running: return True return_code = self.process.poll() if return_code is None: return False # process has stopped. Assert that it didn't return an error code. assert_equal(return_code, 0) self.running = False self.process = None self.rpc_connected = False self.rpc = None self.log.debug("Node stopped") return True def wait_until_stopped(self, timeout=BITCOIND_PROC_WAIT_TIMEOUT): wait_until(self.is_node_stopped, timeout=timeout) def assert_start_raises_init_error(self, extra_args=None, expected_msg=None, partial_match=False, *args, **kwargs): """Attempt to start the node and expect it to raise an error. extra_args: extra arguments to pass through to bitcoind expected_msg: regex that stderr should match when bitcoind fails Will throw if bitcoind starts without an error. Will throw if an expected_msg is provided and it does not match bitcoind's stdout.""" with tempfile.SpooledTemporaryFile(max_size=2**16) as log_stderr: try: self.start(extra_args, stderr=log_stderr, *args, **kwargs) self.wait_for_rpc_connection() self.stop_node() self.wait_until_stopped() except FailedToStartError as e: self.log.debug('bitcoind failed to start: {}'.format(e)) self.running = False self.process = None # Check stderr for expected message if expected_msg is not None: log_stderr.seek(0) stderr = log_stderr.read().decode('utf-8').strip() if partial_match: if re.search(expected_msg, stderr, flags=re.MULTILINE) is None: raise AssertionError( 'Expected message "{}" does not partially match stderr:\n"{}"'.format(expected_msg, stderr)) else: if re.fullmatch(expected_msg, stderr) is None: raise AssertionError( 'Expected message "{}" does not fully match stderr:\n"{}"'.format(expected_msg, stderr)) else: if expected_msg is None: assert_msg = "bitcoind should have exited with an error" else: assert_msg = "bitcoind should have exited with expected error " + expected_msg raise AssertionError(assert_msg) def node_encrypt_wallet(self, passphrase): """"Encrypts the wallet. This causes bitcoind to shutdown, so this method takes care of cleaning up resources.""" self.encryptwallet(passphrase) self.wait_until_stopped() def relay_fee(self, cached=True): if not self.relay_fee_cache or not cached: self.relay_fee_cache = self.getnetworkinfo()["relayfee"] return self.relay_fee_cache def calculate_fee(self, tx): # Relay fee is in satoshis per KB. Thus the 1000, and the COIN added # to get back to an amount of satoshis. billable_size_estimate = tx.billable_size() # Add some padding for signatures # NOTE: Fees must be calculated before signatures are added, # so they will never be included in the billable_size above. billable_size_estimate += len(tx.vin) * 81 return int(self.relay_fee() / 1000 * billable_size_estimate * COIN) def calculate_fee_from_txid(self, txid): ctx = FromHex(CTransaction(), self.getrawtransaction(txid)) return self.calculate_fee(ctx) def add_p2p_connection(self, p2p_conn, *args, **kwargs): """Add a p2p connection to the node. This method adds the p2p connection to the self.p2ps list and also returns the connection to the caller.""" if 'dstport' not in kwargs: kwargs['dstport'] = p2p_port(self.index) if 'dstaddr' not in kwargs: kwargs['dstaddr'] = '127.0.0.1' p2p_conn.peer_connect(*args, **kwargs) self.p2ps.append(p2p_conn) return p2p_conn @property def p2p(self): """Return the first p2p connection Convenience property - most tests only use a single p2p connection to each node, so this saves having to write node.p2ps[0] many times.""" assert self.p2ps, "No p2p connection" return self.p2ps[0] def disconnect_p2ps(self): """Close all p2p connections to the node.""" for p in self.p2ps: p.peer_disconnect() del self.p2ps[:] class TestNodeCLIAttr: def __init__(self, cli, command): self.cli = cli self.command = command def __call__(self, *args, **kwargs): return self.cli.send_cli(self.command, *args, **kwargs) def get_request(self, *args, **kwargs): return lambda: self(*args, **kwargs) class TestNodeCLI(): """Interface to bitcoin-cli for an individual node""" def __init__(self, binary, datadir): self.args = [] self.binary = binary self.datadir = datadir self.input = None self.log = logging.getLogger('TestFramework.bitcoincli') def __call__(self, *args, input=None): # TestNodeCLI is callable with bitcoin-cli command-line args cli = TestNodeCLI(self.binary, self.datadir) cli.args = [str(arg) for arg in args] cli.input = input return cli def __getattr__(self, command): return TestNodeCLIAttr(self, command) def batch(self, requests): results = [] for request in requests: try: results.append(dict(result=request())) except JSONRPCException as e: results.append(dict(error=e)) return results def send_cli(self, command, *args, **kwargs): """Run bitcoin-cli command. Deserializes returned string as python object.""" pos_args = [str(arg) for arg in args] named_args = [str(key) + "=" + str(value) for (key, value) in kwargs.items()] assert not ( pos_args and named_args), "Cannot use positional arguments and named arguments in the same bitcoin-cli call" p_args = [self.binary, "-datadir=" + self.datadir] + self.args if named_args: p_args += ["-named"] p_args += [command] + pos_args + named_args self.log.debug("Running bitcoin-cli command: {}".format(command)) process = subprocess.Popen(p_args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) cli_stdout, cli_stderr = process.communicate(input=self.input) returncode = process.poll() if returncode: match = re.match( r'error code: ([-0-9]+)\nerror message:\n(.*)', cli_stderr) if match: code, message = match.groups() raise JSONRPCException(dict(code=int(code), message=message)) # Ignore cli_stdout, raise with cli_stderr raise subprocess.CalledProcessError( returncode, self.binary, output=cli_stderr) try: return json.loads(cli_stdout, parse_float=decimal.Decimal) except JSONDecodeError: return cli_stdout.rstrip("\n") diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index dea05efaa..fe24c2a1a 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -1,733 +1,726 @@ #!/usr/bin/env python3 # Copyright (c) 2014-2016 The Bitcoin Core developers # Copyright (c) 2017 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Run regression test suite. This module calls down into individual test cases via subprocess. It will forward all unrecognized arguments onto the individual test scripts. Functional tests are disabled on Windows by default. Use --force to run them anyway. For a description of arguments recognized by test scripts, see `test/functional/test_framework/test_framework.py:BitcoinTestFramework.main`. """ import argparse import configparser import datetime import os import time import shutil import sys import subprocess import tempfile import re import logging import xml.etree.ElementTree as ET import json import threading import multiprocessing from queue import Queue, Empty # Formatting. Default colors to empty strings. BOLD, BLUE, RED, GREY = ("", ""), ("", ""), ("", ""), ("", "") try: # Make sure python thinks it can write unicode to its stdout "\u2713".encode("utf_8").decode(sys.stdout.encoding) TICK = "✓ " CROSS = "✖ " CIRCLE = "○ " except UnicodeDecodeError: TICK = "P " CROSS = "x " CIRCLE = "o " if os.name == 'posix': # primitive formatting on supported # terminal via ANSI escape sequences: BOLD = ('\033[0m', '\033[1m') BLUE = ('\033[0m', '\033[0;34m') RED = ('\033[0m', '\033[0;31m') GREY = ('\033[0m', '\033[1;30m') TEST_EXIT_PASSED = 0 TEST_EXIT_SKIPPED = 77 NON_SCRIPTS = [ # These are python files that live in the functional tests directory, but are not test scripts. "combine_logs.py", "create_cache.py", "test_runner.py", ] TEST_PARAMS = { # Some test can be run with additional parameters. # When a test is listed here, the it will be run without parameters # as well as with additional parameters listed here. # This: # example "testName" : [["--param1", "--param2"] , ["--param3"]] # will run the test 3 times: # testName # testName --param1 --param2 # testname --param3 "wallet_txn_doublespend.py": [["--mineblock"]], "wallet_txn_clone.py": [["--mineblock"]], "wallet_multiwallet.py": [["--usecli"]], } # Used to limit the number of tests, when list of tests is not provided on command line # When --extended is specified, we run all tests, otherwise # we only run a test if its execution time in seconds does not exceed EXTENDED_CUTOFF DEFAULT_EXTENDED_CUTOFF = 40 DEFAULT_JOBS = (multiprocessing.cpu_count() // 3) + 1 class TestCase(): """ Data structure to hold and run information necessary to launch a test case. """ def __init__(self, test_num, test_case, tests_dir, tmpdir, flags=None): self.tests_dir = tests_dir self.tmpdir = tmpdir self.test_case = test_case self.test_num = test_num self.flags = flags def run(self, portseed_offset): t = self.test_case portseed = self.test_num + portseed_offset portseed_arg = ["--portseed={}".format(portseed)] log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16) log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16) test_argv = t.split() tmpdir = [os.path.join("--tmpdir={}", "{}_{}").format( self.tmpdir, re.sub(".py$", "", test_argv[0]), portseed)] name = t time0 = time.time() process = subprocess.Popen([os.path.join(self.tests_dir, test_argv[0])] + test_argv[1:] + self.flags + portseed_arg + tmpdir, universal_newlines=True, stdout=log_stdout, stderr=log_stderr) process.wait() log_stdout.seek(0), log_stderr.seek(0) [stdout, stderr] = [l.read().decode('utf-8') for l in (log_stdout, log_stderr)] log_stdout.close(), log_stderr.close() if process.returncode == TEST_EXIT_PASSED and stderr == "": status = "Passed" elif process.returncode == TEST_EXIT_SKIPPED: status = "Skipped" else: status = "Failed" return TestResult(name, status, int(time.time() - time0), stdout, stderr) def on_ci(): return os.getenv('TRAVIS') == 'true' or os.getenv('TEAMCITY_VERSION') != None def main(): # Read config generated by configure. config = configparser.ConfigParser() configfile = os.path.join(os.path.abspath( os.path.dirname(__file__)), "..", "config.ini") config.read_file(open(configfile)) src_dir = config["environment"]["SRCDIR"] build_dir = config["environment"]["BUILDDIR"] tests_dir = os.path.join(src_dir, 'test', 'functional') # Parse arguments and pass through unrecognised args parser = argparse.ArgumentParser(add_help=False, usage='%(prog)s [test_runner.py options] [script options] [scripts]', description=__doc__, epilog=''' Help text and arguments for individual test script:''', formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('--coverage', action='store_true', help='generate a basic coverage report for the RPC interface') parser.add_argument( '--exclude', '-x', help='specify a comma-separated-list of scripts to exclude. Do not include the .py extension in the name.') parser.add_argument('--extended', action='store_true', help='run the extended test suite in addition to the basic tests') parser.add_argument('--cutoff', type=int, default=DEFAULT_EXTENDED_CUTOFF, help='set the cutoff runtime for what tests get run') parser.add_argument('--force', '-f', action='store_true', help='run tests even on platforms where they are disabled by default (e.g. windows).') parser.add_argument('--help', '-h', '-?', action='store_true', help='print help text and exit') parser.add_argument('--jobs', '-j', type=int, default=DEFAULT_JOBS, help='how many test scripts to run in parallel. Default=4.') parser.add_argument('--keepcache', '-k', action='store_true', help='the default behavior is to flush the cache directory on startup. --keepcache retains the cache from the previous testrun.') parser.add_argument('--quiet', '-q', action='store_true', help='only print results summary and failure logs') parser.add_argument('--tmpdirprefix', '-t', default=tempfile.gettempdir(), help="Root directory for datadirs") parser.add_argument('--junitouput', '-ju', default=os.path.join(build_dir, 'junit_results.xml'), help="file that will store JUnit formatted test results.") args, unknown_args = parser.parse_known_args() # Create a set to store arguments and create the passon string tests = set(arg for arg in unknown_args if arg[:2] != "--") passon_args = [arg for arg in unknown_args if arg[:2] == "--"] passon_args.append("--configfile={}".format(configfile)) # Set up logging logging_level = logging.INFO if args.quiet else logging.DEBUG logging.basicConfig(format='%(message)s', level=logging_level) # Create base test directory tmpdir = os.path.join("{}", "bitcoin_test_runner_{:%Y%m%d_%H%M%S}").format( args.tmpdirprefix, datetime.datetime.now()) os.makedirs(tmpdir) logging.debug("Temporary test directory at {}".format(tmpdir)) enable_wallet = config["components"].getboolean("ENABLE_WALLET") enable_utils = config["components"].getboolean("ENABLE_UTILS") enable_bitcoind = config["components"].getboolean("ENABLE_BITCOIND") if config["environment"]["EXEEXT"] == ".exe" and not args.force: # https://github.com/bitcoin/bitcoin/commit/d52802551752140cf41f0d9a225a43e84404d3e9 # https://github.com/bitcoin/bitcoin/pull/5677#issuecomment-136646964 print( "Tests currently disabled on Windows by default. Use --force option to enable") sys.exit(0) if not (enable_wallet and enable_utils and enable_bitcoind): print( "No functional tests to run. Wallet, utils, and bitcoind must all be enabled") print( "Rerun `configure` with -enable-wallet, -with-utils and -with-daemon and rerun make") sys.exit(0) # Build list of tests all_scripts = get_all_scripts_from_disk(tests_dir, NON_SCRIPTS) # Check all tests with parameters actually exist for test in TEST_PARAMS: if not test in all_scripts: print("ERROR: Test with parameter {} does not exist, check it has " "not been renamed or deleted".format(test)) sys.exit(1) if tests: # Individual tests have been specified. Run specified tests that exist # in the all_scripts list. Accept the name with or without .py # extension. test_list = [t for t in all_scripts if (t in tests or re.sub(".py$", "", t) in tests)] # Allow for wildcard at the end of the name, so a single input can # match multiple tests for test in tests: if test.endswith('*'): test_list.extend( [t for t in all_scripts if t.startswith(test[:-1])]) # Make the list unique test_list = list(set(test_list)) # do not cut off explicitly specified tests cutoff = sys.maxsize else: # No individual tests have been specified. # Run all tests that do not exceed test_list = all_scripts cutoff = args.cutoff if args.extended: cutoff = sys.maxsize # Remove the test cases that the user has explicitly asked to exclude. if args.exclude: for exclude_test in args.exclude.split(','): if exclude_test + ".py" in test_list: test_list.remove(exclude_test + ".py") # Use and update timings from build_dir only if separate # build directory is used. We do not want to pollute source directory. build_timings = None if (src_dir != build_dir): build_timings = Timings(os.path.join(build_dir, 'timing.json')) # Always use timings from scr_dir if present src_timings = Timings(os.path.join( src_dir, "test", "functional", 'timing.json')) # Add test parameters and remove long running tests if needed test_list = get_tests_to_run( test_list, TEST_PARAMS, cutoff, src_timings, build_timings) if not test_list: print("No valid test scripts specified. Check that your test is in one " "of the test lists in test_runner.py, or run test_runner.py with no arguments to run all tests") sys.exit(0) if args.help: # Print help for test_runner.py, then print help of the first script # and exit. parser.print_help() subprocess.check_call( [os.path.join(tests_dir, test_list[0]), '-h']) sys.exit(0) if not args.keepcache: shutil.rmtree(os.path.join(build_dir, "test", "cache"), ignore_errors=True) run_tests(test_list, build_dir, tests_dir, args.junitouput, - config["environment"]["EXEEXT"], tmpdir, args.jobs, args.coverage, passon_args, build_timings) + tmpdir, args.jobs, args.coverage, passon_args, build_timings) -def run_tests(test_list, build_dir, tests_dir, junitouput, exeext, tmpdir, num_jobs, enable_coverage=False, args=[], build_timings=None): +def run_tests(test_list, build_dir, tests_dir, junitouput, tmpdir, num_jobs, enable_coverage=False, args=[], build_timings=None): # Warn if bitcoind is already running (unix only) try: pidofOutput = subprocess.check_output(["pidof", "bitcoind"]) if pidofOutput is not None and pidofOutput != b'': print("{}WARNING!{} There is already a bitcoind process running on this system. Tests may fail unexpectedly due to resource contention!".format( BOLD[1], BOLD[0])) except (OSError, subprocess.SubprocessError): pass # Warn if there is a cache directory cache_dir = os.path.join(build_dir, "test", "cache") if os.path.isdir(cache_dir): print("{}WARNING!{} There is a cache directory here: {}. If tests fail unexpectedly, try deleting the cache directory.".format( BOLD[1], BOLD[0], cache_dir)) - # Set env vars - if "BITCOIND" not in os.environ: - os.environ["BITCOIND"] = os.path.join( - build_dir, 'src', 'bitcoind' + exeext) - os.environ["BITCOINCLI"] = os.path.join( - build_dir, 'src', 'bitcoin-cli' + exeext) - flags = [os.path.join("--srcdir={}".format(build_dir), "src")] + args flags.append("--cachedir={}".format(cache_dir)) if enable_coverage: coverage = RPCCoverage() flags.append(coverage.flag) logging.debug( "Initializing coverage directory at {}".format(coverage.dir)) else: coverage = None if len(test_list) > 1 and num_jobs > 1: # Populate cache try: subprocess.check_output( [os.path.join(tests_dir, 'create_cache.py')] + flags + [os.path.join("--tmpdir={}", "cache") .format(tmpdir)]) except Exception as e: print(e.output) raise e # Run Tests time0 = time.time() test_results = execute_test_processes( num_jobs, test_list, tests_dir, tmpdir, flags) runtime = int(time.time() - time0) max_len_name = len(max(test_list, key=len)) print_results(test_results, max_len_name, runtime) save_results_as_junit(test_results, junitouput, runtime) if (build_timings is not None): build_timings.save_timings(test_results) if coverage: coverage.report_rpc_coverage() logging.debug("Cleaning up coverage data") coverage.cleanup() # Clear up the temp directory if all subdirectories are gone if not os.listdir(tmpdir): os.rmdir(tmpdir) all_passed = all( map(lambda test_result: test_result.was_successful, test_results)) sys.exit(not all_passed) def execute_test_processes(num_jobs, test_list, tests_dir, tmpdir, flags): update_queue = Queue() job_queue = Queue() test_results = [] poll_timeout = 10 # seconds # In case there is a graveyard of zombie bitcoinds, we can apply a # pseudorandom offset to hopefully jump over them. # (625 is PORT_RANGE/MAX_NODES) portseed_offset = int(time.time() * 1000) % 625 ## # Define some helper functions we will need for threading. ## def handle_message(message, running_jobs): """ handle_message handles a single message from handle_test_cases """ if isinstance(message, TestCase): running_jobs.add(message.test_case) print("{}{}{} started".format(BOLD[1], message.test_case, BOLD[0])) return if isinstance(message, TestResult): test_result = message running_jobs.remove(test_result.name) test_results.append(test_result) if test_result.status == "Passed": print("{}{}{} passed, Duration: {} s".format( BOLD[1], test_result.name, BOLD[0], test_result.time)) elif test_result.status == "Skipped": print("{}{}{} skipped".format( BOLD[1], test_result.name, BOLD[0])) else: print("{}{}{} failed, Duration: {} s\n".format( BOLD[1], test_result.name, BOLD[0], test_result.time)) print(BOLD[1] + 'stdout:' + BOLD[0]) print(test_result.stdout) print(BOLD[1] + 'stderr:' + BOLD[0]) print(test_result.stderr) return assert False, "we should not be here" def handle_update_messages(): """ handle_update_messages waits for messages to be sent from handle_test_cases via the update_queue. It serializes the results so we can print nice status update messages. """ printed_status = False running_jobs = set() while True: message = None try: message = update_queue.get(True, poll_timeout) if message is None: break # We printed a status message, need to kick to the next line # before printing more. if printed_status: print() printed_status = False handle_message(message, running_jobs) update_queue.task_done() except Empty as e: if not on_ci(): print("Running jobs: {}".format(", ".join(running_jobs)), end="\r") sys.stdout.flush() printed_status = True def handle_test_cases(): """ job_runner represents a single thread that is part of a worker pool. It waits for a test, then executes that test. It also reports start and result messages to handle_update_messages """ while True: test = job_queue.get() if test is None: break # Signal that the test is starting to inform the poor waiting # programmer update_queue.put(test) result = test.run(portseed_offset) update_queue.put(result) job_queue.task_done() ## # Setup our threads, and start sending tasks ## # Start our result collection thread. t = threading.Thread(target=handle_update_messages) t.setDaemon(True) t.start() # Start some worker threads for j in range(num_jobs): t = threading.Thread(target=handle_test_cases) t.setDaemon(True) t.start() # Push all our test cases into the job queue. for i, t in enumerate(test_list): job_queue.put(TestCase(i, t, tests_dir, tmpdir, flags)) # Wait for all the jobs to be completed job_queue.join() # Wait for all the results to be compiled update_queue.join() # Flush our queues so the threads exit update_queue.put(None) for j in range(num_jobs): job_queue.put(None) return test_results def print_results(test_results, max_len_name, runtime): results = "\n" + BOLD[1] + "{} | {} | {}\n\n".format( "TEST".ljust(max_len_name), "STATUS ", "DURATION") + BOLD[0] test_results.sort(key=lambda result: result.name.lower()) all_passed = True time_sum = 0 for test_result in test_results: all_passed = all_passed and test_result.was_successful time_sum += test_result.time test_result.padding = max_len_name results += str(test_result) status = TICK + "Passed" if all_passed else CROSS + "Failed" results += BOLD[1] + "\n{} | {} | {} s (accumulated) \n".format( "ALL".ljust(max_len_name), status.ljust(9), time_sum) + BOLD[0] results += "Runtime: {} s\n".format(runtime) print(results) class TestResult(): """ Simple data structure to store test result values and print them properly """ def __init__(self, name, status, time, stdout, stderr): self.name = name self.status = status self.time = time self.padding = 0 self.stdout = stdout self.stderr = stderr def __repr__(self): if self.status == "Passed": color = BLUE glyph = TICK elif self.status == "Failed": color = RED glyph = CROSS elif self.status == "Skipped": color = GREY glyph = CIRCLE return color[1] + "{} | {}{} | {} s\n".format( self.name.ljust(self.padding), glyph, self.status.ljust(7), self.time) + color[0] @property def was_successful(self): return self.status != "Failed" def get_all_scripts_from_disk(test_dir, non_scripts): """ Return all available test script from script directory (excluding NON_SCRIPTS) """ python_files = set([t for t in os.listdir(test_dir) if t[-3:] == ".py"]) return list(python_files - set(non_scripts)) def get_tests_to_run(test_list, test_params, cutoff, src_timings, build_timings=None): """ Returns only test that will not run longer that cutoff. Long running tests are returned first to favor running tests in parallel Timings from build directory override those from src directory """ def get_test_time(test): if build_timings is not None: timing = next( (x['time'] for x in build_timings.existing_timings if x['name'] == test), None) if timing is not None: return timing # try source directory. Return 0 if test is unknown to always run it return next( (x['time'] for x in src_timings.existing_timings if x['name'] == test), 0) # Some tests must also be run with additional parameters. Add them to the list. tests_with_params = [] for test_name in test_list: # always execute a test without parameters tests_with_params.append(test_name) params = test_params.get(test_name) if params is not None: tests_with_params.extend( [test_name + " " + " ".join(p) for p in params]) result = [t for t in tests_with_params if get_test_time(t) <= cutoff] result.sort(key=lambda x: (-get_test_time(x), x)) return result class RPCCoverage(): """ Coverage reporting utilities for test_runner. Coverage calculation works by having each test script subprocess write coverage files into a particular directory. These files contain the RPC commands invoked during testing, as well as a complete listing of RPC commands per `bitcoin-cli help` (`rpc_interface.txt`). After all tests complete, the commands run are combined and diff'd against the complete list to calculate uncovered RPC commands. See also: test/functional/test_framework/coverage.py """ def __init__(self): self.dir = tempfile.mkdtemp(prefix="coverage") self.flag = '--coveragedir={}'.format(self.dir) def report_rpc_coverage(self): """ Print out RPC commands that were unexercised by tests. """ uncovered = self._get_uncovered_rpc_commands() if uncovered: print("Uncovered RPC commands:") print("".join((" - {}\n".format(i)) for i in sorted(uncovered))) else: print("All RPC commands covered.") def cleanup(self): return shutil.rmtree(self.dir) def _get_uncovered_rpc_commands(self): """ Return a set of currently untested RPC commands. """ # This is shared from `test/functional/test-framework/coverage.py` reference_filename = 'rpc_interface.txt' coverage_file_prefix = 'coverage.' coverage_ref_filename = os.path.join(self.dir, reference_filename) coverage_filenames = set() all_cmds = set() covered_cmds = set() if not os.path.isfile(coverage_ref_filename): raise RuntimeError("No coverage reference found") with open(coverage_ref_filename, 'r') as f: all_cmds.update([i.strip() for i in f.readlines()]) for root, dirs, files in os.walk(self.dir): for filename in files: if filename.startswith(coverage_file_prefix): coverage_filenames.add(os.path.join(root, filename)) for filename in coverage_filenames: with open(filename, 'r') as f: covered_cmds.update([i.strip() for i in f.readlines()]) return all_cmds - covered_cmds def save_results_as_junit(test_results, file_name, time): """ Save tests results to file in JUnit format See http://llg.cubic.org/docs/junit/ for specification of format """ e_test_suite = ET.Element("testsuite", {"name": "bitcoin_abc_tests", "tests": str(len(test_results)), # "errors": "failures": str(len([t for t in test_results if t.status == "Failed"])), "id": "0", "skipped": str(len([t for t in test_results if t.status == "Skipped"])), "time": str(time), "timestamp": datetime.datetime.now().isoformat('T') }) for test_result in test_results: e_test_case = ET.SubElement(e_test_suite, "testcase", {"name": test_result.name, "classname": test_result.name, "time": str(test_result.time) } ) if test_result.status == "Skipped": ET.SubElement(e_test_case, "skipped") elif test_result.status == "Failed": ET.SubElement(e_test_case, "failure") # no special element for passed tests ET.SubElement(e_test_case, "system-out").text = test_result.stdout ET.SubElement(e_test_case, "system-err").text = test_result.stderr ET.ElementTree(e_test_suite).write( file_name, "UTF-8", xml_declaration=True) class Timings(): """ Takes care of loading, merging and saving tests execution times. """ def __init__(self, timing_file): self.timing_file = timing_file self.existing_timings = self.load_timings() def load_timings(self): if os.path.isfile(self.timing_file): with open(self.timing_file) as f: return json.load(f) else: return [] def get_merged_timings(self, new_timings): """ Return new list containing existing timings updated with new timings Tests that do not exists are not removed """ key = 'name' merged = {} for item in self.existing_timings + new_timings: if item[key] in merged: merged[item[key]].update(item) else: merged[item[key]] = item # Sort the result to preserve test ordering in file merged = list(merged.values()) merged.sort(key=lambda t, key=key: t[key]) return merged def save_timings(self, test_results): # we only save test that have passed - timings for failed test might be # wrong (timeouts or early fails) passed_results = [t for t in test_results if t.status == 'Passed'] new_timings = list(map(lambda t: {'name': t.name, 'time': t.time}, passed_results)) merged_timings = self.get_merged_timings(new_timings) with open(self.timing_file, 'w') as f: json.dump(merged_timings, f, indent=True) if __name__ == '__main__': main()