diff --git a/chronik/chronik-bridge/src/ffi.rs b/chronik/chronik-bridge/src/ffi.rs index b4089901e..73e0d7f1a 100644 --- a/chronik/chronik-bridge/src/ffi.rs +++ b/chronik/chronik-bridge/src/ffi.rs @@ -1,246 +1,250 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing the cxx definitions for the bridge from C++ to Rust. pub use self::ffi_inner::*; #[allow(unsafe_code)] #[cxx::bridge(namespace = "chronik_bridge")] mod ffi_inner { /// Info about a block #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct BlockInfo { /// Hash of the block (or 000...000 if no block) pub hash: [u8; 32], /// Height of the block (or -1 if no block) pub height: i32, } /// Block coming from bitcoind to Chronik. /// /// We don't index all fields (e.g. hashMerkleRoot), only those that are /// needed when querying a range of blocks. /// /// Instead of storing all the block data for Chronik again, we only store /// file_num, data_pos and undo_pos of the block data of the node. /// /// This makes the index relatively small, as it's mostly just pointing to /// the data the node already stores. /// /// Note that this prohibits us from using Chronik in pruned mode. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Block { /// Block hash pub hash: [u8; 32], /// hashPrevBlock, hash of the previous block in the chain pub prev_hash: [u8; 32], /// nBits, difficulty of the header pub n_bits: u32, /// Timestamp of the block pub timestamp: i64, /// Height of the block in the chain. pub height: i32, /// File number of the block file this block is stored in. /// This can be used to later slice out transactions, so we don't have /// to index txs twice. pub file_num: u32, /// Position of the block within the block file, starting at the block /// header. pub data_pos: u32, /// Position of the undo data within the undo file. pub undo_pos: u32, /// Serialized size of the block pub size: u64, /// Txs of this block, including positions within the block/undo files. pub txs: Vec, } /// Tx in a block #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct BlockTx { /// Tx (without disk data) pub tx: Tx, /// Where the tx is stored within the block file. pub data_pos: u32, /// Where the tx's undo data is stored within the block's undo file. pub undo_pos: u32, } /// CTransaction, in a block or in the mempool. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Tx { /// TxId of the tx. pub txid: [u8; 32], /// nVersion of the tx. pub version: i32, /// Tx inputs. pub inputs: Vec, /// Tx outputs. pub outputs: Vec, /// Locktime of the tx. pub locktime: u32, } /// COutPoint, pointing to a coin being spent. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct OutPoint { /// TxId of the output of the coin. pub txid: [u8; 32], /// Index in the outputs of the tx of the coin. pub out_idx: u32, } /// CTxIn, spending an unspent output. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxInput { /// Points to an output being spent. pub prev_out: OutPoint, /// scriptSig unlocking the output. pub script: Vec, /// nSequence. pub sequence: u32, /// Coin being spent by this tx. pub coin: Coin, } /// CTxOut, creating a new output. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxOutput { /// Value of the output. pub value: i64, /// Script locking the output. pub script: Vec, } /// Coin, can be spent by providing a valid unlocking script. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Coin { /// Output, locking the coins. pub output: TxOutput, /// Height of the coin in the chain. pub height: i32, /// Whether the coin is a coinbase. pub is_coinbase: bool, } #[allow(missing_debug_implementations)] unsafe extern "C++" { include!("blockindex.h"); include!("chronik-cpp/chronik_bridge.h"); include!("node/context.h"); include!("primitives/block.h"); include!("primitives/transaction.h"); /// node::NodeContext from node/context.h #[namespace = "node"] type NodeContext; /// ::CBlockIndex from blockindex.h #[namespace = ""] type CBlockIndex; /// ::CBlock from primitives/block.h #[namespace = ""] type CBlock; /// ::Config from config.h #[namespace = ""] type Config; /// ::CTransaction from primitives/transaction.h #[namespace = ""] type CTransaction; /// Bridge to bitcoind to access the node type ChronikBridge; /// Print the message to bitcoind's logs. fn log_print( logging_function: &str, source_file: &str, source_line: u32, msg: &str, ); /// Print the message to bitcoind's logs under the BCLog::Chronik /// category. fn log_print_chronik( logging_function: &str, source_file: &str, source_line: u32, msg: &str, ); /// Make the bridge given the NodeContext fn make_bridge( config: &Config, node: &NodeContext, ) -> UniquePtr; /// Return the tip of the chain of the node. /// Returns hash=000...000, height=-1 if there's no block on the chain. fn get_chain_tip(self: &ChronikBridge) -> Result<&CBlockIndex>; /// Lookup the block index with the given hash, or throw an error /// if it couldn't be found. fn lookup_block_index( self: &ChronikBridge, hash: [u8; 32], ) -> Result<&CBlockIndex>; /// Load the CBlock data of this CBlockIndex from the disk fn load_block( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result>; /// Bridge CTransaction -> ffi::Tx, including finding the spent coins. /// `tx` can be a mempool tx. fn bridge_tx(self: &ChronikBridge, tx: &CTransaction) -> Result; /// Find at which block the given block_index forks off from the node. fn find_fork( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result<&CBlockIndex>; /// Bridge bitcoind's classes to the shared struct [`Block`]. fn bridge_block( block: &CBlock, block_index: &CBlockIndex, ) -> Result; /// Load the CTransaction and CTxUndo data from disk and turn it into a /// bridged Tx, containing spent coins etc. fn load_tx(file_num: u32, data_pos: u32, undo_pos: u32) -> Result; /// Load the CTransaction from disk and serialize it. fn load_raw_tx(file_num: u32, data_pos: u32) -> Result>; /// Get a BlockInfo for this CBlockIndex. fn get_block_info(block_index: &CBlockIndex) -> BlockInfo; /// CBlockIndex::GetAncestor fn get_block_ancestor( block_index: &CBlockIndex, height: i32, ) -> Result<&CBlockIndex>; /// Compress the given script using `ScriptCompression`. fn compress_script(script: &[u8]) -> Vec; /// Decompress the given script using `ScriptCompression`. fn decompress_script(compressed: &[u8]) -> Result>; /// Calls `InitError` from `node/ui_interface.h` to report an error to /// the user and then gracefully shut down the node. fn init_error(msg: &str) -> bool; /// Calls `AbortNode` from shutdown.h to gracefully shut down the node /// when an unrecoverable error occured. fn abort_node(msg: &str, user_msg: &str); + + /// Returns true if a shutdown is requested, false otherwise. + /// See `ShutdownRequested` in `shutdown.h`. + fn shutdown_requested() -> bool; } } diff --git a/chronik/chronik-cpp/chronik_bridge.cpp b/chronik/chronik-cpp/chronik_bridge.cpp index 2d3b3f202..3eca1060d 100644 --- a/chronik/chronik-cpp/chronik_bridge.cpp +++ b/chronik/chronik-cpp/chronik_bridge.cpp @@ -1,307 +1,311 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include chronik_bridge::OutPoint BridgeOutPoint(const COutPoint &outpoint) { return { .txid = chronik::util::HashToArray(outpoint.GetTxId()), .out_idx = outpoint.GetN(), }; } chronik_bridge::TxOutput BridgeTxOutput(const CTxOut &output) { return { .value = output.nValue / Amount::satoshi(), .script = chronik::util::ToRustVec(output.scriptPubKey), }; } chronik_bridge::Coin BridgeCoin(const Coin &coin) { const int32_t nHeight = coin.GetHeight() == 0x7fff'ffff ? -1 : coin.GetHeight(); return { .output = BridgeTxOutput(coin.GetTxOut()), .height = nHeight, .is_coinbase = coin.IsCoinBase(), }; } rust::Vec BridgeTxInputs(bool isCoinbase, const std::vector &inputs, const std::vector &spent_coins) { rust::Vec bridged_inputs; bridged_inputs.reserve(inputs.size()); for (size_t idx = 0; idx < inputs.size(); ++idx) { const CTxIn &input = inputs[idx]; chronik_bridge::Coin bridge_coin{}; // empty coin if (!isCoinbase) { if (idx >= spent_coins.size()) { throw std::runtime_error("Missing coin for input"); } bridge_coin = BridgeCoin(spent_coins[idx]); } bridged_inputs.push_back({ .prev_out = BridgeOutPoint(input.prevout), .script = chronik::util::ToRustVec(input.scriptSig), .sequence = input.nSequence, .coin = std::move(bridge_coin), }); } return bridged_inputs; } rust::Vec BridgeTxOutputs(const std::vector &outputs) { rust::Vec bridged_outputs; bridged_outputs.reserve(outputs.size()); for (const CTxOut &output : outputs) { bridged_outputs.push_back(BridgeTxOutput(output)); } return bridged_outputs; } chronik_bridge::Tx BridgeTx(bool isCoinbase, const CTransaction &tx, const std::vector &spent_coins) { return { .txid = chronik::util::HashToArray(tx.GetId()), .version = tx.nVersion, .inputs = BridgeTxInputs(isCoinbase, tx.vin, spent_coins), .outputs = BridgeTxOutputs(tx.vout), .locktime = tx.nLockTime, }; } chronik_bridge::BlockTx BridgeBlockTx(bool isCoinbase, const CTransaction &tx, const std::vector &spent_coins, size_t data_pos, size_t undo_pos) { return {.tx = BridgeTx(isCoinbase, tx, spent_coins), .data_pos = uint32_t(data_pos), .undo_pos = uint32_t(isCoinbase ? 0 : undo_pos)}; } size_t GetFirstBlockTxOffset(const CBlock &block, const CBlockIndex &bindex) { return bindex.nDataPos + ::GetSerializeSize(CBlockHeader()) + GetSizeOfCompactSize(block.vtx.size()); } size_t GetFirstUndoOffset(const CBlock &block, const CBlockIndex &bindex) { // We have to -1 here, because coinbase txs don't have undo data. return bindex.nUndoPos + GetSizeOfCompactSize(block.vtx.size() - 1); } chronik_bridge::Block BridgeBlock(const CBlock &block, const CBlockIndex &bindex) { size_t data_pos = GetFirstBlockTxOffset(block, bindex); size_t undo_pos = 0; CBlockUndo block_undo; // Read undo data (genesis block doesn't have undo data) if (bindex.nHeight > 0) { undo_pos = GetFirstUndoOffset(block, bindex); if (!node::UndoReadFromDisk(block_undo, &bindex)) { throw std::runtime_error("Reading block undo data failed"); } } rust::Vec bridged_txs; for (size_t tx_idx = 0; tx_idx < block.vtx.size(); ++tx_idx) { const bool isCoinbase = tx_idx == 0; const CTransaction &tx = *block.vtx[tx_idx]; if (!isCoinbase && tx_idx - 1 >= block_undo.vtxundo.size()) { throw std::runtime_error("Missing undo data for tx"); } const std::vector &spent_coins = isCoinbase ? std::vector() : block_undo.vtxundo[tx_idx - 1].vprevout; bridged_txs.push_back( BridgeBlockTx(isCoinbase, tx, spent_coins, data_pos, undo_pos)); // advance data_pos and undo_pos positions data_pos += ::GetSerializeSize(tx); if (!isCoinbase) { undo_pos += ::GetSerializeSize(block_undo.vtxundo[tx_idx - 1]); } } return {.hash = chronik::util::HashToArray(block.GetHash()), .prev_hash = chronik::util::HashToArray(block.hashPrevBlock), .n_bits = block.nBits, .timestamp = block.GetBlockTime(), .height = bindex.nHeight, .file_num = uint32_t(bindex.nFile), .data_pos = bindex.nDataPos, .undo_pos = bindex.nUndoPos, .size = bindex.nSize, .txs = bridged_txs}; } namespace chronik_bridge { void log_print(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg) { LogInstance().LogPrintStr(std::string(msg), std::string(logging_function), std::string(source_file), source_line); } void log_print_chronik(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg) { if (LogInstance().WillLogCategory(BCLog::CHRONIK)) { log_print(logging_function, source_file, source_line, msg); } } const CBlockIndex &ChronikBridge::get_chain_tip() const { const CBlockIndex *tip = WITH_LOCK(cs_main, return m_node.chainman->ActiveTip()); if (tip == nullptr) { throw block_index_not_found(); } return *tip; } const CBlockIndex & ChronikBridge::lookup_block_index(std::array hash) const { BlockHash block_hash{chronik::util::ArrayToHash(hash)}; const CBlockIndex *pindex = WITH_LOCK( cs_main, return m_node.chainman->m_blockman.LookupBlockIndex(block_hash)); if (!pindex) { throw block_index_not_found(); } return *pindex; } std::unique_ptr ChronikBridge::load_block(const CBlockIndex &bindex) const { CBlock block; if (!node::ReadBlockFromDisk(block, &bindex, m_consensus)) { throw std::runtime_error("Reading block data failed"); } return std::make_unique(std::move(block)); } Tx ChronikBridge::bridge_tx(const CTransaction &tx) const { std::map coins; for (const CTxIn &input : tx.vin) { coins[input.prevout]; } FindCoins(m_node, coins); std::vector<::Coin> spent_coins; spent_coins.reserve(tx.vin.size()); for (const CTxIn &input : tx.vin) { const ::Coin &coin = coins[input.prevout]; if (coin.GetTxOut().IsNull()) { throw std::runtime_error("Couldn't find coin for input"); } spent_coins.push_back(coin); } return BridgeTx(false, tx, spent_coins); } const CBlockIndex &ChronikBridge::find_fork(const CBlockIndex &index) const { const CBlockIndex *fork = WITH_LOCK( cs_main, return m_node.chainman->ActiveChainstate().m_chain.FindFork(&index)); if (!fork) { throw block_index_not_found(); } return *fork; } std::unique_ptr make_bridge(const Config &config, const node::NodeContext &node) { return std::make_unique( config.GetChainParams().GetConsensus(), node); } chronik_bridge::Block bridge_block(const CBlock &block, const CBlockIndex &bindex) { return BridgeBlock(block, bindex); } Tx load_tx(uint32_t file_num, uint32_t data_pos, uint32_t undo_pos) { CMutableTransaction tx; CTxUndo txundo{}; const bool isCoinbase = undo_pos == 0; if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { throw std::runtime_error("Reading tx data from disk failed"); } if (!isCoinbase) { if (!node::ReadTxUndoFromDisk(txundo, FlatFilePos(file_num, undo_pos))) { throw std::runtime_error("Reading tx undo data from disk failed"); } } return BridgeTx(isCoinbase, CTransaction(std::move(tx)), txundo.vprevout); } rust::Vec load_raw_tx(uint32_t file_num, uint32_t data_pos) { CMutableTransaction tx; if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { throw std::runtime_error("Reading tx data from disk failed"); } CDataStream raw_tx{SER_NETWORK, PROTOCOL_VERSION}; raw_tx << tx; return chronik::util::ToRustVec(raw_tx); } BlockInfo get_block_info(const CBlockIndex &bindex) { return { .hash = chronik::util::HashToArray(bindex.GetBlockHash()), .height = bindex.nHeight, }; } const CBlockIndex &get_block_ancestor(const CBlockIndex &index, int32_t height) { const CBlockIndex *pindex = index.GetAncestor(height); if (!pindex) { throw block_index_not_found(); } return *pindex; } rust::Vec compress_script(rust::Slice bytecode) { std::vector vec = chronik::util::FromRustSlice(bytecode); CScript script{vec.begin(), vec.end()}; CDataStream compressed{SER_NETWORK, PROTOCOL_VERSION}; compressed << Using(script); return chronik::util::ToRustVec(compressed); } rust::Vec decompress_script(rust::Slice compressed) { std::vector vec = chronik::util::FromRustSlice(compressed); CDataStream stream{vec, SER_NETWORK, PROTOCOL_VERSION}; CScript script; stream >> Using(script); return chronik::util::ToRustVec(script); } bool init_error(const rust::Str msg) { return InitError(Untranslated(std::string(msg))); } void abort_node(const rust::Str msg, const rust::Str user_msg) { AbortNode(std::string(msg), Untranslated(std::string(user_msg))); } +bool shutdown_requested() { + return ShutdownRequested(); +} + } // namespace chronik_bridge diff --git a/chronik/chronik-cpp/chronik_bridge.h b/chronik/chronik-cpp/chronik_bridge.h index 0801fa6fd..1379eee39 100644 --- a/chronik/chronik-cpp/chronik_bridge.h +++ b/chronik/chronik-cpp/chronik_bridge.h @@ -1,90 +1,92 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H #define BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H #include #include class CBlock; class CBlockIndex; class Config; class CTransaction; namespace Consensus { struct Params; } // namespace Consensus namespace node { struct NodeContext; } // namespace node class uint256; namespace chronik_bridge { struct BlockInfo; struct Block; struct Tx; class block_index_not_found : public std::exception { public: const char *what() const noexcept override { return "CBlockIndex not found"; } }; void log_print(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg); void log_print_chronik(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg); /** * Bridge to bitcoind to access the node. */ class ChronikBridge { const Consensus::Params &m_consensus; const node::NodeContext &m_node; public: ChronikBridge(const Consensus::Params &consensus, const node::NodeContext &node) : m_consensus(consensus), m_node(node) {} const CBlockIndex &get_chain_tip() const; const CBlockIndex &lookup_block_index(std::array hash) const; std::unique_ptr load_block(const CBlockIndex &bindex) const; Tx bridge_tx(const CTransaction &tx) const; const CBlockIndex &find_fork(const CBlockIndex &index) const; }; std::unique_ptr make_bridge(const Config &config, const node::NodeContext &node); Block bridge_block(const CBlock &block, const CBlockIndex &bindex); Tx load_tx(uint32_t file_num, uint32_t data_pos, uint32_t undo_pos); rust::Vec load_raw_tx(uint32_t file_num, uint32_t data_pos); BlockInfo get_block_info(const CBlockIndex &index); const CBlockIndex &get_block_ancestor(const CBlockIndex &index, int32_t height); rust::Vec compress_script(rust::Slice script); rust::Vec decompress_script(rust::Slice compressed); bool init_error(const rust::Str msg); void abort_node(const rust::Str msg, const rust::Str user_msg); +bool shutdown_requested(); + } // namespace chronik_bridge #endif // BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs index 0ac2be903..c445607c1 100644 --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -1,702 +1,711 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`ChronikIndexer`] to index blocks and txs. use std::path::PathBuf; use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; use chronik_bridge::{ffi, util::expect_unique_ptr}; use chronik_db::{ db::{Db, WriteBatch}, groups::{ FnCompressScript, ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, }, index_tx::prepare_indexed_txs, io::{ BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, TxWriter, }, mem::{Mempool, MempoolTx}, }; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, query::{QueryBlocks, QueryGroupHistory, QueryGroupUtxos, QueryTxs}, subs::{BlockMsg, BlockMsgType, Subs}, subs_group::TxMsgType, }; const CURRENT_INDEXER_VERSION: SchemaVersion = 7; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] pub struct ChronikIndexerParams { /// Folder where the node stores its data, net-dependent. pub datadir_net: PathBuf, /// Whether to clear the DB before opening the DB, e.g. when reindexing. pub wipe_db: bool, /// Function ptr to compress scripts. pub fn_compress_script: FnCompressScript, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. #[derive(Debug)] pub struct ChronikIndexer { db: Db, mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, subs: RwLock, } /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { /// Data about the block (w/o txs) pub db_block: DbBlock, /// Txs in the block, with locations of where they are stored on disk. pub block_txs: BlockTxs, /// Block size in bytes. pub size: u64, /// Txs in the block, with inputs/outputs so we can group them. pub txs: Vec, } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikIndexerError { /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateIndexesDirFailed(PathBuf), /// Cannot rewind blocks that bitcoind doesn't have #[error( "Cannot rewind Chronik, it contains block {0} that the node doesn't \ have. You may need to use -reindex/-chronikreindex, or delete \ indexes/chronik and restart" )] CannotRewindChronik(BlockHash), /// Lower block doesn't exist but higher block does #[error( "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" )] BlocksBelowMissing { /// Lower height that is missing missing: BlockHeight, /// Higher height that exists exists: BlockHeight, }, /// Corrupted schema version #[error( "Corrupted schema version in the Chronik database, consider running \ -reindex/-chronikreindex" )] CorruptedSchemaVersion, /// Missing schema version for non-empty database #[error( "Missing schema version in non-empty Chronik database, consider \ running -reindex/-chronikreindex" )] MissingSchemaVersion, /// This Chronik instance is outdated #[error( "Chronik outdated: Chronik has version {}, but the database has \ version {0}. Upgrade your node to the appropriate version.", CURRENT_INDEXER_VERSION )] ChronikOutdated(SchemaVersion), /// Database is outdated #[error( "DB outdated: Chronik has version {}, but the database has version \ {0}. -reindex/-chronikreindex to reindex the database to the new \ version.", CURRENT_INDEXER_VERSION )] DatabaseOutdated(SchemaVersion), } use self::ChronikIndexerError::*; impl ChronikIndexer { /// Setup the indexer with the given parameters, e.g. open the DB etc. pub fn setup(params: ChronikIndexerParams) -> Result { let indexes_path = params.datadir_net.join("indexes"); if !indexes_path.exists() { std::fs::create_dir(&indexes_path).wrap_err_with(|| { CreateIndexesDirFailed(indexes_path.clone()) })?; } let db_path = indexes_path.join("chronik"); if params.wipe_db { log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; verify_schema_version(&db)?; let script_group = ScriptGroup::new(params.fn_compress_script); let mempool = Mempool::new(script_group.clone()); Ok(ChronikIndexer { db, mempool, script_group: script_group.clone(), avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(script_group)), }) } /// Resync Chronik index to the node pub fn resync_indexer( &mut self, bridge: &ffi::ChronikBridge, ) -> Result<()> { let block_reader = BlockReader::new(&self.db)?; let indexer_tip = block_reader.tip()?; let Ok(node_tip_index) = bridge.get_chain_tip() else { if let Some(indexer_tip) = &indexer_tip { return Err( CannotRewindChronik(indexer_tip.hash.clone()).into() ); } return Ok(()); }; let node_tip_info = ffi::get_block_info(node_tip_index); let node_height = node_tip_info.height; let node_tip_hash = BlockHash::from(node_tip_info.hash); let fork_height = match indexer_tip { Some(tip) => { let indexer_tip_hash = tip.hash.clone(); let indexer_height = tip.height; log!( "Node and Chronik diverged, node is on block \ {node_tip_hash} at height {node_height}, and Chronik is \ on block {indexer_tip_hash} at height {indexer_height}.\n" ); let indexer_tip_index = bridge .lookup_block_index(tip.hash.to_bytes()) .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; self.rewind_indexer(bridge, indexer_tip_index, &tip)? } None => { log!( "Chronik database empty, syncing to block {node_tip_hash} \ at height {node_height}.\n" ); -1 } }; let tip_height = node_tip_info.height; for height in fork_height + 1..=tip_height { + if ffi::shutdown_requested() { + log!("Stopped re-sync adding blocks\n"); + return Ok(()); + } let block_index = ffi::get_block_ancestor(node_tip_index, height)?; let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let block = self.make_chronik_block(ffi_block, block_index)?; let hash = block.db_block.hash.clone(); self.handle_block_connected(block)?; log_chronik!( "Added block {hash}, height {height}/{tip_height} to Chronik\n" ); if height % 100 == 0 { log!( "Synced Chronik up to block {hash} at height \ {height}/{tip_height}\n" ); } } log!( "Chronik completed re-syncing with the node, both are now at \ block {node_tip_hash} at height {node_height}.\n" ); Ok(()) } fn rewind_indexer( &mut self, bridge: &ffi::ChronikBridge, indexer_tip_index: &ffi::CBlockIndex, indexer_db_tip: &DbBlock, ) -> Result { let indexer_height = indexer_db_tip.height; let fork_block_index = bridge .find_fork(indexer_tip_index) .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; let fork_info = ffi::get_block_info(fork_block_index); let fork_block_hash = BlockHash::from(fork_info.hash); let fork_height = fork_info.height; let revert_height = fork_height + 1; log!( "The last common block is {fork_block_hash} at height \ {fork_height}.\n" ); log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); for height in (revert_height..indexer_height).rev() { + if ffi::shutdown_requested() { + log!("Stopped re-sync rewinding blocks\n"); + // return MAX here so we don't add any blocks + return Ok(BlockHeight::MAX); + } let db_block = BlockReader::new(&self.db)? .by_height(height)? .ok_or(BlocksBelowMissing { missing: height, exists: indexer_height, })?; let block_index = bridge .lookup_block_index(db_block.hash.to_bytes()) .map_err(|_| CannotRewindChronik(db_block.hash))?; let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let block = self.make_chronik_block(ffi_block, block_index)?; self.handle_block_disconnected(block)?; } Ok(fork_info.height) } /// Add transaction to the indexer's mempool. pub fn handle_tx_added_to_mempool( &mut self, mempool_tx: MempoolTx, ) -> Result<()> { self.subs .get_mut() .handle_tx_event(&mempool_tx.tx, TxMsgType::AddedToMempool); self.mempool.insert(mempool_tx)?; Ok(()) } /// Remove tx from the indexer's mempool, e.g. by a conflicting tx, expiry /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { let mempool_tx = self.mempool.remove(txid)?; self.subs .get_mut() .handle_tx_event(&mempool_tx.tx, TxMsgType::RemovedFromMempool); Ok(()) } /// Add the block to the index. pub fn handle_block_connected( &mut self, block: ChronikBlock, ) -> Result<()> { let height = block.db_block.height; let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; block_writer.insert(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.insert(&mut batch, &block.block_txs)?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert(&mut batch, &index_txs)?; script_utxo_writer.insert(&mut batch, &index_txs)?; spent_by_writer.insert(&mut batch, &index_txs)?; self.db.write_batch(batch)?; for tx in &block.block_txs.txs { self.mempool.remove_mined(&tx.txid)?; } let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Connected, hash: block.db_block.hash, height: block.db_block.height, }); for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Confirmed); } Ok(()) } /// Remove the block from the index. pub fn handle_block_disconnected( &mut self, block: ChronikBlock, ) -> Result<()> { let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; block_writer.delete(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.delete(&mut batch, &block.block_txs)?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete(&mut batch, &index_txs)?; script_utxo_writer.delete(&mut batch, &index_txs)?; spent_by_writer.delete(&mut batch, &index_txs)?; self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Disconnected, hash: block.db_block.hash, height: block.db_block.height, }); Ok(()) } /// Block finalized with Avalanche. pub fn handle_block_finalized( &mut self, block: ChronikBlock, ) -> Result<()> { self.avalanche.finalize_block(block.db_block.height)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Finalized, hash: block.db_block.hash, height: block.db_block.height, }); for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Finalized); } Ok(()) } /// Return [`QueryBlocks`] to read blocks from the DB. pub fn blocks(&self) -> QueryBlocks<'_> { QueryBlocks { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, } } /// Return [`QueryTxs`] to return txs from mempool/DB. pub fn txs(&self) -> QueryTxs<'_> { QueryTxs { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, } } /// Return [`QueryGroupHistory`] for scripts to query the tx history of /// scripts. pub fn script_history(&self) -> Result> { Ok(QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.script_history(), group: self.script_group.clone(), }) } /// Return [`QueryGroupUtxos`] for scripts to query the utxos of scripts. pub fn script_utxos(&self) -> Result> { Ok(QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.script_utxos(), group: self.script_group.clone(), }) } /// Subscribers, behind read/write lock pub fn subs(&self) -> &RwLock { &self.subs } /// Build the ChronikBlock from the CBlockIndex pub fn make_chronik_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result { let block = ffi::bridge_block(block, bindex)?; let db_block = DbBlock { hash: BlockHash::from(block.hash), prev_hash: BlockHash::from(block.prev_hash), height: block.height, n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, }; let block_txs = BlockTxs { block_height: block.height, txs: block .txs .iter() .map(|tx| { let txid = TxId::from(tx.tx.txid); TxEntry { txid, data_pos: tx.data_pos, undo_pos: tx.undo_pos, time_first_seen: match self.mempool.tx(&txid) { Some(tx) => tx.time_first_seen, None => 0, }, is_coinbase: tx.undo_pos == 0, } }) .collect(), }; let txs = block .txs .into_iter() .map(|block_tx| Tx::from(block_tx.tx)) .collect::>(); Ok(ChronikBlock { db_block, block_txs, size: block.size, txs, }) } } fn verify_schema_version(db: &Db) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { Some(schema_version) => { assert!(!is_empty, "Empty DB can't have a schema version"); if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } if schema_version < CURRENT_INDEXER_VERSION { return Err(DatabaseOutdated(schema_version).into()); } } None => { if !is_empty { return Err(MissingSchemaVersion.into()); } let mut batch = WriteBatch::default(); metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; } } log!("Chronik has version {CURRENT_INDEXER_VERSION}\n"); Ok(()) } impl std::fmt::Debug for ChronikIndexerParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikIndexerParams") .field("datadir_net", &self.datadir_net) .field("wipe_db", &self.wipe_db) .field("fn_compress_script", &"..") .finish() } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use chronik_db::{ db::{Db, WriteBatch, CF_META}, groups::prefix_mock_compress, io::{BlockReader, BlockTxs, DbBlock, MetadataReader, MetadataWriter}, }; use pretty_assertions::assert_eq; use crate::indexer::{ ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams, CURRENT_INDEXER_VERSION, }; #[test] fn test_indexer() -> Result<()> { let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); let params = ChronikIndexerParams { datadir_net: datadir_net.clone(), wipe_db: false, fn_compress_script: prefix_mock_compress, }; // regtest folder doesn't exist yet -> error assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CreateIndexesDirFailed( datadir_net.join("indexes"), ), ); // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; let mut indexer = ChronikIndexer::setup(params.clone())?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); // DB is empty assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); let block = ChronikBlock { db_block: DbBlock { hash: BlockHash::from([4; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1deadbef, timestamp: 1234567890, file_num: 0, data_pos: 1337, }, block_txs: BlockTxs { block_height: 0, txs: vec![], }, size: 285, txs: vec![], }; // Add block indexer.handle_block_connected(block.clone())?; assert_eq!( BlockReader::new(&indexer.db)?.by_height(0)?, Some(block.db_block.clone()) ); // Remove block again indexer.handle_block_disconnected(block.clone())?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); // Add block then wipe, block not there indexer.handle_block_connected(block)?; std::mem::drop(indexer); let indexer = ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..params })?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); Ok(()) } #[test] fn test_schema_version() -> Result<()> { let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { datadir_net: dir.path().to_path_buf(), wipe_db: false, fn_compress_script: prefix_mock_compress, }; // Setting up DB first time sets the schema version ChronikIndexer::setup(params.clone())?; { let db = Db::open(&chronik_path)?; assert_eq!( MetadataReader::new(&db)?.schema_version()?, Some(CURRENT_INDEXER_VERSION) ); } // Opening DB again works fine ChronikIndexer::setup(params.clone())?; // Override DB schema version to 0 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?; db.write_batch(batch)?; } // -> DB too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::DatabaseOutdated(0), ); // Override DB schema version to CURRENT_INDEXER_VERSION + 1 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version( &mut batch, CURRENT_INDEXER_VERSION + 1, )?; db.write_batch(batch)?; } // -> Chronik too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), ); // Corrupt schema version { let db = Db::open(&chronik_path)?; let cf_meta = db.cf(CF_META)?; let mut batch = WriteBatch::default(); batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]); db.write_batch(batch)?; } assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CorruptedSchemaVersion, ); // New db path, but has existing data let new_dir = dir.path().join("new"); let new_chronik_path = new_dir.join("indexes").join("chronik"); std::fs::create_dir_all(&new_chronik_path)?; let new_params = ChronikIndexerParams { datadir_net: new_dir, wipe_db: false, ..params }; { // new db with obscure field in meta let db = Db::open(&new_chronik_path)?; let mut batch = WriteBatch::default(); batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR"); db.write_batch(batch)?; } // Error: non-empty DB without schema version assert_eq!( ChronikIndexer::setup(new_params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..new_params })?; Ok(()) } } diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs index a5dc68289..08245a105 100644 --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -1,249 +1,253 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Rust side of the bridge; these structs and functions are exposed to C++. use std::{ net::{AddrParseError, IpAddr, SocketAddr}, sync::Arc, }; use abc_rust_error::Result; use bitcoinsuite_core::{ script::Script, tx::{Tx, TxId}, }; use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; use chronik_db::mem::MempoolTx; use chronik_http::server::{ChronikServer, ChronikServerParams}; use chronik_indexer::indexer::{ChronikIndexer, ChronikIndexerParams}; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ error::ok_or_abort_node, ffi::{self, StartChronikValidationInterface}, }; /// Errors for [`Chronik`] and [`setup_chronik`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikError { /// Chronik host address failed to parse #[error("Invalid Chronik host address {0:?}: {1}")] InvalidChronikHost(String, AddrParseError), } use self::ChronikError::*; /// Setup the Chronik bridge. Returns a ChronikIndexer object. pub fn setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node: &ffi::NodeContext, ) -> bool { match try_setup_chronik(params, config, node) { Ok(()) => true, Err(report) => { log_chronik!("{report:?}\n"); init_error(&report.to_string()) } } } fn try_setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node: &ffi::NodeContext, ) -> Result<()> { abc_rust_error::install(); let hosts = params .hosts .into_iter() .map(|host| parse_socket_addr(host, params.default_port)) .collect::>>()?; log!("Starting Chronik bound to {:?}\n", hosts); let bridge = chronik_bridge::ffi::make_bridge(config, node); let bridge_ref = expect_unique_ptr("make_bridge", &bridge); let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { datadir_net: params.datadir_net.into(), wipe_db: params.wipe_db, fn_compress_script: compress_script, })?; indexer.resync_indexer(bridge_ref)?; + if chronik_bridge::ffi::shutdown_requested() { + // Don't setup Chronik if the user requested shutdown during resync + return Ok(()); + } let indexer = Arc::new(RwLock::new(indexer)); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; let server = runtime.block_on({ let indexer = Arc::clone(&indexer); async move { // try_bind requires a Runtime ChronikServer::setup(ChronikServerParams { hosts, indexer }) } })?; runtime.spawn(async move { ok_or_abort_node("ChronikServer::serve", server.serve().await); }); let chronik = Box::new(Chronik { bridge: Arc::new(bridge), indexer, _runtime: runtime, }); StartChronikValidationInterface(node, chronik); Ok(()) } fn parse_socket_addr(host: String, default_port: u16) -> Result { if let Ok(addr) = host.parse::() { return Ok(addr); } let ip_addr = host .parse::() .map_err(|err| InvalidChronikHost(host, err))?; Ok(SocketAddr::new(ip_addr, default_port)) } fn compress_script(script: &Script) -> Vec { chronik_bridge::ffi::compress_script(script.as_ref()) } /// Contains all db, runtime, tpc, etc. handles needed by Chronik. /// This makes it so when this struct is dropped, all handles are relased /// cleanly. pub struct Chronik { bridge: Arc>, indexer: Arc>, // Having this here ensures HTTP server, outstanding requests etc. will get // stopped when `Chronik` is dropped. _runtime: tokio::runtime::Runtime, } impl Chronik { /// Tx added to the bitcoind mempool pub fn handle_tx_added_to_mempool( &self, ptx: &ffi::CTransaction, time_first_seen: i64, ) { ok_or_abort_node( "handle_tx_added_to_mempool", self.add_tx_to_mempool(ptx, time_first_seen), ); } /// Tx removed from the bitcoind mempool pub fn handle_tx_removed_from_mempool(&self, txid: [u8; 32]) { let mut indexer = self.indexer.blocking_write(); let txid = TxId::from(txid); ok_or_abort_node( "handle_tx_removed_from_mempool", indexer.handle_tx_removed_from_mempool(txid), ); log_chronik!("Chronik: transaction {} removed from mempool\n", txid); } /// Block connected to the longest chain pub fn handle_block_connected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { ok_or_abort_node( "handle_block_connected", self.connect_block(block, bindex), ); } /// Block disconnected from the longest chain pub fn handle_block_disconnected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { ok_or_abort_node( "handle_block_disconnected", self.disconnect_block(block, bindex), ); } /// Block finalized with Avalanche pub fn handle_block_finalized(&self, bindex: &ffi::CBlockIndex) { ok_or_abort_node("handle_block_finalized", self.finalize_block(bindex)); } fn add_tx_to_mempool( &self, ptx: &ffi::CTransaction, time_first_seen: i64, ) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let tx = self.bridge.bridge_tx(ptx)?; let txid = TxId::from(tx.txid); indexer.handle_tx_added_to_mempool(MempoolTx { tx: Tx::from(tx), time_first_seen, })?; log_chronik!("Chronik: transaction {} added to mempool\n", txid); Ok(()) } fn connect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block, bindex)?; let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_connected(block)?; log_chronik!( "Chronik: block {} connected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn disconnect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block, bindex)?; let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_disconnected(block)?; log_chronik!( "Chronik: block {} disconnected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn finalize_block(&self, bindex: &ffi::CBlockIndex) -> Result<()> { let block = self.bridge.load_block(bindex)?; let block_ref = expect_unique_ptr("load_block", &block); let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block_ref, bindex)?; let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_finalized(block)?; log_chronik!( "Chronik: block {} finalized with {} txs\n", block_hash, num_txs, ); Ok(()) } } impl std::fmt::Debug for Chronik { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Chronik {{ .. }}") } }