diff --git a/test/functional/getblocktemplate_proposals.py b/test/functional/getblocktemplate_proposals.py deleted file mode 100755 index 6dc758bf1..000000000 --- a/test/functional/getblocktemplate_proposals.py +++ /dev/null @@ -1,178 +0,0 @@ -#!/usr/bin/env python3 -# Copyright (c) 2014-2016 The Bitcoin Core developers -# Distributed under the MIT software license, see the accompanying -# file COPYING or http://www.opensource.org/licenses/mit-license.php. - -from test_framework.test_framework import BitcoinTestFramework -from test_framework.util import * - -from binascii import a2b_hex, b2a_hex -from hashlib import sha256 -from struct import pack - - -def b2x(b): - return b2a_hex(b).decode('ascii') - -# NOTE: This does not work for signed numbers (set the high bit) or zero -# (use b'\0') - - -def encodeUNum(n): - s = bytearray(b'\1') - while n > 127: - s[0] += 1 - s.append(n % 256) - n //= 256 - s.append(n) - return bytes(s) - - -def varlenEncode(n): - if n < 0xfd: - return pack(' 1: - n = [] - if len(cur) & 1: - cur.append(cur[-1]) - for i in range(0, len(cur), 2): - n.append(dblsha(cur[i] + cur[i + 1])) - cur = n - return cur[0] - - -def template_to_bytearray(tmpl, txlist): - blkver = pack(' 1 and jobs > 1: # Populate cache subprocess.check_output( [tests_dir + 'create_cache.py'] + flags + ["--tmpdir=%s/cache" % tmpdir]) # Run Tests job_queue = TestHandler(jobs, tests_dir, tmpdir, test_list, flags) time0 = time.time() test_results = [] max_len_name = len(max(test_list, key=len)) for _ in range(len(test_list)): test_result, stdout, stderr = job_queue.get_next() test_results.append(test_result) if test_result.status == "Passed": logging.debug("\n%s%s%s passed, Duration: %s s" % ( BOLD[1], test_result.name, BOLD[0], test_result.time)) elif test_result.status == "Skipped": logging.debug("\n%s%s%s skipped" % (BOLD[1], test_result.name, BOLD[0])) else: print("\n%s%s%s failed, Duration: %s s\n" % (BOLD[1], test_result.name, BOLD[0], test_result.time)) print(BOLD[1] + 'stdout:\n' + BOLD[0] + stdout + '\n') print(BOLD[1] + 'stderr:\n' + BOLD[0] + stderr + '\n') print_results(test_results, max_len_name, (int(time.time() - time0))) if coverage: coverage.report_rpc_coverage() logging.debug("Cleaning up coverage data") coverage.cleanup() # Clear up the temp directory if all subdirectories are gone if not os.listdir(tmpdir): os.rmdir(tmpdir) all_passed = all( map(lambda test_result: test_result.status == "Passed", test_results)) sys.exit(not all_passed) def print_results(test_results, max_len_name, runtime): results = "\n" + BOLD[1] + "%s | %s | %s\n\n" % ( "TEST".ljust(max_len_name), "STATUS ", "DURATION") + BOLD[0] test_results.sort(key=lambda result: result.name.lower()) all_passed = True time_sum = 0 for test_result in test_results: all_passed = all_passed and test_result.status != "Failed" time_sum += test_result.time test_result.padding = max_len_name results += str(test_result) status = TICK + "Passed" if all_passed else CROSS + "Failed" results += BOLD[1] + "\n%s | %s | %s s (accumulated) \n" % ( "ALL".ljust(max_len_name), status.ljust(9), time_sum) + BOLD[0] results += "Runtime: %s s\n" % (runtime) print(results) class TestHandler: """ Trigger the testscrips passed in via the list. """ def __init__(self, num_tests_parallel, tests_dir, tmpdir, test_list=None, flags=None): assert(num_tests_parallel >= 1) self.num_jobs = num_tests_parallel self.tests_dir = tests_dir self.tmpdir = tmpdir self.test_list = test_list self.flags = flags self.num_running = 0 # In case there is a graveyard of zombie bitcoinds, we can apply a # pseudorandom offset to hopefully jump over them. # (625 is PORT_RANGE/MAX_NODES) self.portseed_offset = int(time.time() * 1000) % 625 self.jobs = [] def get_next(self): while self.num_running < self.num_jobs and self.test_list: # Add tests self.num_running += 1 t = self.test_list.pop(0) portseed = len(self.test_list) + self.portseed_offset portseed_arg = ["--portseed={}".format(portseed)] log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16) log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16) test_argv = t.split() tmpdir = ["--tmpdir=%s/%s_%s" % (self.tmpdir, re.sub(".py$", "", test_argv[0]), portseed)] self.jobs.append((t, time.time(), subprocess.Popen([self.tests_dir + test_argv[0]] + test_argv[1:] + self.flags + portseed_arg + tmpdir, universal_newlines=True, stdout=log_stdout, stderr=log_stderr), log_stdout, log_stderr)) if not self.jobs: raise IndexError('pop from empty list') while True: # Return first proc that finishes time.sleep(.5) for j in self.jobs: (name, time0, proc, log_out, log_err) = j if os.getenv('TRAVIS') == 'true' and int(time.time() - time0) > 20 * 60: # In travis, timeout individual tests after 20 minutes (to stop tests hanging and not # providing useful output. proc.send_signal(signal.SIGINT) if proc.poll() is not None: log_out.seek(0), log_err.seek(0) [stdout, stderr] = [l.read().decode('utf-8') for l in (log_out, log_err)] log_out.close(), log_err.close() if proc.returncode == TEST_EXIT_PASSED and stderr == "": status = "Passed" elif proc.returncode == TEST_EXIT_SKIPPED: status = "Skipped" else: status = "Failed" self.num_running -= 1 self.jobs.remove(j) return TestResult(name, status, int(time.time() - time0)), stdout, stderr print('.', end='', flush=True) class TestResult(): def __init__(self, name, status, time): self.name = name self.status = status self.time = time self.padding = 0 def __repr__(self): if self.status == "Passed": color = BLUE glyph = TICK elif self.status == "Failed": color = RED glyph = CROSS elif self.status == "Skipped": color = GREY glyph = CIRCLE return color[1] + "%s | %s%s | %s s\n" % (self.name.ljust(self.padding), glyph, self.status.ljust(7), self.time) + color[0] def check_script_list(src_dir): """Check scripts directory. Check that there are no scripts in the functional tests directory which are not being run by pull-tester.py.""" script_dir = src_dir + '/test/functional/' python_files = set([t for t in os.listdir(script_dir) if t[-3:] == ".py"]) missed_tests = list( python_files - set(map(lambda x: x.split()[0], ALL_SCRIPTS + NON_SCRIPTS))) if len(missed_tests) != 0: print("%sWARNING!%s The following scripts are not being run: %s. Check the test lists in test_runner.py." % ( BOLD[1], BOLD[0], str(missed_tests))) if os.getenv('TRAVIS') == 'true': # On travis this warning is an error to prevent merging incomplete commits into master sys.exit(1) class RPCCoverage(): """ Coverage reporting utilities for test_runner. Coverage calculation works by having each test script subprocess write coverage files into a particular directory. These files contain the RPC commands invoked during testing, as well as a complete listing of RPC commands per `bitcoin-cli help` (`rpc_interface.txt`). After all tests complete, the commands run are combined and diff'd against the complete list to calculate uncovered RPC commands. See also: test/functional/test_framework/coverage.py """ def __init__(self): self.dir = tempfile.mkdtemp(prefix="coverage") self.flag = '--coveragedir={}'.format(self.dir) def report_rpc_coverage(self): """ Print out RPC commands that were unexercised by tests. """ uncovered = self._get_uncovered_rpc_commands() if uncovered: print("Uncovered RPC commands:") print("".join((" - {}\n".format(i)) for i in sorted(uncovered))) else: print("All RPC commands covered.") def cleanup(self): return shutil.rmtree(self.dir) def _get_uncovered_rpc_commands(self): """ Return a set of currently untested RPC commands. """ # This is shared from `test/functional/test-framework/coverage.py` reference_filename = 'rpc_interface.txt' coverage_file_prefix = 'coverage.' coverage_ref_filename = os.path.join(self.dir, reference_filename) coverage_filenames = set() all_cmds = set() covered_cmds = set() if not os.path.isfile(coverage_ref_filename): raise RuntimeError("No coverage reference found") with open(coverage_ref_filename, 'r') as f: all_cmds.update([i.strip() for i in f.readlines()]) for root, dirs, files in os.walk(self.dir): for filename in files: if filename.startswith(coverage_file_prefix): coverage_filenames.add(os.path.join(root, filename)) for filename in coverage_filenames: with open(filename, 'r') as f: covered_cmds.update([i.strip() for i in f.readlines()]) return all_cmds - covered_cmds if __name__ == '__main__': main()