diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -29,6 +29,8 @@ import logging import xml.etree.ElementTree as ET import json +import threading +from queue import Queue, Empty # Formatting. Default colors to empty strings. BOLD, BLUE, RED, GREY = ("", ""), ("", ""), ("", ""), ("", "") @@ -81,6 +83,76 @@ EXTENDED_CUTOFF = 40 +class TestResult(): + """ + Simple data structure to store test result values and print them properly + """ + + def __init__(self, name, status, time, stdout, stderr): + self.name = name + self.status = status + self.time = time + self.padding = 0 + self.stdout = stdout + self.stderr = stderr + + def __repr__(self): + if self.status == "Passed": + color = BLUE + glyph = TICK + elif self.status == "Failed": + color = RED + glyph = CROSS + elif self.status == "Skipped": + color = GREY + glyph = CIRCLE + + return color[1] + "%s | %s%s | %s s\n" % (self.name.ljust(self.padding), glyph, self.status.ljust(7), self.time) + color[0] + + +class TestCase(): + """ + Data structure to hold and run information necessary to launch a test case. + """ + + def __init__(self, test_num, test_case, tests_dir, tmpdir, flags=None): + self.tests_dir = tests_dir + self.tmpdir = tmpdir + self.test_case = test_case + self.test_num = test_num + self.flags = flags + + def run(self, portseed_offset): + t = self.test_case + portseed = self.test_num + portseed_offset + portseed_arg = ["--portseed={}".format(portseed)] + log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16) + log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16) + test_argv = t.split() + tmpdir = [os.path.join("--tmpdir=%s", "%s_%s") % + (self.tmpdir, re.sub(".py$", "", t), portseed)] + name = t + time0 = time.time() + process = subprocess.Popen([os.path.join(self.tests_dir, test_argv[0])] + test_argv[1:] + self.flags + portseed_arg + tmpdir, + universal_newlines=True, + stdout=log_stdout, + stderr=log_stderr) + + process.wait() + log_stdout.seek(0), log_stderr.seek(0) + [stdout, stderr] = [l.read().decode('utf-8') + for l in (log_stdout, log_stderr)] + log_stdout.close(), log_stderr.close() + if process.returncode == TEST_EXIT_PASSED and stderr == "": + status = "Passed" + elif process.returncode == TEST_EXIT_SKIPPED: + status = "Skipped" + else: + status = "Failed" + + return TestResult(name, status, int(time.time() - time0), stdout, stderr) + + def on_ci(): return os.getenv('TRAVIS') == 'true' or os.getenv('TEAMCITY_VERSION') != None @@ -220,7 +292,7 @@ config["environment"]["EXEEXT"], tmpdir, args.jobs, args.coverage, passon_args, build_timings) -def run_tests(test_list, build_dir, tests_dir, junitouput, exeext, tmpdir, jobs=1, enable_coverage=False, args=[], build_timings=None): +def run_tests(test_list, build_dir, tests_dir, junitouput, exeext, tmpdir, num_jobs=1, enable_coverage=False, args=[], build_timings=None): # Warn if bitcoind is already running (unix only) try: pidofOutput = subprocess.check_output(["pidof", "bitcoind"]) @@ -253,35 +325,18 @@ else: coverage = None - if len(test_list) > 1 and jobs > 1: + if len(test_list) > 1 and num_jobs > 1: # Populate cache subprocess.check_output( [os.path.join(tests_dir, 'create_cache.py')] + flags + [os.path.join("--tmpdir=%s", "cache") % tmpdir]) # Run Tests - job_queue = TestHandler(jobs, tests_dir, tmpdir, test_list, flags) time0 = time.time() - test_results = [] + test_results = execute_test_processes( + num_jobs, test_list, tests_dir, tmpdir, flags) + runtime = int(time.time() - time0) max_len_name = len(max(test_list, key=len)) - - for _ in range(len(test_list)): - test_result = job_queue.get_next() - test_results.append(test_result) - - if test_result.status == "Passed": - logging.debug("\n%s%s%s passed, Duration: %s s" % ( - BOLD[1], test_result.name, BOLD[0], test_result.time)) - elif test_result.status == "Skipped": - logging.debug("\n%s%s%s skipped" % - (BOLD[1], test_result.name, BOLD[0])) - else: - print("\n%s%s%s failed, Duration: %s s\n" % - (BOLD[1], test_result.name, BOLD[0], test_result.time)) - print(BOLD[1] + 'stdout:\n' + BOLD[0] + test_result.stdout + '\n') - print(BOLD[1] + 'stderr:\n' + BOLD[0] + test_result.stderr + '\n') - - runtime = int(time.time() - time0) print_results(test_results, max_len_name, runtime) save_results_as_junit(test_results, junitouput, runtime) @@ -304,6 +359,131 @@ sys.exit(not all_passed) +def execute_test_processes(num_jobs, test_list, tests_dir, tmpdir, flags): + update_queue = Queue() + job_queue = Queue() + test_results = [] + poll_timeout = 10 # seconds + # In case there is a graveyard of zombie bitcoinds, we can apply a + # pseudorandom offset to hopefully jump over them. + # (625 is PORT_RANGE/MAX_NODES) + portseed_offset = int(time.time() * 1000) % 625 + + ## + # Define some helper functions we will need for threading. + ## + + def handle_message(message, running_jobs): + """ + handle_message handles a single message from handle_test_cases + """ + if isinstance(message, TestCase): + running_jobs.add(message.test_case) + print("{}{}{} started".format(BOLD[1], message.test_case, BOLD[0])) + return + + if isinstance(message, TestResult): + test_result = message + + running_jobs.remove(test_result.name) + test_results.append(test_result) + + if test_result.status == "Passed": + print("%s%s%s passed, Duration: %s s" % ( + BOLD[1], test_result.name, BOLD[0], test_result.time)) + elif test_result.status == "Skipped": + print("%s%s%s skipped" % + (BOLD[1], test_result.name, BOLD[0])) + else: + print("%s%s%s failed, Duration: %s s\n" % + (BOLD[1], test_result.name, BOLD[0], test_result.time)) + print(BOLD[1] + 'stdout:' + BOLD[0]) + print(test_result.stdout) + print(BOLD[1] + 'stderr:' + BOLD[0]) + print(test_result.stderr) + return + + assert False, "we should not be here" + + def handle_update_messages(): + """ + handle_update_messages waits for messages to be sent from handle_test_cases via the + update_queue. It serializes the results so we can print nice status update messages. + """ + printed_status = False + running_jobs = set() + + while True: + message = None + try: + message = update_queue.get(True, poll_timeout) + if message is None: + break + + # We printed a status message, need to kick to the next line + # before printing more. + if printed_status: + print() + printed_status = False + + handle_message(message, running_jobs) + update_queue.task_done() + except Empty as e: + if not on_ci(): + print("Running jobs: {}".format(", ".join(running_jobs)), end="\r") + sys.stdout.flush() + printed_status = True + + def handle_test_cases(): + """ + job_runner represents a single thread that is part of a worker pool. + It waits for a test, then executes that test. + It also reports start and result messages to handle_update_messages + """ + while True: + test = job_queue.get() + if test is None: + break + # Signal that the test is starting to inform the poor waiting + # programmer + update_queue.put(test) + result = test.run(portseed_offset) + update_queue.put(result) + job_queue.task_done() + + ## + # Setup our threads, and start sending tasks + ## + + # Start our result collection thread. + t = threading.Thread(target=handle_update_messages) + t.setDaemon(True) + t.start() + + # Start some worker threads + for j in range(num_jobs): + t = threading.Thread(target=handle_test_cases) + t.setDaemon(True) + t.start() + + # Push all our test cases into the job queue. + for i, t in enumerate(test_list): + job_queue.put(TestCase(i, t, tests_dir, tmpdir, flags)) + + # Wait for all the jobs to be completed + job_queue.join() + + # Wait for all the results to be compiled + update_queue.join() + + # Flush our queues so the threads exit + update_queue.put(None) + for j in range(num_jobs): + job_queue.put(None) + + return test_results + + def print_results(test_results, max_len_name, runtime): results = "\n" + BOLD[1] + "%s | %s | %s\n\n" % ( "TEST".ljust(max_len_name), "STATUS ", "DURATION") + BOLD[0] @@ -325,98 +505,6 @@ print(results) -class TestHandler: - - """ - Trigger the testscrips passed in via the list. - """ - - def __init__(self, num_tests_parallel, tests_dir, tmpdir, test_list=None, flags=None): - assert(num_tests_parallel >= 1) - self.num_jobs = num_tests_parallel - self.tests_dir = tests_dir - self.tmpdir = tmpdir - self.test_list = test_list - self.flags = flags - self.num_running = 0 - # In case there is a graveyard of zombie bitcoinds, we can apply a - # pseudorandom offset to hopefully jump over them. - # (625 is PORT_RANGE/MAX_NODES) - self.portseed_offset = int(time.time() * 1000) % 625 - self.jobs = [] - - def get_next(self): - while self.num_running < self.num_jobs and self.test_list: - # Add tests - self.num_running += 1 - t = self.test_list.pop(0) - portseed = len(self.test_list) + self.portseed_offset - portseed_arg = ["--portseed={}".format(portseed)] - log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16) - log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16) - test_argv = t.split() - tmpdir = [os.path.join("--tmpdir=%s", "%s_%s") % - (self.tmpdir, re.sub(".py$", "", t), portseed)] - self.jobs.append((t, - time.time(), - subprocess.Popen([os.path.join(self.tests_dir, test_argv[0])] + test_argv[1:] + self.flags + portseed_arg + tmpdir, - universal_newlines=True, - stdout=log_stdout, - stderr=log_stderr), - log_stdout, - log_stderr)) - if not self.jobs: - raise IndexError('pop from empty list') - while True: - # Return first proc that finishes - time.sleep(.5) - for j in self.jobs: - (name, time0, proc, log_out, log_err) = j - if on_ci() and int(time.time() - time0) > 20 * 60: - # In travis, timeout individual tests after 20 minutes (to stop tests hanging and not - # providing useful output. - proc.send_signal(signal.SIGINT) - if proc.poll() is not None: - log_out.seek(0), log_err.seek(0) - [stdout, stderr] = [l.read().decode('utf-8') - for l in (log_out, log_err)] - log_out.close(), log_err.close() - if proc.returncode == TEST_EXIT_PASSED and stderr == "": - status = "Passed" - elif proc.returncode == TEST_EXIT_SKIPPED: - status = "Skipped" - else: - status = "Failed" - self.num_running -= 1 - self.jobs.remove(j) - - return TestResult(name, status, int(time.time() - time0), stdout, stderr) - print('.', end='', flush=True) - - -class TestResult(): - def __init__(self, name, status, time, stdout, stderr): - self.name = name - self.status = status - self.time = time - self.padding = 0 - self.stdout = stdout - self.stderr = stderr - - def __repr__(self): - if self.status == "Passed": - color = BLUE - glyph = TICK - elif self.status == "Failed": - color = RED - glyph = CROSS - elif self.status == "Skipped": - color = GREY - glyph = CIRCLE - - return color[1] + "%s | %s%s | %s s\n" % (self.name.ljust(self.padding), glyph, self.status.ljust(7), self.time) + color[0] - - def get_all_scripts_from_disk(test_dir, non_scripts): """ Return all available test script from script directory (excluding NON_SCRIPTS) @@ -535,7 +623,7 @@ e_test_suite = ET.Element("testsuite", {"name": "bitcoin_abc_tests", "tests": str(len(test_results)), - #"errors": + # "errors": "failures": str(len([t for t in test_results if t.status == "Failed"])), "id": "0", "skipped": str(len([t for t in test_results if t.status == "Skipped"])),