Changeset View
Changeset View
Standalone View
Standalone View
test/functional/test_runner.py
Show All 27 Lines | |||||
import re | import re | ||||
import logging | import logging | ||||
import xml.etree.ElementTree as ET | import xml.etree.ElementTree as ET | ||||
import json | import json | ||||
import threading | import threading | ||||
import multiprocessing | import multiprocessing | ||||
import importlib | import importlib | ||||
import importlib.util | import importlib.util | ||||
import inspect | |||||
import multiprocessing as mp | import multiprocessing as mp | ||||
from queue import Full, Empty | from queue import Full, Empty | ||||
from io import StringIO | |||||
from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework, ComparisonTestFramework, TestStatus | ||||
from test_framework.cdefs import get_srcdir | from test_framework.cdefs import get_srcdir | ||||
# Formatting. Default colors to empty strings. | # Formatting. Default colors to empty strings. | ||||
BOLD, BLUE, RED, GREY = ("", ""), ("", ""), ("", ""), ("", "") | BOLD, BLUE, RED, GREY = ("", ""), ("", ""), ("", ""), ("", "") | ||||
try: | try: | ||||
# Make sure python thinks it can write unicode to its stdout | # Make sure python thinks it can write unicode to its stdout | ||||
"\u2713".encode("utf_8").decode(sys.stdout.encoding) | "\u2713".encode("utf_8").decode(sys.stdout.encoding) | ||||
TICK = "✓ " | TICK = "✓ " | ||||
CROSS = "✖ " | CROSS = "✖ " | ||||
CIRCLE = "○ " | CIRCLE = "○ " | ||||
except UnicodeDecodeError: | except UnicodeDecodeError: | ||||
TICK = "P " | TICK = "P " | ||||
CROSS = "x " | CROSS = "x " | ||||
CIRCLE = "o " | CIRCLE = "o " | ||||
if os.name == 'posix': | if os.name == 'posix': | ||||
# primitive formatting on supported | # primitive formatting on supported | ||||
# terminal via ANSI escape sequences: | # terminal via ANSI escape sequences: | ||||
BOLD = ('\033[0m', '\033[1m') | BOLD = ('\033[0m', '\033[1m') | ||||
BLUE = ('\033[0m', '\033[0;34m') | BLUE = ('\033[0m', '\033[0;34m') | ||||
RED = ('\033[0m', '\033[0;31m') | RED = ('\033[0m', '\033[0;31m') | ||||
GREY = ('\033[0m', '\033[1;30m') | GREY = ('\033[0m', '\033[1;30m') | ||||
TEST_EXIT_PASSED = 0 | |||||
TEST_EXIT_SKIPPED = 77 | |||||
NON_SCRIPTS = [ | NON_SCRIPTS = [ | ||||
# These are python files that live in the functional tests directory, but are not test scripts. | # These are python files that live in the functional tests directory, but are not test scripts. | ||||
"combine_logs.py", | "combine_logs.py", | ||||
"create_cache.py", | "create_cache.py", | ||||
"test_runner.py", | "test_runner.py", | ||||
] | ] | ||||
Show All 36 Lines | def run(self, portseed_offset): | ||||
portseed_arg = ["--portseed={}".format(portseed)] | portseed_arg = ["--portseed={}".format(portseed)] | ||||
log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16) | log_stdout = tempfile.SpooledTemporaryFile(max_size=2**16) | ||||
log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16) | log_stderr = tempfile.SpooledTemporaryFile(max_size=2**16) | ||||
test_argv = t.split() | test_argv = t.split() | ||||
tmpdir = [os.path.join("--tmpdir=%s", "%s_%s") % | tmpdir = [os.path.join("--tmpdir=%s", "%s_%s") % | ||||
(self.tmpdir, re.sub(".py$", "", t), portseed)] | (self.tmpdir, re.sub(".py$", "", t), portseed)] | ||||
name = t | name = t | ||||
time0 = time.time() | time0 = time.time() | ||||
process = subprocess.Popen([os.path.join(self.tests_dir, test_argv[0])] + test_argv[1:] + self.flags + portseed_arg + tmpdir, | |||||
universal_newlines=True, | |||||
stdout=log_stdout, | |||||
stderr=log_stderr) | |||||
process.wait() | |||||
log_stdout.seek(0), log_stderr.seek(0) | |||||
[stdout, stderr] = [l.read().decode('utf-8') | |||||
for l in (log_stdout, log_stderr)] | |||||
log_stdout.close(), log_stderr.close() | |||||
if process.returncode == TEST_EXIT_PASSED and stderr == "": | test_modulepath = None | ||||
try: | |||||
# Dynamically import the test so we can introspect it | |||||
test_modulepath = os.path.abspath( | |||||
os.path.join(self.tests_dir, test_argv[0])) | |||||
test_spec = importlib.util.spec_from_file_location( | |||||
os.path.splitext(test_argv[0])[0], test_modulepath) | |||||
test_module = importlib.util.module_from_spec(test_spec) | |||||
test_spec.loader.exec_module(test_module) | |||||
except Exception as e: | |||||
return TestResult(name, "Failed", 0, "", str(e)) | |||||
try: | |||||
# Setup output capturing | |||||
original_stdout = sys.stdout | |||||
original_stderr = sys.stderr | |||||
test_stdout = StringIO() | |||||
test_stderr = StringIO() | |||||
sys.stdout = test_stdout | |||||
sys.stderr = test_stderr | |||||
exit_code = None | |||||
for prop in dir(test_module): | |||||
obj = getattr(test_module, prop) | |||||
if inspect.isclass(obj) and issubclass(obj, BitcoinTestFramework) and obj is not BitcoinTestFramework and obj is not ComparisonTestFramework: | |||||
exit_code = obj().main( | |||||
test_argv[1:] + self.flags + portseed_arg + tmpdir) | |||||
except Exception as e: | |||||
print(e) | |||||
finally: | |||||
sys.stdout = original_stdout | |||||
sys.stderr = original_stderr | |||||
[stdout, stderr] = [test_stdout.getvalue(), test_stderr.getvalue()] | |||||
test_stdout.close(), test_stderr.close() | |||||
if exit_code == TestStatus.PASSED and stderr == "": | |||||
status = "Passed" | status = "Passed" | ||||
elif process.returncode == TEST_EXIT_SKIPPED: | elif exit_code == TestStatus.SKIPPED: | ||||
status = "Skipped" | status = "Skipped" | ||||
else: | else: | ||||
status = "Failed" | status = "Failed" | ||||
return TestResult(name, status, int(time.time() - time0), stdout, stderr) | return TestResult(name, status, int(time.time() - time0), stdout, stderr) | ||||
def on_ci(): | def on_ci(): | ||||
▲ Show 20 Lines • Show All 586 Lines • Show Last 20 Lines |