Changeset View
Changeset View
Standalone View
Standalone View
test/functional/chronik_block_header.py
| # Copyright (c) The Bitcoin developers | # Copyright (c) The Bitcoin developers | ||||
| # Distributed under the MIT software license, see the accompanying | # Distributed under the MIT software license, see the accompanying | ||||
| # file COPYING or http://www.opensource.org/licenses/mit-license.php. | # file COPYING or http://www.opensource.org/licenses/mit-license.php. | ||||
| """ | """ | ||||
| Test Chronik's /header and /headers endpoints. | Test Chronik's /header and /headers endpoints. | ||||
| """ | """ | ||||
| from test_framework.address import ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE | from test_framework.address import ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE | ||||
| from test_framework.blocktools import GENESIS_BLOCK_HASH | from test_framework.blocktools import GENESIS_BLOCK_HASH | ||||
| from test_framework.merkle import merkle_root_and_branch | |||||
| from test_framework.messages import hash256 | from test_framework.messages import hash256 | ||||
| from test_framework.test_framework import BitcoinTestFramework | from test_framework.test_framework import BitcoinTestFramework | ||||
| from test_framework.util import assert_equal | from test_framework.util import assert_equal, hex_to_be_bytes | ||||
| NUM_HEADERS = 10 | NUM_HEADERS = 10 | ||||
| GENESIS_HEADER = bytes.fromhex( | GENESIS_HEADER = bytes.fromhex( | ||||
| "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7" | "0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7" | ||||
| "a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f2002000000" | "a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4adae5494dffff7f2002000000" | ||||
| ) | ) | ||||
| ▲ Show 20 Lines • Show All 112 Lines • ▼ Show 20 Lines | def run_test(self): | ||||
| assert_equal( | assert_equal( | ||||
| [ | [ | ||||
| hdr.raw_header | hdr.raw_header | ||||
| for hdr in chronik.block_headers(0, NUM_HEADERS).ok().headers | for hdr in chronik.block_headers(0, NUM_HEADERS).ok().headers | ||||
| ], | ], | ||||
| block_headers_from_rpc, | block_headers_from_rpc, | ||||
| ) | ) | ||||
| # Invalidate in the middle of the chain. We can no longer query invalidated | self.log.info("Test the checkpoint_height query param") | ||||
| # checkpoint height must be higher than the queried header height | |||||
| assert_equal( | |||||
| chronik.block_header(hash_or_height=4, checkpoint_height=2).err(400).msg, | |||||
| "400: Invalid checkpoint height 2, may not be below queried header height 4", | |||||
| ) | |||||
| # checkpoint height must be higher than the queried header height | |||||
| assert_equal( | |||||
| chronik.block_headers(0, 42, checkpoint_height=41).err(400).msg, | |||||
| "400: Invalid checkpoint height 41, may not be below queried header height 42", | |||||
| ) | |||||
| # checkpoint_height=0 disables the checkpoint data, any other value enables it | |||||
| assert_equal( | |||||
| chronik.block_header(hash_or_height=0, checkpoint_height=0).ok(), | |||||
| expected_genesis_header, | |||||
| ) | |||||
| for i in range(1, NUM_HEADERS + 1): | |||||
| proto_header = chronik.block_header( | |||||
| hash_or_height=i, checkpoint_height=0 | |||||
| ).ok() | |||||
| assert_equal(proto_header.root, b"") | |||||
| assert_equal(proto_header.branch, []) | |||||
| proto_header = chronik.block_header( | |||||
| hash_or_height=i, checkpoint_height=i | |||||
| ).ok() | |||||
| assert proto_header.root != b"" | |||||
| assert proto_header.branch != [] | |||||
| block_hashes_bytes = [hex_to_be_bytes(h) for h in block_hashes] | |||||
| for checkpoint_height in range(1, len(block_hashes_bytes)): | |||||
| for block_height in range(checkpoint_height + 1): | |||||
| proto_header = chronik.block_header( | |||||
| block_height, checkpoint_height | |||||
| ).ok() | |||||
| root, branch = merkle_root_and_branch( | |||||
| block_hashes_bytes[: checkpoint_height + 1], block_height | |||||
| ) | |||||
| assert_equal(proto_header.root, root) | |||||
| assert_equal(proto_header.branch, branch) | |||||
| proto_headers = ( | |||||
| chronik.block_headers( | |||||
| 0, NUM_HEADERS // 3, checkpoint_height=NUM_HEADERS // 2 | |||||
| ) | |||||
| .ok() | |||||
| .headers | |||||
| ) | |||||
| for i, hdr in enumerate(proto_headers[:-1]): | |||||
| assert_equal(hdr.raw_header, block_headers_from_rpc[i]) | |||||
| # only the final header has checkpoint data | |||||
| assert_equal(hdr.root, b"") | |||||
| assert_equal(hdr.branch, []) | |||||
| root, branch = merkle_root_and_branch( | |||||
| block_hashes_bytes[: NUM_HEADERS // 2 + 1], NUM_HEADERS // 3 | |||||
| ) | |||||
| assert_equal( | |||||
| proto_headers[-1].raw_header, block_headers_from_rpc[NUM_HEADERS // 3] | |||||
| ) | |||||
| assert_equal(proto_headers[-1].root, root) | |||||
| assert_equal(proto_headers[-1].branch, branch) | |||||
| self.log.info("Invalidate a block in the middle of the chain.") | |||||
| # We can no longer query invalidated | |||||
| # headers by height, but they remain accessible by hash | # headers by height, but they remain accessible by hash | ||||
| node.invalidateblock(block_hashes[NUM_HEADERS // 2]) | node.invalidateblock(block_hashes[NUM_HEADERS // 2]) | ||||
| for i in range(NUM_HEADERS // 2, NUM_HEADERS + 1): | for i in range(NUM_HEADERS // 2, NUM_HEADERS + 1): | ||||
| assert_equal( | assert_equal( | ||||
| chronik.block_header(i).err(404).msg, f"404: Block not found: {i}" | chronik.block_header(i).err(404).msg, f"404: Block not found: {i}" | ||||
| ) | ) | ||||
| chronik.block_header(block_hashes[i]).ok() | chronik.block_header(block_hashes[i]).ok() | ||||
| # Previous blocks are still fine | # Previous blocks are still fine | ||||
| for i in range(0, NUM_HEADERS // 2): | for i in range(0, NUM_HEADERS // 2): | ||||
| chronik.block_header(i).ok() | chronik.block_header(i).ok() | ||||
| chronik.block_header(block_hashes[i]).ok() | chronik.block_header(block_hashes[i]).ok() | ||||
| assert_equal( | assert_equal( | ||||
| [ | [ | ||||
| hdr.raw_header | hdr.raw_header | ||||
| for hdr in chronik.block_headers(0, NUM_HEADERS).ok().headers | for hdr in chronik.block_headers(0, NUM_HEADERS).ok().headers | ||||
| ], | ], | ||||
| block_headers_from_rpc[: NUM_HEADERS // 2], | block_headers_from_rpc[: NUM_HEADERS // 2], | ||||
| ) | ) | ||||
| # Mine fork block and check it connects | self.log.info("Mine a fork block and check it connects") | ||||
| fork_hash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | fork_hash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] | ||||
| assert fork_hash != block_hashes[NUM_HEADERS // 2] | assert fork_hash != block_hashes[NUM_HEADERS // 2] | ||||
| proto_header = chronik.block_header(NUM_HEADERS // 2).ok() | proto_header = chronik.block_header(NUM_HEADERS // 2).ok() | ||||
| rpc_header = bytes.fromhex(node.getblockheader(fork_hash, False)) | rpc_header = bytes.fromhex(node.getblockheader(fork_hash, False)) | ||||
| assert_equal(proto_header, pb.BlockHeader(raw_header=rpc_header)) | assert_equal(proto_header, pb.BlockHeader(raw_header=rpc_header)) | ||||
| assert_equal(chronik.block_header(fork_hash).ok(), proto_header) | assert_equal(chronik.block_header(fork_hash).ok(), proto_header) | ||||
| assert_equal( | assert_equal( | ||||
| [ | [ | ||||
| hdr.raw_header | hdr.raw_header | ||||
| for hdr in chronik.block_headers(0, NUM_HEADERS).ok().headers | for hdr in chronik.block_headers(0, NUM_HEADERS).ok().headers | ||||
| ], | ], | ||||
| block_headers_from_rpc[: NUM_HEADERS // 2] + [rpc_header], | block_headers_from_rpc[: NUM_HEADERS // 2] + [rpc_header], | ||||
| ) | ) | ||||
| self.log.info("Check merkle tree data after the block invalidation") | |||||
| block_hashes = ( | |||||
| block_hashes[: NUM_HEADERS // 2] | |||||
| + [fork_hash] | |||||
| + self.generatetoaddress(node, NUM_HEADERS, ADDRESS_ECREG_P2SH_OP_TRUE) | |||||
| ) | |||||
| block_hashes_bytes = [hex_to_be_bytes(h) for h in block_hashes] | |||||
| new_num_headers = len(block_hashes_bytes) | |||||
| for cp_height in range(1, new_num_headers): | |||||
| for block_height in range(cp_height + 1): | |||||
| root, branch = merkle_root_and_branch( | |||||
| block_hashes_bytes[: cp_height + 1], block_height | |||||
| ) | |||||
| proto_header = chronik.block_header(block_height, cp_height).ok() | |||||
| assert_equal( | |||||
| proto_header.raw_header, | |||||
| bytes.fromhex( | |||||
| node.getblockheader(block_hashes[block_height], False) | |||||
| ), | |||||
| ) | |||||
| assert_equal(proto_header.root, root) | |||||
| assert_equal(proto_header.branch, branch) | |||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||
| ChronikHeaderTest().main() | ChronikHeaderTest().main() | ||||