diff --git a/.arclint b/.arclint --- a/.arclint +++ b/.arclint @@ -19,11 +19,10 @@ "autopep8": { "type": "autopep8", "version": ">=1.3.4", - "include": "(\\.py$)", + "include": "(^contrib/.*\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", - "(^contrib/apple-sdk-tools/)", - "(^test/functional/)" + "(^contrib/apple-sdk-tools/)" ], "flags": [ "--aggressive", @@ -34,8 +33,11 @@ "black": { "type": "black", "version": ">=23.0.0", - "include": [ - "(^test/functional/)" + "include": "(\\.py$)", + "exclude": [ + "(^contrib/gitian-builder/)", + "(^contrib/apple-sdk-tools/)", + "(^contrib/)" ], "flags": [ "--preview" diff --git a/cmake/utils/filter-lcov.py b/cmake/utils/filter-lcov.py --- a/cmake/utils/filter-lcov.py +++ b/cmake/utils/filter-lcov.py @@ -6,12 +6,19 @@ import argparse parser = argparse.ArgumentParser( - description='Remove the coverage data from a tracefile for all files matching the pattern.') -parser.add_argument('--pattern', '-p', action='append', - help='the pattern of files to remove', required=True) + description=( + "Remove the coverage data from a tracefile for all files matching the pattern." + ) +) parser.add_argument( - 'tracefile', help='the tracefile to remove the coverage data from') -parser.add_argument('outfile', help='filename for the output to be written to') + "--pattern", + "-p", + action="append", + help="the pattern of files to remove", + required=True, +) +parser.add_argument("tracefile", help="the tracefile to remove the coverage data from") +parser.add_argument("outfile", help="filename for the output to be written to") args = parser.parse_args() tracefile = args.tracefile @@ -19,13 +26,13 @@ outfile = args.outfile in_remove = False -with open(tracefile, 'r', encoding="utf8") as f: - with open(outfile, 'w', encoding="utf8") as wf: +with open(tracefile, "r", encoding="utf8") as f: + with open(outfile, "w", encoding="utf8") as wf: for line in f: for p in pattern: if line.startswith("SF:") and p in line: in_remove = True if not in_remove: wf.write(line) - if line == 'end_of_record\n': + if line == "end_of_record\n": in_remove = False diff --git a/cmake/utils/gen-ninja-deps.py b/cmake/utils/gen-ninja-deps.py --- a/cmake/utils/gen-ninja-deps.py +++ b/cmake/utils/gen-ninja-deps.py @@ -4,25 +4,19 @@ import os import subprocess -parser = argparse.ArgumentParser(description='Produce a dep file from ninja.') +parser = argparse.ArgumentParser(description="Produce a dep file from ninja.") +parser.add_argument("--build-dir", help="The build directory.", required=True) parser.add_argument( - '--build-dir', - help='The build directory.', - required=True) + "--base-dir", + help="The directory for which dependencies are rewriten.", + required=True, +) +parser.add_argument("--ninja", help="The ninja executable to use.") +parser.add_argument("base_target", help="The target from the base's perspective.") parser.add_argument( - '--base-dir', - help='The directory for which dependencies are rewriten.', - required=True) -parser.add_argument('--ninja', help='The ninja executable to use.') -parser.add_argument( - 'base_target', - help="The target from the base's perspective.") -parser.add_argument( - 'targets', nargs='+', - help='The target for which dependencies are extracted.') -parser.add_argument( - '--extra-deps', nargs='+', - help='Extra dependencies.') + "targets", nargs="+", help="The target for which dependencies are extracted." +) +parser.add_argument("--extra-deps", nargs="+", help="Extra dependencies.") args = parser.parse_args() build_dir = os.path.abspath(args.build_dir) @@ -36,15 +30,15 @@ os.chdir(build_dir) if ninja is None: - ninja = subprocess.check_output(['command', '-v', 'ninja'])[:-1] + ninja = subprocess.check_output(["command", "-v", "ninja"])[:-1] # Construct the set of all targets all_targets = set() doto_targets = set() -for t in subprocess.check_output([ninja, '-t', 'targets', 'all']).splitlines(): - t, r = t.split(b':') +for t in subprocess.check_output([ninja, "-t", "targets", "all"]).splitlines(): + t, r = t.split(b":") all_targets.add(t) - if r[:13] == b' C_COMPILER__' or r[:15] == b' CXX_COMPILER__': + if r[:13] == b" C_COMPILER__" or r[:15] == b" CXX_COMPILER__": doto_targets.add(t) @@ -54,20 +48,20 @@ while len(lines): line = lines.pop(0) - if line[0] == ord(' '): + if line[0] == ord(" "): continue # We have a new target - target = line.split(b':')[0] - assert lines.pop(0)[:8] == b' input:' + target = line.split(b":")[0] + assert lines.pop(0)[:8] == b" input:" inputs = set() while True: i = lines.pop(0) - if i[:4] != b' ': + if i[:4] != b" ": break - ''' + """ ninja has 3 types of input: 1. Explicit dependencies, no prefix; 2. Implicit dependencies, | prefix. @@ -75,10 +69,10 @@ Order only dependency do not require the target to be rebuilt and so we ignore them. - ''' + """ i = i[4:] - if i[0] == ord('|'): - if i[1] == ord('|'): + if i[0] == ord("|"): + if i[1] == ord("|"): # We reached the order only dependencies. break i = i[2:] @@ -94,7 +88,7 @@ # Recursively extract the dependencies of the target. deps = {} while len(workset) > 0: - query = subprocess.check_output([ninja, '-t', 'query'] + list(workset)) + query = subprocess.check_output([ninja, "-t", "query"] + list(workset)) target_deps = parse_ninja_query(query) deps.update(target_deps) @@ -108,23 +102,23 @@ return deps ndeps = subprocess.check_output( - [ninja, '-t', 'deps'] + bt_targets, - stderr=subprocess.DEVNULL) + [ninja, "-t", "deps"] + bt_targets, stderr=subprocess.DEVNULL + ) lines = ndeps.splitlines() while len(lines) > 0: line = lines.pop(0) - t, m = line.split(b':') - if m == b' deps not found': + t, m = line.split(b":") + if m == b" deps not found": continue inputs = set() while True: i = lines.pop(0) - if i == b'': + if i == b"": break - assert i[:4] == b' ' + assert i[:4] == b" " inputs.add(i[4:]) deps[t] = inputs @@ -144,8 +138,7 @@ return cache[path] abspath = os.path.abspath(path) - newpath = path if path == abspath else os.path.relpath( - abspath, base_dir) + newpath = path if path == abspath else os.path.relpath(abspath, base_dir) cache[path] = newpath return newpath diff --git a/cmake/utils/junit-reports-merge.py b/cmake/utils/junit-reports-merge.py --- a/cmake/utils/junit-reports-merge.py +++ b/cmake/utils/junit-reports-merge.py @@ -14,7 +14,7 @@ def __init__(self, name, report_dir): self.name = name self.test_cases = {} - self.report_file = os.path.join(report_dir, f'{self.name}.xml') + self.report_file = os.path.join(report_dir, f"{self.name}.xml") def add_test_case(self, test_case): self.test_cases[test_case.test_id] = test_case @@ -24,20 +24,20 @@ def dump(self): # Calculate test suite duration as the sum of all test case duraration - duration = round(sum([ - float(t.node.get('time', 0.0)) for t in self.test_cases.values() - ]), 3) + duration = round( + sum([float(t.node.get("time", 0.0)) for t in self.test_cases.values()]), 3 + ) test_suite = ET.Element( - 'testsuite', + "testsuite", { - 'name': self.name, - 'id': '0', - 'timestamp': datetime.datetime.now().isoformat('T'), - 'time': str(duration), - 'tests': str(len(self.test_cases)), - 'failures': str(len(self.get_failed_tests())), - } + "name": self.name, + "id": "0", + "timestamp": datetime.datetime.now().isoformat("T"), + "time": str(duration), + "tests": str(len(self.test_cases)), + "failures": str(len(self.get_failed_tests())), + }, ) for test_case in self.test_cases.values(): @@ -47,7 +47,7 @@ os.makedirs(report_dir, exist_ok=True) ET.ElementTree(test_suite).write( self.report_file, - 'UTF-8', + "UTF-8", xml_declaration=True, ) @@ -55,20 +55,20 @@ tree = ET.parse(self.report_file) xml_root = tree.getroot() - assert xml_root.tag == 'testsuite' - assert self.name == xml_root.get('name') + assert xml_root.tag == "testsuite" + assert self.name == xml_root.get("name") - for test_case in xml_root.findall('testcase'): + for test_case in xml_root.findall("testcase"): self.add_test_case(TestCase(test_case)) class TestCase: def __init__(self, node): self.node = node - self.test_success = self.node.find('failure') is None + self.test_success = self.node.find("failure") is None def __getattr__(self, attribute): - if attribute == 'test_id': + if attribute == "test_id": return f"{self.classname}/{self.name}" return self.node.attrib[attribute] @@ -76,11 +76,11 @@ class Lock: def __init__(self, suite, lock_dir): - self.lock_file = os.path.join(lock_dir, f'{suite}.lock') + self.lock_file = os.path.join(lock_dir, f"{suite}.lock") def __enter__(self): os.makedirs(os.path.dirname(self.lock_file), exist_ok=True) - self.fd = open(self.lock_file, 'w', encoding='utf-8') + self.fd = open(self.lock_file, "w", encoding="utf-8") fcntl.lockf(self.fd, fcntl.LOCK_EX) def __exit__(self, exception_type, exception_value, traceback): @@ -89,7 +89,7 @@ def main(report_dir, lock_dir, suite, test): - junit = f'{suite}-{test}.xml' + junit = f"{suite}-{test}.xml" if not os.path.isfile(junit): return 0 @@ -98,11 +98,10 @@ # Junit root can be a single test suite or multiple test suites. The # later case is unsupported. xml_root = tree.getroot() - if xml_root.tag != 'testsuite': - raise AssertionError( - "The parser only supports a single test suite per report") + if xml_root.tag != "testsuite": + raise AssertionError("The parser only supports a single test suite per report") - test_suite_name = xml_root.get('name') + test_suite_name = xml_root.get("name") lock = Lock(suite, lock_dir) with lock: @@ -111,7 +110,7 @@ test_suite.load() for child in xml_root: - if child.tag != 'testcase' or (child.find('skipped') is not None): + if child.tag != "testcase" or (child.find("skipped") is not None): continue test_suite.add_test_case(TestCase(child)) @@ -119,8 +118,7 @@ test_suite.dump() sys.exit( - 1 if test in [case.classname for case in test_suite.get_failed_tests()] - else 0 + 1 if test in [case.classname for case in test_suite.get_failed_tests()] else 0 ) diff --git a/share/rpcauth/rpcauth.py b/share/rpcauth/rpcauth.py --- a/share/rpcauth/rpcauth.py +++ b/share/rpcauth/rpcauth.py @@ -18,36 +18,40 @@ def generate_password(): """Create 32 byte b64 password""" - return urlsafe_b64encode(urandom(32)).decode('utf-8') + return urlsafe_b64encode(urandom(32)).decode("utf-8") def password_to_hmac(salt, password): - m = hmac.new(bytearray(salt, 'utf-8'), - bytearray(password, 'utf-8'), 'SHA256') + m = hmac.new(bytearray(salt, "utf-8"), bytearray(password, "utf-8"), "SHA256") return m.hexdigest() def main(): - parser = ArgumentParser( - description='Create login credentials for a JSON-RPC user') - parser.add_argument('username', help='the username for authentication') + parser = ArgumentParser(description="Create login credentials for a JSON-RPC user") + parser.add_argument("username", help="the username for authentication") parser.add_argument( - 'password', help='leave empty to generate a random password or specify "-" to prompt for password', nargs='?') + "password", + help=( + 'leave empty to generate a random password or specify "-" to prompt for' + " password" + ), + nargs="?", + ) args = parser.parse_args() if not args.password: args.password = generate_password() - elif args.password == '-': + elif args.password == "-": args.password = getpass() # Create 16 byte hex salt salt = generate_salt(16) password_hmac = password_to_hmac(salt, args.password) - print('String to be appended to bitcoin.conf:') - print(f'rpcauth={args.username}:{salt}${password_hmac}') - print(f'Your password:\n{args.password}') + print("String to be appended to bitcoin.conf:") + print(f"rpcauth={args.username}:{salt}${password_hmac}") + print(f"Your password:\n{args.password}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/test/data/generate_asmap.py b/src/test/data/generate_asmap.py --- a/src/test/data/generate_asmap.py +++ b/src/test/data/generate_asmap.py @@ -6,20 +6,22 @@ def main(input_file, output_file): - with open(input_file, 'rb') as f: + with open(input_file, "rb") as f: contents = f.read() with open(output_file, "w", encoding="utf-8") as f: - f.write( - f"static unsigned const char {Path(input_file).stem}_raw[] = {{\n") + f.write(f"static unsigned const char {Path(input_file).stem}_raw[] = {{\n") f.write(", ".join(f"0x{x:02x}" for x in contents)) f.write("\n};\n") if __name__ == "__main__": if len(sys.argv) != 3: - print("Invalid parameters\nUsage: {} input_file output_file".format( - Path(sys.argv[0]).name)) + print( + "Invalid parameters\nUsage: {} input_file output_file".format( + Path(sys.argv[0]).name + ) + ) sys.exit(1) main(sys.argv[1], sys.argv[2]) diff --git a/test/functional/feature_notifications.py b/test/functional/feature_notifications.py --- a/test/functional/feature_notifications.py +++ b/test/functional/feature_notifications.py @@ -64,9 +64,11 @@ blocks = self.generatetoaddress( self.nodes[1], block_count, - self.nodes[1].getnewaddress() - if self.is_wallet_compiled() - else ADDRESS_ECREG_UNSPENDABLE, + ( + self.nodes[1].getnewaddress() + if self.is_wallet_compiled() + else ADDRESS_ECREG_UNSPENDABLE + ), ) # wait at most 10 seconds for expected number of files before reading diff --git a/test/functional/test_framework/chronik/client.py b/test/functional/test_framework/chronik/client.py --- a/test/functional/test_framework/chronik/client.py +++ b/test/functional/test_framework/chronik/client.py @@ -26,24 +26,28 @@ def ok(self): if self.status != 200: raise AssertionError( - f'Expected OK response, but got status {self.status}, error: ' - f'{self.error_proto}') + f"Expected OK response, but got status {self.status}, error: " + f"{self.error_proto}" + ) return self.ok_proto def err(self, status: int): if self.status == 200: raise AssertionError( - f'Expected error response status {status}, but got OK: {self.ok_proto}') + f"Expected error response status {status}, but got OK: {self.ok_proto}" + ) if self.status != status: raise AssertionError( - f'Expected error response status {status}, but got different error ' - f'status {self.status}, error: {self.error_proto}') + f"Expected error response status {status}, but got different error " + f"status {self.status}, error: {self.error_proto}" + ) return self.error_proto class ChronikScriptClient: - def __init__(self, client: 'ChronikClient', script_type: str, - script_payload: str) -> None: + def __init__( + self, client: "ChronikClient", script_type: str, script_payload: str + ) -> None: self.client = client self.script_type = script_type self.script_payload = script_payload @@ -51,24 +55,27 @@ def confirmed_txs(self, page=None, page_size=None): query = _page_query_params(page, page_size) return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/confirmed-txs{query}', - pb.TxHistoryPage) + f"/script/{self.script_type}/{self.script_payload}/confirmed-txs{query}", + pb.TxHistoryPage, + ) def history(self, page=None, page_size=None): query = _page_query_params(page, page_size) return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/history{query}', - pb.TxHistoryPage) + f"/script/{self.script_type}/{self.script_payload}/history{query}", + pb.TxHistoryPage, + ) def unconfirmed_txs(self): return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/unconfirmed-txs', - pb.TxHistoryPage) + f"/script/{self.script_type}/{self.script_payload}/unconfirmed-txs", + pb.TxHistoryPage, + ) def utxos(self): return self.client._request_get( - f'/script/{self.script_type}/{self.script_payload}/utxos', - pb.ScriptUtxos) + f"/script/{self.script_type}/{self.script_payload}/utxos", pb.ScriptUtxos + ) class ChronikWs: @@ -97,7 +104,7 @@ class ChronikClient: - CONTENT_TYPE = 'application/x-protobuf' + CONTENT_TYPE = "application/x-protobuf" def __init__(self, host: str, port: int, timeout=DEFAULT_TIMEOUT) -> None: self.host = host @@ -107,11 +114,11 @@ def _request_get(self, path: str, pb_type): kwargs = {} if self.timeout is not None: - kwargs['timeout'] = self.timeout + kwargs["timeout"] = self.timeout client = http.client.HTTPConnection(self.host, self.port, **kwargs) - client.request('GET', path) + client.request("GET", path) response = client.getresponse() - content_type = response.getheader('Content-Type') + content_type = response.getheader("Content-Type") body = response.read() if content_type != self.CONTENT_TYPE: @@ -130,41 +137,43 @@ return ChronikResponse(response.status, ok_proto=ok_proto) def blockchain_info(self) -> ChronikResponse: - return self._request_get('/blockchain-info', pb.BlockchainInfo) + return self._request_get("/blockchain-info", pb.BlockchainInfo) def block(self, hash_or_height: Union[str, int]) -> ChronikResponse: - return self._request_get(f'/block/{hash_or_height}', pb.Block) + return self._request_get(f"/block/{hash_or_height}", pb.Block) - def block_txs(self, hash_or_height: Union[str, int], - page=None, page_size=None) -> ChronikResponse: + def block_txs( + self, hash_or_height: Union[str, int], page=None, page_size=None + ) -> ChronikResponse: query = _page_query_params(page, page_size) return self._request_get( - f'/block-txs/{hash_or_height}{query}', pb.TxHistoryPage) + f"/block-txs/{hash_or_height}{query}", pb.TxHistoryPage + ) def blocks(self, start_height: int, end_height: int) -> ChronikResponse: - return self._request_get(f'/blocks/{start_height}/{end_height}', pb.Blocks) + return self._request_get(f"/blocks/{start_height}/{end_height}", pb.Blocks) def tx(self, txid: str) -> ChronikResponse: - return self._request_get(f'/tx/{txid}', pb.Tx) + return self._request_get(f"/tx/{txid}", pb.Tx) def raw_tx(self, txid: str) -> bytes: - return self._request_get(f'/raw-tx/{txid}', pb.RawTx) + return self._request_get(f"/raw-tx/{txid}", pb.RawTx) def script(self, script_type: str, script_payload: str) -> ChronikScriptClient: return ChronikScriptClient(self, script_type, script_payload) def ws(self, *, timeout=None) -> ChronikWs: ws = websocket.WebSocket() - ws.connect(f'ws://{self.host}:{self.port}/ws', timeout=timeout) + ws.connect(f"ws://{self.host}:{self.port}/ws", timeout=timeout) return ChronikWs(ws) def _page_query_params(page=None, page_size=None) -> str: if page is not None and page_size is not None: - return f'?page={page}&page_size={page_size}' + return f"?page={page}&page_size={page_size}" elif page is not None: - return f'?page={page}' + return f"?page={page}" elif page_size is not None: - return f'?page_size={page_size}' + return f"?page_size={page_size}" else: - return '' + return "" diff --git a/test/functional/test_framework/chronik/test_data.py b/test/functional/test_framework/chronik/test_data.py --- a/test/functional/test_framework/chronik/test_data.py +++ b/test/functional/test_framework/chronik/test_data.py @@ -17,15 +17,19 @@ return pb.Tx( txid=bytes.fromhex(GENESIS_CB_TXID)[::-1], version=1, - inputs=[pb.TxInput( - prev_out=pb.OutPoint(txid=bytes(32), out_idx=0xffffffff), - input_script=bytes(GENESIS_CB_SCRIPT_SIG), - sequence_no=0xffffffff, - )], - outputs=[pb.TxOutput( - value=5000000000, - output_script=bytes(GENESIS_CB_SCRIPT_PUBKEY), - )], + inputs=[ + pb.TxInput( + prev_out=pb.OutPoint(txid=bytes(32), out_idx=0xFFFFFFFF), + input_script=bytes(GENESIS_CB_SCRIPT_SIG), + sequence_no=0xFFFFFFFF, + ) + ], + outputs=[ + pb.TxOutput( + value=5000000000, + output_script=bytes(GENESIS_CB_SCRIPT_PUBKEY), + ) + ], lock_time=0, block=pb.BlockMetadata( hash=bytes.fromhex(GENESIS_BLOCK_HASH)[::-1], diff --git a/test/functional/tool_wallet.py b/test/functional/tool_wallet.py --- a/test/functional/tool_wallet.py +++ b/test/functional/tool_wallet.py @@ -129,8 +129,7 @@ # shasum_before = self.wallet_shasum() timestamp_before = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp before calling info: {timestamp_before}") - out = textwrap.dedent( - """\ + out = textwrap.dedent("""\ Wallet info =========== Encrypted: no @@ -138,8 +137,7 @@ Keypool Size: 2 Transactions: 0 Address Book: 1 - """ - ) + """) self.assert_tool_output(out, f"-wallet={self.default_wallet_name}", "info") timestamp_after = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp after calling info: {timestamp_after}") @@ -173,8 +171,7 @@ shasum_before = self.wallet_shasum() timestamp_before = self.wallet_timestamp() self.log.debug(f"Wallet file timestamp before calling info: {timestamp_before}") - out = textwrap.dedent( - """\ + out = textwrap.dedent("""\ Wallet info =========== Encrypted: no @@ -182,8 +179,7 @@ Keypool Size: 2 Transactions: 1 Address Book: 1 - """ - ) + """) self.assert_tool_output(out, f"-wallet={self.default_wallet_name}", "info") shasum_after = self.wallet_shasum() timestamp_after = self.wallet_timestamp() @@ -205,8 +201,7 @@ self.log.debug( f"Wallet file timestamp before calling create: {timestamp_before}" ) - out = textwrap.dedent( - """\ + out = textwrap.dedent("""\ Topping up keypool... Wallet info =========== @@ -215,8 +210,7 @@ Keypool Size: 2000 Transactions: 0 Address Book: 0 - """ - ) + """) self.assert_tool_output(out, "-wallet=foo", "create") shasum_after = self.wallet_shasum() timestamp_after = self.wallet_timestamp() diff --git a/test/fuzz/test_runner.py b/test/fuzz/test_runner.py --- a/test/fuzz/test_runner.py +++ b/test/fuzz/test_runner.py @@ -17,61 +17,71 @@ def main(): parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter, - description='''Run the fuzz targets with all inputs from the seed_dir once.''', + description="""Run the fuzz targets with all inputs from the seed_dir once.""", ) parser.add_argument( "-l", "--loglevel", dest="loglevel", default="INFO", - help="log events at this level and higher to the console. Can be set to DEBUG, INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output all logs to console.", + help=( + "log events at this level and higher to the console. Can be set to DEBUG," + " INFO, WARNING, ERROR or CRITICAL. Passing --loglevel DEBUG will output" + " all logs to console." + ), ) parser.add_argument( - '--valgrind', - action='store_true', - help='If true, run fuzzing binaries under the valgrind memory error detector', + "--valgrind", + action="store_true", + help="If true, run fuzzing binaries under the valgrind memory error detector", ) parser.add_argument( - '-x', - '--exclude', + "-x", + "--exclude", help="A comma-separated list of targets to exclude", ) parser.add_argument( - '--par', - '-j', + "--par", + "-j", type=int, default=4, - help='How many targets to merge or execute in parallel.', + help="How many targets to merge or execute in parallel.", ) parser.add_argument( - 'seed_dir', - help='The seed corpus to run on (must contain subfolders for each fuzz target).', + "seed_dir", + help=( + "The seed corpus to run on (must contain subfolders for each fuzz target)." + ), ) parser.add_argument( - 'target', - nargs='*', - help='The target(s) to run. Default is to run all targets.', + "target", + nargs="*", + help="The target(s) to run. Default is to run all targets.", ) parser.add_argument( - '--m_dir', - help='Merge inputs from this directory into the seed_dir. Needs /target subdirectory.', + "--m_dir", + help=( + "Merge inputs from this directory into the seed_dir. Needs /target" + " subdirectory." + ), ) parser.add_argument( - '-g', - '--generate', - action='store_true', - help='Create new corpus seeds (or extend the existing ones) by running' - ' the given targets for a finite number of times. Outputs them to' - ' the passed seed_dir.' + "-g", + "--generate", + action="store_true", + help=( + "Create new corpus seeds (or extend the existing ones) by running" + " the given targets for a finite number of times. Outputs them to" + " the passed seed_dir." + ), ) args = parser.parse_args() # Set up logging logging.basicConfig( - format='%(message)s', - level=int(args.loglevel) if args.loglevel.isdigit( - ) else args.loglevel.upper(), + format="%(message)s", + level=int(args.loglevel) if args.loglevel.isdigit() else args.loglevel.upper(), ) # Read config generated by configure. @@ -82,53 +92,53 @@ if not config["components"].getboolean("ENABLE_FUZZ"): logging.error("Must have fuzz targets built") sys.exit(1) - test_dir = os.path.join( - config["environment"]["BUILDDIR"], 'src', 'test', 'fuzz') + test_dir = os.path.join(config["environment"]["BUILDDIR"], "src", "test", "fuzz") # Build list of tests test_list_all = [ - f for f in os.listdir(test_dir) - if os.path.isfile(os.path.join(test_dir, f)) and - os.access(os.path.join(test_dir, f), os.X_OK)] + f + for f in os.listdir(test_dir) + if os.path.isfile(os.path.join(test_dir, f)) + and os.access(os.path.join(test_dir, f), os.X_OK) + ] if not test_list_all: logging.error("No fuzz targets found") sys.exit(1) logging.debug( - f"{len(test_list_all)} fuzz target(s) found: {' '.join(sorted(test_list_all))}") + f"{len(test_list_all)} fuzz target(s) found: {' '.join(sorted(test_list_all))}" + ) # By default run all args.target = args.target or test_list_all test_list_error = list(set(args.target).difference(set(test_list_all))) if test_list_error: - logging.error( - f"Unknown fuzz targets selected: {test_list_error}") - test_list_selection = list( - set(test_list_all).intersection(set(args.target))) + logging.error(f"Unknown fuzz targets selected: {test_list_error}") + test_list_selection = list(set(test_list_all).intersection(set(args.target))) if not test_list_selection: logging.error("No fuzz targets selected") if args.exclude: for excluded_target in args.exclude.split(","): if excluded_target not in test_list_selection: logging.error( - f"Target \"{excluded_target}\" not found in current target list.") + f'Target "{excluded_target}" not found in current target list.' + ) continue test_list_selection.remove(excluded_target) test_list_selection.sort() logging.info( "{} of {} detected fuzz target(s) selected: {}".format( - len(test_list_selection), - len(test_list_all), - " ".join(test_list_selection))) + len(test_list_selection), len(test_list_all), " ".join(test_list_selection) + ) + ) if not args.generate: test_list_seedless = [] for t in test_list_selection: corpus_path = os.path.join(args.seed_dir, t) - if not os.path.exists(corpus_path) or len( - os.listdir(corpus_path)) == 0: + if not os.path.exists(corpus_path) or len(os.listdir(corpus_path)) == 0: test_list_seedless.append(t) test_list_seedless.sort() if test_list_seedless: @@ -138,13 +148,15 @@ ) ) logging.info( - "Please consider adding a fuzz seed corpus at https://github.com/Bitcoin-ABC/qa-assets") + "Please consider adding a fuzz seed corpus at" + " https://github.com/Bitcoin-ABC/qa-assets" + ) try: help_output = subprocess.run( args=[ os.path.join(test_dir, test_list_selection[0]), - '-help=1', + "-help=1", ], timeout=20, check=True, @@ -155,8 +167,7 @@ logging.error("Must be built with libFuzzer") sys.exit(1) except subprocess.TimeoutExpired: - logging.error( - "subprocess timed out: Currently only libFuzzer is supported") + logging.error("subprocess timed out: Currently only libFuzzer is supported") sys.exit(1) with ThreadPoolExecutor(max_workers=args.par) as fuzz_pool: @@ -197,11 +208,14 @@ def job(command): logging.debug(f"Running '{' '.join(command)}'\n") - logging.debug("Command '{}' output:\n'{}'\n".format( - ' '.join(command), - subprocess.run(command, check=True, stderr=subprocess.PIPE, - universal_newlines=True).stderr - )) + logging.debug( + "Command '{}' output:\n'{}'\n".format( + " ".join(command), + subprocess.run( + command, check=True, stderr=subprocess.PIPE, universal_newlines=True + ).stderr, + ) + ) futures = [] for target in targets: @@ -220,15 +234,16 @@ def merge_inputs(*, fuzz_pool, corpus, test_list, test_dir, merge_dir): logging.info( - f"Merge the inputs in the passed dir into the seed_dir. Passed dir {merge_dir}") + f"Merge the inputs in the passed dir into the seed_dir. Passed dir {merge_dir}" + ) jobs = [] for t in test_list: args = [ os.path.join(test_dir, t), - '-merge=1', + "-merge=1", # Also done by oss-fuzz # https://github.com/google/oss-fuzz/issues/1406#issuecomment-387790487 - '-use_value_profile=1', + "-use_value_profile=1", os.path.join(corpus, t), os.path.join(merge_dir, t), ] @@ -237,10 +252,9 @@ def job(t, args): output = f"Run {t} with args {' '.join(args)}\n" - output += subprocess.run(args, - check=True, - stderr=subprocess.PIPE, - universal_newlines=True).stderr + output += subprocess.run( + args, check=True, stderr=subprocess.PIPE, universal_newlines=True + ).stderr logging.debug(output) jobs.append(fuzz_pool.submit(job, t, args)) @@ -256,21 +270,17 @@ os.makedirs(corpus_path, exist_ok=True) args = [ os.path.join(test_dir, t), - '-runs=1', + "-runs=1", corpus_path, ] if use_valgrind: - args = [ - 'valgrind', - '--quiet', - '--error-exitcode=1'] + args + args = ["valgrind", "--quiet", "--error-exitcode=1"] + args def job(t, args): - output = f'Run {t} with args {args}' + output = f"Run {t} with args {args}" result = subprocess.run( - args, - stderr=subprocess.PIPE, - universal_newlines=True) + args, stderr=subprocess.PIPE, universal_newlines=True + ) output += result.stderr return output, result @@ -287,12 +297,12 @@ if e.stderr: logging.info(e.stderr) logging.info( - "Target \"{}\" failed with exit code {}".format( - " ".join( - result.args), - e.returncode)) + 'Target "{}" failed with exit code {}'.format( + " ".join(result.args), e.returncode + ) + ) sys.exit(1) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/test/lint/check-doc.py b/test/lint/check-doc.py --- a/test/lint/check-doc.py +++ b/test/lint/check-doc.py @@ -4,21 +4,21 @@ # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. -''' +""" This checks if all command line args are documented. Return value is 0 to indicate no error. Author: @MarcoFalke -''' +""" import glob import re from pprint import PrettyPrinter from subprocess import check_output -TOP_LEVEL = 'git rev-parse --show-toplevel' -FOLDERS_SRC = ['/src/**/', '/chronik/**/'] -FOLDERS_TEST = ['/src/**/test/', '/chronik/test/**/'] +TOP_LEVEL = "git rev-parse --show-toplevel" +FOLDERS_SRC = ["/src/**/", "/chronik/**/"] +FOLDERS_TEST = ["/src/**/test/", "/chronik/test/**/"] EXTENSIONS = ["*.c", "*.h", "*.cpp", "*.cc", "*.hpp"] REGEX_ARG = r'(?:ForceSet|SoftSet|Get|Is)(?:Bool|Int)?Args?(?:Set)?\(\s*"(-[^"]+)"' @@ -26,61 +26,61 @@ # list false positive unknows arguments SET_FALSE_POSITIVE_UNKNOWNS = { - '-includeconf', - '-regtest', - '-testnet', - '-zmqpubhashblock', - '-zmqpubhashtx', - '-zmqpubrawblock', - '-zmqpubrawtx', - '-zmqpubhashblockhwm', - '-zmqpubhashtxhwm', - '-zmqpubrawblockhwm', - '-zmqpubrawtxhwm', - '-zmqpubsequence', - '-zmqpubsequencehwm', + "-includeconf", + "-regtest", + "-testnet", + "-zmqpubhashblock", + "-zmqpubhashtx", + "-zmqpubrawblock", + "-zmqpubrawtx", + "-zmqpubhashblockhwm", + "-zmqpubhashtxhwm", + "-zmqpubrawblockhwm", + "-zmqpubrawtxhwm", + "-zmqpubsequence", + "-zmqpubsequencehwm", } # list false positive undocumented arguments SET_FALSE_POSITIVE_UNDOCUMENTED = { - '-help', - '-h', - '-avalanchepreconsensus', - '-dbcrashratio', - '-enableminerfund', - '-forcecompactdb', - '-maxaddrtosend', - '-parkdeepreorg', - '-automaticunparking', + "-help", + "-h", + "-avalanchepreconsensus", + "-dbcrashratio", + "-enableminerfund", + "-forcecompactdb", + "-maxaddrtosend", + "-parkdeepreorg", + "-automaticunparking", # Removed arguments that now just print a helpful error message - '-zapwallettxes', - '-replayprotectionactivationtime', + "-zapwallettxes", + "-replayprotectionactivationtime", # Remove after May 2023 upgrade - '-wellingtonactivationtime', + "-wellingtonactivationtime", } def main(): - top_level = check_output(TOP_LEVEL, shell=True, - universal_newlines=True, encoding='utf8').strip() + top_level = check_output( + TOP_LEVEL, shell=True, universal_newlines=True, encoding="utf8" + ).strip() source_files = [] test_files = [] for extension in EXTENSIONS: for folder_src in FOLDERS_SRC: - source_files += glob.glob(top_level + - folder_src + extension, recursive=True) + source_files += glob.glob( + top_level + folder_src + extension, recursive=True + ) for folder_test in FOLDERS_TEST: - test_files += glob.glob(top_level + - folder_test + - extension, recursive=True) + test_files += glob.glob(top_level + folder_test + extension, recursive=True) files = set(source_files) - set(test_files) args_used = set() args_docd = set() for file in files: - with open(file, 'r', encoding='utf-8') as f: + with open(file, "r", encoding="utf-8") as f: content = f.read() args_used |= set(re.findall(re.compile(REGEX_ARG), content)) args_docd |= set(re.findall(re.compile(REGEX_DOC), content)) diff --git a/test/lint/lint-format-strings.py b/test/lint/lint-format-strings.py --- a/test/lint/lint-format-strings.py +++ b/test/lint/lint-format-strings.py @@ -16,21 +16,39 @@ FALSE_POSITIVES = [ ("src/dbwrapper.cpp", "vsnprintf(p, limit - p, format, backup_ap)"), ("src/index/base.cpp", "FatalError(const char *fmt, const Args &...args)"), - ("src/netbase.cpp", "LogConnectFailure(bool manual_connection, const char *fmt, const Args &...args)"), - ("src/util/system.cpp", - "strprintf(_(COPYRIGHT_HOLDERS).translated, COPYRIGHT_HOLDERS_SUBSTITUTION)"), - ("src/validationinterface.cpp", - "LogPrint(BCLog::VALIDATION, fmt \"\\n\", __VA_ARGS__)"), + ( + "src/netbase.cpp", + ( + "LogConnectFailure(bool manual_connection, const char *fmt, const Args" + " &...args)" + ), + ), + ( + "src/util/system.cpp", + "strprintf(_(COPYRIGHT_HOLDERS).translated, COPYRIGHT_HOLDERS_SUBSTITUTION)", + ), + ( + "src/validationinterface.cpp", + 'LogPrint(BCLog::VALIDATION, fmt "\\n", __VA_ARGS__)', + ), ("src/tinyformat.h", "printf(const char *fmt, const Args &...args)"), ("src/tinyformat.h", "printf(const char *fmt, TINYFORMAT_VARARGS(n))"), - ("src/wallet/wallet.h", - "LogPrintf((\"%s \" + fmt).c_str(), GetDisplayName(), parameters...)"), - ("src/wallet/scriptpubkeyman.h", - "WalletLogPrintf(std::string fmt, Params... parameters)"), - ("src/wallet/scriptpubkeyman.h", - "LogPrintf((\"%s \" + fmt).c_str(), m_storage.GetDisplayName(), parameters...)"), - ("src/wallet/scriptpubkeyman.h", - "WalletLogPrintf(const std::string& fmt, const Params&... parameters)"), + ( + "src/wallet/wallet.h", + 'LogPrintf(("%s " + fmt).c_str(), GetDisplayName(), parameters...)', + ), + ( + "src/wallet/scriptpubkeyman.h", + "WalletLogPrintf(std::string fmt, Params... parameters)", + ), + ( + "src/wallet/scriptpubkeyman.h", + 'LogPrintf(("%s " + fmt).c_str(), m_storage.GetDisplayName(), parameters...)', + ), + ( + "src/wallet/scriptpubkeyman.h", + "WalletLogPrintf(const std::string& fmt, const Params&... parameters)", + ), ] FUNCTION_NAMES_AND_NUMBER_OF_LEADING_ARGUMENTS = [ @@ -66,13 +84,19 @@ >>> len(parse_function_calls("foo", "#define FOO foo();")) 0 """ - assert isinstance(function_name, str) and isinstance( - source_code, str) and function_name - lines = [re.sub("// .*", " ", line).strip() - for line in source_code.split("\n") - if not line.strip().startswith("#")] + assert ( + isinstance(function_name, str) + and isinstance(source_code, str) + and function_name + ) + lines = [ + re.sub("// .*", " ", line).strip() + for line in source_code.split("\n") + if not line.strip().startswith("#") + ] return re.findall( - r"[^a-zA-Z_](?=({}\(.*).*)".format(function_name), f" {' '.join(lines)}") + r"[^a-zA-Z_](?=({}\(.*).*)".format(function_name), f" {' '.join(lines)}" + ) def normalize(s): @@ -93,7 +117,7 @@ ESCAPE_MAP = { r"\n": "[escaped-newline]", r"\t": "[escaped-tab]", - r'\"': "[escaped-quote]", + r"\"": "[escaped-quote]", } @@ -180,20 +204,23 @@ >>> parse_function_call_and_arguments("strprintf", 'strprintf("%s (%d)", foo>foo<1,2>(1,2),err)'); ['strprintf(', '"%s (%d)",', ' foo>foo<1,2>(1,2),', 'err', ')'] """ - assert isinstance(function_name, str) and isinstance( - function_call, str) and function_name + assert ( + isinstance(function_name, str) + and isinstance(function_call, str) + and function_name + ) remaining = normalize(escape(function_call)) expected_function_call = f"{function_name}(" assert remaining.startswith(expected_function_call) parts = [expected_function_call] - remaining = remaining[len(expected_function_call):] + remaining = remaining[len(expected_function_call) :] open_parentheses = 1 open_template_arguments = 0 in_string = False parts.append("") for i, char in enumerate(remaining): parts.append(parts.pop() + char) - if char == "\"": + if char == '"': in_string = not in_string continue if in_string: @@ -211,12 +238,19 @@ break prev_char = remaining[i - 1] if i - 1 >= 0 else None next_char = remaining[i + 1] if i + 1 <= len(remaining) - 1 else None - if (char == "<" and next_char not in [" ", "<", "="] - and prev_char not in [" ", "<"]): + if ( + char == "<" + and next_char not in [" ", "<", "="] + and prev_char not in [" ", "<"] + ): open_template_arguments += 1 continue - if (char == ">" and next_char not in [" ", ">", "="] and - prev_char not in [" ", ">"] and open_template_arguments > 0): + if ( + char == ">" + and next_char not in [" ", ">", "="] + and prev_char not in [" ", ">"] + and open_template_arguments > 0 + ): open_template_arguments -= 1 if open_template_arguments > 0: continue @@ -249,7 +283,7 @@ string_content = "" in_string = False for char in normalize(escape(argument)): - if char == "\"": + if char == '"': in_string = not in_string elif in_string: string_content += char @@ -293,7 +327,7 @@ def main(args_in): - """ Return a string output with information on string format errors + """Return a string output with information on string format errors >>> main(["test/lint/lint-format-strings-tests.txt"]) test/lint/lint-format-strings-tests.txt: Expected 1 argument(s) after format string but found 2 argument(s): printf("%d", 1, 2) @@ -310,34 +344,54 @@ test/lint/lint-format-strings-tests-skip-arguments.txt: Expected 1 argument(s) after format string but found 0 argument(s): snprintf(skip1, skip2, "%d") test/lint/lint-format-strings-tests-skip-arguments.txt: Could not parse function call string "snprintf(...)": snprintf(skip1, "%d") """ - parser = argparse.ArgumentParser(description="This program checks that the number of arguments passed " - "to a variadic format string function matches the number of format " - "specifiers in the format string.") - parser.add_argument("file", type=argparse.FileType( - "r", encoding="utf-8"), nargs="*", help="C++ source code file (e.g. foo.cpp)") + parser = argparse.ArgumentParser( + description=( + "This program checks that the number of arguments passed " + "to a variadic format string function matches the number of format " + "specifiers in the format string." + ) + ) + parser.add_argument( + "file", + type=argparse.FileType("r", encoding="utf-8"), + nargs="*", + help="C++ source code file (e.g. foo.cpp)", + ) args = parser.parse_args(args_in) for f in args.file: file_content = f.read() - for (function_name, - skip_arguments) in FUNCTION_NAMES_AND_NUMBER_OF_LEADING_ARGUMENTS: - for function_call_str in parse_function_calls( - function_name, file_content): + for ( + function_name, + skip_arguments, + ) in FUNCTION_NAMES_AND_NUMBER_OF_LEADING_ARGUMENTS: + for function_call_str in parse_function_calls(function_name, file_content): parts = parse_function_call_and_arguments( - function_name, function_call_str) + function_name, function_call_str + ) relevant_function_call_str = unescape("".join(parts))[:512] if (f.name, relevant_function_call_str) in FALSE_POSITIVES: continue if len(parts) < 3 + skip_arguments: - print("{}: Could not parse function call string \"{}(...)\": {}".format( - f.name, function_name, relevant_function_call_str)) + print( + '{}: Could not parse function call string "{}(...)": {}'.format( + f.name, function_name, relevant_function_call_str + ) + ) continue argument_count = len(parts) - 3 - skip_arguments format_str = parse_string_content(parts[1 + skip_arguments]) format_specifier_count = count_format_specifiers(format_str) if format_specifier_count != argument_count: - print("{}: Expected {} argument(s) after format string but found {} argument(s): {}".format( - f.name, format_specifier_count, argument_count, relevant_function_call_str)) + print( + "{}: Expected {} argument(s) after format string but found {}" + " argument(s): {}".format( + f.name, + format_specifier_count, + argument_count, + relevant_function_call_str, + ) + ) continue diff --git a/test/util/bitcoin-util-test.py b/test/util/bitcoin-util-test.py --- a/test/util/bitcoin-util-test.py +++ b/test/util/bitcoin-util-test.py @@ -22,11 +22,12 @@ def main(): config = configparser.ConfigParser() - config.read_file(open(os.path.join(os.path.dirname( - __file__), "../config.ini"), encoding="utf8")) + config.read_file( + open(os.path.join(os.path.dirname(__file__), "../config.ini"), encoding="utf8") + ) parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument('-v', '--verbose', action='store_true') + parser.add_argument("-v", "--verbose", action="store_true") args = parser.parse_args() verbose = args.verbose @@ -34,16 +35,19 @@ level = logging.DEBUG else: level = logging.ERROR - formatter = '%(asctime)s - %(levelname)s - %(message)s' + formatter = "%(asctime)s - %(levelname)s - %(message)s" # Add the format/level to the logger logging.basicConfig(format=formatter, level=level) - bctester(os.path.join(config["environment"]["SRCDIR"], "test", - "util", "data"), "bitcoin-util-test.json", config["environment"]) + bctester( + os.path.join(config["environment"]["SRCDIR"], "test", "util", "data"), + "bitcoin-util-test.json", + config["environment"], + ) def bctester(testDir, input_basename, buildenv): - """ Loads and parses the input file, runs all tests and reports results""" + """Loads and parses the input file, runs all tests and reports results""" input_filename = os.path.join(testDir, input_basename) raw_data = open(input_filename, encoding="utf8").read() input_data = json.loads(raw_data) @@ -76,8 +80,9 @@ """ # Get the exec names and arguments execprog = os.path.join( - buildenv["BUILDDIR"], "src", testObj["exec"] + buildenv["EXEEXT"]) - execargs = testObj['args'] + buildenv["BUILDDIR"], "src", testObj["exec"] + buildenv["EXEEXT"] + ) + execargs = testObj["args"] execrun = [execprog] + execargs if buildenv["EMULATOR"]: execrun = [buildenv["EMULATOR"]] + execrun @@ -95,12 +100,11 @@ outputData = None outputType = None if "output_cmp" in testObj: - outputFn = testObj['output_cmp'] + outputFn = testObj["output_cmp"] # output type from file extension (determines how to compare) outputType = os.path.splitext(outputFn)[1][1:] try: - outputData = open(os.path.join(testDir, outputFn), - encoding="utf8").read() + outputData = open(os.path.join(testDir, outputFn), encoding="utf8").read() except OSError: logging.error(f"Output file {outputFn} can not be opened") raise @@ -108,13 +112,17 @@ logging.error(f"Output data missing for {outputFn}") raise Exception if not outputType: - logging.error( - f"Output file {outputFn} does not have a file extension") + logging.error(f"Output file {outputFn} does not have a file extension") raise Exception # Run the test - proc = subprocess.Popen(execrun, stdin=stdinCfg, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, universal_newlines=True) + proc = subprocess.Popen( + execrun, + stdin=stdinCfg, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) try: outs = proc.communicate(input=inputData) except OSError: @@ -127,14 +135,14 @@ try: a_parsed = parse_output(outs[0], outputType) except Exception as e: - logging.error( - f'Error parsing command output as {outputType}: {e}') + logging.error(f"Error parsing command output as {outputType}: {e}") raise try: b_parsed = parse_output(outputData, outputType) except Exception as e: - logging.error('Error parsing expected output {} as {}: {}'.format( - outputFn, outputType, e)) + logging.error( + f"Error parsing expected output {outputFn} as {outputType}: {e}" + ) raise # Compare data if a_parsed != b_parsed: @@ -143,11 +151,14 @@ # Compare formatting if outs[0] != outputData: error_message = f"Output formatting mismatch for {outputFn}:\n" - error_message += "".join(difflib.context_diff(outputData.splitlines(True), - outs[0].splitlines( - True), - fromfile=outputFn, - tofile="returned")) + error_message += "".join( + difflib.context_diff( + outputData.splitlines(True), + outs[0].splitlines(True), + fromfile=outputFn, + tofile="returned", + ) + ) logging.error(error_message) formatting_mismatch = True @@ -156,7 +167,7 @@ # Compare the return code to the expected return code wantRC = 0 if "return_code" in testObj: - wantRC = testObj['return_code'] + wantRC = testObj["return_code"] if proc.returncode != wantRC: logging.error(f"Return code mismatch for {outputFn}") raise Exception @@ -170,8 +181,9 @@ # linux through wine. Just assert that the expected error text appears # somewhere in stderr. if want_error not in outs[1]: - logging.error("Error mismatch:\n" + "Expected: " + - want_error + "\nReceived: " + outs[1].rstrip()) + logging.error( + f"Error mismatch:\nExpected: {want_error}\nReceived: {outs[1].rstrip()}" + ) raise Exception @@ -179,13 +191,13 @@ """Parse the output according to specified format. Raise an error if the output can't be parsed.""" - if fmt == 'json': # json: compare parsed data + if fmt == "json": # json: compare parsed data return json.loads(a) - elif fmt == 'hex': # hex: parse and compare binary data + elif fmt == "hex": # hex: parse and compare binary data return bytes.fromhex(a.strip()) else: raise NotImplementedError(f"Don't know how to compare {fmt}") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/test/util/rpcauth-test.py b/test/util/rpcauth-test.py --- a/test/util/rpcauth-test.py +++ b/test/util/rpcauth-test.py @@ -17,12 +17,14 @@ def setUp(self): config = configparser.ConfigParser() config_path = os.path.abspath( - os.path.join(os.sep, os.path.abspath(os.path.dirname(__file__)), - "../config.ini")) + os.path.join( + os.sep, os.path.abspath(os.path.dirname(__file__)), "../config.ini" + ) + ) with open(config_path, encoding="utf8") as config_file: config.read_file(config_file) - sys.path.insert(0, os.path.dirname(config['environment']['RPCAUTH'])) - self.rpcauth = importlib.import_module('rpcauth') + sys.path.insert(0, os.path.dirname(config["environment"]["RPCAUTH"])) + self.rpcauth = importlib.import_module("rpcauth") def test_generate_salt(self): for i in range(16, 32 + 1): @@ -31,7 +33,8 @@ def test_generate_password(self): password = self.rpcauth.generate_password() expected_password = base64.urlsafe_b64encode( - base64.urlsafe_b64decode(password)).decode('utf-8') + base64.urlsafe_b64decode(password) + ).decode("utf-8") self.assertEqual(expected_password, password) def test_check_password_hmac(self): @@ -39,12 +42,11 @@ password = self.rpcauth.generate_password() password_hmac = self.rpcauth.password_to_hmac(salt, password) - m = hmac.new(bytearray(salt, 'utf-8'), - bytearray(password, 'utf-8'), 'SHA256') + m = hmac.new(bytearray(salt, "utf-8"), bytearray(password, "utf-8"), "SHA256") expected_password_hmac = m.hexdigest() self.assertEqual(expected_password_hmac, password_hmac) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main()