diff --git a/.arclint b/.arclint index fa9e64d8a..ba2d59ca7 100644 --- a/.arclint +++ b/.arclint @@ -1,341 +1,341 @@ { "linters": { "generated": { "type": "generated" }, "clang-format": { "type": "clang-format", "version": ">=12.0", "bin": [ "clang-format-12", "clang-format" ], "include": "(^(src|chronik)/.*\\.(h|c|cpp|mm)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "autopep8": { "type": "autopep8", "version": ">=1.3.4", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)", - "(^test/functional/abc.*\\.py$)" + "(^test/functional/(abc|chronik).*\\.py$)" ], "flags": [ "--aggressive", "--ignore=W503,W504", "--max-line-length=88" ] }, "black": { "type": "black", "version": ">=23.0.0", "include": [ - "(^test/functional/abc.*\\.py$)" + "(^test/functional/(abc|chronik).*\\.py$)" ], "flags": [ "--experimental-string-processing" ] }, "flake8": { "type": "flake8", "version": ">=5.0", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ], "flags": [ "--ignore=A003,E203,E303,E305,E501,E704,W503,W504", "--require-plugins=flake8-comprehensions,flake8-builtins" ] }, "lint-format-strings": { "type": "lint-format-strings", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/test/fuzz/strprintf.cpp$)" ] }, "check-doc": { "type": "check-doc", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)" }, "lint-tests": { "type": "lint-tests", "include": "(^src/(seeder/|rpc/|wallet/)?test/.*\\.(cpp)$)" }, "phpcs": { "type": "phpcs", "include": "(\\.php$)", "exclude": [ "(^arcanist/__phutil_library_.+\\.php$)" ], "phpcs.standard": "arcanist/phpcs.xml" }, "lint-locale-dependence": { "type": "lint-locale-dependence", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(crypto/ctaes/|leveldb/|secp256k1/|tinyformat.h|univalue/))", "(^src/bench/nanobench.h$)" ] }, "lint-cheader": { "type": "lint-cheader", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "spelling": { "type": "spelling", "exclude": [ "(^build-aux/m4/)", "(^depends/)", "(^doc/release-notes/)", "(^contrib/gitian-builder/)", "(^src/(qt/locale|secp256k1|univalue|leveldb)/)", "(^test/lint/dictionary/)", "(package-lock.json)" ], "spelling.dictionaries": [ "test/lint/dictionary/english.json" ] }, "lint-assert-with-side-effects": { "type": "lint-assert-with-side-effects", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-include-quotes": { "type": "lint-include-quotes", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-include-guard": { "type": "lint-include-guard", "include": "(^(src|chronik)/.*\\.h$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/tinyformat.h$)" ] }, "lint-include-source": { "type": "lint-include-source", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-std-chrono": { "type": "lint-std-chrono", "include": "(^(src|chronik)/.*\\.(h|cpp)$)" }, "lint-stdint": { "type": "lint-stdint", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/compat/assumptions.h$)" ] }, "lint-source-filename": { "type": "lint-source-filename", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-boost-dependencies": { "type": "lint-boost-dependencies", "include": "(^(src|chronik)/.*\\.(h|cpp)$)" }, "lint-python-encoding": { "type": "lint-python-encoding", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "lint-python-shebang": { "type": "lint-python-shebang", "include": "(\\.py$)", "exclude": [ "(__init__\\.py$)", "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "lint-bash-shebang": { "type": "lint-bash-shebang", "include": "(\\.sh$)", "exclude": [ "(^contrib/gitian-builder/)" ] }, "shellcheck": { "type": "shellcheck", "version": ">=0.7.0", "flags": [ "--external-sources", "--source-path=SCRIPTDIR" ], "include": "(\\.sh$)", "exclude": [ "(^contrib/gitian-builder/)", "(^src/(secp256k1|univalue)/)" ] }, "lint-shell-locale": { "type": "lint-shell-locale", "include": "(\\.sh$)", "exclude": [ "(^contrib/gitian-builder/)", "(^src/(secp256k1|univalue)/)", "(^cmake/utils/log-and-print-on-failure.sh)" ] }, "lint-cpp-void-parameters": { "type": "lint-cpp-void-parameters", "include": "(^(src|chronik)/.*\\.(h|cpp)$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)", "(^src/compat/glibc_compat.cpp$)" ] }, "lint-logs": { "type": "lint-logs", "include": "(^(src|chronik)/.*\\.(h|cpp|rs)$)" }, "lint-qt": { "type": "lint-qt", "include": "(^src/qt/.*\\.(h|cpp)$)", "exclude": [ "(^src/qt/(locale|forms|res)/)" ] }, "lint-doxygen": { "type": "lint-doxygen", "include": "(^(src|chronik)/.*\\.(h|c|cpp)$)", "exclude": [ "(^src/(crypto/ctaes|secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "lint-whitespace": { "type": "lint-whitespace", "include": "(\\.(ac|am|cmake|conf|in|include|json|m4|md|openrc|php|pl|rs|sh|txt|yml)$)", "exclude": [ "(^contrib/gitian-builder/)", "(^src/(secp256k1|univalue|leveldb)/)", "(^src/bench/nanobench.h$)" ] }, "yamllint": { "type": "yamllint", "include": "(\\.(yml|yaml)$)", "exclude": "(^src/(secp256k1|univalue|leveldb)/)" }, "lint-check-nonfatal": { "type": "lint-check-nonfatal", "include": [ "(^src/rpc/.*\\.(h|c|cpp)$)", "(^src/wallet/rpc*.*\\.(h|c|cpp)$)" ], "exclude": "(^src/rpc/server.cpp)" }, "lint-markdown": { "type": "lint-markdown", "include": [ "(\\.md$)" ], "exclude": "(^contrib/gitian-builder/)" }, "lint-python-mypy": { "type": "lint-python-mypy", "version": ">=0.910", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)", "(^contrib/macdeploy/)" ], "flags": [ "--ignore-missing-imports", "--install-types", "--non-interactive" ] }, "lint-python-mutable-default": { "type": "lint-python-mutable-default", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "prettier": { "type": "prettier", "version": ">=2.6.0", "include": [ "(^cashtab/.*\\.(css|html|js|json|jsx|md|scss|ts|tsx)$)", "(^web/.*\\.(css|html|js|json|jsx|md|scss|ts|tsx)$)" ], "exclude": "(^web/.*/translations/.*\\.json$)" }, "lint-python-isort": { "type": "lint-python-isort", "version": ">=5.6.4", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] }, "rustfmt": { "type": "rustfmt", "version": ">=1.5.1", "include": "(\\.rs$)" }, "eslint": { "type": "eslint", "version": ">=8.0.0", "include": [ "(cashtab/.*\\.js$)", "(apps/alias-server/.*\\.js$)", "(modules/ecashaddrjs/.*\\.js$)", "(apps/ecash-herald/.*\\.js$)", "(modules/chronik-client/.*\\.(js|jsx|ts|tsx)$)" ] }, "lint-python-flynt": { "type": "lint-python-flynt", "version": ">=0.78", "include": "(\\.py$)", "exclude": [ "(^contrib/gitian-builder/)", "(^contrib/apple-sdk-tools/)" ] } } } diff --git a/test/functional/chronik_avalanche.py b/test/functional/chronik_avalanche.py index 93da02d12..ea38c35ee 100755 --- a/test/functional/chronik_avalanche.py +++ b/test/functional/chronik_avalanche.py @@ -1,134 +1,142 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test whether Chronik indexes the avalanche state correctly.""" from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, SCRIPTSIG_OP_TRUE, ) from test_framework.avatools import can_find_inv_in_poll, get_ava_p2p_interface from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.script import OP_RETURN, CScript from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal QUORUM_NODE_COUNT = 16 class ChronikAvalancheTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.extra_args = [ [ - '-avaproofstakeutxodustthreshold=1000000', - '-avaproofstakeutxoconfirmations=1', - '-avacooldown=0', - '-avaminquorumstake=0', - '-avaminavaproofsnodecount=0', - '-chronik', - '-whitelist=noban@127.0.0.1', + "-avaproofstakeutxodustthreshold=1000000", + "-avaproofstakeutxoconfirmations=1", + "-avacooldown=0", + "-avaminquorumstake=0", + "-avaminavaproofsnodecount=0", + "-chronik", + "-whitelist=noban@127.0.0.1", ], ] self.supports_cli = False self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) # Build a fake quorum of nodes. def get_quorum(): - return [get_ava_p2p_interface(self, node) - for _ in range(0, QUORUM_NODE_COUNT)] + return [ + get_ava_p2p_interface(self, node) for _ in range(0, QUORUM_NODE_COUNT) + ] def has_finalized_tip(tip_expected): hash_tip_final = int(tip_expected, 16) can_find_inv_in_poll(quorum, hash_tip_final) return node.isfinalblock(tip_expected) # Generate us a coin coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] + cointx = coinblock["tx"][0] # Mature coin self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) # Pick one node from the quorum for polling. quorum = get_quorum() - assert node.getavalancheinfo()['ready_to_poll'] is True + assert node.getavalancheinfo()["ready_to_poll"] is True # Build tx to finalize in a block coinvalue = 5000000000 tx = CTransaction() tx.nVersion = 2 - tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE, - nSequence=0xffffffff)] - tx.vout = [CTxOut(nValue=coinvalue - 10000, - scriptPubKey=CScript([OP_RETURN, bytes(100)]))] + tx.vin = [ + CTxIn( + outpoint=COutPoint(int(cointx, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE, + nSequence=0xFFFFFFFF, + ) + ] + tx.vout = [ + CTxOut( + nValue=coinvalue - 10000, scriptPubKey=CScript([OP_RETURN, bytes(100)]) + ) + ] # Add to mempool txid = node.sendrawtransaction(tx.serialize().hex()) # Tx not finalized assert_equal(chronik.tx(txid).ok().block.is_final, False) # Mine block tip = self.generate(node, 1)[-1] # Not finalized yet assert_equal(chronik.block(tip).ok().block_info.is_final, False) assert_equal(chronik.tx(txid).ok().block.is_final, False) # After we wait, both block and tx are finalized self.wait_until(lambda: has_finalized_tip(tip)) assert_equal(chronik.block(tip).ok().block_info.is_final, True) assert_equal(chronik.tx(txid).ok().block.is_final, True) # Restarting "wipes" the finalization status of blocks... - self.restart_node(0, self.extra_args[0] + ['-chronikreindex']) + self.restart_node(0, self.extra_args[0] + ["-chronikreindex"]) assert_equal(chronik.block(tip).ok().block_info.is_final, False) assert_equal(chronik.tx(txid).ok().block.is_final, False) # ...so we establish a new quorum and poll again quorum = get_quorum() self.wait_until(lambda: has_finalized_tip(tip)) assert_equal(chronik.block(tip).ok().block_info.is_final, True) assert_equal(chronik.tx(txid).ok().block.is_final, True) # Generate 10 blocks to invalidate, wait for Avalanche new_block_hashes = self.generate(node, 10) self.wait_until(lambda: has_finalized_tip(new_block_hashes[-1])) for block_hash in new_block_hashes: assert_equal(chronik.block(block_hash).ok().block_info.is_final, True) # After invalidation, blocks not found node.invalidateblock(new_block_hashes[0]) for block_hash in new_block_hashes: chronik.block(block_hash).err(404) # After reconsidering, blocks are not final again node.reconsiderblock(new_block_hashes[-1]) for block_hash in new_block_hashes: assert_equal(chronik.block(block_hash).ok().block_info.is_final, False) # Have to mine another block until avalanche considers reconsidering self.generate(node, 1) self.wait_until(lambda: has_finalized_tip(new_block_hashes[-1])) for block_hash in new_block_hashes: assert_equal(chronik.block(block_hash).ok().block_info.is_final, True) -if __name__ == '__main__': +if __name__ == "__main__": ChronikAvalancheTest().main() diff --git a/test/functional/chronik_block.py b/test/functional/chronik_block.py index 579d87361..8c7cc70de 100644 --- a/test/functional/chronik_block.py +++ b/test/functional/chronik_block.py @@ -1,135 +1,147 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /block endpoint. """ from test_framework.address import ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE from test_framework.blocktools import GENESIS_BLOCK_HASH, TIME_GENESIS_BLOCK from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikBlockTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) expected_genesis_block = pb.Block( block_info=pb.BlockInfo( hash=bytes.fromhex(GENESIS_BLOCK_HASH)[::-1], prev_hash=bytes(32), height=0, - n_bits=0x207fffff, + n_bits=0x207FFFFF, timestamp=TIME_GENESIS_BLOCK, block_size=285, num_txs=1, num_inputs=1, num_outputs=1, sum_input_sats=0, sum_coinbase_output_sats=5000000000, sum_normal_output_sats=0, sum_burned_sats=0, ), ) # Not a valid hash or height - assert_equal(chronik.block('1234f').err(400).msg, - '400: Not a hash or height: 1234f') - assert_equal(chronik.block('00' * 31).err(400).msg, - f'400: Not a hash or height: {"00"*31}') - assert_equal(chronik.block('01').err(400).msg, '400: Not a hash or height: 01') - assert_equal(chronik.block('12345678901').err(400).msg, - '400: Not a hash or height: 12345678901') + assert_equal( + chronik.block("1234f").err(400).msg, "400: Not a hash or height: 1234f" + ) + assert_equal( + chronik.block("00" * 31).err(400).msg, + f'400: Not a hash or height: {"00"*31}', + ) + assert_equal(chronik.block("01").err(400).msg, "400: Not a hash or height: 01") + assert_equal( + chronik.block("12345678901").err(400).msg, + "400: Not a hash or height: 12345678901", + ) # Query genesis block using height assert_equal(chronik.block(0).ok(), expected_genesis_block) # Or hash assert_equal(chronik.block(GENESIS_BLOCK_HASH).ok(), expected_genesis_block) # Block 1 not found - assert_equal(chronik.block(1).err(404).msg, '404: Block not found: 1') + assert_equal(chronik.block(1).err(404).msg, "404: Block not found: 1") # Block "0000...0000" not found - assert_equal(chronik.block('00' * 32).err(404).msg, - f'404: Block not found: {"00"*32}') + assert_equal( + chronik.block("00" * 32).err(404).msg, f'404: Block not found: {"00"*32}' + ) # Generate 100 blocks, verify they form a chain - block_hashes = ( - [GENESIS_BLOCK_HASH] + - self.generatetoaddress(node, 100, ADDRESS_ECREG_P2SH_OP_TRUE) + block_hashes = [GENESIS_BLOCK_HASH] + self.generatetoaddress( + node, 100, ADDRESS_ECREG_P2SH_OP_TRUE ) for i in range(1, 101): proto_block = chronik.block(i).ok() - assert_equal(proto_block, pb.Block( - block_info=pb.BlockInfo( - hash=bytes.fromhex(block_hashes[i])[::-1], - prev_hash=bytes.fromhex(block_hashes[i - 1])[::-1], - height=i, - n_bits=0x207fffff, - timestamp=proto_block.block_info.timestamp, - block_size=181, - num_txs=1, - num_inputs=1, - num_outputs=1, - sum_input_sats=0, - sum_coinbase_output_sats=5000000000, - sum_normal_output_sats=0, - sum_burned_sats=0, + assert_equal( + proto_block, + pb.Block( + block_info=pb.BlockInfo( + hash=bytes.fromhex(block_hashes[i])[::-1], + prev_hash=bytes.fromhex(block_hashes[i - 1])[::-1], + height=i, + n_bits=0x207FFFFF, + timestamp=proto_block.block_info.timestamp, + block_size=181, + num_txs=1, + num_inputs=1, + num_outputs=1, + sum_input_sats=0, + sum_coinbase_output_sats=5000000000, + sum_normal_output_sats=0, + sum_burned_sats=0, + ), ), - )) + ) assert_equal(proto_block, chronik.block(block_hashes[i]).ok()) block_hashes.append(proto_block.block_info.hash) # Invalidate in the middle of the chain node.invalidateblock(block_hashes[50]) # Gives 404 for the invalidated blocks for i in range(50, 101): - assert_equal(chronik.block(i).err(404).msg, f'404: Block not found: {i}') + assert_equal(chronik.block(i).err(404).msg, f"404: Block not found: {i}") assert_equal( chronik.block(block_hashes[i]).err(404).msg, - f'404: Block not found: {block_hashes[i]}') + f"404: Block not found: {block_hashes[i]}", + ) # Previous blocks are still fine for i in range(0, 50): chronik.block(i).ok() chronik.block(block_hashes[i]).ok() # Mine fork block and check it connects fork_hash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] proto_block = chronik.block(50).ok() - assert_equal(proto_block, pb.Block( - block_info=pb.BlockInfo( - hash=bytes.fromhex(fork_hash)[::-1], - prev_hash=bytes.fromhex(block_hashes[49])[::-1], - height=50, - n_bits=0x207fffff, - timestamp=proto_block.block_info.timestamp, - block_size=181, - num_txs=1, - num_inputs=1, - num_outputs=1, - sum_input_sats=0, - sum_coinbase_output_sats=5000000000, - sum_normal_output_sats=0, - sum_burned_sats=0, + assert_equal( + proto_block, + pb.Block( + block_info=pb.BlockInfo( + hash=bytes.fromhex(fork_hash)[::-1], + prev_hash=bytes.fromhex(block_hashes[49])[::-1], + height=50, + n_bits=0x207FFFFF, + timestamp=proto_block.block_info.timestamp, + block_size=181, + num_txs=1, + num_inputs=1, + num_outputs=1, + sum_input_sats=0, + sum_coinbase_output_sats=5000000000, + sum_normal_output_sats=0, + sum_burned_sats=0, + ), ), - )) + ) assert_equal(chronik.block(fork_hash).ok(), proto_block) -if __name__ == '__main__': +if __name__ == "__main__": ChronikBlockTest().main() diff --git a/test/functional/chronik_block_info.py b/test/functional/chronik_block_info.py index 731c3328a..2cdd30119 100644 --- a/test/functional/chronik_block_info.py +++ b/test/functional/chronik_block_info.py @@ -1,126 +1,134 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test if the `BlockInfo` fields are set correctly in Chronik. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import ( create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.script import OP_RETURN, CScript from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikBlockInfoTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] node.setmocktime(1300000000) - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) peer = node.add_p2p_connection(P2PDataStore()) coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] + cointx = coinblock["tx"][0] prev_hash = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] coinvalue = 5000000000 tx = CTransaction() - tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx.vin = [ + CTxIn(outpoint=COutPoint(int(cointx, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) + ] tx.vout = [ CTxOut(coinvalue - 10000, P2SH_OP_TRUE), - CTxOut(1000, CScript([OP_RETURN, b'test'])), + CTxOut(1000, CScript([OP_RETURN, b"test"])), ] tx.rehash() txid = node.sendrawtransaction(tx.serialize().hex()) tip_hash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[-1] - assert_equal(chronik.block(tip_hash).ok(), pb.Block( - block_info=pb.BlockInfo( - hash=bytes.fromhex(tip_hash)[::-1], - prev_hash=bytes.fromhex(prev_hash)[::-1], - height=102, - n_bits=0x207fffff, - timestamp=1300000018, - block_size=281, - num_txs=2, - num_inputs=2, - num_outputs=3, - sum_input_sats=coinvalue, - sum_coinbase_output_sats=coinvalue + 9000, - sum_normal_output_sats=coinvalue - 9000, - sum_burned_sats=1000, + assert_equal( + chronik.block(tip_hash).ok(), + pb.Block( + block_info=pb.BlockInfo( + hash=bytes.fromhex(tip_hash)[::-1], + prev_hash=bytes.fromhex(prev_hash)[::-1], + height=102, + n_bits=0x207FFFFF, + timestamp=1300000018, + block_size=281, + num_txs=2, + num_inputs=2, + num_outputs=3, + sum_input_sats=coinvalue, + sum_coinbase_output_sats=coinvalue + 9000, + sum_normal_output_sats=coinvalue - 9000, + sum_burned_sats=1000, + ), ), - )) + ) node.invalidateblock(tip_hash) chronik.block(tip_hash).err(404) tx2 = CTransaction() - tx2.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx2.vin = [ + CTxIn(outpoint=COutPoint(int(txid, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) + ] tx2.vout = [ - CTxOut(3000, CScript([OP_RETURN, b'test'])), - CTxOut(5000, CScript([OP_RETURN, b'test'])), + CTxOut(3000, CScript([OP_RETURN, b"test"])), + CTxOut(5000, CScript([OP_RETURN, b"test"])), CTxOut(coinvalue - 20000, P2SH_OP_TRUE), ] tx2.rehash() - block = create_block(int(prev_hash, 16), - create_coinbase(102, b'\x03' * 33), - 1300000500) + block = create_block( + int(prev_hash, 16), create_coinbase(102, b"\x03" * 33), 1300000500 + ) block.vtx += [tx, tx2] make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) - assert_equal(chronik.block(block.hash).ok(), pb.Block( - block_info=pb.BlockInfo( - hash=bytes.fromhex(block.hash)[::-1], - prev_hash=bytes.fromhex(prev_hash)[::-1], - height=102, - n_bits=0x207fffff, - timestamp=1300000500, - block_size=403, - num_txs=3, - num_inputs=3, - num_outputs=7, - sum_input_sats=coinvalue * 2 - 10000, - sum_coinbase_output_sats=coinvalue, - sum_normal_output_sats=coinvalue * 2 - 21000, - sum_burned_sats=9000, + assert_equal( + chronik.block(block.hash).ok(), + pb.Block( + block_info=pb.BlockInfo( + hash=bytes.fromhex(block.hash)[::-1], + prev_hash=bytes.fromhex(prev_hash)[::-1], + height=102, + n_bits=0x207FFFFF, + timestamp=1300000500, + block_size=403, + num_txs=3, + num_inputs=3, + num_outputs=7, + sum_input_sats=coinvalue * 2 - 10000, + sum_coinbase_output_sats=coinvalue, + sum_normal_output_sats=coinvalue * 2 - 21000, + sum_burned_sats=9000, + ), ), - )) + ) -if __name__ == '__main__': +if __name__ == "__main__": ChronikBlockInfoTest().main() diff --git a/test/functional/chronik_block_txs.py b/test/functional/chronik_block_txs.py index eaeba4af3..ecec6c3e2 100644 --- a/test/functional/chronik_block_txs.py +++ b/test/functional/chronik_block_txs.py @@ -1,257 +1,267 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /block-txs/:hash_or_height endpoint. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import ( GENESIS_BLOCK_HASH, create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.script import OP_RETURN, CScript from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikBlockTxsTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb from test_framework.chronik.test_data import genesis_cb_tx node = self.nodes[0] node.setmocktime(1300000000) - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) peer = node.add_p2p_connection(P2PDataStore()) # Not a valid hash or height - assert_equal(chronik.block_txs('1234f').err(400).msg, - '400: Not a hash or height: 1234f') - assert_equal(chronik.block_txs('00' * 31).err(400).msg, - f'400: Not a hash or height: {"00"*31}') assert_equal( - chronik.block_txs('01').err(400).msg, - '400: Not a hash or height: 01') - assert_equal(chronik.block_txs('12345678901').err(400).msg, - '400: Not a hash or height: 12345678901') + chronik.block_txs("1234f").err(400).msg, "400: Not a hash or height: 1234f" + ) + assert_equal( + chronik.block_txs("00" * 31).err(400).msg, + f'400: Not a hash or height: {"00"*31}', + ) + assert_equal( + chronik.block_txs("01").err(400).msg, "400: Not a hash or height: 01" + ) + assert_equal( + chronik.block_txs("12345678901").err(400).msg, + "400: Not a hash or height: 12345678901", + ) assert_equal( - chronik.block_txs('00' * 32, page=0, page_size=201).err(400).msg, - '400: Requested block tx page size 201 is too big, maximum is 200') + chronik.block_txs("00" * 32, page=0, page_size=201).err(400).msg, + "400: Requested block tx page size 201 is too big, maximum is 200", + ) assert_equal( - chronik.block_txs('00' * 32, page=0, page_size=0).err(400).msg, - '400: Requested block tx page size 0 is too small, minimum is 1') + chronik.block_txs("00" * 32, page=0, page_size=0).err(400).msg, + "400: Requested block tx page size 0 is too small, minimum is 1", + ) assert_equal( - chronik.block_txs('00' * 32, page=0, page_size=2**32).err(400).msg, - '400: Invalid param page_size: 4294967296, ' + - 'number too large to fit in target type') + chronik.block_txs("00" * 32, page=0, page_size=2**32).err(400).msg, + "400: Invalid param page_size: 4294967296, " + + "number too large to fit in target type", + ) assert_equal( - chronik.block_txs('00' * 32, page=2**32, page_size=1).err(400).msg, - '400: Invalid param page: 4294967296, ' + - 'number too large to fit in target type') + chronik.block_txs("00" * 32, page=2**32, page_size=1).err(400).msg, + "400: Invalid param page: 4294967296, " + + "number too large to fit in target type", + ) assert_equal( chronik.block_txs(GENESIS_BLOCK_HASH, page=2**32 - 1, page_size=200).ok(), pb.TxHistoryPage(txs=[], num_pages=1, num_txs=1), ) assert_equal( chronik.block_txs(GENESIS_BLOCK_HASH).ok(), pb.TxHistoryPage( txs=[genesis_cb_tx()], num_pages=1, num_txs=1, ), ) coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] + cointx = coinblock["tx"][0] tip = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] coinvalue = 5000000000 tx1 = CTransaction() - tx1.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx1.vin = [ + CTxIn(outpoint=COutPoint(int(cointx, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) + ] tx1.vout = [ CTxOut(coinvalue - 10000, P2SH_OP_TRUE), - CTxOut(1000, CScript([OP_RETURN, b'test'])), + CTxOut(1000, CScript([OP_RETURN, b"test"])), ] tx1.rehash() tx2 = CTransaction() - tx2.vin = [CTxIn(outpoint=COutPoint(int(tx1.hash, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx2.vin = [ + CTxIn(outpoint=COutPoint(int(tx1.hash, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) + ] tx2.vout = [ - CTxOut(3000, CScript([OP_RETURN, b'test'])), + CTxOut(3000, CScript([OP_RETURN, b"test"])), CTxOut(coinvalue - 20000, P2SH_OP_TRUE), ] tx2.rehash() - tx_coinbase = create_coinbase(102, b'\x03' * 33) + tx_coinbase = create_coinbase(102, b"\x03" * 33) - block = create_block(int(tip, 16), - tx_coinbase, - 1300000500) + block = create_block(int(tip, 16), tx_coinbase, 1300000500) block.vtx += [tx1, tx2] make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) block_metadata = pb.BlockMetadata( height=102, hash=bytes.fromhex(block.hash)[::-1], timestamp=1300000500, ) proto_coinbase_tx = pb.Tx( txid=bytes.fromhex(tx_coinbase.hash)[::-1], version=1, inputs=[ pb.TxInput( - prev_out=pb.OutPoint(txid=bytes(32), out_idx=0xffffffff), + prev_out=pb.OutPoint(txid=bytes(32), out_idx=0xFFFFFFFF), input_script=bytes(tx_coinbase.vin[0].scriptSig), - sequence_no=0xffffffff, + sequence_no=0xFFFFFFFF, ), ], outputs=[ pb.TxOutput( value=coinvalue, output_script=bytes(tx_coinbase.vout[0].scriptPubKey), ), pb.TxOutput( output_script=bytes(CScript([OP_RETURN])), ), ], lock_time=0, block=block_metadata, size=len(tx_coinbase.serialize()), is_coinbase=True, ) proto_tx1 = pb.Tx( txid=bytes.fromhex(tx1.hash)[::-1], version=1, inputs=[ pb.TxInput( prev_out=pb.OutPoint(txid=bytes.fromhex(cointx)[::-1], out_idx=0), input_script=bytes(SCRIPTSIG_OP_TRUE), output_script=bytes(P2SH_OP_TRUE), value=coinvalue, sequence_no=0, ), ], outputs=[ pb.TxOutput( value=coinvalue - 10000, output_script=bytes(P2SH_OP_TRUE), spent_by=pb.SpentBy( txid=bytes.fromhex(tx2.hash)[::-1], input_idx=0, ), ), pb.TxOutput( value=1000, - output_script=bytes(CScript([OP_RETURN, b'test'])), + output_script=bytes(CScript([OP_RETURN, b"test"])), ), ], lock_time=0, size=len(tx1.serialize()), block=block_metadata, ) proto_tx2 = pb.Tx( txid=bytes.fromhex(tx2.hash)[::-1], version=1, inputs=[ pb.TxInput( prev_out=pb.OutPoint(txid=bytes.fromhex(tx1.hash)[::-1], out_idx=0), input_script=bytes(SCRIPTSIG_OP_TRUE), output_script=bytes(P2SH_OP_TRUE), value=coinvalue - 10000, sequence_no=0, ), ], outputs=[ pb.TxOutput( value=3000, - output_script=bytes(CScript([OP_RETURN, b'test'])), + output_script=bytes(CScript([OP_RETURN, b"test"])), ), pb.TxOutput( value=coinvalue - 20000, output_script=bytes(P2SH_OP_TRUE), ), ], lock_time=0, size=len(tx2.serialize()), block=block_metadata, ) sorted_tx1, sorted_tx2 = sorted( - [proto_tx1, proto_tx2], key=lambda tx: tx.txid[::-1]) + [proto_tx1, proto_tx2], key=lambda tx: tx.txid[::-1] + ) for page, tx in enumerate([proto_coinbase_tx, sorted_tx1, sorted_tx2]): assert_equal( chronik.block_txs(block.hash, page=page, page_size=1).ok(), pb.TxHistoryPage( txs=[tx], num_pages=3, num_txs=3, ), ) assert_equal( chronik.block_txs(block.hash).ok(), pb.TxHistoryPage( txs=[proto_coinbase_tx, sorted_tx1, sorted_tx2], num_pages=1, num_txs=3, ), ) assert_equal( chronik.block_txs(block.hash, page=0, page_size=2).ok(), pb.TxHistoryPage( txs=[proto_coinbase_tx, sorted_tx1], num_pages=2, num_txs=3, ), ) assert_equal( chronik.block_txs(block.hash, page=1, page_size=2).ok(), pb.TxHistoryPage( txs=[sorted_tx2], num_pages=2, num_txs=3, ), ) node.invalidateblock(block.hash) chronik.block_txs(block.hash).err(404) -if __name__ == '__main__': +if __name__ == "__main__": ChronikBlockTxsTest().main() diff --git a/test/functional/chronik_blockchain_info.py b/test/functional/chronik_blockchain_info.py index 3c5190e98..88b5277a1 100644 --- a/test/functional/chronik_blockchain_info.py +++ b/test/functional/chronik_blockchain_info.py @@ -1,51 +1,60 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /blockchain-info endpoint. """ from test_framework.address import ADDRESS_ECREG_UNSPENDABLE from test_framework.blocktools import GENESIS_BLOCK_HASH from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikBlockchainInfoTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) - assert_equal(chronik.blockchain_info().ok(), pb.BlockchainInfo( - tip_hash=bytes.fromhex(GENESIS_BLOCK_HASH)[::-1], - tip_height=0, - )) + assert_equal( + chronik.blockchain_info().ok(), + pb.BlockchainInfo( + tip_hash=bytes.fromhex(GENESIS_BLOCK_HASH)[::-1], + tip_height=0, + ), + ) block_hashes = self.generatetoaddress(node, 12, ADDRESS_ECREG_UNSPENDABLE) - assert_equal(chronik.blockchain_info().ok(), pb.BlockchainInfo( - tip_hash=bytes.fromhex(block_hashes[11])[::-1], - tip_height=12, - )) + assert_equal( + chronik.blockchain_info().ok(), + pb.BlockchainInfo( + tip_hash=bytes.fromhex(block_hashes[11])[::-1], + tip_height=12, + ), + ) node.invalidateblock(block_hashes[6]) - assert_equal(chronik.blockchain_info().ok(), pb.BlockchainInfo( - tip_hash=bytes.fromhex(block_hashes[5])[::-1], - tip_height=6, - )) + assert_equal( + chronik.blockchain_info().ok(), + pb.BlockchainInfo( + tip_hash=bytes.fromhex(block_hashes[5])[::-1], + tip_height=6, + ), + ) -if __name__ == '__main__': +if __name__ == "__main__": ChronikBlockchainInfoTest().main() diff --git a/test/functional/chronik_blocks.py b/test/functional/chronik_blocks.py index 8fb69887f..e8cc71322 100644 --- a/test/functional/chronik_blocks.py +++ b/test/functional/chronik_blocks.py @@ -1,94 +1,104 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /blocks/:start_height/:end_height endpoint. """ from test_framework.address import ADDRESS_ECREG_UNSPENDABLE from test_framework.blocktools import GENESIS_BLOCK_HASH, TIME_GENESIS_BLOCK from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikBlockRangeTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] node.setmocktime(1300000000) - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) - assert_equal(chronik.blocks(-1, 0).err(400).msg, - '400: Invalid block start height: -1') - assert_equal(chronik.blocks(-2**31, 0).err(400).msg, - f'400: Invalid block start height: {-2**31}') - assert_equal(chronik.blocks(2, 1).err(400).msg, - '400: Invalid block end height: 1') + assert_equal( + chronik.blocks(-1, 0).err(400).msg, "400: Invalid block start height: -1" + ) + assert_equal( + chronik.blocks(-(2**31), 0).err(400).msg, + f"400: Invalid block start height: {-2**31}", + ) + assert_equal( + chronik.blocks(2, 1).err(400).msg, "400: Invalid block end height: 1" + ) assert_equal( chronik.blocks(1, 501).err(400).msg, - '400: Blocks page size too large, may not be above 500 but got 501') + "400: Blocks page size too large, may not be above 500 but got 501", + ) # Doesn't overflow: assert_equal( chronik.blocks(0, 2**31 - 1).err(400).msg, - f'400: Blocks page size too large, may not be above 500 but got {2**31}') + f"400: Blocks page size too large, may not be above 500 but got {2**31}", + ) genesis_info = pb.BlockInfo( hash=bytes.fromhex(GENESIS_BLOCK_HASH)[::-1], prev_hash=bytes(32), height=0, - n_bits=0x207fffff, + n_bits=0x207FFFFF, timestamp=TIME_GENESIS_BLOCK, block_size=285, num_txs=1, num_inputs=1, num_outputs=1, sum_input_sats=0, sum_coinbase_output_sats=5000000000, sum_normal_output_sats=0, sum_burned_sats=0, ) assert_equal(chronik.blocks(0, 100).ok(), pb.Blocks(blocks=[genesis_info])) assert_equal(chronik.blocks(0, 0).ok(), pb.Blocks(blocks=[genesis_info])) assert_equal(chronik.blocks(500, 500).ok(), pb.Blocks(blocks=[])) assert_equal(chronik.blocks(1, 500).ok(), pb.Blocks(blocks=[])) assert_equal(chronik.blocks(500, 999).ok(), pb.Blocks(blocks=[])) - assert_equal(chronik.blocks(2**31 - 500, 2**31 - 1).ok(), pb.Blocks(blocks=[])) + assert_equal( + chronik.blocks(2**31 - 500, 2**31 - 1).ok(), pb.Blocks(blocks=[]) + ) block_hashes = [GENESIS_BLOCK_HASH] block_hashes += self.generatetoaddress(node, 12, ADDRESS_ECREG_UNSPENDABLE) assert_equal( chronik.blocks(8, 12).ok(), - pb.Blocks(blocks=[ - pb.BlockInfo( - hash=bytes.fromhex(block_hashes[height])[::-1], - prev_hash=bytes.fromhex(block_hashes[height - 1])[::-1], - height=height, - n_bits=0x207fffff, - timestamp=1300000003, - block_size=181, - num_txs=1, - num_inputs=1, - num_outputs=1, - sum_input_sats=0, - sum_coinbase_output_sats=5000000000, - sum_normal_output_sats=0, - sum_burned_sats=0, - ) - for height in range(8, 13) - ]), + pb.Blocks( + blocks=[ + pb.BlockInfo( + hash=bytes.fromhex(block_hashes[height])[::-1], + prev_hash=bytes.fromhex(block_hashes[height - 1])[::-1], + height=height, + n_bits=0x207FFFFF, + timestamp=1300000003, + block_size=181, + num_txs=1, + num_inputs=1, + num_outputs=1, + sum_input_sats=0, + sum_coinbase_output_sats=5000000000, + sum_normal_output_sats=0, + sum_burned_sats=0, + ) + for height in range(8, 13) + ] + ), ) -if __name__ == '__main__': +if __name__ == "__main__": ChronikBlockRangeTest().main() diff --git a/test/functional/chronik_disallow_prune.py b/test/functional/chronik_disallow_prune.py index 5abebc55e..51640b2a7 100644 --- a/test/functional/chronik_disallow_prune.py +++ b/test/functional/chronik_disallow_prune.py @@ -1,28 +1,29 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Verify combining -prune and -chronik results in an init error. """ from test_framework.test_framework import BitcoinTestFramework class ChronikDisallowPruneTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): self.nodes[0].stop_node() self.nodes[0].assert_start_raises_init_error( ["-chronik", "-prune=1000"], - "Error: Prune mode is incompatible with -chronik.") + "Error: Prune mode is incompatible with -chronik.", + ) -if __name__ == '__main__': +if __name__ == "__main__": ChronikDisallowPruneTest().main() diff --git a/test/functional/chronik_raw_tx.py b/test/functional/chronik_raw_tx.py index 46b187f8b..032373bc6 100644 --- a/test/functional/chronik_raw_tx.py +++ b/test/functional/chronik_raw_tx.py @@ -1,93 +1,105 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /raw-tx/:txid endpoint. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import GENESIS_CB_TXID from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.script import OP_EQUAL, OP_HASH160, CScript, hash160 from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikRawTxTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) - assert_equal(chronik.tx('0').err(400).msg, '400: Not a txid: 0') - assert_equal(chronik.tx('123').err(400).msg, '400: Not a txid: 123') - assert_equal(chronik.tx('1234f').err(400).msg, '400: Not a txid: 1234f') - assert_equal(chronik.tx('00' * 31).err(400).msg, f'400: Not a txid: {"00"*31}') - assert_equal(chronik.tx('01').err(400).msg, '400: Not a txid: 01') - assert_equal(chronik.tx('12345678901').err(400).msg, - '400: Not a txid: 12345678901') + assert_equal(chronik.tx("0").err(400).msg, "400: Not a txid: 0") + assert_equal(chronik.tx("123").err(400).msg, "400: Not a txid: 123") + assert_equal(chronik.tx("1234f").err(400).msg, "400: Not a txid: 1234f") + assert_equal(chronik.tx("00" * 31).err(400).msg, f'400: Not a txid: {"00"*31}') + assert_equal(chronik.tx("01").err(400).msg, "400: Not a txid: 01") + assert_equal( + chronik.tx("12345678901").err(400).msg, "400: Not a txid: 12345678901" + ) - assert_equal(chronik.tx('00' * 32).err(404).msg, - f'404: Transaction {"00"*32} not found in the index') + assert_equal( + chronik.tx("00" * 32).err(404).msg, + f'404: Transaction {"00"*32} not found in the index', + ) # Verify queried genesis tx matches # Note: unlike getrawtransaction, this also works on the Genesis coinbase assert_equal( chronik.raw_tx(GENESIS_CB_TXID).ok(), - pb.RawTx(raw_tx=bytes.fromhex( - '0100000001000000000000000000000000000000000000000000000000000000000000' - '0000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f323030' - '39204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e6420626169' - '6c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe' - '5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4' - 'f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000' - )), + pb.RawTx( + raw_tx=bytes.fromhex( + "0100000001000000000000000000000000000000000000000000000000000000000000" + "0000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f323030" + "39204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e6420626169" + "6c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe" + "5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4" + "f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000" + ) + ), ) coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] + cointx = coinblock["tx"][0] self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) coinvalue = 5000000000 send_values = [coinvalue - 10000, 1000, 2000, 3000] send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] - send_scripts = [CScript([OP_HASH160, hash160(redeem_script), OP_EQUAL]) - for redeem_script in send_redeem_scripts] + send_scripts = [ + CScript([OP_HASH160, hash160(redeem_script), OP_EQUAL]) + for redeem_script in send_redeem_scripts + ] tx = CTransaction() tx.nVersion = 2 - tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE, - nSequence=0xfffffffe)] - tx.vout = [CTxOut(value, script) - for (value, script) in zip(send_values, send_scripts)] + tx.vin = [ + CTxIn( + outpoint=COutPoint(int(cointx, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE, + nSequence=0xFFFFFFFE, + ) + ] + tx.vout = [ + CTxOut(value, script) for (value, script) in zip(send_values, send_scripts) + ] tx.nLockTime = 1234567890 # Submit tx to mempool raw_tx = tx.serialize() txid = node.sendrawtransaction(raw_tx.hex()) assert_equal(chronik.raw_tx(txid).ok(), pb.RawTx(raw_tx=raw_tx)) # Mined block still works self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE) assert_equal(chronik.raw_tx(txid).ok(), pb.RawTx(raw_tx=raw_tx)) -if __name__ == '__main__': +if __name__ == "__main__": ChronikRawTxTest().main() diff --git a/test/functional/chronik_resync.py b/test/functional/chronik_resync.py index f0376e281..41b179668 100644 --- a/test/functional/chronik_resync.py +++ b/test/functional/chronik_resync.py @@ -1,119 +1,122 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import os import shutil from test_framework.address import ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE from test_framework.blocktools import GENESIS_BLOCK_HASH from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal, get_datadir_path class ChronikResyncTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient + node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) # Mine 100 blocks, that Chronik doesn't index - block_hashes = ( - [GENESIS_BLOCK_HASH] + - self.generatetoaddress(node, 100, ADDRESS_ECREG_P2SH_OP_TRUE) + block_hashes = [GENESIS_BLOCK_HASH] + self.generatetoaddress( + node, 100, ADDRESS_ECREG_P2SH_OP_TRUE ) # Restart with Chronik: syncs blocks from genesis - with node.assert_debug_log([ - f"Chronik database empty, syncing to block {block_hashes[100]} " + - "at height 100.", - ]): - self.restart_node(0, ['-chronik']) + with node.assert_debug_log( + [ + f"Chronik database empty, syncing to block {block_hashes[100]} " + + "at height 100.", + ] + ): + self.restart_node(0, ["-chronik"]) for i in range(0, 101): proto_block = chronik.block(i).ok() assert_equal(proto_block.block_info.hash[::-1].hex(), block_hashes[i]) chronik.block(101).err(404) self.restart_node(0, []) # Without Chronik: Undo last 50 blocks, then add 100 new ones node.invalidateblock(block_hashes[50]) chronik_hash = block_hashes[100] del block_hashes[50:] - block_hashes += ( - self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) - ) + block_hashes += self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) # Restart with Chronik: Undoes last 50 blocks, then adds node's next 100 - with node.assert_debug_log([ - f"Node and Chronik diverged, node is on block {block_hashes[149]} " + - f"at height 149, and Chronik is on block {chronik_hash} at height 100.", - f"The last common block is {block_hashes[49]} at height 49.", - "Reverting Chronik blocks 50 to 100", - ]): - self.restart_node(0, ['-chronik']) + with node.assert_debug_log( + [ + f"Node and Chronik diverged, node is on block {block_hashes[149]} " + + f"at height 149, and Chronik is on block {chronik_hash} at height" + " 100.", + f"The last common block is {block_hashes[49]} at height 49.", + "Reverting Chronik blocks 50 to 100", + ] + ): + self.restart_node(0, ["-chronik"]) for i in range(0, 150): proto_block = chronik.block(i).ok() assert_equal(proto_block.block_info.hash[::-1].hex(), block_hashes[i]) chronik.block(150).err(404) # Reset node blockchain back to genesis # Leave Chronik untouched node.stop_node() datadir = get_datadir_path(self.options.tmpdir, 0) - shutil.rmtree(os.path.join(datadir, self.chain, 'blocks')) - shutil.rmtree(os.path.join(datadir, self.chain, 'chainstate')) + shutil.rmtree(os.path.join(datadir, self.chain, "blocks")) + shutil.rmtree(os.path.join(datadir, self.chain, "chainstate")) # Chronik cannot sync because the node doesn't have the old blocks anymore # It needs the node's block data to undo the stale blocks. init_error_msg = ( - f"Error: Cannot rewind Chronik, it contains block {block_hashes[149]} " + - "that the node doesn't have. You may need to use -reindex/" + - "-chronikreindex, or delete indexes/chronik and restart" + f"Error: Cannot rewind Chronik, it contains block {block_hashes[149]} " + + "that the node doesn't have. You may need to use -reindex/" + + "-chronikreindex, or delete indexes/chronik and restart" ) node.assert_start_raises_init_error(["-chronik"], init_error_msg) # Reindexing results in the same error (different code path) - self.restart_node(0, ['-reindex']) + self.restart_node(0, ["-reindex"]) assert_equal(node.getbestblockhash(), GENESIS_BLOCK_HASH) node.stop_node() node.assert_start_raises_init_error(["-chronik"], init_error_msg) # Reindexing with -chronik now works, as it wipes the Chronik data with node.assert_debug_log(["Wiping Chronik at "]): - self.restart_node(0, ['-chronik', '-reindex']) + self.restart_node(0, ["-chronik", "-reindex"]) chronik.block(0).ok() chronik.block(1).err(404) # Generate 100 blocks without chronik self.restart_node(0, []) self.generatetoaddress(node, 100, ADDRESS_ECREG_P2SH_OP_TRUE) # Reindexing indexes 100 blocks - self.restart_node(0, ['-chronik', '-reindex']) + self.restart_node(0, ["-chronik", "-reindex"]) chronik.block(100).ok() # Test -chronikreindex with node.assert_debug_log(["Wiping Chronik at "]): - self.restart_node(0, ['-chronik', '-chronikreindex']) + self.restart_node(0, ["-chronik", "-chronikreindex"]) chronik.block(0).ok() chronik.block(100).ok() chronik.block(101).err(404) -if __name__ == '__main__': +if __name__ == "__main__": ChronikResyncTest().main() diff --git a/test/functional/chronik_script_confirmed_txs.py b/test/functional/chronik_script_confirmed_txs.py index d8e0615c0..2fedbdda9 100644 --- a/test/functional/chronik_script_confirmed_txs.py +++ b/test/functional/chronik_script_confirmed_txs.py @@ -1,196 +1,223 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /script/:type/:payload/confirmed-txs endpoint. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import GENESIS_CB_PK, create_block, create_coinbase from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.test_framework import BitcoinTestFramework from test_framework.txtools import pad_tx from test_framework.util import assert_equal, iter_chunks class ChronikScriptConfirmedTxsTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb from test_framework.chronik.test_data import genesis_cb_tx node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) peer = node.add_p2p_connection(P2PDataStore()) mocktime = 1300000000 node.setmocktime(mocktime) assert_equal( - chronik.script('', '').confirmed_txs().err(400).msg, - '400: Unknown script type: ') + chronik.script("", "").confirmed_txs().err(400).msg, + "400: Unknown script type: ", + ) assert_equal( - chronik.script('foo', '').confirmed_txs().err(400).msg, - '400: Unknown script type: foo') + chronik.script("foo", "").confirmed_txs().err(400).msg, + "400: Unknown script type: foo", + ) assert_equal( - chronik.script('p2pkh', 'LILALI').confirmed_txs().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("p2pkh", "LILALI").confirmed_txs().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('other', 'LILALI').confirmed_txs().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("other", "LILALI").confirmed_txs().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('p2pkh', '').confirmed_txs().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 0 bytes') + chronik.script("p2pkh", "").confirmed_txs().err(400).msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 0 bytes", + ) assert_equal( - chronik.script('p2pkh', 'aA').confirmed_txs().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 1 bytes') + chronik.script("p2pkh", "aA").confirmed_txs().err(400).msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 1 bytes", + ) assert_equal( - chronik.script('p2sh', 'aaBB').confirmed_txs().err(400).msg, - '400: Invalid payload for P2SH: Invalid length, ' + - 'expected 20 bytes but got 2 bytes') + chronik.script("p2sh", "aaBB").confirmed_txs().err(400).msg, + "400: Invalid payload for P2SH: Invalid length, " + + "expected 20 bytes but got 2 bytes", + ) assert_equal( - chronik.script('p2pk', 'aaBBcc').confirmed_txs().err(400).msg, - '400: Invalid payload for P2PK: Invalid length, ' + - 'expected one of [33, 65] but got 3 bytes') + chronik.script("p2pk", "aaBBcc").confirmed_txs().err(400).msg, + "400: Invalid payload for P2PK: Invalid length, " + + "expected one of [33, 65] but got 3 bytes", + ) assert_equal( - chronik.script( - 'p2pk', GENESIS_CB_PK).confirmed_txs( - page=0, page_size=201).err(400).msg, - '400: Requested page size 201 is too big, maximum is 200') + chronik.script("p2pk", GENESIS_CB_PK) + .confirmed_txs(page=0, page_size=201) + .err(400) + .msg, + "400: Requested page size 201 is too big, maximum is 200", + ) assert_equal( - chronik.script( - 'p2pk', GENESIS_CB_PK).confirmed_txs( - page=0, page_size=0).err(400).msg, - '400: Requested page size 0 is too small, minimum is 1') + chronik.script("p2pk", GENESIS_CB_PK) + .confirmed_txs(page=0, page_size=0) + .err(400) + .msg, + "400: Requested page size 0 is too small, minimum is 1", + ) assert_equal( - chronik.script( - 'p2pk', GENESIS_CB_PK).confirmed_txs( - page=0, page_size=2**32).err(400).msg, - '400: Invalid param page_size: 4294967296, ' + - 'number too large to fit in target type') + chronik.script("p2pk", GENESIS_CB_PK) + .confirmed_txs(page=0, page_size=2**32) + .err(400) + .msg, + "400: Invalid param page_size: 4294967296, " + + "number too large to fit in target type", + ) assert_equal( - chronik.script( - 'p2pk', GENESIS_CB_PK).confirmed_txs( - page=2**32, page_size=1).err(400).msg, - '400: Invalid param page: 4294967296, ' + - 'number too large to fit in target type') + chronik.script("p2pk", GENESIS_CB_PK) + .confirmed_txs(page=2**32, page_size=1) + .err(400) + .msg, + "400: Invalid param page: 4294967296, " + + "number too large to fit in target type", + ) # Handle overflow gracefully on 32-bit assert_equal( - chronik.script('p2pk', GENESIS_CB_PK) + chronik.script("p2pk", GENESIS_CB_PK) .confirmed_txs(page=2**32 - 1, page_size=200) .ok(), - pb.TxHistoryPage(num_pages=1, num_txs=1)) + pb.TxHistoryPage(num_pages=1, num_txs=1), + ) - genesis_db_script_history = chronik.script( - 'p2pk', GENESIS_CB_PK).confirmed_txs().ok() - assert_equal(genesis_db_script_history, - pb.TxHistoryPage(txs=[genesis_cb_tx()], - num_pages=1, - num_txs=1)) + genesis_db_script_history = ( + chronik.script("p2pk", GENESIS_CB_PK).confirmed_txs().ok() + ) + assert_equal( + genesis_db_script_history, + pb.TxHistoryPage(txs=[genesis_cb_tx()], num_pages=1, num_txs=1), + ) - script_type = 'p2sh' + script_type = "p2sh" payload_hex = P2SH_OP_TRUE[2:-1].hex() # Generate 101 blocks to some address and verify pages blockhashes = self.generatetoaddress(node, 101, ADDRESS_ECREG_P2SH_OP_TRUE) def check_confirmed_txs(txs, *, page_size=25): pages = list(iter_chunks(txs, page_size)) for page_num, page_txs in enumerate(pages): - script_history = chronik.script( - script_type, payload_hex).confirmed_txs( - page_num, page_size).ok() + script_history = ( + chronik.script(script_type, payload_hex) + .confirmed_txs(page_num, page_size) + .ok() + ) for tx_idx, entry in enumerate(page_txs): script_tx = script_history.txs[tx_idx] - if 'txid' in entry: - assert_equal(script_tx.txid[::-1].hex(), entry['txid']) - if 'block' in entry: - block_height, block_hash = entry['block'] - assert_equal(script_tx.block, pb.BlockMetadata( - hash=bytes.fromhex(block_hash)[::-1], - height=block_height, - timestamp=script_tx.block.timestamp, - )) - - txs = [{'block': (i + 1, blockhash)} for i, blockhash in enumerate(blockhashes)] + if "txid" in entry: + assert_equal(script_tx.txid[::-1].hex(), entry["txid"]) + if "block" in entry: + block_height, block_hash = entry["block"] + assert_equal( + script_tx.block, + pb.BlockMetadata( + hash=bytes.fromhex(block_hash)[::-1], + height=block_height, + timestamp=script_tx.block.timestamp, + ), + ) + + txs = [{"block": (i + 1, blockhash)} for i, blockhash in enumerate(blockhashes)] check_confirmed_txs(txs) check_confirmed_txs(txs, page_size=200) # Undo last block & check history node.invalidateblock(blockhashes[-1]) check_confirmed_txs(txs[:-1]) check_confirmed_txs(txs[:-1], page_size=200) # Create 1 block manually coinbase_tx = create_coinbase(101) coinbase_tx.vout[0].scriptPubKey = P2SH_OP_TRUE coinbase_tx.rehash() - block = create_block(int(blockhashes[-2], 16), - coinbase_tx, - mocktime + 1000) + block = create_block(int(blockhashes[-2], 16), coinbase_tx, mocktime + 1000) block.solve() peer.send_blocks_and_test([block], node) blockhashes[-1] = block.hash - txs = [{'block': (i + 1, blockhash)} for i, blockhash in enumerate(blockhashes)] + txs = [{"block": (i + 1, blockhash)} for i, blockhash in enumerate(blockhashes)] check_confirmed_txs(txs) check_confirmed_txs(txs, page_size=200) # Generate 900 more blocks and verify # Total of 1001 txs for this script (a page in the DB is 1000 entries long) blockhashes += self.generatetoaddress(node, 900, ADDRESS_ECREG_P2SH_OP_TRUE) - txs = [{'block': (i + 1, blockhash)} for i, blockhash in enumerate(blockhashes)] + txs = [{"block": (i + 1, blockhash)} for i, blockhash in enumerate(blockhashes)] page_sizes = [1, 5, 7, 25, 111, 200] for page_size in page_sizes: check_confirmed_txs(txs, page_size=page_size) coinvalue = 5000000000 cointxids = [] for coinblockhash in blockhashes[:10]: coinblock = node.getblock(coinblockhash) - cointxids.append(coinblock['tx'][0]) + cointxids.append(coinblock["tx"][0]) mempool_txids = [] for cointxid in cointxids: tx = CTransaction() tx.nVersion = 1 - tx.vin = [CTxIn(outpoint=COutPoint(int(cointxid, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx.vin = [ + CTxIn( + outpoint=COutPoint(int(cointxid, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE, + ) + ] tx.vout = [CTxOut(coinvalue - 1000, P2SH_OP_TRUE)] pad_tx(tx) txid = node.sendrawtransaction(tx.serialize().hex()) mempool_txids.append(txid) # confirmed-txs completely unaffected by mempool txs for page_size in page_sizes: check_confirmed_txs(txs, page_size=page_size) # Mine mempool txs, now they're in confirmed-txs newblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] - txs.append({'block': (1002, newblockhash)}) - txs += [{'block': (1002, newblockhash), 'txid': txid} - for txid in sorted(mempool_txids)] + txs.append({"block": (1002, newblockhash)}) + txs += [ + {"block": (1002, newblockhash), "txid": txid} + for txid in sorted(mempool_txids) + ] for page_size in page_sizes: check_confirmed_txs(txs, page_size=page_size) -if __name__ == '__main__': +if __name__ == "__main__": ChronikScriptConfirmedTxsTest().main() diff --git a/test/functional/chronik_script_history.py b/test/functional/chronik_script_history.py index eab0f1fc4..3a717cb53 100644 --- a/test/functional/chronik_script_history.py +++ b/test/functional/chronik_script_history.py @@ -1,259 +1,296 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /script/:type/:payload/history endpoint. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import ( GENESIS_CB_PK, create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.test_framework import BitcoinTestFramework from test_framework.txtools import pad_tx from test_framework.util import assert_equal, iter_chunks class ChronikScriptHistoryTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb from test_framework.chronik.test_data import genesis_cb_tx node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port, timeout=4) + chronik = ChronikClient("127.0.0.1", node.chronik_port, timeout=4) peer = node.add_p2p_connection(P2PDataStore()) mocktime = 1300000000 node.setmocktime(mocktime) assert_equal( - chronik.script('', '').history().err(400).msg, - '400: Unknown script type: ') + chronik.script("", "").history().err(400).msg, "400: Unknown script type: " + ) assert_equal( - chronik.script('foo', '').history().err(400).msg, - '400: Unknown script type: foo') + chronik.script("foo", "").history().err(400).msg, + "400: Unknown script type: foo", + ) assert_equal( - chronik.script('p2pkh', 'LILALI').history().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("p2pkh", "LILALI").history().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('other', 'LILALI').history().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("other", "LILALI").history().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('p2pkh', '').history().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 0 bytes') + chronik.script("p2pkh", "").history().err(400).msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 0 bytes", + ) assert_equal( - chronik.script('p2pkh', 'aA').history().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 1 bytes') + chronik.script("p2pkh", "aA").history().err(400).msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 1 bytes", + ) assert_equal( - chronik.script('p2sh', 'aaBB').history().err(400).msg, - '400: Invalid payload for P2SH: Invalid length, ' + - 'expected 20 bytes but got 2 bytes') + chronik.script("p2sh", "aaBB").history().err(400).msg, + "400: Invalid payload for P2SH: Invalid length, " + + "expected 20 bytes but got 2 bytes", + ) assert_equal( - chronik.script('p2pk', 'aaBBcc').history().err(400).msg, - '400: Invalid payload for P2PK: Invalid length, ' + - 'expected one of [33, 65] but got 3 bytes') + chronik.script("p2pk", "aaBBcc").history().err(400).msg, + "400: Invalid payload for P2PK: Invalid length, " + + "expected one of [33, 65] but got 3 bytes", + ) assert_equal( - chronik.script('p2pk', GENESIS_CB_PK) - .history(page=0, page_size=201).err(400).msg, - '400: Requested page size 201 is too big, maximum is 200') + chronik.script("p2pk", GENESIS_CB_PK) + .history(page=0, page_size=201) + .err(400) + .msg, + "400: Requested page size 201 is too big, maximum is 200", + ) assert_equal( - chronik.script('p2pk', GENESIS_CB_PK) - .history(page=0, page_size=0).err(400).msg, - '400: Requested page size 0 is too small, minimum is 1') + chronik.script("p2pk", GENESIS_CB_PK) + .history(page=0, page_size=0) + .err(400) + .msg, + "400: Requested page size 0 is too small, minimum is 1", + ) assert_equal( - chronik.script('p2pk', GENESIS_CB_PK) - .history(page=0, page_size=2**32).err(400).msg, - '400: Invalid param page_size: 4294967296, ' + - 'number too large to fit in target type') + chronik.script("p2pk", GENESIS_CB_PK) + .history(page=0, page_size=2**32) + .err(400) + .msg, + "400: Invalid param page_size: 4294967296, " + + "number too large to fit in target type", + ) assert_equal( - chronik.script('p2pk', GENESIS_CB_PK) - .history(page=2**32, page_size=1).err(400).msg, - '400: Invalid param page: 4294967296, ' + - 'number too large to fit in target type') + chronik.script("p2pk", GENESIS_CB_PK) + .history(page=2**32, page_size=1) + .err(400) + .msg, + "400: Invalid param page: 4294967296, " + + "number too large to fit in target type", + ) # Handle overflow gracefully on 32-bit assert_equal( - chronik.script('p2pk', GENESIS_CB_PK) + chronik.script("p2pk", GENESIS_CB_PK) .history(page=2**32 - 1, page_size=200) .ok(), - pb.TxHistoryPage(num_pages=1, num_txs=1)) + pb.TxHistoryPage(num_pages=1, num_txs=1), + ) - genesis_db_script_history = chronik.script('p2pk', GENESIS_CB_PK).history().ok() - assert_equal(genesis_db_script_history, - pb.TxHistoryPage(txs=[genesis_cb_tx()], - num_pages=1, - num_txs=1)) + genesis_db_script_history = chronik.script("p2pk", GENESIS_CB_PK).history().ok() + assert_equal( + genesis_db_script_history, + pb.TxHistoryPage(txs=[genesis_cb_tx()], num_pages=1, num_txs=1), + ) - script_type = 'p2sh' + script_type = "p2sh" payload_hex = P2SH_OP_TRUE[2:-1].hex() def check_tx_history(mempooltxs, blocktxs, *, page_size=25): pages = list(iter_chunks(mempooltxs + blocktxs, page_size)) for page_num, page_txs in enumerate(pages): script_history = ( chronik.script(script_type, payload_hex) - .history(page=page_num, page_size=page_size).ok() + .history(page=page_num, page_size=page_size) + .ok() ) assert_equal(script_history.num_pages, len(pages)) assert_equal(script_history.num_txs, len(mempooltxs) + len(blocktxs)) for tx_idx, entry in enumerate(page_txs): script_tx = script_history.txs[tx_idx] - if 'txid' in entry: - assert_equal(script_tx.txid[::-1].hex(), entry['txid']) - if 'time_first_seen' in entry: + if "txid" in entry: + assert_equal(script_tx.txid[::-1].hex(), entry["txid"]) + if "time_first_seen" in entry: + assert_equal( + script_tx.time_first_seen, entry["time_first_seen"] + ) + if "block" in entry: + block_height, block_hash = entry["block"] assert_equal( - script_tx.time_first_seen, - entry['time_first_seen']) - if 'block' in entry: - block_height, block_hash = entry['block'] - assert_equal(script_tx.block, pb.BlockMetadata( - hash=bytes.fromhex(block_hash)[::-1], - height=block_height, - timestamp=script_tx.block.timestamp, - )) + script_tx.block, + pb.BlockMetadata( + hash=bytes.fromhex(block_hash)[::-1], + height=block_height, + timestamp=script_tx.block.timestamp, + ), + ) # Generate 101 blocks to some address and verify pages blockhashes = self.generatetoaddress(node, 101, ADDRESS_ECREG_P2SH_OP_TRUE) - blocktxs = [{'block': (i, blockhashes[i - 1])} for i in range(101, 0, -1)] + blocktxs = [{"block": (i, blockhashes[i - 1])} for i in range(101, 0, -1)] check_tx_history([], blocktxs) check_tx_history([], blocktxs, page_size=200) # Undo last block & check history node.invalidateblock(blockhashes[-1]) check_tx_history([], blocktxs[1:]) check_tx_history([], blocktxs[1:], page_size=200) # Create 1 block manually (with out-of-order block time) coinbase_tx = create_coinbase(101) coinbase_tx.vout[0].scriptPubKey = P2SH_OP_TRUE coinbase_tx.rehash() - block = create_block(int(blockhashes[-2], 16), - coinbase_tx, - mocktime + 1000) + block = create_block(int(blockhashes[-2], 16), coinbase_tx, mocktime + 1000) block.solve() peer.send_blocks_and_test([block], node) blockhashes[-1] = block.hash # Blocks still ordered by block height - blocktxs = [{'block': (i, blockhashes[i - 1])} for i in range(101, 0, -1)] + blocktxs = [{"block": (i, blockhashes[i - 1])} for i in range(101, 0, -1)] check_tx_history([], blocktxs) check_tx_history([], blocktxs, page_size=200) # Generate 900 more blocks and verify # Total of 1001 txs for this script (a page in the DB is 1000 entries long) blockhashes += self.generatetoaddress(node, 900, ADDRESS_ECREG_P2SH_OP_TRUE) - blocktxs = [{'block': (i, blockhashes[i - 1])} for i in range(1001, 0, -1)] + blocktxs = [{"block": (i, blockhashes[i - 1])} for i in range(1001, 0, -1)] check_tx_history([], blocktxs, page_size=200) coinvalue = 5000000000 cointxids = [] for coinblockhash in blockhashes[:100]: coinblock = node.getblock(coinblockhash) - cointxids.append(coinblock['tx'][0]) + cointxids.append(coinblock["tx"][0]) mempool_txs = [] mempool_txids = [] # Send 10 mempool txs, each with their own mocktime mocktime_offsets = [0, 10, 10, 5, 0, 0, 12, 12, 10, 5] for mocktime_offset in mocktime_offsets: cointxid = cointxids.pop(0) tx = CTransaction() tx.nVersion = 1 - tx.vin = [CTxIn(outpoint=COutPoint(int(cointxid, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx.vin = [ + CTxIn( + outpoint=COutPoint(int(cointxid, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE, + ) + ] tx.vout = [CTxOut(coinvalue - 1000, P2SH_OP_TRUE)] pad_tx(tx) mempool_txs.append(tx) node.setmocktime(mocktime + mocktime_offset) txid = node.sendrawtransaction(tx.serialize().hex()) mempool_txids.append(txid) def tx_sort_key(entry): - time_first_seen = entry['time_first_seen'] - txid = entry['txid'] + time_first_seen = entry["time_first_seen"] + txid = entry["txid"] if time_first_seen == 0: time_first_seen = 1 << 64 - if entry.get('is_coinbase', False): - txid = '' + if entry.get("is_coinbase", False): + txid = "" return (time_first_seen, txid) - mempooltxs = sorted([{'time_first_seen': mocktime + offset, 'txid': txid} - for (offset, txid) - in zip(mocktime_offsets, mempool_txids)], - key=tx_sort_key, - reverse=True) + mempooltxs = sorted( + [ + {"time_first_seen": mocktime + offset, "txid": txid} + for (offset, txid) in zip(mocktime_offsets, mempool_txids) + ], + key=tx_sort_key, + reverse=True, + ) page_sizes = [1, 5, 7, 25, 111, 200] for page_size in page_sizes: check_tx_history(mempooltxs, blocktxs, page_size=page_size) # Mine block with 5 conflicting txs mine_txs = mempool_txs[5:] - newblocktxs = [entry for entry in mempooltxs if entry['txid'] - not in mempool_txids[:5]] + newblocktxs = [ + entry for entry in mempooltxs if entry["txid"] not in mempool_txids[:5] + ] for idx, tx in enumerate(mempool_txs[:5]): tx.nLockTime = 12 tx.rehash() mine_txs.append(tx) - newblocktxs.append({'time_first_seen': 0, 'txid': tx.hash}) + newblocktxs.append({"time_first_seen": 0, "txid": tx.hash}) height = 1002 coinbase_tx = create_coinbase(height) coinbase_tx.vout[0].scriptPubKey = P2SH_OP_TRUE coinbase_tx.rehash() - block = create_block(int(blockhashes[-1], 16), - coinbase_tx, - mocktime + 1100) + block = create_block(int(blockhashes[-1], 16), coinbase_tx, mocktime + 1100) block.nVersion = 5 block.vtx += mine_txs make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) newblocktxs.append( - {'time_first_seen': 0, 'txid': coinbase_tx.hash, 'is_coinbase': True}) + {"time_first_seen": 0, "txid": coinbase_tx.hash, "is_coinbase": True} + ) newblocktxs.sort(key=tx_sort_key, reverse=True) for blocktx in newblocktxs: - blocktx['block'] = (height, block.hash) + blocktx["block"] = (height, block.hash) check_tx_history([], newblocktxs + blocktxs, page_size=25) check_tx_history([], newblocktxs + blocktxs, page_size=200) # Order for different page sizes is not guaranteed within blocks. txs_individually = [ - chronik.script( - script_type, payload_hex).history(page=i, page_size=1).ok().txs[0] + chronik.script(script_type, payload_hex) + .history(page=i, page_size=1) + .ok() + .txs[0] for i in range(20) ] - txs_bulk = list(chronik.script( - script_type, payload_hex).history(page=0, page_size=20).ok().txs) + txs_bulk = list( + chronik.script(script_type, payload_hex) + .history(page=0, page_size=20) + .ok() + .txs + ) # Contain the same txs, but not necessarily in the same order - assert_equal(sorted(txs_individually, key=lambda tx: tx.txid), - sorted(txs_bulk, key=lambda tx: tx.txid)) + assert_equal( + sorted(txs_individually, key=lambda tx: tx.txid), + sorted(txs_bulk, key=lambda tx: tx.txid), + ) -if __name__ == '__main__': +if __name__ == "__main__": ChronikScriptHistoryTest().main() diff --git a/test/functional/chronik_script_unconfirmed_txs.py b/test/functional/chronik_script_unconfirmed_txs.py index ae6a3d831..4b700a018 100644 --- a/test/functional/chronik_script_unconfirmed_txs.py +++ b/test/functional/chronik_script_unconfirmed_txs.py @@ -1,192 +1,224 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /script/:type/:payload/unconfirmed-txs endpoint. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import ( GENESIS_CB_PK, create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.script import OP_RETURN, CScript from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikScriptUnconfirmedTxsTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) peer = node.add_p2p_connection(P2PDataStore()) mocktime = 1300000000 node.setmocktime(mocktime) assert_equal( - chronik.script('', '').unconfirmed_txs().err(400).msg, - '400: Unknown script type: ') + chronik.script("", "").unconfirmed_txs().err(400).msg, + "400: Unknown script type: ", + ) assert_equal( - chronik.script('foo', '', ).unconfirmed_txs().err(400).msg, - '400: Unknown script type: foo') + chronik.script( + "foo", + "", + ) + .unconfirmed_txs() + .err(400) + .msg, + "400: Unknown script type: foo", + ) assert_equal( - chronik.script('p2pkh', 'LILALI').unconfirmed_txs().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("p2pkh", "LILALI").unconfirmed_txs().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('other', 'LILALI').unconfirmed_txs().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("other", "LILALI").unconfirmed_txs().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('p2pkh', '', ).unconfirmed_txs().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 0 bytes') + chronik.script( + "p2pkh", + "", + ) + .unconfirmed_txs() + .err(400) + .msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 0 bytes", + ) assert_equal( - chronik.script('p2pkh', 'aA').unconfirmed_txs().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 1 bytes') + chronik.script("p2pkh", "aA").unconfirmed_txs().err(400).msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 1 bytes", + ) assert_equal( - chronik.script('p2sh', 'aaBB').unconfirmed_txs().err(400).msg, - '400: Invalid payload for P2SH: Invalid length, ' + - 'expected 20 bytes but got 2 bytes') + chronik.script("p2sh", "aaBB").unconfirmed_txs().err(400).msg, + "400: Invalid payload for P2SH: Invalid length, " + + "expected 20 bytes but got 2 bytes", + ) assert_equal( - chronik.script('p2pk', 'aaBBcc').unconfirmed_txs().err(400).msg, - '400: Invalid payload for P2PK: Invalid length, ' + - 'expected one of [33, 65] but got 3 bytes') + chronik.script("p2pk", "aaBBcc").unconfirmed_txs().err(400).msg, + "400: Invalid payload for P2PK: Invalid length, " + + "expected one of [33, 65] but got 3 bytes", + ) # No txs in mempool for the genesis pubkey assert_equal( - chronik.script('p2pk', GENESIS_CB_PK).unconfirmed_txs().ok(), - pb.TxHistoryPage(num_pages=0, num_txs=0)) + chronik.script("p2pk", GENESIS_CB_PK).unconfirmed_txs().ok(), + pb.TxHistoryPage(num_pages=0, num_txs=0), + ) - script_type = 'p2sh' + script_type = "p2sh" payload_hex = P2SH_OP_TRUE[2:-1].hex() # Generate 110 blocks to some address blockhashes = self.generatetoaddress(node, 110, ADDRESS_ECREG_P2SH_OP_TRUE) # No txs in mempool for that address assert_equal( chronik.script(script_type, payload_hex).unconfirmed_txs().ok(), - pb.TxHistoryPage(num_pages=0, num_txs=0)) + pb.TxHistoryPage(num_pages=0, num_txs=0), + ) coinvalue = 5000000000 cointxids = [] for coinblockhash in blockhashes[:10]: coinblock = node.getblock(coinblockhash) - cointxids.append(coinblock['tx'][0]) + cointxids.append(coinblock["tx"][0]) mempool_txs = [] mempool_proto_txs = [] # Send 10 mempool txs, each with their own mocktime mocktime_offsets = [0, 10, 10, 5, 0, 0, 12, 12, 10, 5] for mocktime_offset in mocktime_offsets: cointxid = cointxids.pop(0) time_first_seen = mocktime + mocktime_offset pad_script = CScript([OP_RETURN, bytes(100)]) tx = CTransaction() tx.nVersion = 1 - tx.vin = [CTxIn(outpoint=COutPoint(int(cointxid, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE, - nSequence=0xffffffff)] + tx.vin = [ + CTxIn( + outpoint=COutPoint(int(cointxid, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE, + nSequence=0xFFFFFFFF, + ) + ] tx.vout = [ CTxOut(coinvalue - 1000, P2SH_OP_TRUE), CTxOut(0, pad_script), ] tx.nLockTime = 1 node.setmocktime(time_first_seen) txid = node.sendrawtransaction(tx.serialize().hex()) mempool_txs.append(tx) - mempool_proto_txs.append(pb.Tx( - txid=bytes.fromhex(txid)[::-1], - version=1, - inputs=[pb.TxInput( - prev_out=pb.OutPoint( - txid=bytes.fromhex(cointxid)[::-1], - out_idx=0, - ), - input_script=bytes(SCRIPTSIG_OP_TRUE), - output_script=bytes(P2SH_OP_TRUE), - value=coinvalue, - sequence_no=0xffffffff, - )], - outputs=[ - pb.TxOutput( - value=coinvalue - 1000, - output_script=bytes(P2SH_OP_TRUE), - ), - pb.TxOutput( - value=0, - output_script=bytes(pad_script), - ), - ], - lock_time=1, - size=len(tx.serialize()), - time_first_seen=time_first_seen, - )) + mempool_proto_txs.append( + pb.Tx( + txid=bytes.fromhex(txid)[::-1], + version=1, + inputs=[ + pb.TxInput( + prev_out=pb.OutPoint( + txid=bytes.fromhex(cointxid)[::-1], + out_idx=0, + ), + input_script=bytes(SCRIPTSIG_OP_TRUE), + output_script=bytes(P2SH_OP_TRUE), + value=coinvalue, + sequence_no=0xFFFFFFFF, + ) + ], + outputs=[ + pb.TxOutput( + value=coinvalue - 1000, + output_script=bytes(P2SH_OP_TRUE), + ), + pb.TxOutput( + value=0, + output_script=bytes(pad_script), + ), + ], + lock_time=1, + size=len(tx.serialize()), + time_first_seen=time_first_seen, + ) + ) # Sort txs by time_first_seen and then by txid def sorted_txs(txs): return sorted(txs, key=lambda tx: (tx.time_first_seen, tx.txid[::-1])) assert_equal( chronik.script(script_type, payload_hex).unconfirmed_txs().ok(), - pb.TxHistoryPage(txs=sorted_txs(mempool_proto_txs), - num_pages=1, - num_txs=len(mempool_txs))) + pb.TxHistoryPage( + txs=sorted_txs(mempool_proto_txs), num_pages=1, num_txs=len(mempool_txs) + ), + ) # Mine 5 transactions, with 2 conflicts, leave 5 others unconfirmed mine_txs = mempool_txs[:3] mine_proto_txs = mempool_proto_txs[:3] for conflict_tx, conflict_proto_tx in zip( - mempool_txs[3:5], mempool_proto_txs[3:5]): + mempool_txs[3:5], mempool_proto_txs[3:5] + ): conflict_tx.nLockTime = 2 conflict_tx.rehash() mine_txs.append(conflict_tx) conflict_proto_tx.txid = bytes.fromhex(conflict_tx.hash)[::-1] conflict_proto_tx.lock_time = 2 mine_proto_txs.append(conflict_proto_tx) height = 111 coinbase_tx = create_coinbase(height) coinbase_tx.vout[0].scriptPubKey = P2SH_OP_TRUE coinbase_tx.rehash() - block = create_block(int(blockhashes[-1], 16), coinbase_tx, - mocktime + 1100) + block = create_block(int(blockhashes[-1], 16), coinbase_tx, mocktime + 1100) block.vtx += mine_txs make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) # Only unconfirmed txs remain, conflict txs are removed assert_equal( chronik.script(script_type, payload_hex).unconfirmed_txs().ok(), - pb.TxHistoryPage(txs=sorted_txs(mempool_proto_txs[5:]), - num_pages=1, - num_txs=5)) + pb.TxHistoryPage( + txs=sorted_txs(mempool_proto_txs[5:]), num_pages=1, num_txs=5 + ), + ) -if __name__ == '__main__': +if __name__ == "__main__": ChronikScriptUnconfirmedTxsTest().main() diff --git a/test/functional/chronik_script_utxos.py b/test/functional/chronik_script_utxos.py index 8a81df93d..c5cedffca 100644 --- a/test/functional/chronik_script_utxos.py +++ b/test/functional/chronik_script_utxos.py @@ -1,222 +1,254 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /script/:type/:payload/utxos endpoint. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import ( GENESIS_CB_PK, GENESIS_CB_TXID, create_block, create_coinbase, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.test_framework import BitcoinTestFramework from test_framework.txtools import pad_tx from test_framework.util import assert_equal class ChronikScriptUtxosTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] node.setmocktime(1300000000) - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) peer = node.add_p2p_connection(P2PDataStore()) assert_equal( - chronik.script('', '').utxos().err(400).msg, - '400: Unknown script type: ') + chronik.script("", "").utxos().err(400).msg, "400: Unknown script type: " + ) assert_equal( - chronik.script('foo', '').utxos().err(400).msg, - '400: Unknown script type: foo') + chronik.script("foo", "").utxos().err(400).msg, + "400: Unknown script type: foo", + ) assert_equal( - chronik.script('p2pkh', 'LILALI').utxos().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("p2pkh", "LILALI").utxos().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('other', 'LILALI').utxos().err(400).msg, - "400: Invalid hex: Invalid character 'L' at position 0") + chronik.script("other", "LILALI").utxos().err(400).msg, + "400: Invalid hex: Invalid character 'L' at position 0", + ) assert_equal( - chronik.script('p2pkh', '').utxos().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 0 bytes') + chronik.script("p2pkh", "").utxos().err(400).msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 0 bytes", + ) assert_equal( - chronik.script('p2pkh', 'aA').utxos().err(400).msg, - '400: Invalid payload for P2PKH: Invalid length, ' + - 'expected 20 bytes but got 1 bytes') + chronik.script("p2pkh", "aA").utxos().err(400).msg, + "400: Invalid payload for P2PKH: Invalid length, " + + "expected 20 bytes but got 1 bytes", + ) assert_equal( - chronik.script('p2sh', 'aaBB').utxos().err(400).msg, - '400: Invalid payload for P2SH: Invalid length, ' + - 'expected 20 bytes but got 2 bytes') + chronik.script("p2sh", "aaBB").utxos().err(400).msg, + "400: Invalid payload for P2SH: Invalid length, " + + "expected 20 bytes but got 2 bytes", + ) assert_equal( - chronik.script('p2pk', 'aaBBcc').utxos().err(400).msg, - '400: Invalid payload for P2PK: Invalid length, ' + - 'expected one of [33, 65] but got 3 bytes') + chronik.script("p2pk", "aaBBcc").utxos().err(400).msg, + "400: Invalid payload for P2PK: Invalid length, " + + "expected one of [33, 65] but got 3 bytes", + ) # Test Genesis pubkey UTXO coinvalue = 5000000000 - assert_equal(chronik.script('p2pk', GENESIS_CB_PK).utxos().ok(), - pb.ScriptUtxos(script=bytes.fromhex(f'41{GENESIS_CB_PK}ac'), - utxos=[pb.ScriptUtxo( - outpoint=pb.OutPoint( - txid=bytes.fromhex(GENESIS_CB_TXID)[::-1], - out_idx=0, - ), - block_height=0, - is_coinbase=True, - value=coinvalue, - is_final=False, - )])) - - script_type = 'p2sh' + assert_equal( + chronik.script("p2pk", GENESIS_CB_PK).utxos().ok(), + pb.ScriptUtxos( + script=bytes.fromhex(f"41{GENESIS_CB_PK}ac"), + utxos=[ + pb.ScriptUtxo( + outpoint=pb.OutPoint( + txid=bytes.fromhex(GENESIS_CB_TXID)[::-1], + out_idx=0, + ), + block_height=0, + is_coinbase=True, + value=coinvalue, + is_final=False, + ) + ], + ), + ) + + script_type = "p2sh" payload_hex = P2SH_OP_TRUE[2:-1].hex() # Generate us a coin, creates a UTXO coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] - - assert_equal(chronik.script(script_type, payload_hex).utxos().ok(), - pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), - utxos=[pb.ScriptUtxo( - outpoint=pb.OutPoint( - txid=bytes.fromhex(cointx)[::-1], - out_idx=0, - ), - block_height=1, - is_coinbase=True, - value=coinvalue, - is_final=False, - )])) + cointx = coinblock["tx"][0] + + assert_equal( + chronik.script(script_type, payload_hex).utxos().ok(), + pb.ScriptUtxos( + script=bytes(P2SH_OP_TRUE), + utxos=[ + pb.ScriptUtxo( + outpoint=pb.OutPoint( + txid=bytes.fromhex(cointx)[::-1], + out_idx=0, + ), + block_height=1, + is_coinbase=True, + value=coinvalue, + is_final=False, + ) + ], + ), + ) self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) # Make tx creating 4 UTXOs, spending the coinbase UTXO send_values = [coinvalue - 10000, 1000, 2000, 3000] tx = CTransaction() - tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx.vin = [ + CTxIn(outpoint=COutPoint(int(cointx, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) + ] tx.vout = [CTxOut(value, P2SH_OP_TRUE) for value in send_values] txid = node.sendrawtransaction(tx.serialize().hex()) expected_utxos = [ pb.ScriptUtxo( outpoint=pb.OutPoint( txid=bytes.fromhex(txid)[::-1], out_idx=i, ), block_height=-1, is_coinbase=False, value=value, is_final=False, ) for i, value in enumerate(send_values) ] - assert_equal(chronik.script(script_type, payload_hex).utxos().ok(), - pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), - utxos=expected_utxos)) + assert_equal( + chronik.script(script_type, payload_hex).utxos().ok(), + pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), utxos=expected_utxos), + ) # Mine tx, which adds the blockheight to the UTXO tip = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[-1] for expected_utxo in expected_utxos: expected_utxo.block_height = 102 - assert_equal(chronik.script(script_type, payload_hex).utxos().ok(), - pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), - utxos=expected_utxos)) + assert_equal( + chronik.script(script_type, payload_hex).utxos().ok(), + pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), utxos=expected_utxos), + ) # Make tx spending the 3rd UTXO, and creating 1 UTXO tx2 = CTransaction() - tx2.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 3), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx2.vin = [ + CTxIn(outpoint=COutPoint(int(txid, 16), 3), scriptSig=SCRIPTSIG_OP_TRUE) + ] tx2.vout = [CTxOut(2500, P2SH_OP_TRUE)] pad_tx(tx2) txid2 = node.sendrawtransaction(tx2.serialize().hex()) del expected_utxos[3] - expected_utxos.append(pb.ScriptUtxo( - outpoint=pb.OutPoint( - txid=bytes.fromhex(txid2)[::-1], - out_idx=0, - ), - block_height=-1, - is_coinbase=False, - value=2500, - is_final=False, - )) + expected_utxos.append( + pb.ScriptUtxo( + outpoint=pb.OutPoint( + txid=bytes.fromhex(txid2)[::-1], + out_idx=0, + ), + block_height=-1, + is_coinbase=False, + value=2500, + is_final=False, + ) + ) - assert_equal(chronik.script(script_type, payload_hex).utxos().ok(), - pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), - utxos=expected_utxos)) + assert_equal( + chronik.script(script_type, payload_hex).utxos().ok(), + pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), utxos=expected_utxos), + ) # Make tx spending a DB UTXO and a mempool UTXO tx3 = CTransaction() - tx3.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 2), - scriptSig=SCRIPTSIG_OP_TRUE), - CTxIn(outpoint=COutPoint(int(txid2, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx3.vin = [ + CTxIn(outpoint=COutPoint(int(txid, 16), 2), scriptSig=SCRIPTSIG_OP_TRUE), + CTxIn(outpoint=COutPoint(int(txid2, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE), + ] pad_tx(tx3) node.sendrawtransaction(tx3.serialize().hex()) - assert_equal(chronik.script(script_type, payload_hex).utxos().ok(), - pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), - utxos=expected_utxos[:2])) + assert_equal( + chronik.script(script_type, payload_hex).utxos().ok(), + pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), utxos=expected_utxos[:2]), + ) # Make a tx which conflicts with tx3, by spending the same DB UTXO tx3_conflict = CTransaction() - tx3_conflict.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 2), - scriptSig=SCRIPTSIG_OP_TRUE)] + tx3_conflict.vin = [ + CTxIn(outpoint=COutPoint(int(txid, 16), 2), scriptSig=SCRIPTSIG_OP_TRUE) + ] pad_tx(tx3_conflict) # Mining conflicting tx returns the mempool UTXO spent by tx3 to the mempool - block = create_block(int(tip, 16), - create_coinbase(103, b'\x03' * 33), - 1300000500) + block = create_block( + int(tip, 16), create_coinbase(103, b"\x03" * 33), 1300000500 + ) block.vtx += [tx3_conflict] block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) del expected_utxos[2] - assert_equal(chronik.script(script_type, payload_hex).utxos().ok(), - pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), - utxos=expected_utxos)) + assert_equal( + chronik.script(script_type, payload_hex).utxos().ok(), + pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), utxos=expected_utxos), + ) # Invalidating the last block doesn't change UTXOs node.invalidateblock(block.hash) - assert_equal(chronik.script(script_type, payload_hex).utxos().ok(), - pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), - utxos=expected_utxos)) + assert_equal( + chronik.script(script_type, payload_hex).utxos().ok(), + pb.ScriptUtxos(script=bytes(P2SH_OP_TRUE), utxos=expected_utxos), + ) # Invalidating the next last block returns all UTXOs back to the mempool node.invalidateblock(tip) for expected_utxo in expected_utxos: expected_utxo.block_height = -1 # Mempool UTXOs are sorted by txid:out_idx. Note: `sorted` is stable. - assert_equal(list(chronik.script(script_type, payload_hex).utxos().ok().utxos), - sorted(expected_utxos, key=lambda utxo: utxo.outpoint.txid[::-1])) + assert_equal( + list(chronik.script(script_type, payload_hex).utxos().ok().utxos), + sorted(expected_utxos, key=lambda utxo: utxo.outpoint.txid[::-1]), + ) -if __name__ == '__main__': +if __name__ == "__main__": ChronikScriptUtxosTest().main() diff --git a/test/functional/chronik_serve.py b/test/functional/chronik_serve.py index e712adf89..0d3ce016e 100644 --- a/test/functional/chronik_serve.py +++ b/test/functional/chronik_serve.py @@ -1,46 +1,50 @@ #!/usr/bin/env python3 # Copyright (c) 2022 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. from test_framework.netutil import test_ipv6_local from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikServeTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient def test_host(ip, port): chronik = ChronikClient(ip, port) - response = chronik._request_get('/path/does/not/exist', pb_type=None) - assert_equal(response.err(404).msg, '404: Not found: /path/does/not/exist') + response = chronik._request_get("/path/does/not/exist", pb_type=None) + assert_equal(response.err(404).msg, "404: Not found: /path/does/not/exist") - test_host('127.0.0.1', self.nodes[0].chronik_port) + test_host("127.0.0.1", self.nodes[0].chronik_port) - self.restart_node(0, ['-chronik', '-chronikbind=0.0.0.0']) - test_host('127.0.0.1', 18442) + self.restart_node(0, ["-chronik", "-chronikbind=0.0.0.0"]) + test_host("127.0.0.1", 18442) - self.restart_node(0, ['-chronik', '-chronikbind=127.0.0.1:12345']) - test_host('127.0.0.1', 12345) + self.restart_node(0, ["-chronik", "-chronikbind=127.0.0.1:12345"]) + test_host("127.0.0.1", 12345) if test_ipv6_local(): - self.restart_node(0, - ['-chronik', - '-chronikbind=127.0.0.1:12345', - '-chronikbind=[::1]:23456']) - test_host('127.0.0.1', 12345) - test_host('::1', 23456) - - -if __name__ == '__main__': + self.restart_node( + 0, + [ + "-chronik", + "-chronikbind=127.0.0.1:12345", + "-chronikbind=[::1]:23456", + ], + ) + test_host("127.0.0.1", 12345) + test_host("::1", 23456) + + +if __name__ == "__main__": ChronikServeTest().main() diff --git a/test/functional/chronik_spent_by.py b/test/functional/chronik_spent_by.py index 2d416bd98..648569560 100644 --- a/test/functional/chronik_spent_by.py +++ b/test/functional/chronik_spent_by.py @@ -1,168 +1,180 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test if the `Tx.spent_by` field is set correctly in Chronik. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import ( create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.script import OP_EQUAL, OP_HASH160, CScript, hash160 from test_framework.test_framework import BitcoinTestFramework from test_framework.txtools import pad_tx from test_framework.util import assert_equal class ChronikSpentByTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] node.setmocktime(1300000000) - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) peer = node.add_p2p_connection(P2PDataStore()) coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] + cointx = coinblock["tx"][0] tip = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] coinvalue = 5000000000 send_values = [coinvalue - 10000, 1000, 1000, 1000] send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] send_script_hashes = [hash160(script) for script in send_redeem_scripts] - send_scripts = [CScript([OP_HASH160, script_hash, OP_EQUAL]) - for script_hash in send_script_hashes] + send_scripts = [ + CScript([OP_HASH160, script_hash, OP_EQUAL]) + for script_hash in send_script_hashes + ] tx = CTransaction() - tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] - tx.vout = [CTxOut(value, script) - for (value, script) in zip(send_values, send_scripts)] + tx.vin = [ + CTxIn(outpoint=COutPoint(int(cointx, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) + ] + tx.vout = [ + CTxOut(value, script) for (value, script) in zip(send_values, send_scripts) + ] tx.rehash() # Submit tx to mempool txid = node.sendrawtransaction(tx.serialize().hex()) def tx_outputs_spent(tx): return [output.spent_by for output in tx.outputs] def find_tx(txs): return [tx for tx in txs if tx.txid[::-1].hex() == txid][0] def check_outputs_spent(expected_outpoints, *, has_been_mined): assert_equal( tx_outputs_spent(chronik.tx(txid).ok()), expected_outpoints, ) for script_hash in send_script_hashes: - chronik_script = chronik.script('p2sh', script_hash.hex()) + chronik_script = chronik.script("p2sh", script_hash.hex()) if has_been_mined: txs = chronik_script.confirmed_txs().ok() else: txs = chronik_script.unconfirmed_txs().ok() tx = find_tx(txs.txs) assert_equal(tx, find_tx(chronik_script.history().ok().txs)) assert_equal( tx_outputs_spent(tx), expected_outpoints, ) # Initially, none of the outputs are spent check_outputs_spent([pb.SpentBy()] * len(send_values), has_been_mined=False) # Add tx that spends the middle two outputs to mempool tx2 = CTransaction() - tx2.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), i + 1), - scriptSig=CScript([redeem_script])) - for i, redeem_script in enumerate(send_redeem_scripts[1:3])] + tx2.vin = [ + CTxIn( + outpoint=COutPoint(int(txid, 16), i + 1), + scriptSig=CScript([redeem_script]), + ) + for i, redeem_script in enumerate(send_redeem_scripts[1:3]) + ] pad_tx(tx2) txid2 = node.sendrawtransaction(tx2.serialize().hex()) middle_two_spent = [ pb.SpentBy(), pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), pb.SpentBy(), ] check_outputs_spent(middle_two_spent, has_been_mined=False) # Mining both txs still works block2 = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] check_outputs_spent(middle_two_spent, has_been_mined=True) # Add tx that also spends the last output to the mempool tx3 = CTransaction() - tx3.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 3), - scriptSig=CScript([send_redeem_scripts[3]]))] + tx3.vin = [ + CTxIn( + outpoint=COutPoint(int(txid, 16), 3), + scriptSig=CScript([send_redeem_scripts[3]]), + ) + ] pad_tx(tx3) txid3 = node.sendrawtransaction(tx3.serialize().hex()) # 2 outputs spent by a mined tx, 1 output spent by a mempool tx last_three_spent = [ pb.SpentBy(), pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), pb.SpentBy(txid=bytes.fromhex(txid3)[::-1], input_idx=0), ] check_outputs_spent(last_three_spent, has_been_mined=True) # Mining tx3 still works block3 = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] check_outputs_spent(last_three_spent, has_been_mined=True) # Adding tx3 back to mempool still works node.invalidateblock(block3) check_outputs_spent(last_three_spent, has_been_mined=True) # Adding tx and tx2 back to mempool still works node.invalidateblock(block2) check_outputs_spent(last_three_spent, has_been_mined=False) # Mine a tx conflicting with tx3 tx3_conflict = CTransaction(tx3) tx3_conflict.nLockTime = 1 tx3_conflict.rehash() # Block mines tx, tx2 and tx3_conflict - block = create_block(int(tip, 16), - create_coinbase(101, b'\x03' * 33), - 1300000500) + block = create_block( + int(tip, 16), create_coinbase(101, b"\x03" * 33), 1300000500 + ) block.vtx += [tx, tx2, tx3_conflict] make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) conflict_spent = [ pb.SpentBy(), pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=0), pb.SpentBy(txid=bytes.fromhex(txid2)[::-1], input_idx=1), pb.SpentBy(txid=bytes.fromhex(tx3_conflict.hash)[::-1], input_idx=0), ] check_outputs_spent(conflict_spent, has_been_mined=True) -if __name__ == '__main__': +if __name__ == "__main__": ChronikSpentByTest().main() diff --git a/test/functional/chronik_tx.py b/test/functional/chronik_tx.py index 9e727d16d..ac6259cf1 100644 --- a/test/functional/chronik_tx.py +++ b/test/functional/chronik_tx.py @@ -1,187 +1,214 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik's /tx endpoint. """ from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.blocktools import GENESIS_CB_TXID, create_block, create_coinbase from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.script import OP_EQUAL, OP_HASH160, CScript, hash160 from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikTxTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 - self.extra_args = [['-chronik']] + self.extra_args = [["-chronik"]] self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb from test_framework.chronik.test_data import genesis_cb_tx node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) peer = node.add_p2p_connection(P2PDataStore()) node.setmocktime(1333333337) - assert_equal(chronik.tx('0').err(400).msg, '400: Not a txid: 0') - assert_equal(chronik.tx('123').err(400).msg, '400: Not a txid: 123') - assert_equal(chronik.tx('1234f').err(400).msg, '400: Not a txid: 1234f') - assert_equal(chronik.tx('00' * 31).err(400).msg, f'400: Not a txid: {"00"*31}') - assert_equal(chronik.tx('01').err(400).msg, '400: Not a txid: 01') - assert_equal(chronik.tx('12345678901').err(400).msg, - '400: Not a txid: 12345678901') + assert_equal(chronik.tx("0").err(400).msg, "400: Not a txid: 0") + assert_equal(chronik.tx("123").err(400).msg, "400: Not a txid: 123") + assert_equal(chronik.tx("1234f").err(400).msg, "400: Not a txid: 1234f") + assert_equal(chronik.tx("00" * 31).err(400).msg, f'400: Not a txid: {"00"*31}') + assert_equal(chronik.tx("01").err(400).msg, "400: Not a txid: 01") + assert_equal( + chronik.tx("12345678901").err(400).msg, "400: Not a txid: 12345678901" + ) - assert_equal(chronik.tx('00' * 32).err(404).msg, - f'404: Transaction {"00"*32} not found in the index') + assert_equal( + chronik.tx("00" * 32).err(404).msg, + f'404: Transaction {"00"*32} not found in the index', + ) # Verify queried genesis tx matches assert_equal(chronik.tx(GENESIS_CB_TXID).ok(), genesis_cb_tx()) coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] + cointx = coinblock["tx"][0] self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) coinvalue = 5000000000 send_values = [coinvalue - 10000, 1000, 2000, 3000] send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] - send_scripts = [CScript([OP_HASH160, hash160(redeem_script), OP_EQUAL]) - for redeem_script in send_redeem_scripts] + send_scripts = [ + CScript([OP_HASH160, hash160(redeem_script), OP_EQUAL]) + for redeem_script in send_redeem_scripts + ] tx = CTransaction() tx.nVersion = 2 - tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE, - nSequence=0xfffffffe)] - tx.vout = [CTxOut(value, script) - for (value, script) in zip(send_values, send_scripts)] + tx.vin = [ + CTxIn( + outpoint=COutPoint(int(cointx, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE, + nSequence=0xFFFFFFFE, + ) + ] + tx.vout = [ + CTxOut(value, script) for (value, script) in zip(send_values, send_scripts) + ] tx.nLockTime = 1234567890 # Submit tx to mempool txid = node.sendrawtransaction(tx.serialize().hex()) proto_tx = pb.Tx( txid=bytes.fromhex(txid)[::-1], version=tx.nVersion, - inputs=[pb.TxInput( - prev_out=pb.OutPoint(txid=bytes.fromhex(cointx)[::-1], out_idx=0), - input_script=bytes(tx.vin[0].scriptSig), - output_script=bytes(P2SH_OP_TRUE), - value=coinvalue, - sequence_no=0xfffffffe, - )], - outputs=[pb.TxOutput( - value=value, - output_script=bytes(script), - ) for value, script in zip(send_values, send_scripts)], + inputs=[ + pb.TxInput( + prev_out=pb.OutPoint(txid=bytes.fromhex(cointx)[::-1], out_idx=0), + input_script=bytes(tx.vin[0].scriptSig), + output_script=bytes(P2SH_OP_TRUE), + value=coinvalue, + sequence_no=0xFFFFFFFE, + ) + ], + outputs=[ + pb.TxOutput( + value=value, + output_script=bytes(script), + ) + for value, script in zip(send_values, send_scripts) + ], lock_time=1234567890, block=None, time_first_seen=1333333337, size=len(tx.serialize()), is_coinbase=False, ) assert_equal(chronik.tx(txid).ok(), proto_tx) # If we mine the block, querying will gives us all the tx details + block txblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] # Set the `block` field, now that we mined it - proto_tx.block.CopyFrom(pb.BlockMetadata( - hash=bytes.fromhex(txblockhash)[::-1], - height=102, - timestamp=1333333355, - )) + proto_tx.block.CopyFrom( + pb.BlockMetadata( + hash=bytes.fromhex(txblockhash)[::-1], + height=102, + timestamp=1333333355, + ) + ) assert_equal(chronik.tx(txid).ok(), proto_tx) node.setmocktime(1333333338) tx2 = CTransaction() tx2.nVersion = 2 - tx2.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), i), - scriptSig=CScript([redeem_script]), - nSequence=0xfffffff0 + i) - for i, redeem_script in enumerate(send_redeem_scripts)] + tx2.vin = [ + CTxIn( + outpoint=COutPoint(int(txid, 16), i), + scriptSig=CScript([redeem_script]), + nSequence=0xFFFFFFF0 + i, + ) + for i, redeem_script in enumerate(send_redeem_scripts) + ] tx2.vout = [CTxOut(coinvalue - 20000, send_scripts[0])] tx2.nLockTime = 12 # Submit tx to mempool txid2 = node.sendrawtransaction(tx2.serialize().hex()) proto_tx2 = pb.Tx( txid=bytes.fromhex(txid2)[::-1], version=tx2.nVersion, inputs=[ pb.TxInput( prev_out=pb.OutPoint(txid=bytes.fromhex(txid)[::-1], out_idx=i), input_script=bytes(tx2.vin[i].scriptSig), output_script=bytes(script), value=value, - sequence_no=0xfffffff0 + i, + sequence_no=0xFFFFFFF0 + i, ) for i, (value, script) in enumerate(zip(send_values, send_scripts)) ], - outputs=[pb.TxOutput( - value=tx2.vout[0].nValue, - output_script=bytes(tx2.vout[0].scriptPubKey), - )], + outputs=[ + pb.TxOutput( + value=tx2.vout[0].nValue, + output_script=bytes(tx2.vout[0].scriptPubKey), + ) + ], lock_time=12, block=None, time_first_seen=1333333338, size=len(tx2.serialize()), is_coinbase=False, ) assert_equal(chronik.tx(txid2).ok(), proto_tx2) # Mine tx tx2blockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] # Invalidate block node.invalidateblock(tx2blockhash) # Tx back in mempool assert_equal(chronik.tx(txid2).ok(), proto_tx2) # Mine conflicting tx conflict_tx = CTransaction(tx2) conflict_tx.nLockTime = 13 - block = create_block(int(txblockhash, 16), - create_coinbase(103, b'\x03' * 33), - 1333333500) + block = create_block( + int(txblockhash, 16), create_coinbase(103, b"\x03" * 33), 1333333500 + ) block.vtx += [conflict_tx] block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) - assert_equal(chronik.tx(txid2).err(404).msg, - f'404: Transaction {txid2} not found in the index') + assert_equal( + chronik.tx(txid2).err(404).msg, + f"404: Transaction {txid2} not found in the index", + ) proto_tx2.txid = bytes.fromhex(conflict_tx.hash)[::-1] proto_tx2.lock_time = 13 proto_tx2.time_first_seen = 0 - proto_tx2.block.CopyFrom(pb.BlockMetadata( - hash=bytes.fromhex(block.hash)[::-1], - height=103, - timestamp=1333333500, - )) + proto_tx2.block.CopyFrom( + pb.BlockMetadata( + hash=bytes.fromhex(block.hash)[::-1], + height=103, + timestamp=1333333500, + ) + ) assert_equal(chronik.tx(conflict_tx.hash).ok(), proto_tx2) -if __name__ == '__main__': +if __name__ == "__main__": ChronikTxTest().main() diff --git a/test/functional/chronik_ws.py b/test/functional/chronik_ws.py index 9ac45159c..5276b7fdc 100755 --- a/test/functional/chronik_ws.py +++ b/test/functional/chronik_ws.py @@ -1,94 +1,110 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test whether Chronik sends WebSocket messages correctly.""" from test_framework.avatools import can_find_inv_in_poll, get_ava_p2p_interface from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal QUORUM_NODE_COUNT = 16 class ChronikWsTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.extra_args = [ [ - '-avaproofstakeutxodustthreshold=1000000', - '-avaproofstakeutxoconfirmations=1', - '-avacooldown=0', - '-avaminquorumstake=0', - '-avaminavaproofsnodecount=0', - '-chronik', - '-whitelist=noban@127.0.0.1', + "-avaproofstakeutxodustthreshold=1000000", + "-avaproofstakeutxoconfirmations=1", + "-avacooldown=0", + "-avaminquorumstake=0", + "-avaminavaproofsnodecount=0", + "-chronik", + "-whitelist=noban@127.0.0.1", ], ] self.supports_cli = False def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) # Build a fake quorum of nodes. def get_quorum(): - return [get_ava_p2p_interface(self, node) - for _ in range(0, QUORUM_NODE_COUNT)] + return [ + get_ava_p2p_interface(self, node) for _ in range(0, QUORUM_NODE_COUNT) + ] def has_finalized_tip(tip_expected): hash_tip_final = int(tip_expected, 16) can_find_inv_in_poll(quorum, hash_tip_final) return node.isfinalblock(tip_expected) # Connect, but don't subscribe yet ws = chronik.ws(timeout=30) # Pick one node from the quorum for polling. # ws will not receive msgs because it's not subscribed to blocks yet. quorum = get_quorum() - assert node.getavalancheinfo()['ready_to_poll'] is True + assert node.getavalancheinfo()["ready_to_poll"] is True tip = node.getbestblockhash() self.wait_until(lambda: has_finalized_tip(tip)) # Now subscribe to blocks, we'll get block updates from now on ws.sub_to_blocks() # Mine block tip = self.generate(node, 1)[-1] height = node.getblockcount() # We get a CONNECTED msg - assert_equal(ws.recv(), pb.WsMsg(block=pb.MsgBlock( - msg_type=pb.BLK_CONNECTED, - block_hash=bytes.fromhex(tip)[::-1], - block_height=height, - ))) + assert_equal( + ws.recv(), + pb.WsMsg( + block=pb.MsgBlock( + msg_type=pb.BLK_CONNECTED, + block_hash=bytes.fromhex(tip)[::-1], + block_height=height, + ) + ), + ) # After we wait, we get a FINALIZED msg self.wait_until(lambda: has_finalized_tip(tip)) - assert_equal(ws.recv(), pb.WsMsg(block=pb.MsgBlock( - msg_type=pb.BLK_FINALIZED, - block_hash=bytes.fromhex(tip)[::-1], - block_height=height, - ))) + assert_equal( + ws.recv(), + pb.WsMsg( + block=pb.MsgBlock( + msg_type=pb.BLK_FINALIZED, + block_hash=bytes.fromhex(tip)[::-1], + block_height=height, + ) + ), + ) # When we invalidate, we get a DISCONNECTED msg node.invalidateblock(tip) - assert_equal(ws.recv(), pb.WsMsg(block=pb.MsgBlock( - msg_type=pb.BLK_DISCONNECTED, - block_hash=bytes.fromhex(tip)[::-1], - block_height=height, - ))) - - -if __name__ == '__main__': + assert_equal( + ws.recv(), + pb.WsMsg( + block=pb.MsgBlock( + msg_type=pb.BLK_DISCONNECTED, + block_hash=bytes.fromhex(tip)[::-1], + block_height=height, + ) + ), + ) + + +if __name__ == "__main__": ChronikWsTest().main() diff --git a/test/functional/chronik_ws_script.py b/test/functional/chronik_ws_script.py index efa6b6a3d..269b0ea59 100755 --- a/test/functional/chronik_ws_script.py +++ b/test/functional/chronik_ws_script.py @@ -1,191 +1,221 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test whether Chronik sends WebSocket messages correctly.""" from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, SCRIPTSIG_OP_TRUE, ) from test_framework.avatools import can_find_inv_in_poll, get_ava_p2p_interface from test_framework.blocktools import ( create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.script import OP_EQUAL, OP_HASH160, CScript, hash160 from test_framework.test_framework import BitcoinTestFramework from test_framework.txtools import pad_tx from test_framework.util import assert_equal QUORUM_NODE_COUNT = 16 class ChronikWsScriptTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.extra_args = [ [ - '-avaproofstakeutxodustthreshold=1000000', - '-avaproofstakeutxoconfirmations=1', - '-avacooldown=0', - '-avaminquorumstake=0', - '-avaminavaproofsnodecount=0', - '-chronik', - '-whitelist=noban@127.0.0.1', + "-avaproofstakeutxodustthreshold=1000000", + "-avaproofstakeutxoconfirmations=1", + "-avacooldown=0", + "-avaminquorumstake=0", + "-avaminavaproofsnodecount=0", + "-chronik", + "-whitelist=noban@127.0.0.1", ], ] self.supports_cli = False self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import ChronikClient, pb node = self.nodes[0] - chronik = ChronikClient('127.0.0.1', node.chronik_port) + chronik = ChronikClient("127.0.0.1", node.chronik_port) node.setmocktime(1300000000) peer = node.add_p2p_connection(P2PDataStore()) # Make us a coin coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) - cointx = coinblock['tx'][0] + cointx = coinblock["tx"][0] # Set up Avalanche def get_quorum(): - return [get_ava_p2p_interface(self, node) - for _ in range(0, QUORUM_NODE_COUNT)] + return [ + get_ava_p2p_interface(self, node) for _ in range(0, QUORUM_NODE_COUNT) + ] def has_finalized_tip(tip_expected): hash_tip_final = int(tip_expected, 16) can_find_inv_in_poll(quorum, hash_tip_final) return node.isfinalblock(tip_expected) quorum = get_quorum() - assert node.getavalancheinfo()['ready_to_poll'] is True + assert node.getavalancheinfo()["ready_to_poll"] is True tip = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] # Tx sending to 4 different scripts coinvalue = 5000000000 send_values = [coinvalue - 10000, 1000, 1000, 1000] send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] send_script_hashes = [hash160(script) for script in send_redeem_scripts] - send_scripts = [CScript([OP_HASH160, script_hash, OP_EQUAL]) - for script_hash in send_script_hashes] + send_scripts = [ + CScript([OP_HASH160, script_hash, OP_EQUAL]) + for script_hash in send_script_hashes + ] tx = CTransaction() - tx.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), - scriptSig=SCRIPTSIG_OP_TRUE)] - tx.vout = [CTxOut(value, script) - for (value, script) in zip(send_values, send_scripts)] + tx.vin = [ + CTxIn(outpoint=COutPoint(int(cointx, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) + ] + tx.vout = [ + CTxOut(value, script) for (value, script) in zip(send_values, send_scripts) + ] # Connect 2 websocket clients ws1 = chronik.ws(timeout=30) ws2 = chronik.ws(timeout=30) # Subscribe to 2 scripts on ws1 and 1 on ws2 - ws1.sub_script('p2sh', send_script_hashes[1]) - ws1.sub_script('p2sh', send_script_hashes[2]) - ws2.sub_script('p2sh', send_script_hashes[2]) + ws1.sub_script("p2sh", send_script_hashes[1]) + ws1.sub_script("p2sh", send_script_hashes[2]) + ws2.sub_script("p2sh", send_script_hashes[2]) # Send the tx, will send updates to ws1 and ws2 txid = node.sendrawtransaction(tx.serialize().hex()) - expected_msg = pb.WsMsg(tx=pb.MsgTx( - msg_type=pb.TX_ADDED_TO_MEMPOOL, - txid=bytes.fromhex(txid)[::-1], - )) + expected_msg = pb.WsMsg( + tx=pb.MsgTx( + msg_type=pb.TX_ADDED_TO_MEMPOOL, + txid=bytes.fromhex(txid)[::-1], + ) + ) # ws1 receives the tx msg twice, as it contains both scripts assert_equal(ws1.recv(), expected_msg) assert_equal(ws1.recv(), expected_msg) assert_equal(ws2.recv(), expected_msg) # Unsubscribe ws1 from the other script ws2 is subscribed to - ws1.sub_script('p2sh', send_script_hashes[2], is_unsub=True) + ws1.sub_script("p2sh", send_script_hashes[2], is_unsub=True) # tx2 is only sent to ws2 tx2 = CTransaction() - tx2.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 2), - scriptSig=CScript([send_redeem_scripts[2]]))] + tx2.vin = [ + CTxIn( + outpoint=COutPoint(int(txid, 16), 2), + scriptSig=CScript([send_redeem_scripts[2]]), + ) + ] pad_tx(tx2) txid2 = node.sendrawtransaction(tx2.serialize().hex()) - assert_equal(ws2.recv(), pb.WsMsg(tx=pb.MsgTx( - msg_type=pb.TX_ADDED_TO_MEMPOOL, - txid=bytes.fromhex(txid2)[::-1], - ))) + assert_equal( + ws2.recv(), + pb.WsMsg( + tx=pb.MsgTx( + msg_type=pb.TX_ADDED_TO_MEMPOOL, + txid=bytes.fromhex(txid2)[::-1], + ) + ), + ) # tx3 is only sent to ws1 tx3 = CTransaction() - tx3.vin = [CTxIn(outpoint=COutPoint(int(txid, 16), 1), - scriptSig=CScript([send_redeem_scripts[1]]))] + tx3.vin = [ + CTxIn( + outpoint=COutPoint(int(txid, 16), 1), + scriptSig=CScript([send_redeem_scripts[1]]), + ) + ] pad_tx(tx3) txid3 = node.sendrawtransaction(tx3.serialize().hex()) - assert_equal(ws1.recv(), pb.WsMsg(tx=pb.MsgTx( - msg_type=pb.TX_ADDED_TO_MEMPOOL, - txid=bytes.fromhex(txid3)[::-1], - ))) + assert_equal( + ws1.recv(), + pb.WsMsg( + tx=pb.MsgTx( + msg_type=pb.TX_ADDED_TO_MEMPOOL, + txid=bytes.fromhex(txid3)[::-1], + ) + ), + ) # Tweak tx3 to cause a conflict tx3_conflict = CTransaction(tx3) tx3_conflict.nLockTime = 1 tx3_conflict.rehash() # Mine tx, tx2 and tx3_conflict height = 102 - block = create_block(int(tip, 16), - create_coinbase(height, b'\x03' * 33), - 1300000500) + block = create_block( + int(tip, 16), create_coinbase(height, b"\x03" * 33), 1300000500 + ) block.vtx += [tx, tx2, tx3_conflict] make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) def check_tx_msgs(ws, msg_type, txids): for txid in txids: - assert_equal(ws.recv(), pb.WsMsg(tx=pb.MsgTx( - msg_type=msg_type, - txid=bytes.fromhex(txid)[::-1], - ))) + assert_equal( + ws.recv(), + pb.WsMsg( + tx=pb.MsgTx( + msg_type=msg_type, + txid=bytes.fromhex(txid)[::-1], + ) + ), + ) # For ws1, this sends a REMOVED_FROM_MEMPOOL for tx3, and two CONFIRMED check_tx_msgs(ws1, pb.TX_REMOVED_FROM_MEMPOOL, [tx3.hash]) check_tx_msgs(ws1, pb.TX_CONFIRMED, sorted([txid, tx3_conflict.hash])) # For ws2, this only sends the CONFIRMED msgs check_tx_msgs(ws2, pb.TX_CONFIRMED, sorted([txid, txid2])) # Invalidate the block again node.invalidateblock(block.hash) # Adds the disconnected block's txs back into the mempool check_tx_msgs(ws1, pb.TX_ADDED_TO_MEMPOOL, [txid, tx3_conflict.hash]) check_tx_msgs(ws2, pb.TX_ADDED_TO_MEMPOOL, [txid, txid2]) # Test Avalanche finalization tip = node.getbestblockhash() self.wait_until(lambda: has_finalized_tip(tip)) # Mine txs in a block -> sends CONFIRMED tip = self.generate(node, 1)[-1] check_tx_msgs(ws1, pb.TX_CONFIRMED, sorted([txid, tx3_conflict.hash])) check_tx_msgs(ws2, pb.TX_CONFIRMED, sorted([txid, txid2])) # Wait for Avalanche finalization of block -> sends TX_FINALIZED self.wait_until(lambda: has_finalized_tip(tip)) check_tx_msgs(ws1, pb.TX_FINALIZED, sorted([txid, tx3_conflict.hash])) check_tx_msgs(ws2, pb.TX_FINALIZED, sorted([txid, txid2])) -if __name__ == '__main__': +if __name__ == "__main__": ChronikWsScriptTest().main()