diff --git a/.arclint b/.arclint index 5d137f199..5d43f06fa 100644 --- a/.arclint +++ b/.arclint @@ -1,341 +1,343 @@ { "linters": { "generated": { "type": "generated" }, "clang-format": { "type": "clang-format", "version": ">=12.0", "bin": [ "clang-format-12", "clang-format" ], "include": "(^(src|chronik)/.*\\.(h|c|cpp|mm)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "autopep8": { "type": "autopep8", "version": ">=1.3.4", - "include": "(\\.py$)", + "include": "(^contrib/.*\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", - "(^contrib/apple-sdk-tools/)", - "(^test/functional/)" + "(^contrib/apple-sdk-tools/)" ], "flags": [ "--aggressive", "--ignore=W503,W504", "--max-line-length=88" ] }, "black": { "type": "black", "version": ">=23.0.0", - "include": [ - "(^test/functional/)" + "include": "(\\.py$)", + "exclude": [ + "(^contrib/gitian-builder/)", + "(^contrib/apple-sdk-tools/)", + "(^contrib/)" ], "flags": [ "--preview" ] }, "flake8": { "type": "flake8", "version": ">=5.0", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ], "flags": [ "--ignore=A003,E203,E303,E305,E501,E704,W503,W504", "--require-plugins=flake8-comprehensions,flake8-builtins" ] }, "lint-format-strings": { "type": "lint-format-strings", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/test/fuzz/strprintf.cpp$)" ] }, "check-doc": { "type": "check-doc", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)" }, "lint-tests": { "type": "lint-tests", "include": "(^src/(seeder/|rpc/|wallet/)?test/.*\\.(cpp)$)" }, "phpcs": { "type": "phpcs", "include": "(\\.php$)", "exclude": [ "(^arcanist/__phutil_library_.+\\.php$)" ], "phpcs.standard": "arcanist/phpcs.xml" }, "lint-locale-dependence": { "type": "lint-locale-dependence", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(crypto/ctaes/|leveldb/|secp256k1/|tinyformat.h|univalue/))", "(^src/bench/nanobench.h$)" ] }, "lint-cheader": { "type": "lint-cheader", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "spelling": { "type": "spelling", "exclude": [ "(^build-aux/m4/)", "(^depends/)", "(^doc/release-notes/)", "(^contrib/gitian-builder/)", "(^src/(qt/locale|secp256k1|univalue|leveldb)/)", "(^test/lint/dictionary/)", "(package-lock.json)" ], "spelling.dictionaries": [ "test/lint/dictionary/english.json" ] }, "lint-assert-with-side-effects": { "type": "lint-assert-with-side-effects", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-include-quotes": { "type": "lint-include-quotes", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-include-guard": { "type": "lint-include-guard", "include": "(^(src|chronik)/.*\\.h$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/tinyformat.h$)" ] }, "lint-include-source": { "type": "lint-include-source", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-std-chrono": { "type": "lint-std-chrono", "include": "(^(src|chronik)/.*\\.(h|cpp)$)" }, "lint-stdint": { "type": "lint-stdint", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/compat/assumptions.h$)" ] }, "lint-source-filename": { "type": "lint-source-filename", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-boost-dependencies": { "type": "lint-boost-dependencies", "include": "(^(src|chronik)/.*\\.(h|cpp)$)" }, "lint-python-encoding": { "type": "lint-python-encoding", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "lint-python-shebang": { "type": "lint-python-shebang", "include": "(\\.py$)", "exclude": [ "(__init__\\.py$)", "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "lint-bash-shebang": { "type": "lint-bash-shebang", "include": "(\\.sh$)", "exclude": [ "(^contrib/gitian-builder/)" ] }, "shellcheck": { "type": "shellcheck", "version": ">=0.7.0", "flags": [ "--external-sources", "--source-path=SCRIPTDIR" ], "include": "(\\.sh$)", "exclude": [ "(^contrib/gitian-builder/)", "(^src/(secp256k1|univalue)/)" ] }, "lint-shell-locale": { "type": "lint-shell-locale", "include": "(\\.sh$)", "exclude": [ "(^contrib/gitian-builder/)", "(^src/(secp256k1|univalue)/)", "(^cmake/utils/log-and-print-on-failure.sh)" ] }, "lint-cpp-void-parameters": { "type": "lint-cpp-void-parameters", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/compat/glibc_compat.cpp$)" ] }, "lint-logs": { "type": "lint-logs", "include": "(^(src|chronik)/.*\\.(h|cpp|rs)$)" }, "lint-qt": { "type": "lint-qt", "include": "(^src/qt/.*\\.(h|cpp)$)", "exclude": [ "(^src/qt/(locale|forms|res)/)" ] }, "lint-doxygen": { "type": "lint-doxygen", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-whitespace": { "type": "lint-whitespace", "include": "(\\.(ac|am|cmake|conf|in|include|json|m4|md|openrc|php|pl|rs|sh|txt|yml)$)", "exclude": [ "(^contrib/gitian-builder/)", "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "yamllint": { "type": "yamllint", "include": "(\\.(yml|yaml)$)", "exclude": "(^src/(secp256k1|univalue|leveldb)/)" }, "lint-check-nonfatal": { "type": "lint-check-nonfatal", "include": [ "(^src/rpc/.*\\.(h|c|cpp)$)", "(^src/wallet/rpc*.*\\.(h|c|cpp)$)" ], "exclude": "(^src/rpc/server.cpp)" }, "lint-markdown": { "type": "lint-markdown", "include": [ "(\\.md$)" ], "exclude": "(^contrib/gitian-builder/)" }, "lint-python-mypy": { "type": "lint-python-mypy", "version": ">=0.910", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)", "(^contrib/macdeploy/)" ], "flags": [ "--ignore-missing-imports", "--install-types", "--non-interactive" ] }, "lint-python-mutable-default": { "type": "lint-python-mutable-default", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "prettier": { "type": "prettier", "version": ">=2.6.0", "include": [ "(^cashtab/.*\\.(css|html|js|json|jsx|md|scss|ts|tsx)$)", "(^web/.*\\.(css|html|js|json|jsx|md|scss|ts|tsx)$)" ], "exclude": "(^web/.*/translations/.*\\.json$)" }, "lint-python-isort": { "type": "lint-python-isort", "version": ">=5.6.4", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "rustfmt": { "type": "rustfmt", "version": ">=1.5.1", "include": "(\\.rs$)" }, "eslint": { "type": "eslint", "version": ">=8.0.0", "include": [ "(cashtab/.*\\.js$)", "(apps/alias-server/.*\\.js$)", "(modules/ecashaddrjs/.*\\.js$)", "(apps/ecash-herald/.*\\.js$)", "(modules/chronik-client/.*\\.(js|jsx|ts|tsx)$)" ] }, "lint-python-flynt": { "type": "lint-python-flynt", "version": ">=0.78", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] } } } diff --git a/cmake/utils/filter-lcov.py b/cmake/utils/filter-lcov.py index 2eb8f189c..98f698a6c 100755 --- a/cmake/utils/filter-lcov.py +++ b/cmake/utils/filter-lcov.py @@ -1,31 +1,38 @@ #!/usr/bin/env python3 # Copyright (c) 2020 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import argparse parser = argparse.ArgumentParser( - description='Remove the coverage data from a tracefile for all files matching the pattern.') -parser.add_argument('--pattern', '-p', action='append', - help='the pattern of files to remove', required=True) + description=( + "Remove the coverage data from a tracefile for all files matching the pattern." + ) +) parser.add_argument( - 'tracefile', help='the tracefile to remove the coverage data from') -parser.add_argument('outfile', help='filename for the output to be written to') + "--pattern", + "-p", + action="append", + help="the pattern of files to remove", + required=True, +) +parser.add_argument("tracefile", help="the tracefile to remove the coverage data from") +parser.add_argument("outfile", help="filename for the output to be written to") args = parser.parse_args() tracefile = args.tracefile pattern = args.pattern outfile = args.outfile in_remove = False -with open(tracefile, 'r', encoding="utf8") as f: - with open(outfile, 'w', encoding="utf8") as wf: +with open(tracefile, "r", encoding="utf8") as f: + with open(outfile, "w", encoding="utf8") as wf: for line in f: for p in pattern: if line.startswith("SF:") and p in line: in_remove = True if not in_remove: wf.write(line) - if line == 'end_of_record\n': + if line == "end_of_record\n": in_remove = False diff --git a/cmake/utils/gen-ninja-deps.py b/cmake/utils/gen-ninja-deps.py index df35c0258..7916f4171 100755 --- a/cmake/utils/gen-ninja-deps.py +++ b/cmake/utils/gen-ninja-deps.py @@ -1,181 +1,174 @@ #!/usr/bin/env python3 import argparse import os import subprocess -parser = argparse.ArgumentParser(description='Produce a dep file from ninja.') +parser = argparse.ArgumentParser(description="Produce a dep file from ninja.") +parser.add_argument("--build-dir", help="The build directory.", required=True) parser.add_argument( - '--build-dir', - help='The build directory.', - required=True) + "--base-dir", + help="The directory for which dependencies are rewriten.", + required=True, +) +parser.add_argument("--ninja", help="The ninja executable to use.") +parser.add_argument("base_target", help="The target from the base's perspective.") parser.add_argument( - '--base-dir', - help='The directory for which dependencies are rewriten.', - required=True) -parser.add_argument('--ninja', help='The ninja executable to use.') -parser.add_argument( - 'base_target', - help="The target from the base's perspective.") -parser.add_argument( - 'targets', nargs='+', - help='The target for which dependencies are extracted.') -parser.add_argument( - '--extra-deps', nargs='+', - help='Extra dependencies.') + "targets", nargs="+", help="The target for which dependencies are extracted." +) +parser.add_argument("--extra-deps", nargs="+", help="Extra dependencies.") args = parser.parse_args() build_dir = os.path.abspath(args.build_dir) base_dir = os.path.abspath(args.base_dir) ninja = args.ninja base_target = args.base_target targets = args.targets extra_deps = args.extra_deps # Make sure we operate in the right folder. os.chdir(build_dir) if ninja is None: - ninja = subprocess.check_output(['command', '-v', 'ninja'])[:-1] + ninja = subprocess.check_output(["command", "-v", "ninja"])[:-1] # Construct the set of all targets all_targets = set() doto_targets = set() -for t in subprocess.check_output([ninja, '-t', 'targets', 'all']).splitlines(): - t, r = t.split(b':') +for t in subprocess.check_output([ninja, "-t", "targets", "all"]).splitlines(): + t, r = t.split(b":") all_targets.add(t) - if r[:13] == b' C_COMPILER__' or r[:15] == b' CXX_COMPILER__': + if r[:13] == b" C_COMPILER__" or r[:15] == b" CXX_COMPILER__": doto_targets.add(t) def parse_ninja_query(query): deps = {} lines = query.splitlines() while len(lines): line = lines.pop(0) - if line[0] == ord(' '): + if line[0] == ord(" "): continue # We have a new target - target = line.split(b':')[0] - assert lines.pop(0)[:8] == b' input:' + target = line.split(b":")[0] + assert lines.pop(0)[:8] == b" input:" inputs = set() while True: i = lines.pop(0) - if i[:4] != b' ': + if i[:4] != b" ": break - ''' + """ ninja has 3 types of input: 1. Explicit dependencies, no prefix; 2. Implicit dependencies, | prefix. 3. Order only dependencies, || prefix. Order only dependency do not require the target to be rebuilt and so we ignore them. - ''' + """ i = i[4:] - if i[0] == ord('|'): - if i[1] == ord('|'): + if i[0] == ord("|"): + if i[1] == ord("|"): # We reached the order only dependencies. break i = i[2:] inputs.add(i) deps[target] = inputs return deps def extract_deps(workset): # Recursively extract the dependencies of the target. deps = {} while len(workset) > 0: - query = subprocess.check_output([ninja, '-t', 'query'] + list(workset)) + query = subprocess.check_output([ninja, "-t", "query"] + list(workset)) target_deps = parse_ninja_query(query) deps.update(target_deps) workset = set() for d in target_deps.values(): workset.update(t for t in d if t in all_targets and t not in deps) # Extract build time dependencies. bt_targets = [t for t in deps if t in doto_targets] if len(bt_targets) == 0: return deps ndeps = subprocess.check_output( - [ninja, '-t', 'deps'] + bt_targets, - stderr=subprocess.DEVNULL) + [ninja, "-t", "deps"] + bt_targets, stderr=subprocess.DEVNULL + ) lines = ndeps.splitlines() while len(lines) > 0: line = lines.pop(0) - t, m = line.split(b':') - if m == b' deps not found': + t, m = line.split(b":") + if m == b" deps not found": continue inputs = set() while True: i = lines.pop(0) - if i == b'': + if i == b"": break - assert i[:4] == b' ' + assert i[:4] == b" " inputs.add(i[4:]) deps[t] = inputs return deps base_dir = base_dir.encode() def rebase_deps(deps): rebased = {} cache = {} def rebase(path): if path in cache: return cache[path] abspath = os.path.abspath(path) - newpath = path if path == abspath else os.path.relpath( - abspath, base_dir) + newpath = path if path == abspath else os.path.relpath(abspath, base_dir) cache[path] = newpath return newpath for t, s in deps.items(): rebased[rebase(t)] = {rebase(d) for d in s} return rebased deps = extract_deps(set(targets)) deps = rebase_deps(deps) def dump(deps): for t, d in deps.items(): if len(d) == 0: continue dump_str = f"{t.decode()}: \\\n " dump_str += " \\\n ".join(sorted(x.decode() for x in d)) print(dump_str) # Collapse everything under the base target. basedeps = set() if extra_deps is None else {d.encode() for d in extra_deps} for d in deps.values(): basedeps.update(d) base_target = base_target.encode() basedeps.discard(base_target) dump({base_target: basedeps}) diff --git a/cmake/utils/junit-reports-merge.py b/cmake/utils/junit-reports-merge.py index 982e02c70..90e07fd5a 100755 --- a/cmake/utils/junit-reports-merge.py +++ b/cmake/utils/junit-reports-merge.py @@ -1,132 +1,130 @@ #!/usr/bin/env python3 # Copyright (c) 2020 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import datetime import fcntl import os import sys import xml.etree.ElementTree as ET class TestSuite: def __init__(self, name, report_dir): self.name = name self.test_cases = {} - self.report_file = os.path.join(report_dir, f'{self.name}.xml') + self.report_file = os.path.join(report_dir, f"{self.name}.xml") def add_test_case(self, test_case): self.test_cases[test_case.test_id] = test_case def get_failed_tests(self): return [t for t in self.test_cases.values() if not t.test_success] def dump(self): # Calculate test suite duration as the sum of all test case duraration - duration = round(sum([ - float(t.node.get('time', 0.0)) for t in self.test_cases.values() - ]), 3) + duration = round( + sum([float(t.node.get("time", 0.0)) for t in self.test_cases.values()]), 3 + ) test_suite = ET.Element( - 'testsuite', + "testsuite", { - 'name': self.name, - 'id': '0', - 'timestamp': datetime.datetime.now().isoformat('T'), - 'time': str(duration), - 'tests': str(len(self.test_cases)), - 'failures': str(len(self.get_failed_tests())), - } + "name": self.name, + "id": "0", + "timestamp": datetime.datetime.now().isoformat("T"), + "time": str(duration), + "tests": str(len(self.test_cases)), + "failures": str(len(self.get_failed_tests())), + }, ) for test_case in self.test_cases.values(): test_suite.append(test_case.node) report_dir = os.path.dirname(self.report_file) os.makedirs(report_dir, exist_ok=True) ET.ElementTree(test_suite).write( self.report_file, - 'UTF-8', + "UTF-8", xml_declaration=True, ) def load(self): tree = ET.parse(self.report_file) xml_root = tree.getroot() - assert xml_root.tag == 'testsuite' - assert self.name == xml_root.get('name') + assert xml_root.tag == "testsuite" + assert self.name == xml_root.get("name") - for test_case in xml_root.findall('testcase'): + for test_case in xml_root.findall("testcase"): self.add_test_case(TestCase(test_case)) class TestCase: def __init__(self, node): self.node = node - self.test_success = self.node.find('failure') is None + self.test_success = self.node.find("failure") is None def __getattr__(self, attribute): - if attribute == 'test_id': + if attribute == "test_id": return f"{self.classname}/{self.name}" return self.node.attrib[attribute] class Lock: def __init__(self, suite, lock_dir): - self.lock_file = os.path.join(lock_dir, f'{suite}.lock') + self.lock_file = os.path.join(lock_dir, f"{suite}.lock") def __enter__(self): os.makedirs(os.path.dirname(self.lock_file), exist_ok=True) - self.fd = open(self.lock_file, 'w', encoding='utf-8') + self.fd = open(self.lock_file, "w", encoding="utf-8") fcntl.lockf(self.fd, fcntl.LOCK_EX) def __exit__(self, exception_type, exception_value, traceback): fcntl.lockf(self.fd, fcntl.LOCK_UN) self.fd.close() def main(report_dir, lock_dir, suite, test): - junit = f'{suite}-{test}.xml' + junit = f"{suite}-{test}.xml" if not os.path.isfile(junit): return 0 tree = ET.parse(junit) # Junit root can be a single test suite or multiple test suites. The # later case is unsupported. xml_root = tree.getroot() - if xml_root.tag != 'testsuite': - raise AssertionError( - "The parser only supports a single test suite per report") + if xml_root.tag != "testsuite": + raise AssertionError("The parser only supports a single test suite per report") - test_suite_name = xml_root.get('name') + test_suite_name = xml_root.get("name") lock = Lock(suite, lock_dir) with lock: test_suite = TestSuite(test_suite_name, report_dir) if os.path.isfile(test_suite.report_file): test_suite.load() for child in xml_root: - if child.tag != 'testcase' or (child.find('skipped') is not None): + if child.tag != "testcase" or (child.find("skipped") is not None): continue test_suite.add_test_case(TestCase(child)) test_suite.dump() sys.exit( - 1 if test in [case.classname for case in test_suite.get_failed_tests()] - else 0 + 1 if test in [case.classname for case in test_suite.get_failed_tests()] else 0 ) main( report_dir=sys.argv[1], lock_dir=sys.argv[2], suite=sys.argv[3], test=sys.argv[4], ) diff --git a/share/rpcauth/rpcauth.py b/share/rpcauth/rpcauth.py index d601f935c..742a9b09f 100755 --- a/share/rpcauth/rpcauth.py +++ b/share/rpcauth/rpcauth.py @@ -1,53 +1,57 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2018 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import hmac from argparse import ArgumentParser from base64 import urlsafe_b64encode from binascii import hexlify from getpass import getpass from os import urandom def generate_salt(size): """Create size byte hex salt""" return hexlify(urandom(size)).decode() def generate_password(): """Create 32 byte b64 password""" - return urlsafe_b64encode(urandom(32)).decode('utf-8') + return urlsafe_b64encode(urandom(32)).decode("utf-8") def password_to_hmac(salt, password): - m = hmac.new(bytearray(salt, 'utf-8'), - bytearray(password, 'utf-8'), 'SHA256') + m = hmac.new(bytearray(salt, "utf-8"), bytearray(password, "utf-8"), "SHA256") return m.hexdigest() def main(): - parser = ArgumentParser( - description='Create login credentials for a JSON-RPC user') - parser.add_argument('username', help='the username for authentication') + parser = ArgumentParser(description="Create login credentials for a JSON-RPC user") + parser.add_argument("username", help="the username for authentication") parser.add_argument( - 'password', help='leave empty to generate a random password or specify "-" to prompt for password', nargs='?') + "password", + help=( + 'leave empty to generate a random password or specify "-" to prompt for' + " password" + ), + nargs="?", + ) args = parser.parse_args() if not args.password: args.password = generate_password() - elif args.password == '-': + elif args.password == "-": args.password = getpass() # Create 16 byte hex salt salt = generate_salt(16) password_hmac = password_to_hmac(salt, args.password) - print('String to be appended to bitcoin.conf:') - print(f'rpcauth={args.username}:{salt}${password_hmac}') - print(f'Your password:\n{args.password}') + print("String to be appended to bitcoin.conf:") + print(f"rpcauth={args.username}:{salt}${password_hmac}") + print(f"Your password:\n{args.password}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/test/data/generate_asmap.py b/src/test/data/generate_asmap.py index d691ac9f9..b46f8c3f3 100755 --- a/src/test/data/generate_asmap.py +++ b/src/test/data/generate_asmap.py @@ -1,25 +1,27 @@ #!/usr/bin/env python3 # Copyright (c) 2020 The Bitcoin developers import sys from pathlib import Path def main(input_file, output_file): - with open(input_file, 'rb') as f: + with open(input_file, "rb") as f: contents = f.read() with open(output_file, "w", encoding="utf-8") as f: - f.write( - f"static unsigned const char {Path(input_file).stem}_raw[] = {{\n") + f.write(f"static unsigned const char {Path(input_file).stem}_raw[] = {{\n") f.write(", ".join(f"0x{x:02x}" for x in contents)) f.write("\n};\n") if __name__ == "__main__": if len(sys.argv) != 3: - print("Invalid parameters\nUsage: {} input_file output_file".format( - Path(sys.argv[0]).name)) + print( + "Invalid parameters\nUsage: {} input_file output_file".format( + Path(sys.argv[0]).name + ) + ) sys.exit(1) main(sys.argv[1], sys.argv[2]) diff --git a/test/functional/feature_notifications.py b/test/functional/feature_notifications.py index 42600abc4..fec06324a 100755 --- a/test/functional/feature_notifications.py +++ b/test/functional/feature_notifications.py @@ -1,217 +1,219 @@ #!/usr/bin/env python3 # Copyright (c) 2014-2019 The Bitcoin Core developers # Copyright (c) 2018 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test the -alertnotify, -blocknotify and -walletnotify options.""" import os from test_framework.address import ADDRESS_ECREG_UNSPENDABLE, keyhash_to_p2pkh from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal FORK_WARNING_MESSAGE = "Warning: Large-work fork detected, forking after block {}" # Linux allow all characters other than \x00 # Windows disallow control characters (0-31) and /\?%:|"<> FILE_CHAR_START = 32 if os.name == "nt" else 1 FILE_CHAR_END = 128 FILE_CHAR_BLACKLIST = '/\\?%*:|"<>' if os.name == "nt" else "/" def notify_outputname(walletname, txid): return txid if os.name == "nt" else f"{walletname}_{txid}" class NotificationsTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 self.setup_clean_chain = True def setup_network(self): self.wallet = "".join( chr(i) for i in range(FILE_CHAR_START, FILE_CHAR_END) if chr(i) not in FILE_CHAR_BLACKLIST ) self.alertnotify_dir = os.path.join(self.options.tmpdir, "alertnotify") self.blocknotify_dir = os.path.join(self.options.tmpdir, "blocknotify") self.walletnotify_dir = os.path.join(self.options.tmpdir, "walletnotify") os.mkdir(self.alertnotify_dir) os.mkdir(self.blocknotify_dir) os.mkdir(self.walletnotify_dir) # -alertnotify and -blocknotify on node0, walletnotify on node1 self.extra_args = [ [ f"-alertnotify=echo > {os.path.join(self.alertnotify_dir, '%s')}", f"-blocknotify=echo > {os.path.join(self.blocknotify_dir, '%s')}", ], [ "-rescan", ( "-walletnotify=echo >" f" {os.path.join(self.walletnotify_dir, notify_outputname('%w', '%s'))}" ), ], ] self.wallet_names = [self.default_wallet_name, self.wallet] super().setup_network() def run_test(self): self.log.info("test -blocknotify") block_count = 10 blocks = self.generatetoaddress( self.nodes[1], block_count, - self.nodes[1].getnewaddress() - if self.is_wallet_compiled() - else ADDRESS_ECREG_UNSPENDABLE, + ( + self.nodes[1].getnewaddress() + if self.is_wallet_compiled() + else ADDRESS_ECREG_UNSPENDABLE + ), ) # wait at most 10 seconds for expected number of files before reading # the content self.wait_until( lambda: len(os.listdir(self.blocknotify_dir)) == block_count, timeout=10 ) # directory content should equal the generated blocks hashes assert_equal(sorted(blocks), sorted(os.listdir(self.blocknotify_dir))) if self.is_wallet_compiled(): self.log.info("test -walletnotify") # wait at most 10 seconds for expected number of files before # reading the content self.wait_until( lambda: len(os.listdir(self.walletnotify_dir)) == block_count, timeout=10, ) # directory content should equal the generated transaction hashes txids_rpc = [ notify_outputname(self.wallet, t["txid"]) for t in self.nodes[1].listtransactions("*", block_count) ] assert_equal(sorted(txids_rpc), sorted(os.listdir(self.walletnotify_dir))) self.stop_node(1) for tx_file in os.listdir(self.walletnotify_dir): os.remove(os.path.join(self.walletnotify_dir, tx_file)) self.log.info("test -walletnotify after rescan") # restart node to rescan to force wallet notifications self.start_node(1) self.connect_nodes(0, 1) self.wait_until( lambda: len(os.listdir(self.walletnotify_dir)) == block_count, timeout=10, ) # directory content should equal the generated transaction hashes txids_rpc = [ notify_outputname(self.wallet, t["txid"]) for t in self.nodes[1].listtransactions("*", block_count) ] assert_equal(sorted(txids_rpc), sorted(os.listdir(self.walletnotify_dir))) for tx_file in os.listdir(self.walletnotify_dir): os.remove(os.path.join(self.walletnotify_dir, tx_file)) # Conflicting transactions tests. Give node 0 same wallet seed as # node 1, generate spends from node 0, and check notifications # triggered by node 1 self.log.info("test -walletnotify with conflicting transactions") self.nodes[0].sethdseed( seed=self.nodes[1].dumpprivkey( keyhash_to_p2pkh( bytes.fromhex(self.nodes[1].getwalletinfo()["hdseedid"])[::-1] ) ) ) self.nodes[0].rescanblockchain() self.generatetoaddress(self.nodes[0], 100, ADDRESS_ECREG_UNSPENDABLE) # Generate transaction on node 0, sync mempools, and check for # notification on node 1. tx1 = self.nodes[0].sendtoaddress( address=ADDRESS_ECREG_UNSPENDABLE, amount=100 ) assert_equal(tx1 in self.nodes[0].getrawmempool(), True) self.sync_mempools() self.expect_wallet_notify([tx1]) # Add tx1 transaction to new block, checking for a notification # and the correct number of confirmations. self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE) self.sync_blocks() self.expect_wallet_notify([tx1]) assert_equal(self.nodes[1].gettransaction(tx1)["confirmations"], 1) # Generate conflicting transactions with the nodes disconnected. # Sending almost the entire available balance on each node, but # with a slightly different amount, ensures that there will be # a conflict. balance = self.nodes[0].getbalance() self.disconnect_nodes(0, 1) tx2_node0 = self.nodes[0].sendtoaddress( address=ADDRESS_ECREG_UNSPENDABLE, amount=balance - 20 ) tx2_node1 = self.nodes[1].sendtoaddress( address=ADDRESS_ECREG_UNSPENDABLE, amount=balance - 21 ) assert tx2_node0 != tx2_node1 self.expect_wallet_notify([tx2_node1]) # So far tx2_node1 has no conflicting tx assert not self.nodes[1].gettransaction(tx2_node1)["walletconflicts"] # Mine a block on node0, reconnect the nodes, check that tx2_node1 # has a conflicting tx after syncing with node0. self.generatetoaddress( self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE, sync_fun=self.no_op ) self.connect_nodes(0, 1) self.sync_blocks() assert ( tx2_node0 in self.nodes[1].gettransaction(tx2_node1)["walletconflicts"] ) # node1's wallet will notify of the new confirmed transaction tx2_0 # and about the conflicted transaction tx2_1. self.expect_wallet_notify([tx2_node0, tx2_node1]) # Create an invalid chain and ensure the node warns. self.log.info("test -alertnotify for forked chain") fork_block = self.nodes[0].getbestblockhash() self.generatetoaddress(self.nodes[0], 1, ADDRESS_ECREG_UNSPENDABLE) invalid_block = self.nodes[0].getbestblockhash() self.generatetoaddress(self.nodes[0], 7, ADDRESS_ECREG_UNSPENDABLE) # Invalidate a large branch, which should trigger an alert. self.nodes[0].invalidateblock(invalid_block) # Give bitcoind 10 seconds to write the alert notification self.wait_until(lambda: len(os.listdir(self.alertnotify_dir)), timeout=10) # The notification command is unable to properly handle the spaces on # windows. Skip the content check in this case. if os.name != "nt": assert FORK_WARNING_MESSAGE.format(fork_block) in os.listdir( self.alertnotify_dir ) for notify_file in os.listdir(self.alertnotify_dir): os.remove(os.path.join(self.alertnotify_dir, notify_file)) def expect_wallet_notify(self, tx_ids): self.wait_until( lambda: len(os.listdir(self.walletnotify_dir)) >= len(tx_ids), timeout=10 ) assert_equal( sorted(notify_outputname(self.wallet, tx_id) for tx_id in tx_ids), sorted(os.listdir(self.walletnotify_dir)), ) for tx_file in os.listdir(self.walletnotify_dir): os.remove(os.path.join(self.walletnotify_dir, tx_file)) if __name__ == "__main__": NotificationsTest().main() diff --git a/test/functional/test_framework/chronik/client.py b/test/functional/test_framework/chronik/client.py index 3bb01ff81..3ed1313c9 100644 --- a/test/functional/test_framework/chronik/client.py +++ b/test/functional/test_framework/chronik/client.py @@ -1,170 +1,179 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import http.client from typing import Union import chronik_pb2 as pb import websocket # Timespan when HTTP requests to Chronik time out DEFAULT_TIMEOUT = 30 class UnexpectedContentType(Exception): pass class ChronikResponse: def __init__(self, status: int, *, ok_proto=None, error_proto=None) -> None: self.status = status self.ok_proto = ok_proto self.error_proto = error_proto def ok(self): if self.status != 200: raise AssertionError( - f'Expected OK response, but got status {self.status}, error: ' - f'{self.error_proto}') + f"Expected OK response, but got status {self.status}, error: " + f"{self.error_proto}" + ) return self.ok_proto def err(self, status: int): if self.status == 200: raise AssertionError( - f'Expected error response status {status}, but got OK: {self.ok_proto}') + f"Expected error response status {status}, but got OK: {self.ok_proto}" + ) if self.status != status: raise AssertionError( - f'Expected error response status {status}, but got different error ' - f'status {self.status}, error: {self.error_proto}') + f"Expected error response status {status}, but got different error " + f"status {self.status}, error: {self.error_proto}" + ) return self.error_proto class ChronikScriptClient: - def __init__(self, client: 'ChronikClient', script_type: str, - script_payload: str) -> None: + def __init__( + self, client: "ChronikClient", script_type: str, script_payload: str + ) -> None: self.client = client self.script_type = script_type self.script_payload = script_payload def confirmed_txs(self, page=None, page_size=None): query = _page_query_params(page, page_size) return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/confirmed-txs{query}', - pb.TxHistoryPage) + f"/script/{self.script_type}/{self.script_payload}/confirmed-txs{query}", + pb.TxHistoryPage, + ) def history(self, page=None, page_size=None): query = _page_query_params(page, page_size) return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/history{query}', - pb.TxHistoryPage) + f"/script/{self.script_type}/{self.script_payload}/history{query}", + pb.TxHistoryPage, + ) def unconfirmed_txs(self): return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/unconfirmed-txs', - pb.TxHistoryPage) + f"/script/{self.script_type}/{self.script_payload}/unconfirmed-txs", + pb.TxHistoryPage, + ) def utxos(self): return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/utxos', - pb.ScriptUtxos) + f"/script/{self.script_type}/{self.script_payload}/utxos", pb.ScriptUtxos + ) class ChronikWs: def __init__(self, ws) -> None: self.ws = ws def recv(self): data = self.ws.recv() ws_msg = pb.WsMsg() ws_msg.ParseFromString(data) return ws_msg def send_bytes(self, data: bytes) -> None: self.ws.send(data, websocket.ABNF.OPCODE_BINARY) def sub_to_blocks(self, *, is_unsub=False) -> None: sub = pb.WsSub(is_unsub=is_unsub, blocks=pb.WsSubBlocks()) self.send_bytes(sub.SerializeToString()) def sub_script(self, script_type: str, payload: bytes, *, is_unsub=False) -> None: sub = pb.WsSub( is_unsub=is_unsub, script=pb.WsSubScript(script_type=script_type, payload=payload), ) self.send_bytes(sub.SerializeToString()) class ChronikClient: - CONTENT_TYPE = 'application/x-protobuf' + CONTENT_TYPE = "application/x-protobuf" def __init__(self, host: str, port: int, timeout=DEFAULT_TIMEOUT) -> None: self.host = host self.port = port self.timeout = timeout def _request_get(self, path: str, pb_type): kwargs = {} if self.timeout is not None: - kwargs['timeout'] = self.timeout + kwargs["timeout"] = self.timeout client = http.client.HTTPConnection(self.host, self.port, **kwargs) - client.request('GET', path) + client.request("GET", path) response = client.getresponse() - content_type = response.getheader('Content-Type') + content_type = response.getheader("Content-Type") body = response.read() if content_type != self.CONTENT_TYPE: raise UnexpectedContentType( f'Unexpected Content-Type "{content_type}" (expected ' f'"{self.CONTENT_TYPE}"), body: {repr(body)}' ) if response.status != 200: proto_error = pb.Error() proto_error.ParseFromString(body) return ChronikResponse(response.status, error_proto=proto_error) ok_proto = pb_type() ok_proto.ParseFromString(body) return ChronikResponse(response.status, ok_proto=ok_proto) def blockchain_info(self) -> ChronikResponse: - return self._request_get('/blockchain-info', pb.BlockchainInfo) + return self._request_get("/blockchain-info", pb.BlockchainInfo) def block(self, hash_or_height: Union[str, int]) -> ChronikResponse: - return self._request_get(f'/block/{hash_or_height}', pb.Block) + return self._request_get(f"/block/{hash_or_height}", pb.Block) - def block_txs(self, hash_or_height: Union[str, int], - page=None, page_size=None) -> ChronikResponse: + def block_txs( + self, hash_or_height: Union[str, int], page=None, page_size=None + ) -> ChronikResponse: query = _page_query_params(page, page_size) return self._request_get( - f'/block-txs/{hash_or_height}{query}', pb.TxHistoryPage) + f"/block-txs/{hash_or_height}{query}", pb.TxHistoryPage + ) def blocks(self, start_height: int, end_height: int) -> ChronikResponse: - return self._request_get(f'/blocks/{start_height}/{end_height}', pb.Blocks) + return self._request_get(f"/blocks/{start_height}/{end_height}", pb.Blocks) def tx(self, txid: str) -> ChronikResponse: - return self._request_get(f'/tx/{txid}', pb.Tx) + return self._request_get(f"/tx/{txid}", pb.Tx) def raw_tx(self, txid: str) -> bytes: - return self._request_get(f'/raw-tx/{txid}', pb.RawTx) + return self._request_get(f"/raw-tx/{txid}", pb.RawTx) def script(self, script_type: str, script_payload: str) -> ChronikScriptClient: return ChronikScriptClient(self, script_type, script_payload) def ws(self, *, timeout=None) -> ChronikWs: ws = websocket.WebSocket() - ws.connect(f'ws://{self.host}:{self.port}/ws', timeout=timeout) + ws.connect(f"ws://{self.host}:{self.port}/ws", timeout=timeout) return ChronikWs(ws) def _page_query_params(page=None, page_size=None) -> str: if page is not None and page_size is not None: - return f'?page={page}&page_size={page_size}' + return f"?page={page}&page_size={page_size}" elif page is not None: - return f'?page={page}' + return f"?page={page}" elif page_size is not None: - return f'?page_size={page_size}' + return f"?page_size={page_size}" else: - return '' + return "" diff --git a/test/functional/test_framework/chronik/test_data.py b/test/functional/test_framework/chronik/test_data.py index ad6a1e592..4d1ae1f72 100644 --- a/test/functional/test_framework/chronik/test_data.py +++ b/test/functional/test_framework/chronik/test_data.py @@ -1,38 +1,42 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. from test_framework.blocktools import ( GENESIS_BLOCK_HASH, GENESIS_CB_SCRIPT_PUBKEY, GENESIS_CB_SCRIPT_SIG, GENESIS_CB_TXID, TIME_GENESIS_BLOCK, ) from test_framework.chronik.client import pb def genesis_cb_tx(): return pb.Tx( txid=bytes.fromhex(GENESIS_CB_TXID)[::-1], version=1, - inputs=[pb.TxInput( - prev_out=pb.OutPoint(txid=bytes(32), out_idx=0xffffffff), - input_script=bytes(GENESIS_CB_SCRIPT_SIG), - sequence_no=0xffffffff, - )], - outputs=[pb.TxOutput( - value=5000000000, - output_script=bytes(GENESIS_CB_SCRIPT_PUBKEY), - )], + inputs=[ + pb.TxInput( + prev_out=pb.OutPoint(txid=bytes(32), out_idx=0xFFFFFFFF), + input_script=bytes(GENESIS_CB_SCRIPT_SIG), + sequence_no=0xFFFFFFFF, + ) + ], + outputs=[ + pb.TxOutput( + value=5000000000, + output_script=bytes(GENESIS_CB_SCRIPT_PUBKEY), + ) + ], lock_time=0, block=pb.BlockMetadata( hash=bytes.fromhex(GENESIS_BLOCK_HASH)[::-1], height=0, timestamp=TIME_GENESIS_BLOCK, ), time_first_seen=0, size=204, is_coinbase=True, ) diff --git a/test/functional/tool_wallet.py b/test/functional/tool_wallet.py index 9c1662cfa..bffa55caf 100755 --- a/test/functional/tool_wallet.py +++ b/test/functional/tool_wallet.py @@ -1,291 +1,285 @@ #!/usr/bin/env python3 # Copyright (c) 2018-2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test bitcoin-wallet.""" import hashlib import os import stat import subprocess import textwrap from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal BUFFER_SIZE = 16 * 1024 class ToolWalletTest(BitcoinTestFramework): def set_test_params(self): self.num_nodes = 1 self.setup_clean_chain = True self.rpc_timeout = 120 def skip_test_if_missing_module(self): self.skip_if_no_wallet() self.skip_if_no_wallet_tool() def bitcoin_wallet_process(self, *args): binary = ( self.config["environment"]["BUILDDIR"] + "/src/bitcoin-wallet" + self.config["environment"]["EXEEXT"] ) args = [f"-datadir={self.nodes[0].datadir}", f"-chain={self.chain}"] + list( args ) command_line = [binary] + args if self.config["environment"]["EMULATOR"]: command_line = [self.config["environment"]["EMULATOR"]] + command_line return subprocess.Popen( command_line, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, ) def assert_raises_tool_error(self, error, *args): p = self.bitcoin_wallet_process(*args) stdout, stderr = p.communicate() assert_equal(p.poll(), 1) assert_equal(stdout, "") assert_equal(stderr.strip(), error) def assert_tool_output(self, output, *args): p = self.bitcoin_wallet_process(*args) stdout, stderr = p.communicate() assert_equal(stderr, "") assert_equal(stdout, output) assert_equal(p.poll(), 0) def wallet_shasum(self): h = hashlib.sha1() mv = memoryview(bytearray(BUFFER_SIZE)) with open(self.wallet_path, "rb", buffering=0) as f: for n in iter(lambda: f.readinto(mv), 0): h.update(mv[:n]) return h.hexdigest() def wallet_timestamp(self): return os.path.getmtime(self.wallet_path) def wallet_permissions(self): return oct(os.lstat(self.wallet_path).st_mode)[-3:] def log_wallet_timestamp_comparison(self, old, new): result = "unchanged" if new == old else "increased!" self.log.debug(f"Wallet file timestamp {result}") def test_invalid_tool_commands_and_args(self): self.log.info( "Testing that various invalid commands raise with specific error messages" ) self.assert_raises_tool_error("Invalid command: foo", "foo") # `bitcoin-wallet help` raises an error. Use `bitcoin-wallet -help`. self.assert_raises_tool_error("Invalid command: help", "help") self.assert_raises_tool_error( ( "Error: two methods provided (info and create). Only one method should" " be provided." ), "info", "create", ) self.assert_raises_tool_error( "Error parsing command line arguments: Invalid parameter -foo", "-foo" ) locked_dir = os.path.join(self.options.tmpdir, "node0", "regtest", "wallets") self.assert_raises_tool_error( f'Error initializing wallet database environment "{locked_dir}"!', f"-wallet={self.default_wallet_name}", "info", ) path = os.path.join( self.options.tmpdir, "node0", "regtest", "wallets", "nonexistent.dat" ) self.assert_raises_tool_error( f"Failed to load database path '{path}'. Path does not exist.", "-wallet=nonexistent.dat", "info", ) def test_tool_wallet_info(self): # Stop the node to close the wallet to call the info command. self.stop_node(0) self.log.info("Calling wallet tool info, testing output") # # TODO: Wallet tool info should work with wallet file permissions set to # read-only without raising: # "Error loading wallet.dat. Is wallet being used by another process?" # The following lines should be uncommented and the tests still succeed: # # self.log.debug('Setting wallet file permissions to 400 (read-only)') # os.chmod(self.wallet_path, stat.S_IRUSR) # assert self.wallet_permissions() in ['400', '666'] # Sanity check. 666 because Appveyor. # shasum_before = self.wallet_shasum() timestamp_before = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp before calling info: {timestamp_before}") - out = textwrap.dedent( - """\ + out = textwrap.dedent("""\ Wallet info =========== Encrypted: no HD (hd seed available): yes Keypool Size: 2 Transactions: 0 Address Book: 1 - """ - ) + """) self.assert_tool_output(out, f"-wallet={self.default_wallet_name}", "info") timestamp_after = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp after calling info: {timestamp_after}") self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after) self.log.debug("Setting wallet file permissions back to 600 (read/write)") os.chmod(self.wallet_path, stat.S_IRUSR | stat.S_IWUSR) # Sanity check. 666 because Appveyor. assert self.wallet_permissions() in ["600", "666"] # # TODO: Wallet tool info should not write to the wallet file. # The following lines should be uncommented and the tests still succeed: # # assert_equal(timestamp_before, timestamp_after) # shasum_after = self.wallet_shasum() # assert_equal(shasum_before, shasum_after) # self.log.debug('Wallet file shasum unchanged\n') def test_tool_wallet_info_after_transaction(self): """ Mutate the wallet with a transaction to verify that the info command output changes accordingly. """ self.start_node(0) self.log.info("Generating transaction to mutate wallet") self.generate(self.nodes[0], 1) self.stop_node(0) self.log.info( "Calling wallet tool info after generating a transaction, testing output" ) shasum_before = self.wallet_shasum() timestamp_before = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp before calling info: {timestamp_before}") - out = textwrap.dedent( - """\ + out = textwrap.dedent("""\ Wallet info =========== Encrypted: no HD (hd seed available): yes Keypool Size: 2 Transactions: 1 Address Book: 1 - """ - ) + """) self.assert_tool_output(out, f"-wallet={self.default_wallet_name}", "info") shasum_after = self.wallet_shasum() timestamp_after = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp after calling info: {timestamp_after}") self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after) # # TODO: Wallet tool info should not write to the wallet file. # This assertion should be uncommented and succeed: # assert_equal(timestamp_before, timestamp_after) assert_equal(shasum_before, shasum_after) self.log.debug("Wallet file shasum unchanged\n") def test_tool_wallet_create_on_existing_wallet(self): self.log.info( "Calling wallet tool create on an existing wallet, testing output" ) shasum_before = self.wallet_shasum() timestamp_before = self.wallet_timestamp() self.log.debug( f"Wallet file timestamp before calling create: {timestamp_before}" ) - out = textwrap.dedent( - """\ + out = textwrap.dedent("""\ Topping up keypool... Wallet info =========== Encrypted: no HD (hd seed available): yes Keypool Size: 2000 Transactions: 0 Address Book: 0 - """ - ) + """) self.assert_tool_output(out, "-wallet=foo", "create") shasum_after = self.wallet_shasum() timestamp_after = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp after calling create: {timestamp_after}") self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after) assert_equal(timestamp_before, timestamp_after) assert_equal(shasum_before, shasum_after) self.log.debug("Wallet file shasum unchanged\n") def test_getwalletinfo_on_different_wallet(self): self.log.info("Starting node with arg -wallet=foo") self.start_node(0, ["-nowallet", "-wallet=foo"]) self.log.info( 'Calling getwalletinfo on a different wallet ("foo"), testing output' ) shasum_before = self.wallet_shasum() timestamp_before = self.wallet_timestamp() self.log.debug( f"Wallet file timestamp before calling getwalletinfo: {timestamp_before}" ) out = self.nodes[0].getwalletinfo() self.stop_node(0) shasum_after = self.wallet_shasum() timestamp_after = self.wallet_timestamp() self.log.debug( f"Wallet file timestamp after calling getwalletinfo: {timestamp_after}" ) assert_equal(0, out["txcount"]) assert_equal(1000, out["keypoolsize"]) assert_equal(1000, out["keypoolsize_hd_internal"]) assert_equal(True, "hdseedid" in out) self.log_wallet_timestamp_comparison(timestamp_before, timestamp_after) assert_equal(timestamp_before, timestamp_after) assert_equal(shasum_after, shasum_before) self.log.debug("Wallet file shasum unchanged\n") def test_salvage(self): # TODO: Check salvage actually salvages and doesn't break things. # https://github.com/bitcoin/bitcoin/issues/7463 self.log.info("Check salvage") self.start_node(0, ["-wallet=salvage"]) self.stop_node(0) self.assert_tool_output("", "-wallet=salvage", "salvage") def run_test(self): self.wallet_path = os.path.join( self.nodes[0].datadir, self.chain, "wallets", self.default_wallet_name, self.wallet_data_filename, ) self.test_invalid_tool_commands_and_args() # Warning: The following tests are order-dependent. self.test_tool_wallet_info() self.test_tool_wallet_info_after_transaction() if not self.options.descriptors: # TODO: Wallet tool needs more create options at which point these # can be enabled. self.test_tool_wallet_create_on_existing_wallet() self.test_getwalletinfo_on_different_wallet() # Salvage is a legacy wallet only thing self.test_salvage() if __name__ == "__main__": ToolWalletTest().main() diff --git a/test/fuzz/test_runner.py b/test/fuzz/test_runner.py index 9b64687d3..79fa88154 100755 --- a/test/fuzz/test_runner.py +++ b/test/fuzz/test_runner.py @@ -1,298 +1,308 @@ #!/usr/bin/env python3 # Copyright (c) 2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Run fuzz test targets. """ import argparse import configparser import logging import os import subprocess import sys from concurrent.futures import ThreadPoolExecutor, as_completed def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, - description='''Run the fuzz targets with all inputs from the seed_dir once.''', + description="""Run the fuzz targets with all inputs from the seed_dir once.""", ) parser.add_argument( "-l", "--loglevel", dest="loglevel", default="INFO", - help="log events at this level and higher to the console. Can be set to DEBUG, INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output all logs to console.", + help=( + "log events at this level and higher to the console. Can be set to DEBUG," + " INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output" + " all logs to console." + ), ) parser.add_argument( - '--valgrind', - action='store_true', - help='If true, run fuzzing binaries under the valgrind memory error detector', + "--valgrind", + action="store_true", + help="If true, run fuzzing binaries under the valgrind memory error detector", ) parser.add_argument( - '-x', - '--exclude', + "-x", + "--exclude", help="A comma-separated list of targets to exclude", ) parser.add_argument( - '--par', - '-j', + "--par", + "-j", type=int, default=4, - help='How many targets to merge or execute in parallel.', + help="How many targets to merge or execute in parallel.", ) parser.add_argument( - 'seed_dir', - help='The seed corpus to run on (must contain subfolders for each fuzz target).', + "seed_dir", + help=( + "The seed corpus to run on (must contain subfolders for each fuzz target)." + ), ) parser.add_argument( - 'target', - nargs='*', - help='The target(s) to run. Default is to run all targets.', + "target", + nargs="*", + help="The target(s) to run. Default is to run all targets.", ) parser.add_argument( - '--m_dir', - help='Merge inputs from this directory into the seed_dir. Needs /target subdirectory.', + "--m_dir", + help=( + "Merge inputs from this directory into the seed_dir. Needs /target" + " subdirectory." + ), ) parser.add_argument( - '-g', - '--generate', - action='store_true', - help='Create new corpus seeds (or extend the existing ones) by running' - ' the given targets for a finite number of times. Outputs them to' - ' the passed seed_dir.' + "-g", + "--generate", + action="store_true", + help=( + "Create new corpus seeds (or extend the existing ones) by running" + " the given targets for a finite number of times. Outputs them to" + " the passed seed_dir." + ), ) args = parser.parse_args() # Set up logging logging.basicConfig( - format='%(message)s', - level=int(args.loglevel) if args.loglevel.isdigit( - ) else args.loglevel.upper(), + format="%(message)s", + level=int(args.loglevel) if args.loglevel.isdigit() else args.loglevel.upper(), ) # Read config generated by configure. config = configparser.ConfigParser() configfile = f"{os.path.abspath(os.path.dirname(__file__))}/../config.ini" config.read_file(open(configfile, encoding="utf8")) if not config["components"].getboolean("ENABLE_FUZZ"): logging.error("Must have fuzz targets built") sys.exit(1) - test_dir = os.path.join( - config["environment"]["BUILDDIR"], 'src', 'test', 'fuzz') + test_dir = os.path.join(config["environment"]["BUILDDIR"], "src", "test", "fuzz") # Build list of tests test_list_all = [ - f for f in os.listdir(test_dir) - if os.path.isfile(os.path.join(test_dir, f)) and - os.access(os.path.join(test_dir, f), os.X_OK)] + f + for f in os.listdir(test_dir) + if os.path.isfile(os.path.join(test_dir, f)) + and os.access(os.path.join(test_dir, f), os.X_OK) + ] if not test_list_all: logging.error("No fuzz targets found") sys.exit(1) logging.debug( - f"{len(test_list_all)} fuzz target(s) found: {' '.join(sorted(test_list_all))}") + f"{len(test_list_all)} fuzz target(s) found: {' '.join(sorted(test_list_all))}" + ) # By default run all args.target = args.target or test_list_all test_list_error = list(set(args.target).difference(set(test_list_all))) if test_list_error: - logging.error( - f"Unknown fuzz targets selected: {test_list_error}") - test_list_selection = list( - set(test_list_all).intersection(set(args.target))) + logging.error(f"Unknown fuzz targets selected: {test_list_error}") + test_list_selection = list(set(test_list_all).intersection(set(args.target))) if not test_list_selection: logging.error("No fuzz targets selected") if args.exclude: for excluded_target in args.exclude.split(","): if excluded_target not in test_list_selection: logging.error( - f"Target \"{excluded_target}\" not found in current target list.") + f'Target "{excluded_target}" not found in current target list.' + ) continue test_list_selection.remove(excluded_target) test_list_selection.sort() logging.info( "{} of {} detected fuzz target(s) selected: {}".format( - len(test_list_selection), - len(test_list_all), - " ".join(test_list_selection))) + len(test_list_selection), len(test_list_all), " ".join(test_list_selection) + ) + ) if not args.generate: test_list_seedless = [] for t in test_list_selection: corpus_path = os.path.join(args.seed_dir, t) - if not os.path.exists(corpus_path) or len( - os.listdir(corpus_path)) == 0: + if not os.path.exists(corpus_path) or len(os.listdir(corpus_path)) == 0: test_list_seedless.append(t) test_list_seedless.sort() if test_list_seedless: logging.info( "Fuzzing harnesses lacking a seed corpus: {}".format( " ".join(test_list_seedless) ) ) logging.info( - "Please consider adding a fuzz seed corpus at https://github.com/Bitcoin-ABC/qa-assets") + "Please consider adding a fuzz seed corpus at" + " https://github.com/Bitcoin-ABC/qa-assets" + ) try: help_output = subprocess.run( args=[ os.path.join(test_dir, test_list_selection[0]), - '-help=1', + "-help=1", ], timeout=20, check=True, stderr=subprocess.PIPE, universal_newlines=True, ).stderr if "libFuzzer" not in help_output: logging.error("Must be built with libFuzzer") sys.exit(1) except subprocess.TimeoutExpired: - logging.error( - "subprocess timed out: Currently only libFuzzer is supported") + logging.error("subprocess timed out: Currently only libFuzzer is supported") sys.exit(1) with ThreadPoolExecutor(max_workers=args.par) as fuzz_pool: if args.generate: return generate_corpus_seeds( fuzz_pool=fuzz_pool, test_dir=test_dir, seed_dir=args.seed_dir, targets=test_list_selection, ) if args.m_dir: merge_inputs( fuzz_pool=fuzz_pool, corpus=args.seed_dir, test_list=test_list_selection, test_dir=test_dir, merge_dir=args.m_dir, ) return run_once( fuzz_pool=fuzz_pool, corpus=args.seed_dir, test_list=test_list_selection, test_dir=test_dir, use_valgrind=args.valgrind, ) def generate_corpus_seeds(*, fuzz_pool, test_dir, seed_dir, targets): """Generates new corpus seeds. Run {targets} without input, and outputs the generated corpus seeds to {seed_dir}. """ logging.info(f"Generating corpus seeds to {seed_dir}") def job(command): logging.debug(f"Running '{' '.join(command)}'\n") - logging.debug("Command '{}' output:\n'{}'\n".format( - ' '.join(command), - subprocess.run(command, check=True, stderr=subprocess.PIPE, - universal_newlines=True).stderr - )) + logging.debug( + "Command '{}' output:\n'{}'\n".format( + " ".join(command), + subprocess.run( + command, check=True, stderr=subprocess.PIPE, universal_newlines=True + ).stderr, + ) + ) futures = [] for target in targets: target_seed_dir = os.path.join(seed_dir, target) os.makedirs(target_seed_dir, exist_ok=True) command = [ os.path.join(test_dir, target), "-runs=100000", target_seed_dir, ] futures.append(fuzz_pool.submit(job, command)) for future in as_completed(futures): future.result() def merge_inputs(*, fuzz_pool, corpus, test_list, test_dir, merge_dir): logging.info( - f"Merge the inputs in the passed dir into the seed_dir. Passed dir {merge_dir}") + f"Merge the inputs in the passed dir into the seed_dir. Passed dir {merge_dir}" + ) jobs = [] for t in test_list: args = [ os.path.join(test_dir, t), - '-merge=1', + "-merge=1", # Also done by oss-fuzz # https://github.com/google/oss-fuzz/issues/1406#issuecomment-387790487 - '-use_value_profile=1', + "-use_value_profile=1", os.path.join(corpus, t), os.path.join(merge_dir, t), ] os.makedirs(os.path.join(corpus, t), exist_ok=True) os.makedirs(os.path.join(merge_dir, t), exist_ok=True) def job(t, args): output = f"Run {t} with args {' '.join(args)}\n" - output += subprocess.run(args, - check=True, - stderr=subprocess.PIPE, - universal_newlines=True).stderr + output += subprocess.run( + args, check=True, stderr=subprocess.PIPE, universal_newlines=True + ).stderr logging.debug(output) jobs.append(fuzz_pool.submit(job, t, args)) for future in as_completed(jobs): future.result() def run_once(*, fuzz_pool, corpus, test_list, test_dir, use_valgrind): jobs = [] for t in test_list: corpus_path = os.path.join(corpus, t) os.makedirs(corpus_path, exist_ok=True) args = [ os.path.join(test_dir, t), - '-runs=1', + "-runs=1", corpus_path, ] if use_valgrind: - args = [ - 'valgrind', - '--quiet', - '--error-exitcode=1'] + args + args = ["valgrind", "--quiet", "--error-exitcode=1"] + args def job(t, args): - output = f'Run {t} with args {args}' + output = f"Run {t} with args {args}" result = subprocess.run( - args, - stderr=subprocess.PIPE, - universal_newlines=True) + args, stderr=subprocess.PIPE, universal_newlines=True + ) output += result.stderr return output, result jobs.append(fuzz_pool.submit(job, t, args)) for future in as_completed(jobs): output, result = future.result() logging.debug(output) try: result.check_returncode() except subprocess.CalledProcessError as e: if e.stdout: logging.info(e.stdout) if e.stderr: logging.info(e.stderr) logging.info( - "Target \"{}\" failed with exit code {}".format( - " ".join( - result.args), - e.returncode)) + 'Target "{}" failed with exit code {}'.format( + " ".join(result.args), e.returncode + ) + ) sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/test/lint/check-doc.py b/test/lint/check-doc.py index e8ab2bb3a..0f3171720 100755 --- a/test/lint/check-doc.py +++ b/test/lint/check-doc.py @@ -1,103 +1,103 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2019 The Bitcoin Core developers # Copyright (c) 2019 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. -''' +""" This checks if all command line args are documented. Return value is 0 to indicate no error. Author: @MarcoFalke -''' +""" import glob import re from pprint import PrettyPrinter from subprocess import check_output -TOP_LEVEL = 'git rev-parse --show-toplevel' -FOLDERS_SRC = ['/src/**/', '/chronik/**/'] -FOLDERS_TEST = ['/src/**/test/', '/chronik/test/**/'] +TOP_LEVEL = "git rev-parse --show-toplevel" +FOLDERS_SRC = ["/src/**/", "/chronik/**/"] +FOLDERS_TEST = ["/src/**/test/", "/chronik/test/**/"] EXTENSIONS = ["*.c", "*.h", "*.cpp", "*.cc", "*.hpp"] REGEX_ARG = r'(?:ForceSet|SoftSet|Get|Is)(?:Bool|Int)?Args?(?:Set)?\(\s*"(-[^"]+)"' REGEX_DOC = r'AddArg\(\s*"(-[^"=]+?)(?:=|")' # list false positive unknows arguments SET_FALSE_POSITIVE_UNKNOWNS = { - '-includeconf', - '-regtest', - '-testnet', - '-zmqpubhashblock', - '-zmqpubhashtx', - '-zmqpubrawblock', - '-zmqpubrawtx', - '-zmqpubhashblockhwm', - '-zmqpubhashtxhwm', - '-zmqpubrawblockhwm', - '-zmqpubrawtxhwm', - '-zmqpubsequence', - '-zmqpubsequencehwm', + "-includeconf", + "-regtest", + "-testnet", + "-zmqpubhashblock", + "-zmqpubhashtx", + "-zmqpubrawblock", + "-zmqpubrawtx", + "-zmqpubhashblockhwm", + "-zmqpubhashtxhwm", + "-zmqpubrawblockhwm", + "-zmqpubrawtxhwm", + "-zmqpubsequence", + "-zmqpubsequencehwm", } # list false positive undocumented arguments SET_FALSE_POSITIVE_UNDOCUMENTED = { - '-help', - '-h', - '-avalanchepreconsensus', - '-dbcrashratio', - '-enableminerfund', - '-forcecompactdb', - '-maxaddrtosend', - '-parkdeepreorg', - '-automaticunparking', + "-help", + "-h", + "-avalanchepreconsensus", + "-dbcrashratio", + "-enableminerfund", + "-forcecompactdb", + "-maxaddrtosend", + "-parkdeepreorg", + "-automaticunparking", # Removed arguments that now just print a helpful error message - '-zapwallettxes', - '-replayprotectionactivationtime', + "-zapwallettxes", + "-replayprotectionactivationtime", # Remove after May 2023 upgrade - '-wellingtonactivationtime', + "-wellingtonactivationtime", } def main(): - top_level = check_output(TOP_LEVEL, shell=True, - universal_newlines=True, encoding='utf8').strip() + top_level = check_output( + TOP_LEVEL, shell=True, universal_newlines=True, encoding="utf8" + ).strip() source_files = [] test_files = [] for extension in EXTENSIONS: for folder_src in FOLDERS_SRC: - source_files += glob.glob(top_level + - folder_src + extension, recursive=True) + source_files += glob.glob( + top_level + folder_src + extension, recursive=True + ) for folder_test in FOLDERS_TEST: - test_files += glob.glob(top_level + - folder_test + - extension, recursive=True) + test_files += glob.glob(top_level + folder_test + extension, recursive=True) files = set(source_files) - set(test_files) args_used = set() args_docd = set() for file in files: - with open(file, 'r', encoding='utf-8') as f: + with open(file, "r", encoding="utf-8") as f: content = f.read() args_used |= set(re.findall(re.compile(REGEX_ARG), content)) args_docd |= set(re.findall(re.compile(REGEX_DOC), content)) args_used |= SET_FALSE_POSITIVE_UNKNOWNS args_docd |= SET_FALSE_POSITIVE_UNDOCUMENTED args_need_doc = args_used - args_docd args_unknown = args_docd - args_used pp = PrettyPrinter() print(f"Args used : {len(args_used)}") print(f"Args documented : {len(args_docd)}") print(f"Args undocumented: {len(args_need_doc)}") pp.pprint(args_need_doc) print(f"Args unknown : {len(args_unknown)}") pp.pprint(args_unknown) if __name__ == "__main__": main() diff --git a/test/lint/lint-format-strings.py b/test/lint/lint-format-strings.py index bb490324a..cc4a6ec14 100755 --- a/test/lint/lint-format-strings.py +++ b/test/lint/lint-format-strings.py @@ -1,346 +1,400 @@ #!/usr/bin/env python3 # # Copyright (c) 2018-2019 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. # # Lint format strings: This program checks that the number of arguments passed # to a variadic format string function matches the number of format specifiers # in the format string. import argparse import doctest import re import sys FALSE_POSITIVES = [ ("src/dbwrapper.cpp", "vsnprintf(p, limit - p, format, backup_ap)"), ("src/index/base.cpp", "FatalError(const char *fmt, const Args &...args)"), - ("src/netbase.cpp", "LogConnectFailure(bool manual_connection, const char *fmt, const Args &...args)"), - ("src/util/system.cpp", - "strprintf(_(COPYRIGHT_HOLDERS).translated, COPYRIGHT_HOLDERS_SUBSTITUTION)"), - ("src/validationinterface.cpp", - "LogPrint(BCLog::VALIDATION, fmt \"\\n\", __VA_ARGS__)"), + ( + "src/netbase.cpp", + ( + "LogConnectFailure(bool manual_connection, const char *fmt, const Args" + " &...args)" + ), + ), + ( + "src/util/system.cpp", + "strprintf(_(COPYRIGHT_HOLDERS).translated, COPYRIGHT_HOLDERS_SUBSTITUTION)", + ), + ( + "src/validationinterface.cpp", + 'LogPrint(BCLog::VALIDATION, fmt "\\n", __VA_ARGS__)', + ), ("src/tinyformat.h", "printf(const char *fmt, const Args &...args)"), ("src/tinyformat.h", "printf(const char *fmt, TINYFORMAT_VARARGS(n))"), - ("src/wallet/wallet.h", - "LogPrintf((\"%s \" + fmt).c_str(), GetDisplayName(), parameters...)"), - ("src/wallet/scriptpubkeyman.h", - "WalletLogPrintf(std::string fmt, Params... parameters)"), - ("src/wallet/scriptpubkeyman.h", - "LogPrintf((\"%s \" + fmt).c_str(), m_storage.GetDisplayName(), parameters...)"), - ("src/wallet/scriptpubkeyman.h", - "WalletLogPrintf(const std::string& fmt, const Params&... parameters)"), + ( + "src/wallet/wallet.h", + 'LogPrintf(("%s " + fmt).c_str(), GetDisplayName(), parameters...)', + ), + ( + "src/wallet/scriptpubkeyman.h", + "WalletLogPrintf(std::string fmt, Params... parameters)", + ), + ( + "src/wallet/scriptpubkeyman.h", + 'LogPrintf(("%s " + fmt).c_str(), m_storage.GetDisplayName(), parameters...)', + ), + ( + "src/wallet/scriptpubkeyman.h", + "WalletLogPrintf(const std::string& fmt, const Params&... parameters)", + ), ] FUNCTION_NAMES_AND_NUMBER_OF_LEADING_ARGUMENTS = [ ("FatalError", 0), ("fprintf", 1), ("LogConnectFailure", 1), ("LogPrint", 1), ("LogPrintf", 0), ("printf", 0), ("snprintf", 2), ("sprintf", 1), ("strprintf", 0), ("tfm::format", 1), # Assuming tfm::::format(std::ostream&, ... ("vfprintf", 1), ("vprintf", 1), ("vsnprintf", 1), ("vsprintf", 1), ] def parse_function_calls(function_name, source_code): """Return an array with all calls to function function_name in string source_code. Preprocessor directives and C++ style comments ("//") in source_code are removed. >>> len(parse_function_calls("foo", "foo();bar();foo();bar();")) 2 >>> parse_function_calls("foo", "foo(1);bar(1);foo(2);bar(2);")[0].startswith("foo(1);") True >>> parse_function_calls("foo", "foo(1);bar(1);foo(2);bar(2);")[1].startswith("foo(2);") True >>> len(parse_function_calls("foo", "foo();bar();// foo();bar();")) 1 >>> len(parse_function_calls("foo", "#define FOO foo();")) 0 """ - assert isinstance(function_name, str) and isinstance( - source_code, str) and function_name - lines = [re.sub("// .*", " ", line).strip() - for line in source_code.split("\n") - if not line.strip().startswith("#")] + assert ( + isinstance(function_name, str) + and isinstance(source_code, str) + and function_name + ) + lines = [ + re.sub("// .*", " ", line).strip() + for line in source_code.split("\n") + if not line.strip().startswith("#") + ] return re.findall( - r"[^a-zA-Z_](?=({}\(.*).*)".format(function_name), f" {' '.join(lines)}") + r"[^a-zA-Z_](?=({}\(.*).*)".format(function_name), f" {' '.join(lines)}" + ) def normalize(s): """Return a normalized version of string s with newlines, tabs and C style comments ("/* ... */") replaced with spaces. Multiple spaces are replaced with a single space. >>> normalize(" /* nothing */ foo\tfoo /* bar */ foo ") 'foo foo foo' """ assert isinstance(s, str) s = s.replace("\n", " ") s = s.replace("\t", " ") s = re.sub(r"/\*.*?\*/", " ", s) s = re.sub(" {2,}", " ", s) return s.strip() ESCAPE_MAP = { r"\n": "[escaped-newline]", r"\t": "[escaped-tab]", - r'\"': "[escaped-quote]", + r"\"": "[escaped-quote]", } def escape(s): """Return the escaped version of string s with "\\\"", "\\n" and "\\t" escaped as "[escaped-backslash]", "[escaped-newline]" and "[escaped-tab]". >>> unescape(escape("foo")) == "foo" True >>> escape(r'foo \\t foo \\n foo \\\\ foo \\ foo \\"bar\\"') 'foo [escaped-tab] foo [escaped-newline] foo \\\\\\\\ foo \\\\ foo [escaped-quote]bar[escaped-quote]' """ assert isinstance(s, str) for raw_value, escaped_value in ESCAPE_MAP.items(): s = s.replace(raw_value, escaped_value) return s def unescape(s): """Return the unescaped version of escaped string s. Reverses the replacements made in function escape(s). >>> unescape(escape("bar")) 'bar' >>> unescape("foo [escaped-tab] foo [escaped-newline] foo \\\\\\\\ foo \\\\ foo [escaped-quote]bar[escaped-quote]") 'foo \\\\t foo \\\\n foo \\\\\\\\ foo \\\\ foo \\\\"bar\\\\"' """ assert isinstance(s, str) for raw_value, escaped_value in ESCAPE_MAP.items(): s = s.replace(escaped_value, raw_value) return s def parse_function_call_and_arguments(function_name, function_call): """Split string function_call into an array of strings consisting of: * the string function_call followed by "(" * the function call argument #1 * ... * the function call argument #n * a trailing ");" The strings returned are in escaped form. See escape(...). >>> parse_function_call_and_arguments("foo", 'foo("%s", "foo");') ['foo(', '"%s",', ' "foo"', ')'] >>> parse_function_call_and_arguments("foo", 'foo("%s", "foo");') ['foo(', '"%s",', ' "foo"', ')'] >>> parse_function_call_and_arguments("foo", 'foo("%s %s", "foo", "bar");') ['foo(', '"%s %s",', ' "foo",', ' "bar"', ')'] >>> parse_function_call_and_arguments("fooprintf", 'fooprintf("%050d", i);') ['fooprintf(', '"%050d",', ' i', ')'] >>> parse_function_call_and_arguments("foo", 'foo(bar(foobar(barfoo("foo"))), foobar); barfoo') ['foo(', 'bar(foobar(barfoo("foo"))),', ' foobar', ')'] >>> parse_function_call_and_arguments("foo", "foo()") ['foo(', '', ')'] >>> parse_function_call_and_arguments("foo", "foo(123)") ['foo(', '123', ')'] >>> parse_function_call_and_arguments("foo", 'foo("foo")') ['foo(', '"foo"', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", std::wstring_convert,wchar_t>().to_bytes(buf), err);') ['strprintf(', '"%s (%d)",', ' std::wstring_convert,wchar_t>().to_bytes(buf),', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo().to_bytes(buf), err);') ['strprintf(', '"%s (%d)",', ' foo().to_bytes(buf),', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo().to_bytes(buf), err);') ['strprintf(', '"%s (%d)",', ' foo().to_bytes(buf),', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo << 1, err);') ['strprintf(', '"%s (%d)",', ' foo << 1,', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo() >> 1, err);') ['strprintf(', '"%s (%d)",', ' foo() >> 1,', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo < 1 ? bar : foobar, err);') ['strprintf(', '"%s (%d)",', ' foo < 1 ? bar : foobar,', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo < 1, err);') ['strprintf(', '"%s (%d)",', ' foo < 1,', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo > 1 ? bar : foobar, err);') ['strprintf(', '"%s (%d)",', ' foo > 1 ? bar : foobar,', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo > 1, err);') ['strprintf(', '"%s (%d)",', ' foo > 1,', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo <= 1, err);') ['strprintf(', '"%s (%d)",', ' foo <= 1,', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo <= bar<1, 2>(1, 2), err);') ['strprintf(', '"%s (%d)",', ' foo <= bar<1, 2>(1, 2),', ' err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo>foo<1,2>(1,2)?bar:foobar,err)'); ['strprintf(', '"%s (%d)",', ' foo>foo<1,2>(1,2)?bar:foobar,', 'err', ')'] >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo>foo<1,2>(1,2),err)'); ['strprintf(', '"%s (%d)",', ' foo>foo<1,2>(1,2),', 'err', ')'] """ - assert isinstance(function_name, str) and isinstance( - function_call, str) and function_name + assert ( + isinstance(function_name, str) + and isinstance(function_call, str) + and function_name + ) remaining = normalize(escape(function_call)) expected_function_call = f"{function_name}(" assert remaining.startswith(expected_function_call) parts = [expected_function_call] - remaining = remaining[len(expected_function_call):] + remaining = remaining[len(expected_function_call) :] open_parentheses = 1 open_template_arguments = 0 in_string = False parts.append("") for i, char in enumerate(remaining): parts.append(parts.pop() + char) - if char == "\"": + if char == '"': in_string = not in_string continue if in_string: continue if char == "(": open_parentheses += 1 continue if char == ")": open_parentheses -= 1 if open_parentheses > 1: continue if open_parentheses == 0: parts.append(parts.pop()[:-1]) parts.append(char) break prev_char = remaining[i - 1] if i - 1 >= 0 else None next_char = remaining[i + 1] if i + 1 <= len(remaining) - 1 else None - if (char == "<" and next_char not in [" ", "<", "="] - and prev_char not in [" ", "<"]): + if ( + char == "<" + and next_char not in [" ", "<", "="] + and prev_char not in [" ", "<"] + ): open_template_arguments += 1 continue - if (char == ">" and next_char not in [" ", ">", "="] and - prev_char not in [" ", ">"] and open_template_arguments > 0): + if ( + char == ">" + and next_char not in [" ", ">", "="] + and prev_char not in [" ", ">"] + and open_template_arguments > 0 + ): open_template_arguments -= 1 if open_template_arguments > 0: continue if char == ",": parts.append("") return parts def parse_string_content(argument): """Return the text within quotes in string argument. >>> parse_string_content('1 "foo %d bar" 2') 'foo %d bar' >>> parse_string_content('1 foobar 2') '' >>> parse_string_content('1 "bar" 2') 'bar' >>> parse_string_content('1 "foo" 2 "bar" 3') 'foobar' >>> parse_string_content('1 "foo" 2 " " "bar" 3') 'foo bar' >>> parse_string_content('""') '' >>> parse_string_content('') '' >>> parse_string_content('1 2 3') '' """ assert isinstance(argument, str) string_content = "" in_string = False for char in normalize(escape(argument)): - if char == "\"": + if char == '"': in_string = not in_string elif in_string: string_content += char return string_content def count_format_specifiers(format_string): """Return the number of format specifiers in string format_string. >>> count_format_specifiers("foo bar foo") 0 >>> count_format_specifiers("foo %d bar foo") 1 >>> count_format_specifiers("foo %d bar %i foo") 2 >>> count_format_specifiers("foo %d bar %i foo %% foo") 2 >>> count_format_specifiers("foo %d bar %i foo %% foo %d foo") 3 >>> count_format_specifiers("foo %d bar %i foo %% foo %*d foo") 4 >>> count_format_specifiers("%%%u") 1 >>> [count_format_specifiers(i * "%" + "u") for i in range(10)] [0, 1, 0, 1, 0, 1, 0, 1, 0, 1] """ assert isinstance(format_string, str) n = 0 in_specifier = False # remove any number of escaped % characters format_string = format_string.replace("%%", "") for i, char in enumerate(format_string): if char == "%": in_specifier = True n += 1 elif char in "aAcdeEfFgGinopsuxX": in_specifier = False elif in_specifier and char == "*": n += 1 return n def main(args_in): - """ Return a string output with information on string format errors + """Return a string output with information on string format errors >>> main(["test/lint/lint-format-strings-tests.txt"]) test/lint/lint-format-strings-tests.txt: Expected 1 argument(s) after format string but found 2 argument(s): printf("%d", 1, 2) test/lint/lint-format-strings-tests.txt: Expected 2 argument(s) after format string but found 3 argument(s): printf("%a %b", 1, 2, "anything") test/lint/lint-format-strings-tests.txt: Expected 1 argument(s) after format string but found 0 argument(s): printf("%d") test/lint/lint-format-strings-tests.txt: Expected 3 argument(s) after format string but found 2 argument(s): printf("%a%b%z", 1, "anything") test/lint/lint-format-strings-tests.txt: Expected 0 argument(s) after format string but found 1 argument(s): strprintf("%%%%u", scope_id) test/lint/lint-format-strings-tests.txt: Expected 1 argument(s) after format string but found 0 argument(s): strprintf("%%%u") >>> main(["test/lint/lint-format-strings-tests-skip-arguments.txt"]) test/lint/lint-format-strings-tests-skip-arguments.txt: Expected 1 argument(s) after format string but found 2 argument(s): fprintf(skipped, "%d", 1, 2) test/lint/lint-format-strings-tests-skip-arguments.txt: Expected 1 argument(s) after format string but found 0 argument(s): fprintf(skipped, "%d") test/lint/lint-format-strings-tests-skip-arguments.txt: Expected 1 argument(s) after format string but found 2 argument(s): snprintf(skip1, skip2, "%d", 1, 2) test/lint/lint-format-strings-tests-skip-arguments.txt: Expected 1 argument(s) after format string but found 0 argument(s): snprintf(skip1, skip2, "%d") test/lint/lint-format-strings-tests-skip-arguments.txt: Could not parse function call string "snprintf(...)": snprintf(skip1, "%d") """ - parser = argparse.ArgumentParser(description="This program checks that the number of arguments passed " - "to a variadic format string function matches the number of format " - "specifiers in the format string.") - parser.add_argument("file", type=argparse.FileType( - "r", encoding="utf-8"), nargs="*", help="C++ source code file (e.g. foo.cpp)") + parser = argparse.ArgumentParser( + description=( + "This program checks that the number of arguments passed " + "to a variadic format string function matches the number of format " + "specifiers in the format string." + ) + ) + parser.add_argument( + "file", + type=argparse.FileType("r", encoding="utf-8"), + nargs="*", + help="C++ source code file (e.g. foo.cpp)", + ) args = parser.parse_args(args_in) for f in args.file: file_content = f.read() - for (function_name, - skip_arguments) in FUNCTION_NAMES_AND_NUMBER_OF_LEADING_ARGUMENTS: - for function_call_str in parse_function_calls( - function_name, file_content): + for ( + function_name, + skip_arguments, + ) in FUNCTION_NAMES_AND_NUMBER_OF_LEADING_ARGUMENTS: + for function_call_str in parse_function_calls(function_name, file_content): parts = parse_function_call_and_arguments( - function_name, function_call_str) + function_name, function_call_str + ) relevant_function_call_str = unescape("".join(parts))[:512] if (f.name, relevant_function_call_str) in FALSE_POSITIVES: continue if len(parts) < 3 + skip_arguments: - print("{}: Could not parse function call string \"{}(...)\": {}".format( - f.name, function_name, relevant_function_call_str)) + print( + '{}: Could not parse function call string "{}(...)": {}'.format( + f.name, function_name, relevant_function_call_str + ) + ) continue argument_count = len(parts) - 3 - skip_arguments format_str = parse_string_content(parts[1 + skip_arguments]) format_specifier_count = count_format_specifiers(format_str) if format_specifier_count != argument_count: - print("{}: Expected {} argument(s) after format string but found {} argument(s): {}".format( - f.name, format_specifier_count, argument_count, relevant_function_call_str)) + print( + "{}: Expected {} argument(s) after format string but found {}" + " argument(s): {}".format( + f.name, + format_specifier_count, + argument_count, + relevant_function_call_str, + ) + ) continue if __name__ == "__main__": doctest.testmod() main(sys.argv[1:]) diff --git a/test/util/bitcoin-util-test.py b/test/util/bitcoin-util-test.py index 1eb1f0fb5..4727ef468 100755 --- a/test/util/bitcoin-util-test.py +++ b/test/util/bitcoin-util-test.py @@ -1,191 +1,203 @@ #!/usr/bin/env python3 # Copyright 2014 BitPay Inc. # Copyright 2016-2017 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test framework for bitcoin utils. Runs automatically during `make check`. Can also be run manually.""" import argparse import configparser import difflib import json import logging import os import pprint import subprocess import sys def main(): config = configparser.ConfigParser() - config.read_file(open(os.path.join(os.path.dirname( - __file__), "../config.ini"), encoding="utf8")) + config.read_file( + open(os.path.join(os.path.dirname(__file__), "../config.ini"), encoding="utf8") + ) parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument('-v', '--verbose', action='store_true') + parser.add_argument("-v", "--verbose", action="store_true") args = parser.parse_args() verbose = args.verbose if verbose: level = logging.DEBUG else: level = logging.ERROR - formatter = '%(asctime)s - %(levelname)s - %(message)s' + formatter = "%(asctime)s - %(levelname)s - %(message)s" # Add the format/level to the logger logging.basicConfig(format=formatter, level=level) - bctester(os.path.join(config["environment"]["SRCDIR"], "test", - "util", "data"), "bitcoin-util-test.json", config["environment"]) + bctester( + os.path.join(config["environment"]["SRCDIR"], "test", "util", "data"), + "bitcoin-util-test.json", + config["environment"], + ) def bctester(testDir, input_basename, buildenv): - """ Loads and parses the input file, runs all tests and reports results""" + """Loads and parses the input file, runs all tests and reports results""" input_filename = os.path.join(testDir, input_basename) raw_data = open(input_filename, encoding="utf8").read() input_data = json.loads(raw_data) failed_testcases = [] for testObj in input_data: try: bctest(testDir, testObj, buildenv) except Exception: logging.info(f"FAILED: {testObj['description']}") failed_testcases.append(testObj["description"]) else: logging.info(f"PASSED: {testObj['description']}") if failed_testcases: error_message = "FAILED_TESTCASES:\n" error_message += pprint.pformat(failed_testcases, width=400) logging.error(error_message) sys.exit(1) else: sys.exit(0) def bctest(testDir, testObj, buildenv): """Runs a single test, comparing output and RC to expected output and RC. Raises an error if input can't be read, executable fails, or output/RC are not as expected. Error is caught by bctester() and reported. """ # Get the exec names and arguments execprog = os.path.join( - buildenv["BUILDDIR"], "src", testObj["exec"] + buildenv["EXEEXT"]) - execargs = testObj['args'] + buildenv["BUILDDIR"], "src", testObj["exec"] + buildenv["EXEEXT"] + ) + execargs = testObj["args"] execrun = [execprog] + execargs if buildenv["EMULATOR"]: execrun = [buildenv["EMULATOR"]] + execrun # Read the input data (if there is any) stdinCfg = None inputData = None if "input" in testObj: filename = os.path.join(testDir, testObj["input"]) inputData = open(filename, encoding="utf8").read() stdinCfg = subprocess.PIPE # Read the expected output data (if there is any) outputFn = None outputData = None outputType = None if "output_cmp" in testObj: - outputFn = testObj['output_cmp'] + outputFn = testObj["output_cmp"] # output type from file extension (determines how to compare) outputType = os.path.splitext(outputFn)[1][1:] try: - outputData = open(os.path.join(testDir, outputFn), - encoding="utf8").read() + outputData = open(os.path.join(testDir, outputFn), encoding="utf8").read() except OSError: logging.error(f"Output file {outputFn} can not be opened") raise if not outputData: logging.error(f"Output data missing for {outputFn}") raise Exception if not outputType: - logging.error( - f"Output file {outputFn} does not have a file extension") + logging.error(f"Output file {outputFn} does not have a file extension") raise Exception # Run the test - proc = subprocess.Popen(execrun, stdin=stdinCfg, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, universal_newlines=True) + proc = subprocess.Popen( + execrun, + stdin=stdinCfg, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) try: outs = proc.communicate(input=inputData) except OSError: logging.error(f"OSError, Failed to execute {execprog}") raise if outputData: data_mismatch, formatting_mismatch = False, False # Parse command output and expected output try: a_parsed = parse_output(outs[0], outputType) except Exception as e: - logging.error( - f'Error parsing command output as {outputType}: {e}') + logging.error(f"Error parsing command output as {outputType}: {e}") raise try: b_parsed = parse_output(outputData, outputType) except Exception as e: - logging.error('Error parsing expected output {} as {}: {}'.format( - outputFn, outputType, e)) + logging.error( + f"Error parsing expected output {outputFn} as {outputType}: {e}" + ) raise # Compare data if a_parsed != b_parsed: logging.error(f"Output data mismatch for {outputFn} (format {outputType})") data_mismatch = True # Compare formatting if outs[0] != outputData: error_message = f"Output formatting mismatch for {outputFn}:\n" - error_message += "".join(difflib.context_diff(outputData.splitlines(True), - outs[0].splitlines( - True), - fromfile=outputFn, - tofile="returned")) + error_message += "".join( + difflib.context_diff( + outputData.splitlines(True), + outs[0].splitlines(True), + fromfile=outputFn, + tofile="returned", + ) + ) logging.error(error_message) formatting_mismatch = True assert not data_mismatch and not formatting_mismatch # Compare the return code to the expected return code wantRC = 0 if "return_code" in testObj: - wantRC = testObj['return_code'] + wantRC = testObj["return_code"] if proc.returncode != wantRC: logging.error(f"Return code mismatch for {outputFn}") raise Exception if "error_txt" in testObj: want_error = testObj["error_txt"] # Compare error text # TODO: ideally, we'd compare the strings exactly and also assert # That stderr is empty if no errors are expected. However, bitcoin-tx # emits DISPLAY errors when running as a windows application on # linux through wine. Just assert that the expected error text appears # somewhere in stderr. if want_error not in outs[1]: - logging.error("Error mismatch:\n" + "Expected: " + - want_error + "\nReceived: " + outs[1].rstrip()) + logging.error( + f"Error mismatch:\nExpected: {want_error}\nReceived: {outs[1].rstrip()}" + ) raise Exception def parse_output(a, fmt): """Parse the output according to specified format. Raise an error if the output can't be parsed.""" - if fmt == 'json': # json: compare parsed data + if fmt == "json": # json: compare parsed data return json.loads(a) - elif fmt == 'hex': # hex: parse and compare binary data + elif fmt == "hex": # hex: parse and compare binary data return bytes.fromhex(a.strip()) else: raise NotImplementedError(f"Don't know how to compare {fmt}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/test/util/rpcauth-test.py b/test/util/rpcauth-test.py index 43d1f3d6a..6839dbfb7 100755 --- a/test/util/rpcauth-test.py +++ b/test/util/rpcauth-test.py @@ -1,50 +1,52 @@ #!/usr/bin/env python3 # Copyright (c) 2015-2018 The Bitcoin Core developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test share/rpcauth/rpcauth.py """ import base64 import configparser import hmac import importlib import os import sys import unittest class TestRPCAuth(unittest.TestCase): def setUp(self): config = configparser.ConfigParser() config_path = os.path.abspath( - os.path.join(os.sep, os.path.abspath(os.path.dirname(__file__)), - "../config.ini")) + os.path.join( + os.sep, os.path.abspath(os.path.dirname(__file__)), "../config.ini" + ) + ) with open(config_path, encoding="utf8") as config_file: config.read_file(config_file) - sys.path.insert(0, os.path.dirname(config['environment']['RPCAUTH'])) - self.rpcauth = importlib.import_module('rpcauth') + sys.path.insert(0, os.path.dirname(config["environment"]["RPCAUTH"])) + self.rpcauth = importlib.import_module("rpcauth") def test_generate_salt(self): for i in range(16, 32 + 1): self.assertEqual(len(self.rpcauth.generate_salt(i)), i * 2) def test_generate_password(self): password = self.rpcauth.generate_password() expected_password = base64.urlsafe_b64encode( - base64.urlsafe_b64decode(password)).decode('utf-8') + base64.urlsafe_b64decode(password) + ).decode("utf-8") self.assertEqual(expected_password, password) def test_check_password_hmac(self): salt = self.rpcauth.generate_salt(16) password = self.rpcauth.generate_password() password_hmac = self.rpcauth.password_to_hmac(salt, password) - m = hmac.new(bytearray(salt, 'utf-8'), - bytearray(password, 'utf-8'), 'SHA256') + m = hmac.new(bytearray(salt, "utf-8"), bytearray(password, "utf-8"), "SHA256") expected_password_hmac = m.hexdigest() self.assertEqual(expected_password_hmac, password_hmac) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()