Changeset View
Changeset View
Standalone View
Standalone View
test/fuzz/test_runner.py
Show All 9 Lines | |||||
import logging | import logging | ||||
import os | import os | ||||
import subprocess | import subprocess | ||||
import sys | import sys | ||||
def main(): | def main(): | ||||
parser = argparse.ArgumentParser( | parser = argparse.ArgumentParser( | ||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter) | formatter_class=argparse.ArgumentDefaultsHelpFormatter, | ||||
description='''Run the fuzz targets with all inputs from the seed_dir once.''', | |||||
) | |||||
parser.add_argument( | parser.add_argument( | ||||
"-l", | "-l", | ||||
"--loglevel", | "--loglevel", | ||||
dest="loglevel", | dest="loglevel", | ||||
default="INFO", | default="INFO", | ||||
help="log events at this level and higher to the console. Can be set to DEBUG, INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output all logs to console.", | help="log events at this level and higher to the console. Can be set to DEBUG, INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output all logs to console.", | ||||
) | ) | ||||
parser.add_argument( | parser.add_argument( | ||||
'--export_coverage', | |||||
action='store_true', | |||||
help='If true, export coverage information to files in the seed corpus', | |||||
) | |||||
parser.add_argument( | |||||
'--valgrind', | '--valgrind', | ||||
action='store_true', | action='store_true', | ||||
help='If true, run fuzzing binaries under the valgrind memory error detector. Valgrind 3.14 or later required.', | help='If true, run fuzzing binaries under the valgrind memory error detector. Valgrind 3.14 or later required.', | ||||
) | ) | ||||
parser.add_argument( | parser.add_argument( | ||||
'seed_dir', | 'seed_dir', | ||||
help='The seed corpus to run on (must contain subfolders for each fuzz target).', | help='The seed corpus to run on (must contain subfolders for each fuzz target).', | ||||
) | ) | ||||
parser.add_argument( | parser.add_argument( | ||||
'target', | 'target', | ||||
nargs='*', | nargs='*', | ||||
help='The target(s) to run. Default is to run all targets.', | help='The target(s) to run. Default is to run all targets.', | ||||
) | ) | ||||
parser.add_argument( | |||||
'--m_dir', | |||||
help='Merge inputs from this directory into the seed_dir. Needs /target subdirectory.', | |||||
) | |||||
args = parser.parse_args() | args = parser.parse_args() | ||||
# Set up logging | # Set up logging | ||||
logging.basicConfig( | logging.basicConfig( | ||||
format='%(message)s', | format='%(message)s', | ||||
level=int(args.loglevel) if args.loglevel.isdigit( | level=int(args.loglevel) if args.loglevel.isdigit( | ||||
) else args.loglevel.upper(), | ) else args.loglevel.upper(), | ||||
▲ Show 20 Lines • Show All 64 Lines • ▼ Show 20 Lines | try: | ||||
if "libFuzzer" not in help_output: | if "libFuzzer" not in help_output: | ||||
logging.error("Must be built with libFuzzer") | logging.error("Must be built with libFuzzer") | ||||
sys.exit(1) | sys.exit(1) | ||||
except subprocess.TimeoutExpired: | except subprocess.TimeoutExpired: | ||||
logging.error( | logging.error( | ||||
"subprocess timed out: Currently only libFuzzer is supported") | "subprocess timed out: Currently only libFuzzer is supported") | ||||
sys.exit(1) | sys.exit(1) | ||||
if args.m_dir: | |||||
merge_inputs( | |||||
corpus=args.seed_dir, | |||||
test_list=test_list_selection, | |||||
build_dir=config["environment"]["BUILDDIR"], | |||||
merge_dir=args.m_dir, | |||||
) | |||||
run_once( | run_once( | ||||
corpus=args.seed_dir, | corpus=args.seed_dir, | ||||
test_list=test_list_selection, | test_list=test_list_selection, | ||||
test_dir=test_dir, | test_dir=test_dir, | ||||
export_coverage=args.export_coverage, | |||||
use_valgrind=args.valgrind, | use_valgrind=args.valgrind, | ||||
) | ) | ||||
def run_once(*, corpus, test_list, test_dir, export_coverage, use_valgrind): | def merge_inputs(*, corpus, test_list, build_dir, merge_dir): | ||||
logging.info( | |||||
"Merge the inputs in the passed dir into the seed_dir. Passed dir {}".format(merge_dir)) | |||||
for t in test_list: | |||||
args = [ | |||||
os.path.join(build_dir, 'src', 'test', 'fuzz', t), | |||||
'-merge=1', | |||||
os.path.join(corpus, t), | |||||
os.path.join(merge_dir, t), | |||||
] | |||||
os.makedirs(os.path.join(corpus, t), exist_ok=True) | |||||
os.makedirs(os.path.join(merge_dir, t), exist_ok=True) | |||||
logging.debug('Run {} with args {}'.format(t, args)) | |||||
output = subprocess.run( | |||||
args, | |||||
check=True, | |||||
stderr=subprocess.PIPE, | |||||
universal_newlines=True).stderr | |||||
logging.debug('Output: {}'.format(output)) | |||||
def run_once(*, corpus, test_list, test_dir, use_valgrind): | |||||
for t in test_list: | for t in test_list: | ||||
corpus_path = os.path.join(corpus, t) | corpus_path = os.path.join(corpus, t) | ||||
os.makedirs(corpus_path, exist_ok=True) | os.makedirs(corpus_path, exist_ok=True) | ||||
args = [ | args = [ | ||||
os.path.join(test_dir, t), | os.path.join(test_dir, t), | ||||
'-runs=1', | '-runs=1', | ||||
'-detect_leaks=0', | '-detect_leaks=0', | ||||
corpus_path, | corpus_path, | ||||
] | ] | ||||
if use_valgrind: | if use_valgrind: | ||||
args = [ | args = [ | ||||
'valgrind', | 'valgrind', | ||||
'--quiet', | '--quiet', | ||||
'--error-exitcode=1', | '--error-exitcode=1', | ||||
'--exit-on-first-error=yes'] + args | '--exit-on-first-error=yes'] + args | ||||
logging.debug('Run {} with args {}'.format(t, args)) | logging.debug('Run {} with args {}'.format(t, args)) | ||||
result = subprocess.run( | result = subprocess.run( | ||||
args, | args, | ||||
stderr=subprocess.PIPE, | stderr=subprocess.PIPE, | ||||
universal_newlines=True) | universal_newlines=True) | ||||
output = result.stderr | output = result.stderr | ||||
logging.debug('Output: {}'.format(output)) | logging.debug('Output: {}'.format(output)) | ||||
result.check_returncode() | result.check_returncode() | ||||
if not export_coverage: | |||||
continue | |||||
for line in output.splitlines(): | |||||
if 'INITED' in line: | |||||
with open(os.path.join(corpus, t + '_coverage'), 'w', encoding='utf-8') as cov_file: | |||||
cov_file.write(line) | |||||
break | |||||
if __name__ == '__main__': | if __name__ == '__main__': | ||||
main() | main() |