diff --git a/chronik/chronik-bridge/src/ffi.rs b/chronik/chronik-bridge/src/ffi.rs index 6ca4f916f..356b6f0f4 100644 --- a/chronik/chronik-bridge/src/ffi.rs +++ b/chronik/chronik-bridge/src/ffi.rs @@ -1,339 +1,339 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing the cxx definitions for the bridge from C++ to Rust. pub use self::ffi_inner::*; #[allow(unsafe_code)] #[cxx::bridge(namespace = "chronik_bridge")] mod ffi_inner { /// Info about a block #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct BlockInfo { /// Hash of the block (or 000...000 if no block) pub hash: [u8; 32], /// Height of the block (or -1 if no block) pub height: i32, } /// Block coming from bitcoind to Chronik. /// /// We don't index all fields (e.g. hashMerkleRoot), only those that are /// needed when querying a range of blocks. /// /// Instead of storing all the block data for Chronik again, we only store /// file_num, data_pos and undo_pos of the block data of the node. /// /// This makes the index relatively small, as it's mostly just pointing to /// the data the node already stores. /// /// Note that this prohibits us from using Chronik in pruned mode. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Block { /// Block hash pub hash: [u8; 32], /// hashPrevBlock, hash of the previous block in the chain pub prev_hash: [u8; 32], /// nBits, difficulty of the header pub n_bits: u32, /// Timestamp of the block pub timestamp: i64, /// Height of the block in the chain. pub height: i32, /// File number of the block file this block is stored in. /// This can be used to later slice out transactions, so we don't have /// to index txs twice. pub file_num: u32, /// Position of the block within the block file, starting at the block /// header. pub data_pos: u32, /// Position of the undo data within the undo file. pub undo_pos: u32, /// Serialized size of the block pub size: u64, /// Txs of this block, including positions within the block/undo files. pub txs: Vec, } /// Tx in a block #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct BlockTx { /// Tx (without disk data) pub tx: Tx, /// Where the tx is stored within the block file. pub data_pos: u32, /// Where the tx's undo data is stored within the block's undo file. pub undo_pos: u32, } /// CTransaction, in a block or in the mempool. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Tx { /// TxId of the tx. pub txid: [u8; 32], /// nVersion of the tx. pub version: i32, /// Tx inputs. pub inputs: Vec, /// Tx outputs. pub outputs: Vec, /// Locktime of the tx. pub locktime: u32, } /// COutPoint, pointing to a coin being spent. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct OutPoint { /// TxId of the output of the coin. pub txid: [u8; 32], /// Index in the outputs of the tx of the coin. pub out_idx: u32, } /// CTxIn, spending an unspent output. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxInput { /// Points to an output being spent. pub prev_out: OutPoint, /// scriptSig unlocking the output. pub script: Vec, /// nSequence. pub sequence: u32, /// Coin being spent by this tx. pub coin: Coin, } /// CTxOut, creating a new output. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxOutput { /// Value of the output. pub value: i64, /// Script locking the output. pub script: Vec, } /// Coin, can be spent by providing a valid unlocking script. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Coin { /// Output, locking the coins. pub output: TxOutput, /// Height of the coin in the chain. pub height: i32, /// Whether the coin is a coinbase. pub is_coinbase: bool, } #[allow(missing_debug_implementations)] unsafe extern "C++" { include!("blockindex.h"); include!("chronik-cpp/chronik_bridge.h"); include!("coins.h"); include!("node/context.h"); include!("primitives/block.h"); include!("primitives/transaction.h"); include!("undo.h"); /// node::NodeContext from node/context.h #[namespace = "node"] type NodeContext; /// ::CBlockIndex from blockindex.h #[namespace = ""] type CBlockIndex; /// ::CBlock from primitives/block.h #[namespace = ""] type CBlock; /// ::CBlockUndo from undo.h #[namespace = ""] type CBlockUndo; /// ::Coin from coins.h (renamed to CCoin to prevent a name clash) #[namespace = ""] #[cxx_name = "Coin"] type CCoin; /// ::Config from config.h #[namespace = ""] type Config; /// ::CTransaction from primitives/transaction.h #[namespace = ""] type CTransaction; /// Bridge to bitcoind to access the node type ChronikBridge; /// Print the message to bitcoind's logs. fn log_print( logging_function: &str, source_file: &str, source_line: u32, msg: &str, ); /// Print the message to bitcoind's logs under the BCLog::Chronik /// category. fn log_print_chronik( logging_function: &str, source_file: &str, source_line: u32, msg: &str, ); /// Make the bridge given the NodeContext fn make_bridge( config: &Config, node: &NodeContext, ) -> UniquePtr; /// Return the tip of the chain of the node. /// Returns hash=000...000, height=-1 if there's no block on the chain. fn get_chain_tip(self: &ChronikBridge) -> Result<&CBlockIndex>; /// Lookup the block index with the given hash, or throw an error /// if it couldn't be found. fn lookup_block_index( self: &ChronikBridge, hash: [u8; 32], ) -> Result<&CBlockIndex>; /// Load the CBlock data of this CBlockIndex from the disk fn load_block( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result>; /// Load the CBlockUndo data of this CBlockIndex from the disk undo data fn load_block_undo( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result>; /// Load the CTransaction and CTxUndo data from disk and turn it into a /// bridged Tx, containing spent coins etc. fn load_tx( self: &ChronikBridge, file_num: u32, data_pos: u32, undo_pos: u32, ) -> Result; /// Load the CTransaction from disk and serialize it. fn load_raw_tx( self: &ChronikBridge, file_num: u32, data_pos: u32, ) -> Result>; /// Find at which block the given block_index forks off from the node. fn find_fork( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result<&CBlockIndex>; /// Lookup the spent coins of a tx and fill them in in-place. /// - `not_found` will be the outpoints that couldn't be found in the /// node or the DB. /// - `coins_to_uncache` will be the outpoints that need to be uncached /// if the tx doesn't end up being broadcast. This is so that clients /// can't fill our cache with useless old coins. It mirrors the /// behavior of `MemPoolAccept::PreChecks`, which uncaches the queried /// coins if they don't end up being spent. fn lookup_spent_coins( self: &ChronikBridge, tx: &mut Tx, not_found: &mut Vec, coins_to_uncache: &mut Vec, ) -> Result<()>; /// Remove the coins from the coin cache. /// This must be done after a call to `lookup_spent_coins` where the tx /// wasn't broadcast, to avoid clients filling our cache with unneeded /// coins. fn uncache_coins( self: &ChronikBridge, coins: &[OutPoint], ) -> Result<()>; /// Add the given tx to the mempool, and if that succeeds, broadcast it /// to all our peers. /// Also check the actual tx fee doesn't exceed max_fee. /// Note max_fee is absolute, not a fee rate (as in sendrawtransaction). fn broadcast_tx( self: &ChronikBridge, raw_tx: &[u8], max_fee: i64, ) -> Result<[u8; 32]>; + /// Calls `AbortNode` from shutdown.h to gracefully shut down the node + /// when an unrecoverable error occured. + fn abort_node(self: &ChronikBridge, msg: &str, user_msg: &str); + + /// Returns true if a shutdown is requested, false otherwise. + /// See `ShutdownRequested` in `shutdown.h`. + fn shutdown_requested(self: &ChronikBridge) -> bool; + /// Bridge CTransaction -> ffi::Tx, using the given spent coins. fn bridge_tx( tx: &CTransaction, spent_coins: &CxxVector, ) -> Result; /// Bridge bitcoind's classes to the shared struct [`Block`]. fn bridge_block( block: &CBlock, block_undo: &CBlockUndo, block_index: &CBlockIndex, ) -> Result; /// Get a BlockInfo for this CBlockIndex. fn get_block_info(block_index: &CBlockIndex) -> BlockInfo; /// CBlockIndex::GetAncestor fn get_block_ancestor( block_index: &CBlockIndex, height: i32, ) -> Result<&CBlockIndex>; /// Compress the given script using `ScriptCompression`. fn compress_script(script: &[u8]) -> Vec; /// Decompress the given script using `ScriptCompression`. fn decompress_script(compressed: &[u8]) -> Result>; /// Calc the fee in satoshis for the given tx size in bytes. fn calc_fee(num_bytes: usize, sats_fee_per_kb: i64) -> i64; /// Default maximum fee rate when broadcasting txs. fn default_max_raw_tx_fee_rate_per_kb() -> i64; /// Calls `SyncWithValidationInterfaceQueue` from validationinterface.h /// to make sure wallet/indexes are synced. fn sync_with_validation_interface_queue(); /// Calls `InitError` from `node/ui_interface.h` to report an error to /// the user and then gracefully shut down the node. fn init_error(msg: &str) -> bool; - - /// Calls `AbortNode` from shutdown.h to gracefully shut down the node - /// when an unrecoverable error occured. - fn abort_node(msg: &str, user_msg: &str); - - /// Returns true if a shutdown is requested, false otherwise. - /// See `ShutdownRequested` in `shutdown.h`. - fn shutdown_requested() -> bool; } } /// SAFETY: All fields of ChronikBridge (const Consensus::Params &, const /// node::NodeContext &) can be moved betweed threads safely. #[allow(unsafe_code)] unsafe impl Send for ChronikBridge {} /// SAFETY: All fields of ChronikBridge (const Consensus::Params &, const /// node::NodeContext &) can be accessed from different threads safely. #[allow(unsafe_code)] unsafe impl Sync for ChronikBridge {} impl std::fmt::Debug for ChronikBridge { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikBridge").finish_non_exhaustive() } } diff --git a/chronik/chronik-cpp/chronik_bridge.cpp b/chronik/chronik-cpp/chronik_bridge.cpp index 6fcc8ed5e..09d033b8f 100644 --- a/chronik/chronik-cpp/chronik_bridge.cpp +++ b/chronik/chronik-cpp/chronik_bridge.cpp @@ -1,393 +1,394 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include chronik_bridge::OutPoint BridgeOutPoint(const COutPoint &outpoint) { return { .txid = chronik::util::HashToArray(outpoint.GetTxId()), .out_idx = outpoint.GetN(), }; } chronik_bridge::TxOutput BridgeTxOutput(const CTxOut &output) { return { .value = output.nValue / Amount::satoshi(), .script = chronik::util::ToRustVec(output.scriptPubKey), }; } chronik_bridge::Coin BridgeCoin(const Coin &coin) { const int32_t nHeight = coin.GetHeight() == 0x7fff'ffff ? -1 : coin.GetHeight(); return { .output = BridgeTxOutput(coin.GetTxOut()), .height = nHeight, .is_coinbase = coin.IsCoinBase(), }; } rust::Vec BridgeTxInputs(bool isCoinbase, const std::vector &inputs, const std::vector &spent_coins) { rust::Vec bridged_inputs; bridged_inputs.reserve(inputs.size()); for (size_t idx = 0; idx < inputs.size(); ++idx) { const CTxIn &input = inputs[idx]; chronik_bridge::Coin bridge_coin{}; // empty coin if (!isCoinbase) { if (idx >= spent_coins.size()) { throw std::runtime_error("Missing coin for input"); } bridge_coin = BridgeCoin(spent_coins[idx]); } bridged_inputs.push_back({ .prev_out = BridgeOutPoint(input.prevout), .script = chronik::util::ToRustVec(input.scriptSig), .sequence = input.nSequence, .coin = std::move(bridge_coin), }); } return bridged_inputs; } rust::Vec BridgeTxOutputs(const std::vector &outputs) { rust::Vec bridged_outputs; bridged_outputs.reserve(outputs.size()); for (const CTxOut &output : outputs) { bridged_outputs.push_back(BridgeTxOutput(output)); } return bridged_outputs; } chronik_bridge::Tx BridgeTx(bool isCoinbase, const CTransaction &tx, const std::vector &spent_coins) { return { .txid = chronik::util::HashToArray(tx.GetId()), .version = tx.nVersion, .inputs = BridgeTxInputs(isCoinbase, tx.vin, spent_coins), .outputs = BridgeTxOutputs(tx.vout), .locktime = tx.nLockTime, }; } chronik_bridge::BlockTx BridgeBlockTx(bool isCoinbase, const CTransaction &tx, const std::vector &spent_coins, size_t data_pos, size_t undo_pos) { return {.tx = BridgeTx(isCoinbase, tx, spent_coins), .data_pos = uint32_t(data_pos), .undo_pos = uint32_t(isCoinbase ? 0 : undo_pos)}; } size_t GetFirstBlockTxOffset(const CBlock &block, const CBlockIndex &bindex) { return bindex.nDataPos + ::GetSerializeSize(CBlockHeader()) + GetSizeOfCompactSize(block.vtx.size()); } size_t GetFirstUndoOffset(const CBlock &block, const CBlockIndex &bindex) { // We have to -1 here, because coinbase txs don't have undo data. return bindex.nUndoPos + GetSizeOfCompactSize(block.vtx.size() - 1); } chronik_bridge::Block BridgeBlock(const CBlock &block, const CBlockUndo &block_undo, const CBlockIndex &bindex) { size_t data_pos = GetFirstBlockTxOffset(block, bindex); size_t undo_pos = 0; // Set undo offset; for the genesis block leave it at 0 if (bindex.nHeight > 0) { undo_pos = GetFirstUndoOffset(block, bindex); } rust::Vec bridged_txs; for (size_t tx_idx = 0; tx_idx < block.vtx.size(); ++tx_idx) { const bool isCoinbase = tx_idx == 0; const CTransaction &tx = *block.vtx[tx_idx]; if (!isCoinbase && tx_idx - 1 >= block_undo.vtxundo.size()) { throw std::runtime_error("Missing undo data for tx"); } const std::vector &spent_coins = isCoinbase ? std::vector() : block_undo.vtxundo[tx_idx - 1].vprevout; bridged_txs.push_back( BridgeBlockTx(isCoinbase, tx, spent_coins, data_pos, undo_pos)); // advance data_pos and undo_pos positions data_pos += ::GetSerializeSize(tx); if (!isCoinbase) { undo_pos += ::GetSerializeSize(block_undo.vtxundo[tx_idx - 1]); } } return {.hash = chronik::util::HashToArray(block.GetHash()), .prev_hash = chronik::util::HashToArray(block.hashPrevBlock), .n_bits = block.nBits, .timestamp = block.GetBlockTime(), .height = bindex.nHeight, .file_num = uint32_t(bindex.nFile), .data_pos = bindex.nDataPos, .undo_pos = bindex.nUndoPos, .size = ::GetSerializeSize(block), .txs = bridged_txs}; } namespace chronik_bridge { void log_print(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg) { LogInstance().LogPrintStr(std::string(msg), std::string(logging_function), std::string(source_file), source_line); } void log_print_chronik(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg) { if (LogInstance().WillLogCategory(BCLog::CHRONIK)) { log_print(logging_function, source_file, source_line, msg); } } const CBlockIndex &ChronikBridge::get_chain_tip() const { const CBlockIndex *tip = WITH_LOCK(cs_main, return m_node.chainman->ActiveTip()); if (tip == nullptr) { throw block_index_not_found(); } return *tip; } const CBlockIndex & ChronikBridge::lookup_block_index(std::array hash) const { BlockHash block_hash{chronik::util::ArrayToHash(hash)}; const CBlockIndex *pindex = WITH_LOCK( cs_main, return m_node.chainman->m_blockman.LookupBlockIndex(block_hash)); if (!pindex) { throw block_index_not_found(); } return *pindex; } std::unique_ptr ChronikBridge::load_block(const CBlockIndex &bindex) const { CBlock block; if (!node::ReadBlockFromDisk(block, &bindex, m_consensus)) { throw std::runtime_error("Reading block data failed"); } return std::make_unique(std::move(block)); } std::unique_ptr ChronikBridge::load_block_undo(const CBlockIndex &bindex) const { CBlockUndo block_undo; // Read undo data (genesis block doesn't have undo data) if (bindex.nHeight > 0) { if (!node::UndoReadFromDisk(block_undo, &bindex)) { throw std::runtime_error("Reading block undo data failed"); } } return std::make_unique(std::move(block_undo)); } Tx ChronikBridge::load_tx(uint32_t file_num, uint32_t data_pos, uint32_t undo_pos) const { CMutableTransaction tx; CTxUndo txundo{}; const bool isCoinbase = undo_pos == 0; if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { throw std::runtime_error("Reading tx data from disk failed"); } if (!isCoinbase) { if (!node::ReadTxUndoFromDisk(txundo, FlatFilePos(file_num, undo_pos))) { throw std::runtime_error("Reading tx undo data from disk failed"); } } return BridgeTx(isCoinbase, CTransaction(std::move(tx)), txundo.vprevout); } rust::Vec ChronikBridge::load_raw_tx(uint32_t file_num, uint32_t data_pos) const { CMutableTransaction tx; if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { throw std::runtime_error("Reading tx data from disk failed"); } CDataStream raw_tx{SER_NETWORK, PROTOCOL_VERSION}; raw_tx << tx; return chronik::util::ToRustVec(raw_tx); } Tx bridge_tx(const CTransaction &tx, const std::vector<::Coin> &spent_coins) { return BridgeTx(false, tx, spent_coins); } const CBlockIndex &ChronikBridge::find_fork(const CBlockIndex &index) const { const CBlockIndex *fork = WITH_LOCK( cs_main, return m_node.chainman->ActiveChainstate().m_chain.FindFork(&index)); if (!fork) { throw block_index_not_found(); } return *fork; } void ChronikBridge::lookup_spent_coins( Tx &tx, rust::Vec ¬_found, rust::Vec &coins_to_uncache) const { not_found.clear(); coins_to_uncache.clear(); LOCK(cs_main); CCoinsViewCache &coins_cache = m_node.chainman->ActiveChainstate().CoinsTip(); CCoinsViewMemPool coin_view(&coins_cache, *m_node.mempool); for (TxInput &input : tx.inputs) { TxId txid = TxId(chronik::util::ArrayToHash(input.prev_out.txid)); COutPoint outpoint = COutPoint(txid, input.prev_out.out_idx); // Remember if coin was already cached const bool had_cached = coins_cache.HaveCoinInCache(outpoint); ::Coin coin; if (!coin_view.GetCoin(outpoint, coin)) { not_found.push_back(input.prev_out); continue; } if (!had_cached) { // Only add if previously uncached. // We don't check if the prev_out is now cached (which wouldn't be // the case for a mempool UTXO), as uncaching an outpoint is cheap, // so we save one extra cache lookup here. coins_to_uncache.push_back(input.prev_out); } input.coin = BridgeCoin(coin); } } void ChronikBridge::uncache_coins( rust::Slice coins_to_uncache) const { LOCK(cs_main); CCoinsViewCache &coins_cache = m_node.chainman->ActiveChainstate().CoinsTip(); for (const OutPoint &outpoint : coins_to_uncache) { TxId txid = TxId(chronik::util::ArrayToHash(outpoint.txid)); coins_cache.Uncache(COutPoint(txid, outpoint.out_idx)); } } std::array ChronikBridge::broadcast_tx(rust::Slice raw_tx, int64_t max_fee) const { std::vector vec = chronik::util::FromRustSlice(raw_tx); CDataStream stream{vec, SER_NETWORK, PROTOCOL_VERSION}; CMutableTransaction tx; stream >> tx; CTransactionRef tx_ref = MakeTransactionRef(tx); std::string err_str; TransactionError error = node::BroadcastTransaction( m_node, tx_ref, err_str, max_fee * Amount::satoshi(), /*relay=*/true, /*wait_callback=*/false); if (error != TransactionError::OK) { bilingual_str txErrorMsg = TransactionErrorString(error); if (err_str.empty()) { throw std::runtime_error(txErrorMsg.original.c_str()); } else { std::string msg = strprintf("%s: %s", txErrorMsg.original, err_str); throw std::runtime_error(msg.c_str()); } } return chronik::util::HashToArray(tx_ref->GetId()); } +void ChronikBridge::abort_node(const rust::Str msg, + const rust::Str user_msg) const { + AbortNode(std::string(msg), Untranslated(std::string(user_msg))); +} + +bool ChronikBridge::shutdown_requested() const { + return ShutdownRequested(); +} + std::unique_ptr make_bridge(const Config &config, const node::NodeContext &node) { return std::make_unique( config.GetChainParams().GetConsensus(), node); } chronik_bridge::Block bridge_block(const CBlock &block, const CBlockUndo &block_undo, const CBlockIndex &bindex) { return BridgeBlock(block, block_undo, bindex); } BlockInfo get_block_info(const CBlockIndex &bindex) { return { .hash = chronik::util::HashToArray(bindex.GetBlockHash()), .height = bindex.nHeight, }; } const CBlockIndex &get_block_ancestor(const CBlockIndex &index, int32_t height) { const CBlockIndex *pindex = index.GetAncestor(height); if (!pindex) { throw block_index_not_found(); } return *pindex; } rust::Vec compress_script(rust::Slice bytecode) { std::vector vec = chronik::util::FromRustSlice(bytecode); CScript script{vec.begin(), vec.end()}; CDataStream compressed{SER_NETWORK, PROTOCOL_VERSION}; compressed << Using(script); return chronik::util::ToRustVec(compressed); } rust::Vec decompress_script(rust::Slice compressed) { std::vector vec = chronik::util::FromRustSlice(compressed); CDataStream stream{vec, SER_NETWORK, PROTOCOL_VERSION}; CScript script; stream >> Using(script); return chronik::util::ToRustVec(script); } int64_t calc_fee(size_t num_bytes, int64_t sats_fee_per_kb) { return CFeeRate(sats_fee_per_kb * SATOSHI).GetFee(num_bytes) / SATOSHI; } int64_t default_max_raw_tx_fee_rate_per_kb() { return node::DEFAULT_MAX_RAW_TX_FEE_RATE.GetFeePerK() / SATOSHI; } void sync_with_validation_interface_queue() { SyncWithValidationInterfaceQueue(); } bool init_error(const rust::Str msg) { return InitError(Untranslated(std::string(msg))); } -void abort_node(const rust::Str msg, const rust::Str user_msg) { - AbortNode(std::string(msg), Untranslated(std::string(user_msg))); -} - -bool shutdown_requested() { - return ShutdownRequested(); -} - } // namespace chronik_bridge diff --git a/chronik/chronik-cpp/chronik_bridge.h b/chronik/chronik-cpp/chronik_bridge.h index 2a2c2f626..bc8d29788 100644 --- a/chronik/chronik-cpp/chronik_bridge.h +++ b/chronik/chronik-cpp/chronik_bridge.h @@ -1,114 +1,114 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H #define BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H #include #include #include class CBlock; class CBlockIndex; class CBlockUndo; class Coin; class Config; class CTransaction; namespace Consensus { struct Params; } // namespace Consensus namespace node { struct NodeContext; } // namespace node class uint256; namespace chronik_bridge { struct BlockInfo; struct Block; struct Tx; struct OutPoint; class block_index_not_found : public std::exception { public: const char *what() const noexcept override { return "CBlockIndex not found"; } }; void log_print(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg); void log_print_chronik(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg); /** * Bridge to bitcoind to access the node. */ class ChronikBridge { const Consensus::Params &m_consensus; const node::NodeContext &m_node; public: ChronikBridge(const Consensus::Params &consensus, const node::NodeContext &node) : m_consensus(consensus), m_node(node) {} const CBlockIndex &get_chain_tip() const; const CBlockIndex &lookup_block_index(std::array hash) const; std::unique_ptr load_block(const CBlockIndex &bindex) const; std::unique_ptr load_block_undo(const CBlockIndex &bindex) const; Tx load_tx(uint32_t file_num, uint32_t data_pos, uint32_t undo_pos) const; rust::Vec load_raw_tx(uint32_t file_num, uint32_t data_pos) const; const CBlockIndex &find_fork(const CBlockIndex &index) const; void lookup_spent_coins(Tx &, rust::Vec ¬_found, rust::Vec &coins_to_uncache) const; void uncache_coins(rust::Slice) const; std::array broadcast_tx(rust::Slice raw_tx, int64_t max_fee) const; + + void abort_node(const rust::Str msg, const rust::Str user_msg) const; + + bool shutdown_requested() const; }; std::unique_ptr make_bridge(const Config &config, const node::NodeContext &node); Tx bridge_tx(const CTransaction &tx, const std::vector &spent_coins); Block bridge_block(const CBlock &block, const CBlockUndo &block_undo, const CBlockIndex &bindex); BlockInfo get_block_info(const CBlockIndex &index); const CBlockIndex &get_block_ancestor(const CBlockIndex &index, int32_t height); rust::Vec compress_script(rust::Slice script); rust::Vec decompress_script(rust::Slice compressed); int64_t calc_fee(size_t num_bytes, int64_t sats_fee_per_kb); int64_t default_max_raw_tx_fee_rate_per_kb(); void sync_with_validation_interface_queue(); bool init_error(const rust::Str msg); -void abort_node(const rust::Str msg, const rust::Str user_msg); - -bool shutdown_requested(); - } // namespace chronik_bridge #endif // BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs index d95e4c4cf..ccf3bdd0a 100644 --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -1,937 +1,950 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`ChronikIndexer`] to index blocks and txs. use std::{io::Write, path::PathBuf}; use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; use chronik_bridge::{ffi, util::expect_unique_ptr}; use chronik_db::{ db::{Db, WriteBatch}, groups::{ ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, TokenIdGroup, TokenIdGroupAux, TokenIdHistoryWriter, TokenIdUtxoWriter, }, index_tx::prepare_indexed_txs, io::{ merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, TxReader, TxWriter, }, mem::{MemData, MemDataConf, Mempool, MempoolTx}, }; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, indexer::ChronikIndexerError::*, query::{ QueryBlocks, QueryBroadcast, QueryGroupHistory, QueryGroupUtxos, QueryTxs, UtxoProtobufOutput, UtxoProtobufValue, }, subs::{BlockMsg, BlockMsgType, Subs}, subs_group::TxMsgType, }; const CURRENT_INDEXER_VERSION: SchemaVersion = 10; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] pub struct ChronikIndexerParams { /// Folder where the node stores its data, net-dependent. pub datadir_net: PathBuf, /// Whether to clear the DB before opening the DB, e.g. when reindexing. pub wipe_db: bool, /// Whether Chronik should index SLP/ALP token txs. pub enable_token_index: bool, /// Whether to output Chronik performance statistics into a perf/ folder pub enable_perf_stats: bool, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. #[derive(Debug)] pub struct ChronikIndexer { db: Db, mem_data: MemData, mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, subs: RwLock, perf_path: Option, is_token_index_enabled: bool, } /// Access to the bitcoind node. #[derive(Debug)] pub struct Node { /// FFI bridge to the node. pub bridge: cxx::UniquePtr, } /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { /// Data about the block (w/o txs) pub db_block: DbBlock, /// Txs in the block, with locations of where they are stored on disk. pub block_txs: BlockTxs, /// Block size in bytes. pub size: u64, /// Txs in the block, with inputs/outputs so we can group them. pub txs: Vec, } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikIndexerError { /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateDirFailed(PathBuf), /// Cannot rewind blocks that bitcoind doesn't have #[error( "Cannot rewind Chronik, it contains block {0} that the node doesn't \ have. You may need to use -reindex/-chronikreindex, or delete \ indexes/chronik and restart" )] CannotRewindChronik(BlockHash), /// Lower block doesn't exist but higher block does #[error( "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" )] BlocksBelowMissing { /// Lower height that is missing missing: BlockHeight, /// Higher height that exists exists: BlockHeight, }, /// Corrupted schema version #[error( "Corrupted schema version in the Chronik database, consider running \ -reindex/-chronikreindex" )] CorruptedSchemaVersion, /// Missing schema version for non-empty database #[error( "Missing schema version in non-empty Chronik database, consider \ running -reindex/-chronikreindex" )] MissingSchemaVersion, /// This Chronik instance is outdated #[error( "Chronik outdated: Chronik has version {}, but the database has \ version {0}. Upgrade your node to the appropriate version.", CURRENT_INDEXER_VERSION )] ChronikOutdated(SchemaVersion), /// Database is outdated #[error( "DB outdated: Chronik has version {}, but the database has version \ {0}. -reindex/-chronikreindex to reindex the database to the new \ version.", CURRENT_INDEXER_VERSION )] DatabaseOutdated(SchemaVersion), /// Cannot enable token index on a DB that previously had it disabled #[error( "Cannot enable -chroniktokenindex on a DB that previously had it \ disabled. Provide -reindex/-chronikreindex to reindex the database \ with token data, or specify -chroniktokenindex=0 to disable the \ token index again." )] CannotEnableTokenIndex, } impl ChronikIndexer { /// Setup the indexer with the given parameters, e.g. open the DB etc. pub fn setup(params: ChronikIndexerParams) -> Result { let indexes_path = params.datadir_net.join("indexes"); let perf_path = params.datadir_net.join("perf"); if !indexes_path.exists() { std::fs::create_dir(&indexes_path) .wrap_err_with(|| CreateDirFailed(indexes_path.clone()))?; } if params.enable_perf_stats && !perf_path.exists() { std::fs::create_dir(&perf_path) .wrap_err_with(|| CreateDirFailed(perf_path.clone()))?; } let db_path = indexes_path.join("chronik"); if params.wipe_db { log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; verify_schema_version(&db)?; verify_enable_token_index(&db, params.enable_token_index)?; let mempool = Mempool::new(ScriptGroup, params.enable_token_index); Ok(ChronikIndexer { db, mempool, mem_data: MemData::new(MemDataConf {}), script_group: ScriptGroup, avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(ScriptGroup)), perf_path: params.enable_perf_stats.then_some(perf_path), is_token_index_enabled: params.enable_token_index, }) } /// Resync Chronik index to the node pub fn resync_indexer( &mut self, bridge: &ffi::ChronikBridge, ) -> Result<()> { let block_reader = BlockReader::new(&self.db)?; let indexer_tip = block_reader.tip()?; let Ok(node_tip_index) = bridge.get_chain_tip() else { if let Some(indexer_tip) = &indexer_tip { return Err( CannotRewindChronik(indexer_tip.hash.clone()).into() ); } return Ok(()); }; let node_tip_info = ffi::get_block_info(node_tip_index); let node_height = node_tip_info.height; let node_tip_hash = BlockHash::from(node_tip_info.hash); let start_height = match indexer_tip { Some(tip) if tip.hash != node_tip_hash => { let indexer_tip_hash = tip.hash.clone(); let indexer_height = tip.height; log!( "Node and Chronik diverged, node is on block \ {node_tip_hash} at height {node_height}, and Chronik is \ on block {indexer_tip_hash} at height {indexer_height}.\n" ); let indexer_tip_index = bridge .lookup_block_index(tip.hash.to_bytes()) .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; self.rewind_indexer(bridge, indexer_tip_index, &tip)? } Some(tip) => tip.height, None => { log!( "Chronik database empty, syncing to block {node_tip_hash} \ at height {node_height}.\n" ); -1 } }; let tip_height = node_tip_info.height; for height in start_height + 1..=tip_height { - if ffi::shutdown_requested() { + if bridge.shutdown_requested() { log!("Stopped re-sync adding blocks\n"); return Ok(()); } let block_index = ffi::get_block_ancestor(node_tip_index, height)?; let block = self.load_chronik_block(bridge, block_index)?; let hash = block.db_block.hash.clone(); self.handle_block_connected(block)?; log_chronik!( "Added block {hash}, height {height}/{tip_height} to Chronik\n" ); if height % 100 == 0 { log!( "Synced Chronik up to block {hash} at height \ {height}/{tip_height}\n" ); } } log!( "Chronik completed re-syncing with the node, both are now at \ block {node_tip_hash} at height {node_height}.\n" ); if let Some(perf_path) = &self.perf_path { let mut resync_stats = std::fs::File::create(perf_path.join("resync_stats.txt"))?; write!(&mut resync_stats, "{:#.3?}", self.mem_data.stats())?; } Ok(()) } fn rewind_indexer( &mut self, bridge: &ffi::ChronikBridge, indexer_tip_index: &ffi::CBlockIndex, indexer_db_tip: &DbBlock, ) -> Result { let indexer_height = indexer_db_tip.height; let fork_block_index = bridge .find_fork(indexer_tip_index) .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; let fork_info = ffi::get_block_info(fork_block_index); let fork_block_hash = BlockHash::from(fork_info.hash); let fork_height = fork_info.height; let revert_height = fork_height + 1; log!( "The last common block is {fork_block_hash} at height \ {fork_height}.\n" ); log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); for height in (revert_height..indexer_height).rev() { - if ffi::shutdown_requested() { + if bridge.shutdown_requested() { log!("Stopped re-sync rewinding blocks\n"); // return MAX here so we don't add any blocks return Ok(BlockHeight::MAX); } let db_block = BlockReader::new(&self.db)? .by_height(height)? .ok_or(BlocksBelowMissing { missing: height, exists: indexer_height, })?; let block_index = bridge .lookup_block_index(db_block.hash.to_bytes()) .map_err(|_| CannotRewindChronik(db_block.hash))?; let block = self.load_chronik_block(bridge, block_index)?; self.handle_block_disconnected(block)?; } Ok(fork_info.height) } /// Add transaction to the indexer's mempool. pub fn handle_tx_added_to_mempool( &mut self, mempool_tx: MempoolTx, ) -> Result<()> { let result = self.mempool.insert(&self.db, mempool_tx)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::AddedToMempool, &result.token_id_aux, ); Ok(()) } /// Remove tx from the indexer's mempool, e.g. by a conflicting tx, expiry /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { let result = self.mempool.remove(txid)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::RemovedFromMempool, &result.token_id_aux, ); Ok(()) } /// Add the block to the index. pub fn handle_block_connected( &mut self, block: ChronikBlock, ) -> Result<()> { let height = block.db_block.height; let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.insert(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.insert( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.insert( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; let token_id_aux; if self.is_token_index_enabled { let processed_token_batch = token_writer.insert(&mut batch, &index_txs)?; token_id_aux = TokenIdGroupAux::from_batch(&index_txs, &processed_token_batch); token_id_history_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; } else { token_id_aux = TokenIdGroupAux::default(); } self.db.write_batch(batch)?; for tx in &block.block_txs.txs { self.mempool.remove_mined(&tx.txid)?; } merge::check_for_errors()?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Connected, hash: block.db_block.hash, height: block.db_block.height, }); for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Confirmed, &token_id_aux); } Ok(()) } /// Remove the block from the index. pub fn handle_block_disconnected( &mut self, block: ChronikBlock, ) -> Result<()> { let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.delete(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.delete( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.delete( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; if self.is_token_index_enabled { let token_id_aux = TokenIdGroupAux::from_db(&index_txs, &self.db)?; token_id_history_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; token_writer.delete(&mut batch, &index_txs)?; } self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Disconnected, hash: block.db_block.hash, height: block.db_block.height, }); Ok(()) } /// Block finalized with Avalanche. pub fn handle_block_finalized( &mut self, block: ChronikBlock, ) -> Result<()> { self.avalanche.finalize_block(block.db_block.height)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Finalized, hash: block.db_block.hash, height: block.db_block.height, }); let tx_reader = TxReader::new(&self.db)?; let first_tx_num = tx_reader .first_tx_num_by_block(block.db_block.height)? .unwrap(); let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; let token_id_aux = if self.is_token_index_enabled { TokenIdGroupAux::from_db(&index_txs, &self.db)? } else { TokenIdGroupAux::default() }; for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Finalized, &token_id_aux); } Ok(()) } /// Return [`QueryBroadcast`] to broadcast tx to the network. pub fn broadcast<'a>(&'a self, node: &'a Node) -> QueryBroadcast<'a> { QueryBroadcast { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryBlocks`] to read blocks from the DB. pub fn blocks<'a>(&'a self, node: &'a Node) -> QueryBlocks<'a> { QueryBlocks { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryTxs`] to return txs from mempool/DB. pub fn txs<'a>(&'a self, node: &'a Node) -> QueryTxs<'a> { QueryTxs { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupHistory`] for scripts to query the tx history of /// scripts. pub fn script_history<'a>( &'a self, node: &'a Node, ) -> Result> { Ok(QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.script_history(), group: self.script_group.clone(), node, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupUtxos`] for scripts to query the utxos of scripts. pub fn script_utxos( &self, ) -> Result> { Ok(QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.script_utxos(), group: self.script_group.clone(), utxo_mapper: UtxoProtobufValue, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupHistory`] for token IDs to query the tx history of /// token IDs. pub fn token_id_history<'a>( &'a self, node: &'a Node, ) -> QueryGroupHistory<'a, TokenIdGroup> { QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.token_id_history(), group: TokenIdGroup, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupUtxos`] for token IDs to query the utxos of token IDs pub fn token_id_utxos( &self, ) -> QueryGroupUtxos<'_, TokenIdGroup, UtxoProtobufOutput> { QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.token_id_utxos(), group: TokenIdGroup, utxo_mapper: UtxoProtobufOutput, is_token_index_enabled: self.is_token_index_enabled, } } /// Subscribers, behind read/write lock pub fn subs(&self) -> &RwLock { &self.subs } /// Build a ChronikBlock from a ffi::Block. pub fn make_chronik_block(&self, block: ffi::Block) -> ChronikBlock { let db_block = DbBlock { hash: BlockHash::from(block.hash), prev_hash: BlockHash::from(block.prev_hash), height: block.height, n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, }; let block_txs = BlockTxs { block_height: block.height, txs: block .txs .iter() .map(|tx| { let txid = TxId::from(tx.tx.txid); TxEntry { txid, data_pos: tx.data_pos, undo_pos: tx.undo_pos, time_first_seen: match self.mempool.tx(&txid) { Some(tx) => tx.time_first_seen, None => 0, }, is_coinbase: tx.undo_pos == 0, } }) .collect(), }; let txs = block .txs .into_iter() .map(|block_tx| Tx::from(block_tx.tx)) .collect::>(); ChronikBlock { db_block, block_txs, size: block.size, txs, } } /// Load a ChronikBlock from the node given the CBlockIndex. pub fn load_chronik_block( &self, bridge: &ffi::ChronikBridge, block_index: &ffi::CBlockIndex, ) -> Result { let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let ffi_block_undo = bridge.load_block_undo(block_index)?; let ffi_block_undo = expect_unique_ptr("load_block_undo", &ffi_block_undo); let block = ffi::bridge_block(ffi_block, ffi_block_undo, block_index)?; Ok(self.make_chronik_block(block)) } } fn verify_schema_version(db: &Db) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { Some(schema_version) => { assert!(!is_empty, "Empty DB can't have a schema version"); if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } if schema_version < CURRENT_INDEXER_VERSION { return Err(DatabaseOutdated(schema_version).into()); } } None => { if !is_empty { return Err(MissingSchemaVersion.into()); } let mut batch = WriteBatch::default(); metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; } } log!("Chronik has version {CURRENT_INDEXER_VERSION}\n"); Ok(()) } fn verify_enable_token_index(db: &Db, enable_token_index: bool) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let token_writer = TokenWriter::new(db)?; let is_empty = db.is_db_empty()?; let is_token_index_enabled = metadata_reader.is_token_index_enabled()?; let mut batch = WriteBatch::default(); if !is_empty { // Cannot enable token index if not already previously enabled if enable_token_index && !is_token_index_enabled { return Err(CannotEnableTokenIndex.into()); } // Wipe token index if previously enabled and now disabled if !enable_token_index && is_token_index_enabled { log!( "Warning: Wiping existing token index, since \ -chroniktokenindex=0\n" ); log!("You will need to -reindex/-chronikreindex to restore\n"); token_writer.wipe(&mut batch); } } metadata_writer .update_is_token_index_enabled(&mut batch, enable_token_index)?; db.write_batch(batch)?; Ok(()) } +impl Node { + /// If `result` is [`Err`], logs and aborts the node. + pub fn ok_or_abort(&self, func_name: &str, result: Result) { + if let Err(report) = result { + log_chronik!("{report:?}\n"); + self.bridge.abort_node( + &format!("ERROR Chronik in {func_name}"), + &format!("{report:#}"), + ); + } + } +} + impl std::fmt::Debug for ChronikIndexerParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikIndexerParams") .field("datadir_net", &self.datadir_net) .field("wipe_db", &self.wipe_db) .field("fn_compress_script", &"..") .finish() } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use chronik_db::{ db::{Db, WriteBatch, CF_META}, io::{BlockReader, BlockTxs, DbBlock, MetadataReader, MetadataWriter}, }; use pretty_assertions::assert_eq; use crate::indexer::{ ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams, CURRENT_INDEXER_VERSION, }; #[test] fn test_indexer() -> Result<()> { let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); let params = ChronikIndexerParams { datadir_net: datadir_net.clone(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, }; // regtest folder doesn't exist yet -> error assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CreateDirFailed(datadir_net.join("indexes")), ); // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; let mut indexer = ChronikIndexer::setup(params.clone())?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); // DB is empty assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); let block = ChronikBlock { db_block: DbBlock { hash: BlockHash::from([4; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1deadbef, timestamp: 1234567890, file_num: 0, data_pos: 1337, }, block_txs: BlockTxs { block_height: 0, txs: vec![], }, size: 285, txs: vec![], }; // Add block indexer.handle_block_connected(block.clone())?; assert_eq!( BlockReader::new(&indexer.db)?.by_height(0)?, Some(block.db_block.clone()) ); // Remove block again indexer.handle_block_disconnected(block.clone())?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); // Add block then wipe, block not there indexer.handle_block_connected(block)?; std::mem::drop(indexer); let indexer = ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..params })?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); Ok(()) } #[test] fn test_schema_version() -> Result<()> { let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { datadir_net: dir.path().to_path_buf(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, }; // Setting up DB first time sets the schema version ChronikIndexer::setup(params.clone())?; { let db = Db::open(&chronik_path)?; assert_eq!( MetadataReader::new(&db)?.schema_version()?, Some(CURRENT_INDEXER_VERSION) ); } // Opening DB again works fine ChronikIndexer::setup(params.clone())?; // Override DB schema version to 0 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?; db.write_batch(batch)?; } // -> DB too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::DatabaseOutdated(0), ); // Override DB schema version to CURRENT_INDEXER_VERSION + 1 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version( &mut batch, CURRENT_INDEXER_VERSION + 1, )?; db.write_batch(batch)?; } // -> Chronik too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), ); // Corrupt schema version { let db = Db::open(&chronik_path)?; let cf_meta = db.cf(CF_META)?; let mut batch = WriteBatch::default(); batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]); db.write_batch(batch)?; } assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CorruptedSchemaVersion, ); // New db path, but has existing data let new_dir = dir.path().join("new"); let new_chronik_path = new_dir.join("indexes").join("chronik"); std::fs::create_dir_all(&new_chronik_path)?; let new_params = ChronikIndexerParams { datadir_net: new_dir, wipe_db: false, ..params }; { // new db with obscure field in meta let db = Db::open(&new_chronik_path)?; let mut batch = WriteBatch::default(); batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR"); db.write_batch(batch)?; } // Error: non-empty DB without schema version assert_eq!( ChronikIndexer::setup(new_params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..new_params })?; Ok(()) } } diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs index e36a10ce4..8523ebe3e 100644 --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -1,289 +1,290 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Rust side of the bridge; these structs and functions are exposed to C++. use std::{ net::{AddrParseError, IpAddr, SocketAddr}, sync::Arc, time::Duration, }; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; use chronik_db::mem::MempoolTx; use chronik_http::server::{ ChronikServer, ChronikServerParams, ChronikSettings, }; use chronik_indexer::{ indexer::{ChronikIndexer, ChronikIndexerParams, Node}, pause::Pause, }; use chronik_plugin::context::PluginContext; use chronik_util::{log, log_chronik, mount_loggers, Loggers}; use thiserror::Error; use tokio::sync::RwLock; -use crate::{ - error::ok_or_abort_node, - ffi::{self, StartChronikValidationInterface}, -}; +use crate::ffi::{self, StartChronikValidationInterface}; /// Errors for [`Chronik`] and [`setup_chronik`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikError { /// Chronik host address failed to parse #[error("Invalid Chronik host address {0:?}: {1}")] InvalidChronikHost(String, AddrParseError), } use self::ChronikError::*; /// Setup the Chronik bridge. Returns a ChronikIndexer object. pub fn setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node: &ffi::NodeContext, ) -> bool { match try_setup_chronik(params, config, node) { Ok(()) => true, Err(report) => { log_chronik!("{report:?}\n"); init_error(&report.to_string()) } } } fn try_setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node_context: &ffi::NodeContext, ) -> Result<()> { abc_rust_error::install(); mount_loggers(Loggers { log: chronik_bridge::ffi::log_print, log_chronik: chronik_bridge::ffi::log_print_chronik, }); let hosts = params .hosts .into_iter() .map(|host| parse_socket_addr(host, params.default_port)) .collect::>>()?; PluginContext::setup()?; log!("Starting Chronik bound to {:?}\n", hosts); let bridge = chronik_bridge::ffi::make_bridge(config, node_context); let bridge_ref = expect_unique_ptr("make_bridge", &bridge); let (pause, pause_notify) = Pause::new_pair(params.is_pause_allowed); let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { datadir_net: params.datadir_net.into(), wipe_db: params.wipe_db, enable_token_index: params.enable_token_index, enable_perf_stats: params.enable_perf_stats, })?; indexer.resync_indexer(bridge_ref)?; - if chronik_bridge::ffi::shutdown_requested() { + if bridge.shutdown_requested() { // Don't setup Chronik if the user requested shutdown during resync return Ok(()); } let indexer = Arc::new(RwLock::new(indexer)); let node = Arc::new(Node { bridge }); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; let server = runtime.block_on({ let indexer = Arc::clone(&indexer); let node = Arc::clone(&node); async move { // try_bind requires a Runtime ChronikServer::setup(ChronikServerParams { hosts, indexer, node, pause_notify: Arc::new(pause_notify), settings: ChronikSettings { ws_ping_interval: Duration::from_secs( params.ws_ping_interval_secs, ), }, }) } })?; - runtime.spawn(async move { - ok_or_abort_node("ChronikServer::serve", server.serve().await); + runtime.spawn({ + let node = Arc::clone(&node); + async move { + node.ok_or_abort("ChronikServer::serve", server.serve().await); + } }); let chronik = Box::new(Chronik { node: Arc::clone(&node), indexer, pause, runtime, }); StartChronikValidationInterface(node_context, chronik); Ok(()) } fn parse_socket_addr(host: String, default_port: u16) -> Result { if let Ok(addr) = host.parse::() { return Ok(addr); } let ip_addr = host .parse::() .map_err(|err| InvalidChronikHost(host, err))?; Ok(SocketAddr::new(ip_addr, default_port)) } /// Contains all db, runtime, tpc, etc. handles needed by Chronik. /// This makes it so when this struct is dropped, all handles are relased /// cleanly. pub struct Chronik { node: Arc, indexer: Arc>, pause: Pause, // Having this here ensures HTTP server, outstanding requests etc. will get // stopped when `Chronik` is dropped. runtime: tokio::runtime::Runtime, } impl Chronik { /// Tx added to the bitcoind mempool pub fn handle_tx_added_to_mempool( &self, ptx: &ffi::CTransaction, spent_coins: &cxx::CxxVector, time_first_seen: i64, ) { self.block_if_paused(); - ok_or_abort_node( + self.node.ok_or_abort( "handle_tx_added_to_mempool", self.add_tx_to_mempool(ptx, spent_coins, time_first_seen), ); } /// Tx removed from the bitcoind mempool pub fn handle_tx_removed_from_mempool(&self, txid: [u8; 32]) { self.block_if_paused(); let mut indexer = self.indexer.blocking_write(); let txid = TxId::from(txid); - ok_or_abort_node( + self.node.ok_or_abort( "handle_tx_removed_from_mempool", indexer.handle_tx_removed_from_mempool(txid), ); log_chronik!("Chronik: transaction {} removed from mempool\n", txid); } /// Block connected to the longest chain pub fn handle_block_connected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { self.block_if_paused(); - ok_or_abort_node( + self.node.ok_or_abort( "handle_block_connected", self.connect_block(block, bindex), ); } /// Block disconnected from the longest chain pub fn handle_block_disconnected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { self.block_if_paused(); - ok_or_abort_node( + self.node.ok_or_abort( "handle_block_disconnected", self.disconnect_block(block, bindex), ); } /// Block finalized with Avalanche pub fn handle_block_finalized(&self, bindex: &ffi::CBlockIndex) { self.block_if_paused(); - ok_or_abort_node("handle_block_finalized", self.finalize_block(bindex)); + self.node + .ok_or_abort("handle_block_finalized", self.finalize_block(bindex)); } fn add_tx_to_mempool( &self, ptx: &ffi::CTransaction, spent_coins: &cxx::CxxVector, time_first_seen: i64, ) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let tx = chronik_bridge::ffi::bridge_tx(ptx, spent_coins)?; let txid = TxId::from(tx.txid); indexer.handle_tx_added_to_mempool(MempoolTx { tx: Tx::from(tx), time_first_seen, })?; log_chronik!("Chronik: transaction {} added to mempool\n", txid); Ok(()) } fn connect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let block_undo = self.node.bridge.load_block_undo(bindex)?; let block = chronik_bridge::ffi::bridge_block(block, &block_undo, bindex)?; let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block); let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_connected(block)?; log_chronik!( "Chronik: block {} connected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn disconnect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let block_undo = self.node.bridge.load_block_undo(bindex)?; let block = chronik_bridge::ffi::bridge_block(block, &block_undo, bindex)?; let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block); let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_disconnected(block)?; log_chronik!( "Chronik: block {} disconnected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn finalize_block(&self, bindex: &ffi::CBlockIndex) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let block = indexer.load_chronik_block(&self.node.bridge, bindex)?; let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_finalized(block)?; log_chronik!( "Chronik: block {} finalized with {} txs\n", block_hash, num_txs, ); Ok(()) } fn block_if_paused(&self) { self.pause.block_if_paused(&self.runtime); } } impl std::fmt::Debug for Chronik { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Chronik {{ .. }}") } } diff --git a/chronik/chronik-lib/src/error.rs b/chronik/chronik-lib/src/error.rs deleted file mode 100644 index 96f888e7c..000000000 --- a/chronik/chronik-lib/src/error.rs +++ /dev/null @@ -1,18 +0,0 @@ -// Copyright (c) 2022 The Bitcoin developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -use abc_rust_error::Result; -use chronik_bridge::ffi::abort_node; -use chronik_util::log_chronik; - -/// If `result` is [`Err`], logs and aborts the node. -pub(crate) fn ok_or_abort_node(func_name: &str, result: Result) { - if let Err(report) = result { - log_chronik!("{report:?}\n"); - abort_node( - &format!("ERROR Chronik in {func_name}"), - &format!("{report:#}"), - ); - } -} diff --git a/chronik/chronik-lib/src/lib.rs b/chronik/chronik-lib/src/lib.rs index f8f8ba98d..7476d9d65 100644 --- a/chronik/chronik-lib/src/lib.rs +++ b/chronik/chronik-lib/src/lib.rs @@ -1,21 +1,20 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Foreign-function interface to bitcoind. Bridges Rust to C++. //! //! The bridge is split into two crates, `chronik-bridge` and `chronik-lib`. //! //! In this crate `chronik-lib`, we define a library that can be statically //! linked to by the node. //! //! We define all the functions that the C++ side needs to call: //! 1. Methods in CValidationInterface notifying the indexer of added/removed //! blocks/transactions. //! 2. `setup_bridge`, to instantiate the bridge connecting to C++. abc_rust_lint::lint! { pub mod bridge; - mod error; pub mod ffi; }