diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -341,6 +341,85 @@ sys.exit(not all_passed) +## +# Define some helper functions we will need for threading. +## + +def handle_message(message, running_jobs): + """ + handle_message handles a single message from handle_test_cases + """ + if isinstance(message, TestCase): + running_jobs.add(message.test_case) + print("{}{}{} started".format(BOLD[1], message.test_case, BOLD[0])) + return + if isinstance(message, TestResult): + test_result = message + running_jobs.remove(test_result.name) + test_results.append(test_result) + if test_result.status == "Passed": + print("%s%s%s passed, Duration: %s s" % ( + BOLD[1], test_result.name, BOLD[0], test_result.time)) + elif test_result.status == "Skipped": + print("%s%s%s skipped" % + (BOLD[1], test_result.name, BOLD[0])) + else: + print("%s%s%s failed, Duration: %s s\n" % + (BOLD[1], test_result.name, BOLD[0], test_result.time)) + print(BOLD[1] + 'stdout:' + BOLD[0]) + print(test_result.stdout) + print(BOLD[1] + 'stderr:' + BOLD[0]) + print(test_result.stderr) + return + assert False, "we should not be here" + + +def handle_update_messages(): + """ + handle_update_messages waits for messages to be sent from handle_test_cases via the + update_queue. It serializes the results so we can print nice status update messages. + """ + printed_status = False + running_jobs = set() + while True: + message = None + try: + message = update_queue.get(True, poll_timeout) + if message is None: + break + # We printed a status message, need to kick to the next line + # before printing more. + if printed_status: + print() + printed_status = False + handle_message(message, running_jobs) + update_queue.task_done() + except Empty as e: + if not on_ci(): + print("Running jobs: {}".format( + ", ".join(running_jobs)), end="\r") + sys.stdout.flush() + printed_status = True + + +def handle_test_cases(): + """ + job_runner represents a single thread that is part of a worker pool. + It waits for a test, then executes that test. + It also reports start and result messages to handle_update_messages + """ + while True: + test = job_queue.get() + if test is None: + break + # Signal that the test is starting to inform the poor waiting + # programmer + update_queue.put(test) + result = test.run(portseed_offset) + update_queue.put(result) + job_queue.task_done() + + def execute_test_processes(num_jobs, test_list, tests_dir, tmpdir, flags): update_queue = Queue() job_queue = Queue() @@ -351,88 +430,6 @@ # (625 is PORT_RANGE/MAX_NODES) portseed_offset = int(time.time() * 1000) % 625 - ## - # Define some helper functions we will need for threading. - ## - - def handle_message(message, running_jobs): - """ - handle_message handles a single message from handle_test_cases - """ - if isinstance(message, TestCase): - running_jobs.add(message.test_case) - print("{}{}{} started".format(BOLD[1], message.test_case, BOLD[0])) - return - - if isinstance(message, TestResult): - test_result = message - - running_jobs.remove(test_result.name) - test_results.append(test_result) - - if test_result.status == "Passed": - print("%s%s%s passed, Duration: %s s" % ( - BOLD[1], test_result.name, BOLD[0], test_result.time)) - elif test_result.status == "Skipped": - print("%s%s%s skipped" % - (BOLD[1], test_result.name, BOLD[0])) - else: - print("%s%s%s failed, Duration: %s s\n" % - (BOLD[1], test_result.name, BOLD[0], test_result.time)) - print(BOLD[1] + 'stdout:' + BOLD[0]) - print(test_result.stdout) - print(BOLD[1] + 'stderr:' + BOLD[0]) - print(test_result.stderr) - return - - assert False, "we should not be here" - - def handle_update_messages(): - """ - handle_update_messages waits for messages to be sent from handle_test_cases via the - update_queue. It serializes the results so we can print nice status update messages. - """ - printed_status = False - running_jobs = set() - - while True: - message = None - try: - message = update_queue.get(True, poll_timeout) - if message is None: - break - - # We printed a status message, need to kick to the next line - # before printing more. - if printed_status: - print() - printed_status = False - - handle_message(message, running_jobs) - update_queue.task_done() - except Empty as e: - if not on_ci(): - print("Running jobs: {}".format(", ".join(running_jobs)), end="\r") - sys.stdout.flush() - printed_status = True - - def handle_test_cases(): - """ - job_runner represents a single thread that is part of a worker pool. - It waits for a test, then executes that test. - It also reports start and result messages to handle_update_messages - """ - while True: - test = job_queue.get() - if test is None: - break - # Signal that the test is starting to inform the poor waiting - # programmer - update_queue.put(test) - result = test.run(portseed_offset) - update_queue.put(result) - job_queue.task_done() - ## # Setup our threads, and start sending tasks ##