diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -31,7 +31,13 @@ import json import threading import multiprocessing -from queue import Queue, Empty +import importlib +import importlib.util +import multiprocessing as mp +from queue import Full, Empty + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.cdefs import get_srcdir # Formatting. Default colors to empty strings. BOLD, BLUE, RED, GREY = ("", ""), ("", ""), ("", ""), ("", "") @@ -118,6 +124,7 @@ [stdout, stderr] = [l.read().decode('utf-8') for l in (log_stdout, log_stderr)] log_stdout.close(), log_stderr.close() + if process.returncode == TEST_EXIT_PASSED and stderr == "": status = "Passed" elif process.returncode == TEST_EXIT_SKIPPED: @@ -340,112 +347,113 @@ sys.exit(not all_passed) +## +# Define some helper functions we will need for threading. +## -def execute_test_processes(num_jobs, test_list, tests_dir, tmpdir, flags): - update_queue = Queue() - job_queue = Queue() - test_results = [] + +def handle_message(message, running_jobs, results_queue): + """ + handle_message handles a single message from handle_test_cases + """ + if isinstance(message, TestCase): + running_jobs.add(message.test_case) + print("{}{}{} started".format(BOLD[1], message.test_case, BOLD[0])) + return + if isinstance(message, TestResult): + test_result = message + running_jobs.remove(test_result.name) + results_queue.put(test_result) + if test_result.status == "Passed": + print("%s%s%s passed, Duration: %s s" % ( + BOLD[1], test_result.name, BOLD[0], test_result.time)) + elif test_result.status == "Skipped": + print("%s%s%s skipped" % + (BOLD[1], test_result.name, BOLD[0])) + else: + print("%s%s%s failed, Duration: %s s\n" % + (BOLD[1], test_result.name, BOLD[0], test_result.time)) + print(BOLD[1] + 'stdout:' + BOLD[0]) + print(test_result.stdout) + print(BOLD[1] + 'stderr:' + BOLD[0]) + print(test_result.stderr) + return + assert False, "we should not be here" + + +def handle_update_messages(update_queue, results_queue): + """ + handle_update_messages waits for messages to be sent from handle_test_cases via the + update_queue. It serializes the results so we can print nice status update messages. + """ + printed_status = False + running_jobs = set() poll_timeout = 10 # seconds + + while True: + message = None + try: + message = update_queue.get(True, poll_timeout) + if message is None: + break + # We printed a status message, need to kick to the next line + # before printing more. + if printed_status: + print() + printed_status = False + handle_message(message, running_jobs, results_queue) + update_queue.task_done() + except Empty as e: + if not on_ci(): + print("Running jobs: {}".format( + ", ".join(running_jobs)), end="\r") + sys.stdout.flush() + printed_status = True + + +def handle_test_cases(job_queue, update_queue): + """ + job_runner represents a single thread that is part of a worker pool. + It waits for a test, then executes that test. It also reports start + and result messages to handle_update_messages. + """ # In case there is a graveyard of zombie bitcoinds, we can apply a # pseudorandom offset to hopefully jump over them. # (625 is PORT_RANGE/MAX_NODES) portseed_offset = int(time.time() * 1000) % 625 - ## - # Define some helper functions we will need for threading. - ## - - def handle_message(message, running_jobs): - """ - handle_message handles a single message from handle_test_cases - """ - if isinstance(message, TestCase): - running_jobs.add(message.test_case) - print("{}{}{} started".format(BOLD[1], message.test_case, BOLD[0])) - return - - if isinstance(message, TestResult): - test_result = message - - running_jobs.remove(test_result.name) - test_results.append(test_result) - - if test_result.status == "Passed": - print("%s%s%s passed, Duration: %s s" % ( - BOLD[1], test_result.name, BOLD[0], test_result.time)) - elif test_result.status == "Skipped": - print("%s%s%s skipped" % - (BOLD[1], test_result.name, BOLD[0])) - else: - print("%s%s%s failed, Duration: %s s\n" % - (BOLD[1], test_result.name, BOLD[0], test_result.time)) - print(BOLD[1] + 'stdout:' + BOLD[0]) - print(test_result.stdout) - print(BOLD[1] + 'stderr:' + BOLD[0]) - print(test_result.stderr) - return + while True: + test = job_queue.get() + if test is None: + break + # Signal that the test is starting to inform the poor waiting + # programmer + update_queue.put(test) + result = test.run(portseed_offset) + update_queue.put(result) + job_queue.task_done() - assert False, "we should not be here" - def handle_update_messages(): - """ - handle_update_messages waits for messages to be sent from handle_test_cases via the - update_queue. It serializes the results so we can print nice status update messages. - """ - printed_status = False - running_jobs = set() - - while True: - message = None - try: - message = update_queue.get(True, poll_timeout) - if message is None: - break - - # We printed a status message, need to kick to the next line - # before printing more. - if printed_status: - print() - printed_status = False - - handle_message(message, running_jobs) - update_queue.task_done() - except Empty as e: - if not on_ci(): - print("Running jobs: {}".format(", ".join(running_jobs)), end="\r") - sys.stdout.flush() - printed_status = True - - def handle_test_cases(): - """ - job_runner represents a single thread that is part of a worker pool. - It waits for a test, then executes that test. - It also reports start and result messages to handle_update_messages - """ - while True: - test = job_queue.get() - if test is None: - break - # Signal that the test is starting to inform the poor waiting - # programmer - update_queue.put(test) - result = test.run(portseed_offset) - update_queue.put(result) - job_queue.task_done() +def execute_test_processes(num_jobs, test_list, tests_dir, tmpdir, flags): + ctx = mp.get_context('spawn') + update_queue = ctx.JoinableQueue() + job_queue = ctx.JoinableQueue() + results_queue = ctx.Queue() ## # Setup our threads, and start sending tasks ## + mp.Pipe() # Start our result collection thread. - t = threading.Thread(target=handle_update_messages) - t.setDaemon(True) + t = ctx.Process(target=handle_update_messages, + args=(update_queue, results_queue,)) t.start() # Start some worker threads for j in range(num_jobs): - t = threading.Thread(target=handle_test_cases) - t.setDaemon(True) + t = ctx.Process(target=handle_test_cases, + args=(job_queue, update_queue,)) t.start() # Push all our test cases into the job queue. @@ -463,6 +471,11 @@ for j in range(num_jobs): job_queue.put(None) + # We've already stopped sending messages, so we can be sure when this queue is empty we are done. + test_results = [] + while not results_queue.empty(): + test_results.append(results_queue.get()) + return test_results