diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -31,7 +31,13 @@ import json import threading import multiprocessing -from queue import Queue, Empty +import importlib +import importlib.util +import multiprocessing as mp +from queue import Full, Empty + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.cdefs import get_srcdir # Formatting. Default colors to empty strings. BOLD, BLUE, RED, GREY = ("", ""), ("", ""), ("", ""), ("", "") @@ -118,6 +124,7 @@ [stdout, stderr] = [l.read().decode('utf-8') for l in (log_stdout, log_stderr)] log_stdout.close(), log_stderr.close() + if process.returncode == TEST_EXIT_PASSED and stderr == "": status = "Passed" elif process.returncode == TEST_EXIT_SKIPPED: @@ -340,12 +347,12 @@ sys.exit(not all_passed) - ## # Define some helper functions we will need for threading. ## -def handle_message(message, running_jobs): + +def handle_message(message, running_jobs, results_queue): """ handle_message handles a single message from handle_test_cases """ @@ -356,7 +363,7 @@ if isinstance(message, TestResult): test_result = message running_jobs.remove(test_result.name) - test_results.append(test_result) + results_queue.put(test_result) if test_result.status == "Passed": print("%s%s%s passed, Duration: %s s" % ( BOLD[1], test_result.name, BOLD[0], test_result.time)) @@ -374,13 +381,15 @@ assert False, "we should not be here" -def handle_update_messages(): +def handle_update_messages(update_queue, results_queue): """ handle_update_messages waits for messages to be sent from handle_test_cases via the update_queue. It serializes the results so we can print nice status update messages. """ printed_status = False running_jobs = set() + poll_timeout = 10 # seconds + while True: message = None try: @@ -392,7 +401,7 @@ if printed_status: print() printed_status = False - handle_message(message, running_jobs) + handle_message(message, running_jobs, results_queue) update_queue.task_done() except Empty as e: if not on_ci(): @@ -402,12 +411,17 @@ printed_status = True -def handle_test_cases(): +def handle_test_cases(job_queue, update_queue): """ - job_runner represents a single thread that is part of a worker pool. - It waits for a test, then executes that test. - It also reports start and result messages to handle_update_messages + job_runner represents a single thread that is part of a worker pool. + It waits for a test, then executes that test. It also reports start + and result messages to handle_update_messages. """ + # In case there is a graveyard of zombie bitcoinds, we can apply a + # pseudorandom offset to hopefully jump over them. + # (625 is PORT_RANGE/MAX_NODES) + portseed_offset = int(time.time() * 1000) % 625 + while True: test = job_queue.get() if test is None: @@ -421,28 +435,23 @@ def execute_test_processes(num_jobs, test_list, tests_dir, tmpdir, flags): - update_queue = Queue() - job_queue = Queue() - test_results = [] - poll_timeout = 10 # seconds - # In case there is a graveyard of zombie bitcoinds, we can apply a - # pseudorandom offset to hopefully jump over them. - # (625 is PORT_RANGE/MAX_NODES) - portseed_offset = int(time.time() * 1000) % 625 + ctx = mp.get_context('spawn') + update_queue = ctx.JoinableQueue() + job_queue = ctx.JoinableQueue() + results_queue = ctx.Queue() ## # Setup our threads, and start sending tasks ## - # Start our result collection thread. - t = threading.Thread(target=handle_update_messages) - t.setDaemon(True) + t = ctx.Process(target=handle_update_messages, + args=(update_queue, results_queue,)) t.start() # Start some worker threads for j in range(num_jobs): - t = threading.Thread(target=handle_test_cases) - t.setDaemon(True) + t = ctx.Process(target=handle_test_cases, + args=(job_queue, update_queue,)) t.start() # Push all our test cases into the job queue. @@ -460,6 +469,11 @@ for j in range(num_jobs): job_queue.put(None) + # We've already stopped sending messages, so we can be sure when this queue is empty we are done. + test_results = [] + while not results_queue.empty(): + test_results.append(results_queue.get()) + return test_results