diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -307,8 +307,10 @@ "abc-rust-error", "abc-rust-lint", "bitcoinsuite-core", + "chronik-bridge", "chronik-db", "chronik-util", + "cxx", "pretty_assertions", "tempdir", "thiserror", diff --git a/chronik/chronik-cpp/chronik.cpp b/chronik/chronik-cpp/chronik.cpp --- a/chronik/chronik-cpp/chronik.cpp +++ b/chronik/chronik-cpp/chronik.cpp @@ -21,15 +21,16 @@ return vec; } -bool Start([[maybe_unused]] const Config &config, - [[maybe_unused]] const node::NodeContext &node) { - return chronik_bridge::setup_chronik({ - .datadir_net = gArgs.GetDataDirNet().u8string(), - .hosts = ToRustVec<rust::String>(gArgs.IsArgSet("-chronikbind") - ? gArgs.GetArgs("-chronikbind") - : DEFAULT_BINDS), - .default_port = BaseParams().ChronikPort(), - }); +bool Start(const Config &config, const node::NodeContext &node) { + return chronik_bridge::setup_chronik( + { + .datadir_net = gArgs.GetDataDirNet().u8string(), + .hosts = ToRustVec<rust::String>(gArgs.IsArgSet("-chronikbind") + ? gArgs.GetArgs("-chronikbind") + : DEFAULT_BINDS), + .default_port = BaseParams().ChronikPort(), + }, + config, node); } void Stop() { diff --git a/chronik/chronik-indexer/Cargo.toml b/chronik/chronik-indexer/Cargo.toml --- a/chronik/chronik-indexer/Cargo.toml +++ b/chronik/chronik-indexer/Cargo.toml @@ -11,9 +11,13 @@ abc-rust-lint = { path = "../abc-rust-lint" } abc-rust-error = { path = "../abc-rust-error" } bitcoinsuite-core = { path = "../bitcoinsuite-core" } +chronik-bridge = { path = "../chronik-bridge" } chronik-db = { path = "../chronik-db" } chronik-util = { path = "../chronik-util" } +# Bridge to C++ +cxx = "1.0" + # Derive error enums thiserror = "1.0" diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -7,11 +7,13 @@ use std::path::PathBuf; use abc_rust_error::{Result, WrapErr}; +use bitcoinsuite_core::block::BlockHash; +use chronik_bridge::ffi; use chronik_db::{ db::{Db, WriteBatch}, - io::{BlockReader, BlockWriter, DbBlock}, + io::{BlockHeight, BlockReader, BlockWriter, DbBlock}, }; -use chronik_util::log_chronik; +use chronik_util::{log, log_chronik}; use thiserror::Error; /// Params for setting up a [`ChronikIndexer`] instance. @@ -40,6 +42,24 @@ /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateIndexesDirFailed(PathBuf), + + /// Cannot rewind blocks that bitcoind doesn't have + #[error( + "Cannot rewind Chronik, it contains block {0} that the node doesn't \ + have. You may need to -reindex, or delete indexes/chronik and restart" + )] + CannotRewindChronik(BlockHash), + + /// Lower block doesn't exist but higher block does + #[error( + "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" + )] + BlocksBelowMissing { + /// Lower height that is missing + missing: BlockHeight, + /// Higher height that exists + exists: BlockHeight, + }, } use self::ChronikIndexerError::*; @@ -59,6 +79,107 @@ Ok(ChronikIndexer { db }) } + /// Resync Chronik index to the node + pub fn resync_indexer( + &mut self, + bridge: &ffi::ChronikBridge, + ) -> Result<()> { + let indexer_tip = self.blocks()?.tip()?; + let Ok(node_tip_index) = bridge.get_chain_tip() else { + if let Some(indexer_tip) = &indexer_tip { + return Err( + CannotRewindChronik(indexer_tip.hash.clone()).into() + ); + } + return Ok(()); + }; + let node_tip_info = ffi::get_block_info(node_tip_index); + let node_height = node_tip_info.height; + let node_tip_hash = BlockHash::from(node_tip_info.hash); + let fork_height = match indexer_tip { + Some(tip) => { + let indexer_tip_hash = tip.hash.clone(); + let indexer_height = tip.height; + log!( + "Node and Chronik diverged, node is on block \ + {node_tip_hash} at height {node_height}, and Chronik is \ + on block {indexer_tip_hash} at height {indexer_height}.\n" + ); + let indexer_tip_index = bridge + .lookup_block_index(tip.hash.to_bytes()) + .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; + self.rewind_indexer(bridge, indexer_tip_index, &tip)? + } + None => { + log!( + "Chronik database empty, syncing to block {node_tip_hash} \ + at height {node_height}.\n" + ); + -1 + } + }; + let tip_height = node_tip_info.height; + for height in fork_height + 1..=tip_height { + let block_index = ffi::get_block_ancestor(node_tip_index, height)?; + let ffi_block = bridge.load_block(block_index)?; + let ffi_block = expect_unique_ptr("load_block", &ffi_block); + let block = make_chronik_block(ffi_block, block_index); + let hash = block.db_block.hash.clone(); + self.handle_block_connected(block)?; + log_chronik!( + "Added block {hash}, height {height}/{tip_height} to Chronik\n" + ); + if height % 100 == 0 { + log!( + "Synced Chronik up to block {hash} at height \ + {height}/{tip_height}\n" + ); + } + } + log!( + "Chronik completed re-syncing with the node, both are now at \ + block {node_tip_hash} at height {node_height}.\n" + ); + Ok(()) + } + + fn rewind_indexer( + &mut self, + bridge: &ffi::ChronikBridge, + indexer_tip_index: &ffi::CBlockIndex, + indexer_db_tip: &DbBlock, + ) -> Result<BlockHeight> { + let indexer_height = indexer_db_tip.height; + let fork_block_index = bridge + .find_fork(indexer_tip_index) + .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; + let fork_info = ffi::get_block_info(fork_block_index); + let fork_block_hash = BlockHash::from(fork_info.hash); + let fork_height = fork_info.height; + let revert_height = fork_height + 1; + log!( + "The last common block is {fork_block_hash} at height \ + {fork_height}.\n" + ); + log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); + for height in (revert_height..indexer_height).rev() { + let db_block = self.blocks()?.by_height(height)?.ok_or( + BlocksBelowMissing { + missing: height, + exists: indexer_height, + }, + )?; + let block_index = bridge + .lookup_block_index(db_block.hash.to_bytes()) + .map_err(|_| CannotRewindChronik(db_block.hash))?; + let ffi_block = bridge.load_block(block_index)?; + let ffi_block = expect_unique_ptr("load_block", &ffi_block); + let block = make_chronik_block(ffi_block, block_index); + self.handle_block_disconnected(block)?; + } + Ok(fork_info.height) + } + /// Add the block to the index. pub fn handle_block_connected( &mut self, @@ -89,6 +210,33 @@ } } +/// Build the ChronikBlock from the CBlockIndex +pub fn make_chronik_block( + block: &ffi::CBlock, + bindex: &ffi::CBlockIndex, +) -> ChronikBlock { + let block = ffi::bridge_block(block, bindex); + let db_block = DbBlock { + hash: BlockHash::from(block.hash), + prev_hash: BlockHash::from(block.prev_hash), + height: block.height, + n_bits: block.n_bits, + timestamp: block.timestamp, + file_num: block.file_num, + data_pos: block.data_pos, + }; + ChronikBlock { db_block } +} + +fn expect_unique_ptr<'ptr, T: cxx::memory::UniquePtrTarget>( + name: &str, + cblock: &'ptr cxx::UniquePtr<T>, +) -> &'ptr T { + cblock + .as_ref() + .unwrap_or_else(|| panic!("{name} returned a null std::unique_ptr")) +} + #[cfg(test)] mod tests { use abc_rust_error::Result; diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -10,12 +10,12 @@ }; use abc_rust_error::Result; -use bitcoinsuite_core::block::BlockHash; -use chronik_bridge::ffi::{bridge_block, init_error, CBlock, CBlockIndex}; -use chronik_db::io::DbBlock; +use chronik_bridge::ffi::{ + init_error, make_bridge, CBlock, CBlockIndex, Config, NodeContext, +}; use chronik_http::server::{ChronikServer, ChronikServerParams}; use chronik_indexer::indexer::{ - ChronikBlock, ChronikIndexer, ChronikIndexerParams, + make_chronik_block, ChronikIndexer, ChronikIndexerParams, }; use chronik_util::{log, log_chronik}; use thiserror::Error; @@ -37,8 +37,12 @@ use self::ChronikError::*; /// Setup the Chronik bridge. Returns a ChronikIndexer object. -pub fn setup_chronik(params: ffi::SetupParams) -> bool { - match try_setup_chronik(params) { +pub fn setup_chronik( + params: ffi::SetupParams, + config: &Config, + node: &NodeContext, +) -> bool { + match try_setup_chronik(params, config, node) { Ok(()) => true, Err(report) => { log_chronik!("{report:?}"); @@ -47,7 +51,11 @@ } } -fn try_setup_chronik(params: ffi::SetupParams) -> Result<()> { +fn try_setup_chronik( + params: ffi::SetupParams, + config: &Config, + node: &NodeContext, +) -> Result<()> { abc_rust_error::install(); let hosts = params .hosts @@ -55,9 +63,12 @@ .map(|host| parse_socket_addr(host, params.default_port)) .collect::<Result<Vec<_>>>()?; log!("Starting Chronik bound to {:?}\n", hosts); - let indexer = ChronikIndexer::setup(ChronikIndexerParams { + let bridge = make_bridge(config, node); + let bridge = bridge.as_ref().expect("make_bridge"); + let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { datadir_net: params.datadir_net.into(), })?; + indexer.resync_indexer(bridge)?; let indexer = Arc::new(RwLock::new(indexer)); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() @@ -137,17 +148,3 @@ log_chronik!("Chronik: block disconnected\n"); } } - -fn make_chronik_block(block: &CBlock, bindex: &CBlockIndex) -> ChronikBlock { - let block = bridge_block(block, bindex); - let db_block = DbBlock { - hash: BlockHash::from(block.hash), - prev_hash: BlockHash::from(block.prev_hash), - height: block.height, - n_bits: block.n_bits, - timestamp: block.timestamp, - file_num: block.file_num, - data_pos: block.data_pos, - }; - ChronikBlock { db_block } -} diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs --- a/chronik/chronik-lib/src/ffi.rs +++ b/chronik/chronik-lib/src/ffi.rs @@ -24,7 +24,11 @@ extern "Rust" { type Chronik; - fn setup_chronik(params: SetupParams) -> bool; + fn setup_chronik( + params: SetupParams, + config: &Config, + node: &NodeContext, + ) -> bool; fn handle_tx_added_to_mempool(&self); fn handle_tx_removed_from_mempool(&self); @@ -39,6 +43,8 @@ unsafe extern "C++" { include!("blockindex.h"); include!("chronik-cpp/chronik_validationinterface.h"); + include!("config.h"); + include!("node/context.h"); include!("primitives/block.h"); /// CBlockIndex from blockindex.h @@ -49,6 +55,14 @@ #[namespace = ""] type CBlock = chronik_bridge::ffi::CBlock; + /// ::Config from config.h + #[namespace = ""] + type Config = chronik_bridge::ffi::Config; + + /// NodeContext from node/context.h + #[namespace = "node"] + type NodeContext = chronik_bridge::ffi::NodeContext; + /// Register the Chronik instance as CValidationInterface to receive /// chain updates from the node. #[namespace = "chronik"] diff --git a/test/functional/chronik_resync.py b/test/functional/chronik_resync.py new file mode 100644 --- /dev/null +++ b/test/functional/chronik_resync.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023 The Bitcoin developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. + +import http.client +import os +import shutil + +from test_framework.address import ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE +from test_framework.blocktools import GENESIS_BLOCK_HASH +from test_framework.test_framework import BitcoinTestFramework +from test_framework.test_node import ErrorMatch +from test_framework.util import assert_equal, get_datadir_path + + +class ChronikResyncTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + + def skip_test_if_missing_module(self): + self.skip_if_no_chronik() + + def run_test(self): + import chronik_pb2 as pb + + def query_block(block_height): + chronik_port = self.nodes[0].chronik_port + client = http.client.HTTPConnection('127.0.0.1', chronik_port, timeout=4) + client.request('GET', f'/block/{block_height}') + response = client.getresponse() + assert_equal(response.getheader('Content-Type'), + 'application/x-protobuf') + return response + + node = self.nodes[0] + + # Mine 100 blocks, that Chronik doesn't index + block_hashes = ( + [GENESIS_BLOCK_HASH] + + self.generatetoaddress(node, 100, ADDRESS_ECREG_P2SH_OP_TRUE) + ) + + # Restart with Chronik: syncs blocks from genesis + with node.assert_debug_log([ + f"Chronik database empty, syncing to block {block_hashes[100]} " + + "at height 100.", + f"Synced Chronik up to block {block_hashes[0]} at height 0/100", + f"Synced Chronik up to block {block_hashes[100]} at height 100/100", + "Chronik completed re-syncing with the node, both are now at " + + f"block {block_hashes[100]} at height 100.", + ]): + self.restart_node(0, ['-chronik']) + + for i in range(0, 101): + response = query_block(i) + assert_equal(response.status, 200) + proto_block = pb.Block() + proto_block.ParseFromString(response.read()) + assert_equal(proto_block.block_info.hash[::-1].hex(), block_hashes[i]) + + response = query_block(101) + assert_equal(response.status, 404) + + self.restart_node(0, []) + + # Without Chronik: Undo last 50 blocks, then add 100 new ones + node.invalidateblock(block_hashes[50]) + chronik_hash = block_hashes[100] + del block_hashes[50:] + block_hashes += ( + self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) + ) + + # Restart with Chronik: Undoes last 50 blocks, then adds node's next 100 + with node.assert_debug_log([ + f"Node and Chronik diverged, node is on block {block_hashes[149]} " + + f"at height 149, and Chronik is on block {chronik_hash} at height 100.", + f"The last common block is {block_hashes[49]} at height 49.", + "Reverting Chronik blocks 50 to 100", + f"Synced Chronik up to block {block_hashes[100]} at height 100/149", + "Chronik completed re-syncing with the node, both are now at block " + + f"{block_hashes[149]} at height 149.", + ]): + self.restart_node(0, ['-chronik']) + + for i in range(0, 150): + response = query_block(i) + assert_equal(response.status, 200) + proto_block = pb.Block() + proto_block.ParseFromString(response.read()) + assert_equal(proto_block.block_info.hash[::-1].hex(), block_hashes[i]) + + # Reset node blockchain back to genesis + # Leave Chronik untouched + node.stop_node() + datadir = get_datadir_path(self.options.tmpdir, 0) + shutil.rmtree(os.path.join(datadir, self.chain, 'blocks')) + shutil.rmtree(os.path.join(datadir, self.chain, 'chainstate')) + self.restart_node(0, ['-reindex']) + assert_equal(node.getbestblockhash(), GENESIS_BLOCK_HASH) + + # Chronik cannot sync because the node doesn't have the old blocks anymore + # It needs the node's block data to undo the stale blocks. + node.stop_node() + node.assert_start_raises_init_error( + ["-chronik"], + f"Error: Cannot rewind Chronik, it contains block {block_hashes[149]} " + + "that the node doesn't have. You may need to -reindex, or delete " + + "indexes/chronik and restart", + match=ErrorMatch.FULL_TEXT) + + +if __name__ == '__main__': + ChronikResyncTest().main()