diff --git a/chronik/chronik-bridge/src/ffi.rs b/chronik/chronik-bridge/src/ffi.rs index 7c5134d9b..6ca4f916f 100644 --- a/chronik/chronik-bridge/src/ffi.rs +++ b/chronik/chronik-bridge/src/ffi.rs @@ -1,330 +1,339 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing the cxx definitions for the bridge from C++ to Rust. pub use self::ffi_inner::*; #[allow(unsafe_code)] #[cxx::bridge(namespace = "chronik_bridge")] mod ffi_inner { /// Info about a block #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct BlockInfo { /// Hash of the block (or 000...000 if no block) pub hash: [u8; 32], /// Height of the block (or -1 if no block) pub height: i32, } /// Block coming from bitcoind to Chronik. /// /// We don't index all fields (e.g. hashMerkleRoot), only those that are /// needed when querying a range of blocks. /// /// Instead of storing all the block data for Chronik again, we only store /// file_num, data_pos and undo_pos of the block data of the node. /// /// This makes the index relatively small, as it's mostly just pointing to /// the data the node already stores. /// /// Note that this prohibits us from using Chronik in pruned mode. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Block { /// Block hash pub hash: [u8; 32], /// hashPrevBlock, hash of the previous block in the chain pub prev_hash: [u8; 32], /// nBits, difficulty of the header pub n_bits: u32, /// Timestamp of the block pub timestamp: i64, /// Height of the block in the chain. pub height: i32, /// File number of the block file this block is stored in. /// This can be used to later slice out transactions, so we don't have /// to index txs twice. pub file_num: u32, /// Position of the block within the block file, starting at the block /// header. pub data_pos: u32, /// Position of the undo data within the undo file. pub undo_pos: u32, /// Serialized size of the block pub size: u64, /// Txs of this block, including positions within the block/undo files. pub txs: Vec, } /// Tx in a block #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct BlockTx { /// Tx (without disk data) pub tx: Tx, /// Where the tx is stored within the block file. pub data_pos: u32, /// Where the tx's undo data is stored within the block's undo file. pub undo_pos: u32, } /// CTransaction, in a block or in the mempool. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Tx { /// TxId of the tx. pub txid: [u8; 32], /// nVersion of the tx. pub version: i32, /// Tx inputs. pub inputs: Vec, /// Tx outputs. pub outputs: Vec, /// Locktime of the tx. pub locktime: u32, } /// COutPoint, pointing to a coin being spent. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct OutPoint { /// TxId of the output of the coin. pub txid: [u8; 32], /// Index in the outputs of the tx of the coin. pub out_idx: u32, } /// CTxIn, spending an unspent output. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxInput { /// Points to an output being spent. pub prev_out: OutPoint, /// scriptSig unlocking the output. pub script: Vec, /// nSequence. pub sequence: u32, /// Coin being spent by this tx. pub coin: Coin, } /// CTxOut, creating a new output. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxOutput { /// Value of the output. pub value: i64, /// Script locking the output. pub script: Vec, } /// Coin, can be spent by providing a valid unlocking script. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct Coin { /// Output, locking the coins. pub output: TxOutput, /// Height of the coin in the chain. pub height: i32, /// Whether the coin is a coinbase. pub is_coinbase: bool, } #[allow(missing_debug_implementations)] unsafe extern "C++" { include!("blockindex.h"); include!("chronik-cpp/chronik_bridge.h"); include!("coins.h"); include!("node/context.h"); include!("primitives/block.h"); include!("primitives/transaction.h"); include!("undo.h"); /// node::NodeContext from node/context.h #[namespace = "node"] type NodeContext; /// ::CBlockIndex from blockindex.h #[namespace = ""] type CBlockIndex; /// ::CBlock from primitives/block.h #[namespace = ""] type CBlock; /// ::CBlockUndo from undo.h #[namespace = ""] type CBlockUndo; /// ::Coin from coins.h (renamed to CCoin to prevent a name clash) #[namespace = ""] #[cxx_name = "Coin"] type CCoin; /// ::Config from config.h #[namespace = ""] type Config; /// ::CTransaction from primitives/transaction.h #[namespace = ""] type CTransaction; /// Bridge to bitcoind to access the node type ChronikBridge; /// Print the message to bitcoind's logs. fn log_print( logging_function: &str, source_file: &str, source_line: u32, msg: &str, ); /// Print the message to bitcoind's logs under the BCLog::Chronik /// category. fn log_print_chronik( logging_function: &str, source_file: &str, source_line: u32, msg: &str, ); /// Make the bridge given the NodeContext fn make_bridge( config: &Config, node: &NodeContext, ) -> UniquePtr; /// Return the tip of the chain of the node. /// Returns hash=000...000, height=-1 if there's no block on the chain. fn get_chain_tip(self: &ChronikBridge) -> Result<&CBlockIndex>; /// Lookup the block index with the given hash, or throw an error /// if it couldn't be found. fn lookup_block_index( self: &ChronikBridge, hash: [u8; 32], ) -> Result<&CBlockIndex>; /// Load the CBlock data of this CBlockIndex from the disk fn load_block( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result>; /// Load the CBlockUndo data of this CBlockIndex from the disk undo data fn load_block_undo( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result>; + /// Load the CTransaction and CTxUndo data from disk and turn it into a + /// bridged Tx, containing spent coins etc. + fn load_tx( + self: &ChronikBridge, + file_num: u32, + data_pos: u32, + undo_pos: u32, + ) -> Result; + + /// Load the CTransaction from disk and serialize it. + fn load_raw_tx( + self: &ChronikBridge, + file_num: u32, + data_pos: u32, + ) -> Result>; + /// Find at which block the given block_index forks off from the node. fn find_fork( self: &ChronikBridge, block_index: &CBlockIndex, ) -> Result<&CBlockIndex>; /// Lookup the spent coins of a tx and fill them in in-place. /// - `not_found` will be the outpoints that couldn't be found in the /// node or the DB. /// - `coins_to_uncache` will be the outpoints that need to be uncached /// if the tx doesn't end up being broadcast. This is so that clients /// can't fill our cache with useless old coins. It mirrors the /// behavior of `MemPoolAccept::PreChecks`, which uncaches the queried /// coins if they don't end up being spent. fn lookup_spent_coins( self: &ChronikBridge, tx: &mut Tx, not_found: &mut Vec, coins_to_uncache: &mut Vec, ) -> Result<()>; /// Remove the coins from the coin cache. /// This must be done after a call to `lookup_spent_coins` where the tx /// wasn't broadcast, to avoid clients filling our cache with unneeded /// coins. fn uncache_coins( self: &ChronikBridge, coins: &[OutPoint], ) -> Result<()>; /// Add the given tx to the mempool, and if that succeeds, broadcast it /// to all our peers. /// Also check the actual tx fee doesn't exceed max_fee. /// Note max_fee is absolute, not a fee rate (as in sendrawtransaction). fn broadcast_tx( self: &ChronikBridge, raw_tx: &[u8], max_fee: i64, ) -> Result<[u8; 32]>; /// Bridge CTransaction -> ffi::Tx, using the given spent coins. fn bridge_tx( tx: &CTransaction, spent_coins: &CxxVector, ) -> Result; /// Bridge bitcoind's classes to the shared struct [`Block`]. fn bridge_block( block: &CBlock, block_undo: &CBlockUndo, block_index: &CBlockIndex, ) -> Result; - /// Load the CTransaction and CTxUndo data from disk and turn it into a - /// bridged Tx, containing spent coins etc. - fn load_tx(file_num: u32, data_pos: u32, undo_pos: u32) -> Result; - - /// Load the CTransaction from disk and serialize it. - fn load_raw_tx(file_num: u32, data_pos: u32) -> Result>; - /// Get a BlockInfo for this CBlockIndex. fn get_block_info(block_index: &CBlockIndex) -> BlockInfo; /// CBlockIndex::GetAncestor fn get_block_ancestor( block_index: &CBlockIndex, height: i32, ) -> Result<&CBlockIndex>; /// Compress the given script using `ScriptCompression`. fn compress_script(script: &[u8]) -> Vec; /// Decompress the given script using `ScriptCompression`. fn decompress_script(compressed: &[u8]) -> Result>; /// Calc the fee in satoshis for the given tx size in bytes. fn calc_fee(num_bytes: usize, sats_fee_per_kb: i64) -> i64; /// Default maximum fee rate when broadcasting txs. fn default_max_raw_tx_fee_rate_per_kb() -> i64; /// Calls `SyncWithValidationInterfaceQueue` from validationinterface.h /// to make sure wallet/indexes are synced. fn sync_with_validation_interface_queue(); /// Calls `InitError` from `node/ui_interface.h` to report an error to /// the user and then gracefully shut down the node. fn init_error(msg: &str) -> bool; /// Calls `AbortNode` from shutdown.h to gracefully shut down the node /// when an unrecoverable error occured. fn abort_node(msg: &str, user_msg: &str); /// Returns true if a shutdown is requested, false otherwise. /// See `ShutdownRequested` in `shutdown.h`. fn shutdown_requested() -> bool; } } /// SAFETY: All fields of ChronikBridge (const Consensus::Params &, const /// node::NodeContext &) can be moved betweed threads safely. #[allow(unsafe_code)] unsafe impl Send for ChronikBridge {} /// SAFETY: All fields of ChronikBridge (const Consensus::Params &, const /// node::NodeContext &) can be accessed from different threads safely. #[allow(unsafe_code)] unsafe impl Sync for ChronikBridge {} impl std::fmt::Debug for ChronikBridge { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikBridge").finish_non_exhaustive() } } diff --git a/chronik/chronik-cpp/chronik_bridge.cpp b/chronik/chronik-cpp/chronik_bridge.cpp index 66a7962f6..6fcc8ed5e 100644 --- a/chronik/chronik-cpp/chronik_bridge.cpp +++ b/chronik/chronik-cpp/chronik_bridge.cpp @@ -1,391 +1,393 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include chronik_bridge::OutPoint BridgeOutPoint(const COutPoint &outpoint) { return { .txid = chronik::util::HashToArray(outpoint.GetTxId()), .out_idx = outpoint.GetN(), }; } chronik_bridge::TxOutput BridgeTxOutput(const CTxOut &output) { return { .value = output.nValue / Amount::satoshi(), .script = chronik::util::ToRustVec(output.scriptPubKey), }; } chronik_bridge::Coin BridgeCoin(const Coin &coin) { const int32_t nHeight = coin.GetHeight() == 0x7fff'ffff ? -1 : coin.GetHeight(); return { .output = BridgeTxOutput(coin.GetTxOut()), .height = nHeight, .is_coinbase = coin.IsCoinBase(), }; } rust::Vec BridgeTxInputs(bool isCoinbase, const std::vector &inputs, const std::vector &spent_coins) { rust::Vec bridged_inputs; bridged_inputs.reserve(inputs.size()); for (size_t idx = 0; idx < inputs.size(); ++idx) { const CTxIn &input = inputs[idx]; chronik_bridge::Coin bridge_coin{}; // empty coin if (!isCoinbase) { if (idx >= spent_coins.size()) { throw std::runtime_error("Missing coin for input"); } bridge_coin = BridgeCoin(spent_coins[idx]); } bridged_inputs.push_back({ .prev_out = BridgeOutPoint(input.prevout), .script = chronik::util::ToRustVec(input.scriptSig), .sequence = input.nSequence, .coin = std::move(bridge_coin), }); } return bridged_inputs; } rust::Vec BridgeTxOutputs(const std::vector &outputs) { rust::Vec bridged_outputs; bridged_outputs.reserve(outputs.size()); for (const CTxOut &output : outputs) { bridged_outputs.push_back(BridgeTxOutput(output)); } return bridged_outputs; } chronik_bridge::Tx BridgeTx(bool isCoinbase, const CTransaction &tx, const std::vector &spent_coins) { return { .txid = chronik::util::HashToArray(tx.GetId()), .version = tx.nVersion, .inputs = BridgeTxInputs(isCoinbase, tx.vin, spent_coins), .outputs = BridgeTxOutputs(tx.vout), .locktime = tx.nLockTime, }; } chronik_bridge::BlockTx BridgeBlockTx(bool isCoinbase, const CTransaction &tx, const std::vector &spent_coins, size_t data_pos, size_t undo_pos) { return {.tx = BridgeTx(isCoinbase, tx, spent_coins), .data_pos = uint32_t(data_pos), .undo_pos = uint32_t(isCoinbase ? 0 : undo_pos)}; } size_t GetFirstBlockTxOffset(const CBlock &block, const CBlockIndex &bindex) { return bindex.nDataPos + ::GetSerializeSize(CBlockHeader()) + GetSizeOfCompactSize(block.vtx.size()); } size_t GetFirstUndoOffset(const CBlock &block, const CBlockIndex &bindex) { // We have to -1 here, because coinbase txs don't have undo data. return bindex.nUndoPos + GetSizeOfCompactSize(block.vtx.size() - 1); } chronik_bridge::Block BridgeBlock(const CBlock &block, const CBlockUndo &block_undo, const CBlockIndex &bindex) { size_t data_pos = GetFirstBlockTxOffset(block, bindex); size_t undo_pos = 0; // Set undo offset; for the genesis block leave it at 0 if (bindex.nHeight > 0) { undo_pos = GetFirstUndoOffset(block, bindex); } rust::Vec bridged_txs; for (size_t tx_idx = 0; tx_idx < block.vtx.size(); ++tx_idx) { const bool isCoinbase = tx_idx == 0; const CTransaction &tx = *block.vtx[tx_idx]; if (!isCoinbase && tx_idx - 1 >= block_undo.vtxundo.size()) { throw std::runtime_error("Missing undo data for tx"); } const std::vector &spent_coins = isCoinbase ? std::vector() : block_undo.vtxundo[tx_idx - 1].vprevout; bridged_txs.push_back( BridgeBlockTx(isCoinbase, tx, spent_coins, data_pos, undo_pos)); // advance data_pos and undo_pos positions data_pos += ::GetSerializeSize(tx); if (!isCoinbase) { undo_pos += ::GetSerializeSize(block_undo.vtxundo[tx_idx - 1]); } } return {.hash = chronik::util::HashToArray(block.GetHash()), .prev_hash = chronik::util::HashToArray(block.hashPrevBlock), .n_bits = block.nBits, .timestamp = block.GetBlockTime(), .height = bindex.nHeight, .file_num = uint32_t(bindex.nFile), .data_pos = bindex.nDataPos, .undo_pos = bindex.nUndoPos, .size = ::GetSerializeSize(block), .txs = bridged_txs}; } namespace chronik_bridge { void log_print(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg) { LogInstance().LogPrintStr(std::string(msg), std::string(logging_function), std::string(source_file), source_line); } void log_print_chronik(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg) { if (LogInstance().WillLogCategory(BCLog::CHRONIK)) { log_print(logging_function, source_file, source_line, msg); } } const CBlockIndex &ChronikBridge::get_chain_tip() const { const CBlockIndex *tip = WITH_LOCK(cs_main, return m_node.chainman->ActiveTip()); if (tip == nullptr) { throw block_index_not_found(); } return *tip; } const CBlockIndex & ChronikBridge::lookup_block_index(std::array hash) const { BlockHash block_hash{chronik::util::ArrayToHash(hash)}; const CBlockIndex *pindex = WITH_LOCK( cs_main, return m_node.chainman->m_blockman.LookupBlockIndex(block_hash)); if (!pindex) { throw block_index_not_found(); } return *pindex; } std::unique_ptr ChronikBridge::load_block(const CBlockIndex &bindex) const { CBlock block; if (!node::ReadBlockFromDisk(block, &bindex, m_consensus)) { throw std::runtime_error("Reading block data failed"); } return std::make_unique(std::move(block)); } std::unique_ptr ChronikBridge::load_block_undo(const CBlockIndex &bindex) const { CBlockUndo block_undo; // Read undo data (genesis block doesn't have undo data) if (bindex.nHeight > 0) { if (!node::UndoReadFromDisk(block_undo, &bindex)) { throw std::runtime_error("Reading block undo data failed"); } } return std::make_unique(std::move(block_undo)); } +Tx ChronikBridge::load_tx(uint32_t file_num, uint32_t data_pos, + uint32_t undo_pos) const { + CMutableTransaction tx; + CTxUndo txundo{}; + const bool isCoinbase = undo_pos == 0; + if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { + throw std::runtime_error("Reading tx data from disk failed"); + } + if (!isCoinbase) { + if (!node::ReadTxUndoFromDisk(txundo, + FlatFilePos(file_num, undo_pos))) { + throw std::runtime_error("Reading tx undo data from disk failed"); + } + } + return BridgeTx(isCoinbase, CTransaction(std::move(tx)), txundo.vprevout); +} + +rust::Vec ChronikBridge::load_raw_tx(uint32_t file_num, + uint32_t data_pos) const { + CMutableTransaction tx; + if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { + throw std::runtime_error("Reading tx data from disk failed"); + } + CDataStream raw_tx{SER_NETWORK, PROTOCOL_VERSION}; + raw_tx << tx; + return chronik::util::ToRustVec(raw_tx); +} + Tx bridge_tx(const CTransaction &tx, const std::vector<::Coin> &spent_coins) { return BridgeTx(false, tx, spent_coins); } const CBlockIndex &ChronikBridge::find_fork(const CBlockIndex &index) const { const CBlockIndex *fork = WITH_LOCK( cs_main, return m_node.chainman->ActiveChainstate().m_chain.FindFork(&index)); if (!fork) { throw block_index_not_found(); } return *fork; } void ChronikBridge::lookup_spent_coins( Tx &tx, rust::Vec ¬_found, rust::Vec &coins_to_uncache) const { not_found.clear(); coins_to_uncache.clear(); LOCK(cs_main); CCoinsViewCache &coins_cache = m_node.chainman->ActiveChainstate().CoinsTip(); CCoinsViewMemPool coin_view(&coins_cache, *m_node.mempool); for (TxInput &input : tx.inputs) { TxId txid = TxId(chronik::util::ArrayToHash(input.prev_out.txid)); COutPoint outpoint = COutPoint(txid, input.prev_out.out_idx); // Remember if coin was already cached const bool had_cached = coins_cache.HaveCoinInCache(outpoint); ::Coin coin; if (!coin_view.GetCoin(outpoint, coin)) { not_found.push_back(input.prev_out); continue; } if (!had_cached) { // Only add if previously uncached. // We don't check if the prev_out is now cached (which wouldn't be // the case for a mempool UTXO), as uncaching an outpoint is cheap, // so we save one extra cache lookup here. coins_to_uncache.push_back(input.prev_out); } input.coin = BridgeCoin(coin); } } void ChronikBridge::uncache_coins( rust::Slice coins_to_uncache) const { LOCK(cs_main); CCoinsViewCache &coins_cache = m_node.chainman->ActiveChainstate().CoinsTip(); for (const OutPoint &outpoint : coins_to_uncache) { TxId txid = TxId(chronik::util::ArrayToHash(outpoint.txid)); coins_cache.Uncache(COutPoint(txid, outpoint.out_idx)); } } std::array ChronikBridge::broadcast_tx(rust::Slice raw_tx, int64_t max_fee) const { std::vector vec = chronik::util::FromRustSlice(raw_tx); CDataStream stream{vec, SER_NETWORK, PROTOCOL_VERSION}; CMutableTransaction tx; stream >> tx; CTransactionRef tx_ref = MakeTransactionRef(tx); std::string err_str; TransactionError error = node::BroadcastTransaction( m_node, tx_ref, err_str, max_fee * Amount::satoshi(), /*relay=*/true, /*wait_callback=*/false); if (error != TransactionError::OK) { bilingual_str txErrorMsg = TransactionErrorString(error); if (err_str.empty()) { throw std::runtime_error(txErrorMsg.original.c_str()); } else { std::string msg = strprintf("%s: %s", txErrorMsg.original, err_str); throw std::runtime_error(msg.c_str()); } } return chronik::util::HashToArray(tx_ref->GetId()); } std::unique_ptr make_bridge(const Config &config, const node::NodeContext &node) { return std::make_unique( config.GetChainParams().GetConsensus(), node); } chronik_bridge::Block bridge_block(const CBlock &block, const CBlockUndo &block_undo, const CBlockIndex &bindex) { return BridgeBlock(block, block_undo, bindex); } -Tx load_tx(uint32_t file_num, uint32_t data_pos, uint32_t undo_pos) { - CMutableTransaction tx; - CTxUndo txundo{}; - const bool isCoinbase = undo_pos == 0; - if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { - throw std::runtime_error("Reading tx data from disk failed"); - } - if (!isCoinbase) { - if (!node::ReadTxUndoFromDisk(txundo, - FlatFilePos(file_num, undo_pos))) { - throw std::runtime_error("Reading tx undo data from disk failed"); - } - } - return BridgeTx(isCoinbase, CTransaction(std::move(tx)), txundo.vprevout); -} - -rust::Vec load_raw_tx(uint32_t file_num, uint32_t data_pos) { - CMutableTransaction tx; - if (!node::ReadTxFromDisk(tx, FlatFilePos(file_num, data_pos))) { - throw std::runtime_error("Reading tx data from disk failed"); - } - CDataStream raw_tx{SER_NETWORK, PROTOCOL_VERSION}; - raw_tx << tx; - return chronik::util::ToRustVec(raw_tx); -} - BlockInfo get_block_info(const CBlockIndex &bindex) { return { .hash = chronik::util::HashToArray(bindex.GetBlockHash()), .height = bindex.nHeight, }; } const CBlockIndex &get_block_ancestor(const CBlockIndex &index, int32_t height) { const CBlockIndex *pindex = index.GetAncestor(height); if (!pindex) { throw block_index_not_found(); } return *pindex; } rust::Vec compress_script(rust::Slice bytecode) { std::vector vec = chronik::util::FromRustSlice(bytecode); CScript script{vec.begin(), vec.end()}; CDataStream compressed{SER_NETWORK, PROTOCOL_VERSION}; compressed << Using(script); return chronik::util::ToRustVec(compressed); } rust::Vec decompress_script(rust::Slice compressed) { std::vector vec = chronik::util::FromRustSlice(compressed); CDataStream stream{vec, SER_NETWORK, PROTOCOL_VERSION}; CScript script; stream >> Using(script); return chronik::util::ToRustVec(script); } int64_t calc_fee(size_t num_bytes, int64_t sats_fee_per_kb) { return CFeeRate(sats_fee_per_kb * SATOSHI).GetFee(num_bytes) / SATOSHI; } int64_t default_max_raw_tx_fee_rate_per_kb() { return node::DEFAULT_MAX_RAW_TX_FEE_RATE.GetFeePerK() / SATOSHI; } void sync_with_validation_interface_queue() { SyncWithValidationInterfaceQueue(); } bool init_error(const rust::Str msg) { return InitError(Untranslated(std::string(msg))); } void abort_node(const rust::Str msg, const rust::Str user_msg) { AbortNode(std::string(msg), Untranslated(std::string(user_msg))); } bool shutdown_requested() { return ShutdownRequested(); } } // namespace chronik_bridge diff --git a/chronik/chronik-cpp/chronik_bridge.h b/chronik/chronik-cpp/chronik_bridge.h index 712e11485..2a2c2f626 100644 --- a/chronik/chronik-cpp/chronik_bridge.h +++ b/chronik/chronik-cpp/chronik_bridge.h @@ -1,113 +1,114 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H #define BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H #include #include #include class CBlock; class CBlockIndex; class CBlockUndo; class Coin; class Config; class CTransaction; namespace Consensus { struct Params; } // namespace Consensus namespace node { struct NodeContext; } // namespace node class uint256; namespace chronik_bridge { struct BlockInfo; struct Block; struct Tx; struct OutPoint; class block_index_not_found : public std::exception { public: const char *what() const noexcept override { return "CBlockIndex not found"; } }; void log_print(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg); void log_print_chronik(const rust::Str logging_function, const rust::Str source_file, const uint32_t source_line, const rust::Str msg); /** * Bridge to bitcoind to access the node. */ class ChronikBridge { const Consensus::Params &m_consensus; const node::NodeContext &m_node; public: ChronikBridge(const Consensus::Params &consensus, const node::NodeContext &node) : m_consensus(consensus), m_node(node) {} const CBlockIndex &get_chain_tip() const; const CBlockIndex &lookup_block_index(std::array hash) const; std::unique_ptr load_block(const CBlockIndex &bindex) const; std::unique_ptr load_block_undo(const CBlockIndex &bindex) const; + Tx load_tx(uint32_t file_num, uint32_t data_pos, uint32_t undo_pos) const; + + rust::Vec load_raw_tx(uint32_t file_num, uint32_t data_pos) const; + const CBlockIndex &find_fork(const CBlockIndex &index) const; void lookup_spent_coins(Tx &, rust::Vec ¬_found, rust::Vec &coins_to_uncache) const; void uncache_coins(rust::Slice) const; std::array broadcast_tx(rust::Slice raw_tx, int64_t max_fee) const; }; std::unique_ptr make_bridge(const Config &config, const node::NodeContext &node); Tx bridge_tx(const CTransaction &tx, const std::vector &spent_coins); Block bridge_block(const CBlock &block, const CBlockUndo &block_undo, const CBlockIndex &bindex); -Tx load_tx(uint32_t file_num, uint32_t data_pos, uint32_t undo_pos); -rust::Vec load_raw_tx(uint32_t file_num, uint32_t data_pos); - BlockInfo get_block_info(const CBlockIndex &index); const CBlockIndex &get_block_ancestor(const CBlockIndex &index, int32_t height); rust::Vec compress_script(rust::Slice script); rust::Vec decompress_script(rust::Slice compressed); int64_t calc_fee(size_t num_bytes, int64_t sats_fee_per_kb); int64_t default_max_raw_tx_fee_rate_per_kb(); void sync_with_validation_interface_queue(); bool init_error(const rust::Str msg); void abort_node(const rust::Str msg, const rust::Str user_msg); bool shutdown_requested(); } // namespace chronik_bridge #endif // BITCOIN_CHRONIK_CPP_CHRONIK_BRIDGE_H diff --git a/chronik/chronik-http/src/handlers.rs b/chronik/chronik-http/src/handlers.rs index b7fee60df..f85fd6d82 100644 --- a/chronik/chronik-http/src/handlers.rs +++ b/chronik/chronik-http/src/handlers.rs @@ -1,187 +1,194 @@ //! Module for Chronik handlers. use std::{collections::HashMap, fmt::Display, str::FromStr}; use abc_rust_error::{Report, Result}; use bitcoinsuite_slp::token_id::TokenId; -use chronik_indexer::indexer::ChronikIndexer; +use chronik_indexer::indexer::{ChronikIndexer, Node}; use chronik_proto::proto; use hyper::Uri; use thiserror::Error; use crate::{error::ReportError, parse::parse_script_variant_hex}; /// Errors for HTTP handlers. #[derive(Debug, Error, PartialEq)] pub enum ChronikHandlerError { /// Not found #[error("404: Not found: {0}")] RouteNotFound(Uri), /// Query parameter has an invalid value #[error("400: Invalid param {param_name}: {param_value}, {msg}")] InvalidParam { /// Name of the param param_name: String, /// Value of the param param_value: String, /// Human-readable error message. msg: String, }, } use self::ChronikHandlerError::*; fn get_param( params: &HashMap, param_name: &str, ) -> Result> where T::Err: Display, { let Some(param) = params.get(param_name) else { return Ok(None); }; Ok(Some(param.parse::().map_err(|err| InvalidParam { param_name: param_name.to_string(), param_value: param.to_string(), msg: err.to_string(), })?)) } /// Fallback route that returns a 404 response pub async fn handle_not_found(uri: Uri) -> Result<(), ReportError> { Err(Report::from(RouteNotFound(uri)).into()) } /// Return a page of the txs of a block. pub async fn handle_block_txs( hash_or_height: String, query_params: &HashMap, indexer: &ChronikIndexer, + node: &Node, ) -> Result { - let blocks = indexer.blocks(); + let blocks = indexer.blocks(node); let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); blocks.block_txs(hash_or_height, page_num as usize, page_size as usize) } /// Return a page of the confirmed txs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_confirmed_txs( script_type: &str, payload: &str, query_params: &HashMap, indexer: &ChronikIndexer, + node: &Node, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; - let script_history = indexer.script_history()?; + let script_history = indexer.script_history(node)?; let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); let script = script_variant.to_script(); script_history.confirmed_txs(&script, page_num as usize, page_size as usize) } /// Return a page of the tx history of the given script, in reverse /// chronological order, i.e. the latest transaction first and then going back /// in time. Scripts are identified by script_type and payload. pub async fn handle_script_history( script_type: &str, payload: &str, query_params: &HashMap, indexer: &ChronikIndexer, + node: &Node, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; - let script_history = indexer.script_history()?; + let script_history = indexer.script_history(node)?; let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); let script = script_variant.to_script(); script_history.rev_history(&script, page_num as usize, page_size as usize) } /// Return a page of the unconfirmed txs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_unconfirmed_txs( script_type: &str, payload: &str, indexer: &ChronikIndexer, + node: &Node, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; - let script_history = indexer.script_history()?; + let script_history = indexer.script_history(node)?; let script = script_variant.to_script(); script_history.unconfirmed_txs(&script) } /// Return the UTXOs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_utxos( script_type: &str, payload: &str, indexer: &ChronikIndexer, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_utxos = indexer.script_utxos()?; let script = script_variant.to_script(); let utxos = script_utxos.utxos(&script)?; Ok(proto::ScriptUtxos { script: script.bytecode().to_vec(), utxos, }) } /// Return a page of the confirmed txs of the given token ID. pub async fn handle_token_id_confirmed_txs( token_id_hex: &str, query_params: &HashMap, indexer: &ChronikIndexer, + node: &Node, ) -> Result { let token_id = token_id_hex.parse::()?; - let token_id_history = indexer.token_id_history(); + let token_id_history = indexer.token_id_history(node); let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); token_id_history.confirmed_txs( token_id, page_num as usize, page_size as usize, ) } /// Return a page of the tx history of the given token ID, in reverse /// chronological order, i.e. the latest transaction first and then going back /// in time. pub async fn handle_token_id_history( token_id_hex: &str, query_params: &HashMap, indexer: &ChronikIndexer, + node: &Node, ) -> Result { let token_id = token_id_hex.parse::()?; - let token_id_history = indexer.token_id_history(); + let token_id_history = indexer.token_id_history(node); let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); token_id_history.rev_history( token_id, page_num as usize, page_size as usize, ) } /// Return a page of the unconfirmed txs of the given token ID. pub async fn handle_token_id_unconfirmed_txs( token_id_hex: &str, indexer: &ChronikIndexer, + node: &Node, ) -> Result { let token_id = token_id_hex.parse::()?; - let token_id_history = indexer.token_id_history(); + let token_id_history = indexer.token_id_history(node); token_id_history.unconfirmed_txs(token_id) } /// Return the UTXOs of the given token ID. pub async fn handle_token_id_utxos( token_id_hex: &str, indexer: &ChronikIndexer, ) -> Result { let token_id = token_id_hex.parse::()?; let token_id_utxos = indexer.token_id_utxos(); let utxos = token_id_utxos.utxos(token_id)?; Ok(proto::Utxos { utxos }) } diff --git a/chronik/chronik-http/src/server.rs b/chronik/chronik-http/src/server.rs index b1f3cd4eb..351967760 100644 --- a/chronik/chronik-http/src/server.rs +++ b/chronik/chronik-http/src/server.rs @@ -1,492 +1,519 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`ChronikServer`]. use std::collections::HashMap; use std::time::Duration; use std::{net::SocketAddr, sync::Arc}; use abc_rust_error::{Result, WrapErr}; use axum::{ extract::{Path, Query, WebSocketUpgrade}, response::IntoResponse, routing::{self, MethodFilter}, Extension, Router, }; use bitcoinsuite_core::tx::TxId; use chronik_bridge::ffi; use chronik_indexer::{ indexer::{ChronikIndexer, Node}, pause::PauseNotify, }; use chronik_proto::proto; use thiserror::Error; use tokio::sync::RwLock; use crate::{ error::ReportError, handlers, protobuf::Protobuf, ws::handle_subscribe_socket, }; /// Ref-counted indexer with read or write access pub type ChronikIndexerRef = Arc>; /// Ref-counted access to the bitcoind node pub type NodeRef = Arc; /// Ref-counted pause notifier for Chronik indexing pub type PauseNotifyRef = Arc; /// Settings to tune Chronik #[derive(Clone, Debug)] pub struct ChronikSettings { /// Duration between WebSocket pings initiated by Chronik. pub ws_ping_interval: Duration, } /// Params defining what and where to serve for [`ChronikServer`]. #[derive(Clone, Debug)] pub struct ChronikServerParams { /// Host address (port + IP) where to serve Chronik at. pub hosts: Vec, /// Indexer to read data from pub indexer: ChronikIndexerRef, /// Access to the bitcoind node pub node: NodeRef, /// Handle for pausing/resuming indexing any updates from the node pub pause_notify: PauseNotifyRef, /// Settings to tune Chronik pub settings: ChronikSettings, } /// Chronik HTTP server, holding all the data/handles required to serve an /// instance. #[derive(Debug)] pub struct ChronikServer { tcp_listeners: Vec, indexer: ChronikIndexerRef, node: NodeRef, pause_notify: PauseNotifyRef, settings: ChronikSettings, } /// Errors for [`ChronikServer`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikServerError { /// Binding to host address failed #[error("Chronik failed binding to {0}: {1}")] FailedBindingAddress(SocketAddr, String), /// Serving Chronik failed #[error("Chronik failed serving: {0}")] ServingFailed(String), /// Query is neither a hex hash nor an integer string #[error("400: Not a hash or height: {0}")] NotHashOrHeight(String), /// Query is not a txid #[error("400: Not a txid: {0}")] NotTxId(String), /// Block not found in DB #[error("404: Block not found: {0}")] BlockNotFound(String), } use self::ChronikServerError::*; impl ChronikServer { /// Binds the Chronik server on the given hosts pub fn setup(params: ChronikServerParams) -> Result { let tcp_listeners = params .hosts .into_iter() .map(|host| -> Result<_> { let tcp = std::net::TcpListener::bind(host).map_err(|err| { FailedBindingAddress(host, err.to_string()) })?; // Important: We need to set non-blocking ourselves tcp.set_nonblocking(true)?; Ok(tokio::net::TcpListener::from_std(tcp)?) }) .collect::>>()?; Ok(ChronikServer { tcp_listeners, indexer: params.indexer, node: params.node, pause_notify: params.pause_notify, settings: params.settings, }) } /// Serve a Chronik HTTP endpoint with the given parameters. pub async fn serve(self) -> Result<()> { let app = Self::make_router( self.indexer, self.node, self.pause_notify, self.settings, ); let servers = self .tcp_listeners .into_iter() .zip(std::iter::repeat(app)) .map(|(tcp_listener, app)| { Box::pin(async move { axum::serve(tcp_listener, app.into_make_service()) .await .map_err(|err| ServingFailed(err.to_string())) }) }); let (result, _, _) = futures::future::select_all(servers).await; result?; Ok(()) } fn make_router( indexer: ChronikIndexerRef, node: NodeRef, pause_notify: PauseNotifyRef, settings: ChronikSettings, ) -> Router { Router::new() .route("/blockchain-info", routing::get(handle_blockchain_info)) .route("/block/:hash_or_height", routing::get(handle_block)) .route("/block-txs/:hash_or_height", routing::get(handle_block_txs)) .route("/blocks/:start/:end", routing::get(handle_block_range)) .route("/chronik-info", routing::get(handle_chronik_info)) .route("/tx/:txid", routing::get(handle_tx)) .route("/token/:txid", routing::get(handle_token_info)) .route( "/validate-tx", routing::post(handle_validate_tx) .on(MethodFilter::OPTIONS, handle_post_options), ) .route( "/broadcast-tx", routing::post(handle_broadcast_tx) .on(MethodFilter::OPTIONS, handle_post_options), ) .route( "/broadcast-txs", routing::post(handle_broadcast_txs) .on(MethodFilter::OPTIONS, handle_post_options), ) .route("/raw-tx/:txid", routing::get(handle_raw_tx)) .route( "/script/:type/:payload/confirmed-txs", routing::get(handle_script_confirmed_txs), ) .route( "/script/:type/:payload/history", routing::get(handle_script_history), ) .route( "/script/:type/:payload/unconfirmed-txs", routing::get(handle_script_unconfirmed_txs), ) .route( "/script/:type/:payload/utxos", routing::get(handle_script_utxos), ) .route( "/token-id/:token_id/confirmed-txs", routing::get(handle_token_id_confirmed_txs), ) .route( "/token-id/:token_id/history", routing::get(handle_token_id_history), ) .route( "/token-id/:token_id/unconfirmed-txs", routing::get(handle_token_id_unconfirmed_txs), ) .route( "/token-id/:token_id/utxos", routing::get(handle_token_id_utxos), ) .route("/ws", routing::get(handle_ws)) .route("/pause", routing::get(handle_pause)) .route("/resume", routing::get(handle_resume)) .fallback(handlers::handle_not_found) .layer(Extension(indexer)) .layer(Extension(node)) .layer(Extension(pause_notify)) .layer(Extension(settings)) } } async fn handle_blockchain_info( Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; - let blocks = indexer.blocks(); + let blocks = indexer.blocks(&node); Ok(Protobuf(blocks.blockchain_info()?)) } async fn handle_chronik_info( ) -> Result, ReportError> { let this_chronik_version: String = env!("CARGO_PKG_VERSION").to_string(); let chronik_info = proto::ChronikInfo { version: this_chronik_version, }; Ok(Protobuf(chronik_info)) } async fn handle_block_range( Path((start_height, end_height)): Path<(i32, i32)>, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; - let blocks = indexer.blocks(); + let blocks = indexer.blocks(&node); Ok(Protobuf(blocks.by_range(start_height, end_height)?)) } async fn handle_block( Path(hash_or_height): Path, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; - let blocks = indexer.blocks(); + let blocks = indexer.blocks(&node); Ok(Protobuf(blocks.by_hash_or_height(hash_or_height)?)) } async fn handle_block_txs( Path(hash_or_height): Path, Query(query_params): Query>, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( - handlers::handle_block_txs(hash_or_height, &query_params, &indexer) - .await?, + handlers::handle_block_txs( + hash_or_height, + &query_params, + &indexer, + &node, + ) + .await?, )) } async fn handle_tx( Path(txid): Path, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; - Ok(Protobuf(indexer.txs().tx_by_id(txid)?)) + Ok(Protobuf(indexer.txs(&node).tx_by_id(txid)?)) } async fn handle_token_info( Path(txid): Path, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; - Ok(Protobuf(indexer.txs().token_info(&txid)?)) + Ok(Protobuf(indexer.txs(&node).token_info(&txid)?)) } async fn handle_broadcast_tx( Extension(indexer): Extension, Extension(node): Extension, Protobuf(request): Protobuf, ) -> Result, ReportError> { let indexer = indexer.read().await; let txids_result = indexer .broadcast(node.as_ref()) .broadcast_txs(&[request.raw_tx.into()], request.skip_token_checks); // Drop indexer before syncing otherwise we get a deadlock drop(indexer); // Block for indexer being synced before returning so the user can query // the broadcast txs right away ffi::sync_with_validation_interface_queue(); let txids = txids_result?; Ok(Protobuf(proto::BroadcastTxResponse { txid: txids[0].to_vec(), })) } async fn handle_broadcast_txs( Extension(indexer): Extension, Extension(node): Extension, Protobuf(request): Protobuf, ) -> Result, ReportError> { let indexer = indexer.read().await; let txids_result = indexer.broadcast(node.as_ref()).broadcast_txs( &request .raw_txs .into_iter() .map(Into::into) .collect::>(), request.skip_token_checks, ); // Drop indexer before syncing otherwise we get a deadlock drop(indexer); // Block for indexer being synced before returning so the user can query // the broadcast txs right away ffi::sync_with_validation_interface_queue(); let txids = txids_result?; Ok(Protobuf(proto::BroadcastTxsResponse { txids: txids.into_iter().map(|txid| txid.to_vec()).collect(), })) } async fn handle_validate_tx( Extension(indexer): Extension, Extension(node): Extension, Protobuf(raw_tx): Protobuf, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( indexer .broadcast(node.as_ref()) .validate_tx(raw_tx.raw_tx)?, )) } async fn handle_raw_tx( Path(txid): Path, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; - Ok(Protobuf(indexer.txs().raw_tx_by_id(&txid)?)) + Ok(Protobuf(indexer.txs(&node).raw_tx_by_id(&txid)?)) } async fn handle_script_confirmed_txs( Path((script_type, payload)): Path<(String, String)>, Query(query_params): Query>, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_confirmed_txs( &script_type, &payload, &query_params, &indexer, + &node, ) .await?, )) } async fn handle_script_history( Path((script_type, payload)): Path<(String, String)>, Query(query_params): Query>, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_history( &script_type, &payload, &query_params, &indexer, + &node, ) .await?, )) } async fn handle_script_unconfirmed_txs( Path((script_type, payload)): Path<(String, String)>, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_unconfirmed_txs( &script_type, &payload, &indexer, + &node, ) .await?, )) } async fn handle_script_utxos( Path((script_type, payload)): Path<(String, String)>, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_utxos(&script_type, &payload, &indexer).await?, )) } async fn handle_token_id_confirmed_txs( Path(token_id_hex): Path, Query(query_params): Query>, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_token_id_confirmed_txs( &token_id_hex, &query_params, &indexer, + &node, ) .await?, )) } async fn handle_token_id_history( Path(token_id_hex): Path, Query(query_params): Query>, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_token_id_history( &token_id_hex, &query_params, &indexer, + &node, ) .await?, )) } async fn handle_token_id_unconfirmed_txs( Path(token_id_hex): Path, Extension(indexer): Extension, + Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( - handlers::handle_token_id_unconfirmed_txs(&token_id_hex, &indexer) - .await?, + handlers::handle_token_id_unconfirmed_txs( + &token_id_hex, + &indexer, + &node, + ) + .await?, )) } async fn handle_token_id_utxos( Path(token_id_hex): Path, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_token_id_utxos(&token_id_hex, &indexer).await?, )) } async fn handle_pause( Extension(pause_notify): Extension, ) -> Result, ReportError> { pause_notify.pause()?; Ok(Protobuf(proto::Empty {})) } async fn handle_resume( Extension(pause_notify): Extension, ) -> Result, ReportError> { pause_notify.resume()?; Ok(Protobuf(proto::Empty {})) } async fn handle_ws( ws: WebSocketUpgrade, Extension(indexer): Extension, Extension(settings): Extension, ) -> impl IntoResponse { ws.on_upgrade(|ws| handle_subscribe_socket(ws, indexer, settings)) } async fn handle_post_options( ) -> Result, ReportError> { axum::http::Response::builder() .header("Allow", "OPTIONS, HEAD, POST") .body(axum::body::Body::empty()) .map_err(|err| ReportError(err.into())) } diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs index b5ec88173..d95e4c4cf 100644 --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -1,927 +1,937 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`ChronikIndexer`] to index blocks and txs. use std::{io::Write, path::PathBuf}; use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; use chronik_bridge::{ffi, util::expect_unique_ptr}; use chronik_db::{ db::{Db, WriteBatch}, groups::{ ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, TokenIdGroup, TokenIdGroupAux, TokenIdHistoryWriter, TokenIdUtxoWriter, }, index_tx::prepare_indexed_txs, io::{ merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, TxReader, TxWriter, }, mem::{MemData, MemDataConf, Mempool, MempoolTx}, }; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, indexer::ChronikIndexerError::*, query::{ QueryBlocks, QueryBroadcast, QueryGroupHistory, QueryGroupUtxos, QueryTxs, UtxoProtobufOutput, UtxoProtobufValue, }, subs::{BlockMsg, BlockMsgType, Subs}, subs_group::TxMsgType, }; const CURRENT_INDEXER_VERSION: SchemaVersion = 10; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] pub struct ChronikIndexerParams { /// Folder where the node stores its data, net-dependent. pub datadir_net: PathBuf, /// Whether to clear the DB before opening the DB, e.g. when reindexing. pub wipe_db: bool, /// Whether Chronik should index SLP/ALP token txs. pub enable_token_index: bool, /// Whether to output Chronik performance statistics into a perf/ folder pub enable_perf_stats: bool, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. #[derive(Debug)] pub struct ChronikIndexer { db: Db, mem_data: MemData, mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, subs: RwLock, perf_path: Option, is_token_index_enabled: bool, } /// Access to the bitcoind node. #[derive(Debug)] pub struct Node { /// FFI bridge to the node. pub bridge: cxx::UniquePtr, } /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { /// Data about the block (w/o txs) pub db_block: DbBlock, /// Txs in the block, with locations of where they are stored on disk. pub block_txs: BlockTxs, /// Block size in bytes. pub size: u64, /// Txs in the block, with inputs/outputs so we can group them. pub txs: Vec, } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikIndexerError { /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateDirFailed(PathBuf), /// Cannot rewind blocks that bitcoind doesn't have #[error( "Cannot rewind Chronik, it contains block {0} that the node doesn't \ have. You may need to use -reindex/-chronikreindex, or delete \ indexes/chronik and restart" )] CannotRewindChronik(BlockHash), /// Lower block doesn't exist but higher block does #[error( "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" )] BlocksBelowMissing { /// Lower height that is missing missing: BlockHeight, /// Higher height that exists exists: BlockHeight, }, /// Corrupted schema version #[error( "Corrupted schema version in the Chronik database, consider running \ -reindex/-chronikreindex" )] CorruptedSchemaVersion, /// Missing schema version for non-empty database #[error( "Missing schema version in non-empty Chronik database, consider \ running -reindex/-chronikreindex" )] MissingSchemaVersion, /// This Chronik instance is outdated #[error( "Chronik outdated: Chronik has version {}, but the database has \ version {0}. Upgrade your node to the appropriate version.", CURRENT_INDEXER_VERSION )] ChronikOutdated(SchemaVersion), /// Database is outdated #[error( "DB outdated: Chronik has version {}, but the database has version \ {0}. -reindex/-chronikreindex to reindex the database to the new \ version.", CURRENT_INDEXER_VERSION )] DatabaseOutdated(SchemaVersion), /// Cannot enable token index on a DB that previously had it disabled #[error( "Cannot enable -chroniktokenindex on a DB that previously had it \ disabled. Provide -reindex/-chronikreindex to reindex the database \ with token data, or specify -chroniktokenindex=0 to disable the \ token index again." )] CannotEnableTokenIndex, } impl ChronikIndexer { /// Setup the indexer with the given parameters, e.g. open the DB etc. pub fn setup(params: ChronikIndexerParams) -> Result { let indexes_path = params.datadir_net.join("indexes"); let perf_path = params.datadir_net.join("perf"); if !indexes_path.exists() { std::fs::create_dir(&indexes_path) .wrap_err_with(|| CreateDirFailed(indexes_path.clone()))?; } if params.enable_perf_stats && !perf_path.exists() { std::fs::create_dir(&perf_path) .wrap_err_with(|| CreateDirFailed(perf_path.clone()))?; } let db_path = indexes_path.join("chronik"); if params.wipe_db { log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; verify_schema_version(&db)?; verify_enable_token_index(&db, params.enable_token_index)?; let mempool = Mempool::new(ScriptGroup, params.enable_token_index); Ok(ChronikIndexer { db, mempool, mem_data: MemData::new(MemDataConf {}), script_group: ScriptGroup, avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(ScriptGroup)), perf_path: params.enable_perf_stats.then_some(perf_path), is_token_index_enabled: params.enable_token_index, }) } /// Resync Chronik index to the node pub fn resync_indexer( &mut self, bridge: &ffi::ChronikBridge, ) -> Result<()> { let block_reader = BlockReader::new(&self.db)?; let indexer_tip = block_reader.tip()?; let Ok(node_tip_index) = bridge.get_chain_tip() else { if let Some(indexer_tip) = &indexer_tip { return Err( CannotRewindChronik(indexer_tip.hash.clone()).into() ); } return Ok(()); }; let node_tip_info = ffi::get_block_info(node_tip_index); let node_height = node_tip_info.height; let node_tip_hash = BlockHash::from(node_tip_info.hash); let start_height = match indexer_tip { Some(tip) if tip.hash != node_tip_hash => { let indexer_tip_hash = tip.hash.clone(); let indexer_height = tip.height; log!( "Node and Chronik diverged, node is on block \ {node_tip_hash} at height {node_height}, and Chronik is \ on block {indexer_tip_hash} at height {indexer_height}.\n" ); let indexer_tip_index = bridge .lookup_block_index(tip.hash.to_bytes()) .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; self.rewind_indexer(bridge, indexer_tip_index, &tip)? } Some(tip) => tip.height, None => { log!( "Chronik database empty, syncing to block {node_tip_hash} \ at height {node_height}.\n" ); -1 } }; let tip_height = node_tip_info.height; for height in start_height + 1..=tip_height { if ffi::shutdown_requested() { log!("Stopped re-sync adding blocks\n"); return Ok(()); } let block_index = ffi::get_block_ancestor(node_tip_index, height)?; let block = self.load_chronik_block(bridge, block_index)?; let hash = block.db_block.hash.clone(); self.handle_block_connected(block)?; log_chronik!( "Added block {hash}, height {height}/{tip_height} to Chronik\n" ); if height % 100 == 0 { log!( "Synced Chronik up to block {hash} at height \ {height}/{tip_height}\n" ); } } log!( "Chronik completed re-syncing with the node, both are now at \ block {node_tip_hash} at height {node_height}.\n" ); if let Some(perf_path) = &self.perf_path { let mut resync_stats = std::fs::File::create(perf_path.join("resync_stats.txt"))?; write!(&mut resync_stats, "{:#.3?}", self.mem_data.stats())?; } Ok(()) } fn rewind_indexer( &mut self, bridge: &ffi::ChronikBridge, indexer_tip_index: &ffi::CBlockIndex, indexer_db_tip: &DbBlock, ) -> Result { let indexer_height = indexer_db_tip.height; let fork_block_index = bridge .find_fork(indexer_tip_index) .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; let fork_info = ffi::get_block_info(fork_block_index); let fork_block_hash = BlockHash::from(fork_info.hash); let fork_height = fork_info.height; let revert_height = fork_height + 1; log!( "The last common block is {fork_block_hash} at height \ {fork_height}.\n" ); log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); for height in (revert_height..indexer_height).rev() { if ffi::shutdown_requested() { log!("Stopped re-sync rewinding blocks\n"); // return MAX here so we don't add any blocks return Ok(BlockHeight::MAX); } let db_block = BlockReader::new(&self.db)? .by_height(height)? .ok_or(BlocksBelowMissing { missing: height, exists: indexer_height, })?; let block_index = bridge .lookup_block_index(db_block.hash.to_bytes()) .map_err(|_| CannotRewindChronik(db_block.hash))?; let block = self.load_chronik_block(bridge, block_index)?; self.handle_block_disconnected(block)?; } Ok(fork_info.height) } /// Add transaction to the indexer's mempool. pub fn handle_tx_added_to_mempool( &mut self, mempool_tx: MempoolTx, ) -> Result<()> { let result = self.mempool.insert(&self.db, mempool_tx)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::AddedToMempool, &result.token_id_aux, ); Ok(()) } /// Remove tx from the indexer's mempool, e.g. by a conflicting tx, expiry /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { let result = self.mempool.remove(txid)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::RemovedFromMempool, &result.token_id_aux, ); Ok(()) } /// Add the block to the index. pub fn handle_block_connected( &mut self, block: ChronikBlock, ) -> Result<()> { let height = block.db_block.height; let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.insert(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.insert( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.insert( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; let token_id_aux; if self.is_token_index_enabled { let processed_token_batch = token_writer.insert(&mut batch, &index_txs)?; token_id_aux = TokenIdGroupAux::from_batch(&index_txs, &processed_token_batch); token_id_history_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; } else { token_id_aux = TokenIdGroupAux::default(); } self.db.write_batch(batch)?; for tx in &block.block_txs.txs { self.mempool.remove_mined(&tx.txid)?; } merge::check_for_errors()?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Connected, hash: block.db_block.hash, height: block.db_block.height, }); for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Confirmed, &token_id_aux); } Ok(()) } /// Remove the block from the index. pub fn handle_block_disconnected( &mut self, block: ChronikBlock, ) -> Result<()> { let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.delete(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.delete( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.delete( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; if self.is_token_index_enabled { let token_id_aux = TokenIdGroupAux::from_db(&index_txs, &self.db)?; token_id_history_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; token_writer.delete(&mut batch, &index_txs)?; } self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Disconnected, hash: block.db_block.hash, height: block.db_block.height, }); Ok(()) } /// Block finalized with Avalanche. pub fn handle_block_finalized( &mut self, block: ChronikBlock, ) -> Result<()> { self.avalanche.finalize_block(block.db_block.height)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Finalized, hash: block.db_block.hash, height: block.db_block.height, }); let tx_reader = TxReader::new(&self.db)?; let first_tx_num = tx_reader .first_tx_num_by_block(block.db_block.height)? .unwrap(); let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; let token_id_aux = if self.is_token_index_enabled { TokenIdGroupAux::from_db(&index_txs, &self.db)? } else { TokenIdGroupAux::default() }; for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Finalized, &token_id_aux); } Ok(()) } /// Return [`QueryBroadcast`] to broadcast tx to the network. pub fn broadcast<'a>(&'a self, node: &'a Node) -> QueryBroadcast<'a> { QueryBroadcast { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryBlocks`] to read blocks from the DB. - pub fn blocks(&self) -> QueryBlocks<'_> { + pub fn blocks<'a>(&'a self, node: &'a Node) -> QueryBlocks<'a> { QueryBlocks { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, + node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryTxs`] to return txs from mempool/DB. - pub fn txs(&self) -> QueryTxs<'_> { + pub fn txs<'a>(&'a self, node: &'a Node) -> QueryTxs<'a> { QueryTxs { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, + node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupHistory`] for scripts to query the tx history of /// scripts. - pub fn script_history(&self) -> Result> { + pub fn script_history<'a>( + &'a self, + node: &'a Node, + ) -> Result> { Ok(QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.script_history(), group: self.script_group.clone(), + node, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupUtxos`] for scripts to query the utxos of scripts. pub fn script_utxos( &self, ) -> Result> { Ok(QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.script_utxos(), group: self.script_group.clone(), utxo_mapper: UtxoProtobufValue, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupHistory`] for token IDs to query the tx history of /// token IDs. - pub fn token_id_history(&self) -> QueryGroupHistory<'_, TokenIdGroup> { + pub fn token_id_history<'a>( + &'a self, + node: &'a Node, + ) -> QueryGroupHistory<'a, TokenIdGroup> { QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.token_id_history(), group: TokenIdGroup, + node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupUtxos`] for token IDs to query the utxos of token IDs pub fn token_id_utxos( &self, ) -> QueryGroupUtxos<'_, TokenIdGroup, UtxoProtobufOutput> { QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.token_id_utxos(), group: TokenIdGroup, utxo_mapper: UtxoProtobufOutput, is_token_index_enabled: self.is_token_index_enabled, } } /// Subscribers, behind read/write lock pub fn subs(&self) -> &RwLock { &self.subs } /// Build a ChronikBlock from a ffi::Block. pub fn make_chronik_block(&self, block: ffi::Block) -> ChronikBlock { let db_block = DbBlock { hash: BlockHash::from(block.hash), prev_hash: BlockHash::from(block.prev_hash), height: block.height, n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, }; let block_txs = BlockTxs { block_height: block.height, txs: block .txs .iter() .map(|tx| { let txid = TxId::from(tx.tx.txid); TxEntry { txid, data_pos: tx.data_pos, undo_pos: tx.undo_pos, time_first_seen: match self.mempool.tx(&txid) { Some(tx) => tx.time_first_seen, None => 0, }, is_coinbase: tx.undo_pos == 0, } }) .collect(), }; let txs = block .txs .into_iter() .map(|block_tx| Tx::from(block_tx.tx)) .collect::>(); ChronikBlock { db_block, block_txs, size: block.size, txs, } } /// Load a ChronikBlock from the node given the CBlockIndex. pub fn load_chronik_block( &self, bridge: &ffi::ChronikBridge, block_index: &ffi::CBlockIndex, ) -> Result { let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let ffi_block_undo = bridge.load_block_undo(block_index)?; let ffi_block_undo = expect_unique_ptr("load_block_undo", &ffi_block_undo); let block = ffi::bridge_block(ffi_block, ffi_block_undo, block_index)?; Ok(self.make_chronik_block(block)) } } fn verify_schema_version(db: &Db) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { Some(schema_version) => { assert!(!is_empty, "Empty DB can't have a schema version"); if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } if schema_version < CURRENT_INDEXER_VERSION { return Err(DatabaseOutdated(schema_version).into()); } } None => { if !is_empty { return Err(MissingSchemaVersion.into()); } let mut batch = WriteBatch::default(); metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; } } log!("Chronik has version {CURRENT_INDEXER_VERSION}\n"); Ok(()) } fn verify_enable_token_index(db: &Db, enable_token_index: bool) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let token_writer = TokenWriter::new(db)?; let is_empty = db.is_db_empty()?; let is_token_index_enabled = metadata_reader.is_token_index_enabled()?; let mut batch = WriteBatch::default(); if !is_empty { // Cannot enable token index if not already previously enabled if enable_token_index && !is_token_index_enabled { return Err(CannotEnableTokenIndex.into()); } // Wipe token index if previously enabled and now disabled if !enable_token_index && is_token_index_enabled { log!( "Warning: Wiping existing token index, since \ -chroniktokenindex=0\n" ); log!("You will need to -reindex/-chronikreindex to restore\n"); token_writer.wipe(&mut batch); } } metadata_writer .update_is_token_index_enabled(&mut batch, enable_token_index)?; db.write_batch(batch)?; Ok(()) } impl std::fmt::Debug for ChronikIndexerParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikIndexerParams") .field("datadir_net", &self.datadir_net) .field("wipe_db", &self.wipe_db) .field("fn_compress_script", &"..") .finish() } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use chronik_db::{ db::{Db, WriteBatch, CF_META}, io::{BlockReader, BlockTxs, DbBlock, MetadataReader, MetadataWriter}, }; use pretty_assertions::assert_eq; use crate::indexer::{ ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams, CURRENT_INDEXER_VERSION, }; #[test] fn test_indexer() -> Result<()> { let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); let params = ChronikIndexerParams { datadir_net: datadir_net.clone(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, }; // regtest folder doesn't exist yet -> error assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CreateDirFailed(datadir_net.join("indexes")), ); // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; let mut indexer = ChronikIndexer::setup(params.clone())?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); // DB is empty assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); let block = ChronikBlock { db_block: DbBlock { hash: BlockHash::from([4; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1deadbef, timestamp: 1234567890, file_num: 0, data_pos: 1337, }, block_txs: BlockTxs { block_height: 0, txs: vec![], }, size: 285, txs: vec![], }; // Add block indexer.handle_block_connected(block.clone())?; assert_eq!( BlockReader::new(&indexer.db)?.by_height(0)?, Some(block.db_block.clone()) ); // Remove block again indexer.handle_block_disconnected(block.clone())?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); // Add block then wipe, block not there indexer.handle_block_connected(block)?; std::mem::drop(indexer); let indexer = ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..params })?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); Ok(()) } #[test] fn test_schema_version() -> Result<()> { let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { datadir_net: dir.path().to_path_buf(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, }; // Setting up DB first time sets the schema version ChronikIndexer::setup(params.clone())?; { let db = Db::open(&chronik_path)?; assert_eq!( MetadataReader::new(&db)?.schema_version()?, Some(CURRENT_INDEXER_VERSION) ); } // Opening DB again works fine ChronikIndexer::setup(params.clone())?; // Override DB schema version to 0 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?; db.write_batch(batch)?; } // -> DB too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::DatabaseOutdated(0), ); // Override DB schema version to CURRENT_INDEXER_VERSION + 1 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version( &mut batch, CURRENT_INDEXER_VERSION + 1, )?; db.write_batch(batch)?; } // -> Chronik too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), ); // Corrupt schema version { let db = Db::open(&chronik_path)?; let cf_meta = db.cf(CF_META)?; let mut batch = WriteBatch::default(); batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]); db.write_batch(batch)?; } assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CorruptedSchemaVersion, ); // New db path, but has existing data let new_dir = dir.path().join("new"); let new_chronik_path = new_dir.join("indexes").join("chronik"); std::fs::create_dir_all(&new_chronik_path)?; let new_params = ChronikIndexerParams { datadir_net: new_dir, wipe_db: false, ..params }; { // new db with obscure field in meta let db = Db::open(&new_chronik_path)?; let mut batch = WriteBatch::default(); batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR"); db.write_batch(batch)?; } // Error: non-empty DB without schema version assert_eq!( ChronikIndexer::setup(new_params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..new_params })?; Ok(()) } } diff --git a/chronik/chronik-indexer/src/query/blocks.rs b/chronik/chronik-indexer/src/query/blocks.rs index 597587770..da06b2ddc 100644 --- a/chronik/chronik-indexer/src/query/blocks.rs +++ b/chronik/chronik-indexer/src/query/blocks.rs @@ -1,277 +1,281 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`QueryBlocks`], to query blocks. use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; -use chronik_bridge::ffi; use chronik_db::{ db::Db, io::{ BlockHeight, BlockReader, BlockStats, BlockStatsReader, DbBlock, SpentByReader, TxNum, TxReader, }, mem::Mempool, }; use chronik_proto::proto; use thiserror::Error; use crate::{ avalanche::Avalanche, + indexer::Node, query::{make_tx_proto, HashOrHeight, OutputsSpent, TxTokenData}, }; const MAX_BLOCKS_PAGE_SIZE: usize = 500; /// Smallest allowed page size pub const MIN_BLOCK_TXS_PAGE_SIZE: usize = 1; /// Largest allowed page size pub const MAX_BLOCK_TXS_PAGE_SIZE: usize = 200; /// Struct for querying blocks from the DB. #[derive(Debug)] pub struct QueryBlocks<'a> { /// Db. pub db: &'a Db, /// Avalanche. pub avalanche: &'a Avalanche, /// Mempool pub mempool: &'a Mempool, + /// Access to bitcoind to read txs + pub node: &'a Node, /// Whether the SLP/ALP token index is enabled pub is_token_index_enabled: bool, } /// Errors indicating something went wrong with querying blocks. #[derive(Debug, Error, PartialEq)] pub enum QueryBlockError { /// Block not found in DB #[error("404: Block not found: {0}")] BlockNotFound(String), /// Invalid block start height #[error("400: Invalid block start height: {0}")] InvalidStartHeight(BlockHeight), /// Invalid block end height #[error("400: Invalid block end height: {0}")] InvalidEndHeight(BlockHeight), /// Blocks page size too large #[error( "400: Blocks page size too large, may not be above {} but got {0}", MAX_BLOCKS_PAGE_SIZE )] BlocksPageSizeTooLarge(usize), /// DB is missing block stats #[error("500: Inconsistent DB: Missing block stats for height {0}")] MissingBlockStats(BlockHeight), /// Block has no txs #[error("500: Inconsistent DB: Block {0} has no txs")] BlockHasNoTx(BlockHeight), /// Block has tx_num that doesn't exist #[error("500: Inconsistent DB: block {0} has missing tx_num {1}")] BlockHasMissingTx(BlockHash, TxNum), /// Can only request page sizes below a certain maximum. #[error( "400: Requested block tx page size {0} is too big, maximum is {}", MAX_BLOCK_TXS_PAGE_SIZE )] RequestPageSizeTooBig(usize), /// Can only request page sizes above a certain minimum. #[error( "400: Requested block tx page size {0} is too small, minimum is {}", MIN_BLOCK_TXS_PAGE_SIZE )] RequestPageSizeTooSmall(usize), /// Reading failed, likely corrupted block data #[error("500: Reading {0} failed")] ReadFailure(TxId), } use self::QueryBlockError::*; impl<'a> QueryBlocks<'a> { /// Query a block by hash or height from DB. /// /// `height` may not have any leading zeros, because otherwise it might /// become ambiguous with a hash. pub fn by_hash_or_height( &self, hash_or_height: String, ) -> Result { let db_blocks = BlockReader::new(self.db)?; let block_stats_reader = BlockStatsReader::new(self.db)?; let db_block = match hash_or_height.parse::()? { HashOrHeight::Hash(hash) => db_blocks.by_hash(&hash)?, HashOrHeight::Height(height) => db_blocks.by_height(height)?, }; let db_block = db_block.ok_or(BlockNotFound(hash_or_height))?; let block_stats = block_stats_reader .by_height(db_block.height)? .ok_or(MissingBlockStats(db_block.height))?; Ok(proto::Block { block_info: Some( self.make_block_info_proto(&db_block, &block_stats), ), }) } /// Query blocks by a range of heights. Start and end height are inclusive. pub fn by_range( &self, start_height: BlockHeight, end_height: BlockHeight, ) -> Result { if start_height < 0 { return Err(InvalidStartHeight(start_height).into()); } if end_height < start_height { return Err(InvalidEndHeight(end_height).into()); } let num_blocks = end_height as usize - start_height as usize + 1; if num_blocks > MAX_BLOCKS_PAGE_SIZE { return Err(BlocksPageSizeTooLarge(num_blocks).into()); } let block_reader = BlockReader::new(self.db)?; let block_stats_reader = BlockStatsReader::new(self.db)?; let mut blocks = Vec::with_capacity(num_blocks); for block_height in start_height..=end_height { let block = block_reader.by_height(block_height)?; let block = match block { Some(block) => block, None => break, }; let block_stats = block_stats_reader .by_height(block_height)? .ok_or(MissingBlockStats(block_height))?; blocks.push(self.make_block_info_proto(&block, &block_stats)); } Ok(proto::Blocks { blocks }) } /// Query the txs of a block, paginated. pub fn block_txs( &self, hash_or_height: String, request_page_num: usize, request_page_size: usize, ) -> Result { if request_page_size < MIN_BLOCK_TXS_PAGE_SIZE { return Err(RequestPageSizeTooSmall(request_page_size).into()); } if request_page_size > MAX_BLOCK_TXS_PAGE_SIZE { return Err(RequestPageSizeTooBig(request_page_size).into()); } let block_reader = BlockReader::new(self.db)?; let tx_reader = TxReader::new(self.db)?; let spent_by_reader = SpentByReader::new(self.db)?; let db_block = match hash_or_height.parse::()? { HashOrHeight::Hash(hash) => block_reader.by_hash(&hash)?, HashOrHeight::Height(height) => block_reader.by_height(height)?, }; let db_block = db_block.ok_or(BlockNotFound(hash_or_height))?; let tx_range = tx_reader .block_tx_num_range(db_block.height)? .ok_or(BlockHasNoTx(db_block.height))?; let tx_offset = request_page_num.saturating_mul(request_page_size) as u64; let page_tx_num_start = tx_range.start.saturating_add(tx_offset).min(tx_range.end); let page_tx_num_end = page_tx_num_start .saturating_add(request_page_size as u64) .min(tx_range.end); let num_page_txs = (page_tx_num_end - page_tx_num_start) as usize; let mut txs = Vec::with_capacity(num_page_txs); for tx_num in page_tx_num_start..page_tx_num_end { let db_tx = tx_reader .tx_by_tx_num(tx_num)? .ok_or(BlockHasMissingTx(db_block.hash.clone(), tx_num))?; let tx = Tx::from( - ffi::load_tx( - db_block.file_num, - db_tx.entry.data_pos, - db_tx.entry.undo_pos, - ) - .wrap_err(ReadFailure(db_tx.entry.txid))?, + self.node + .bridge + .load_tx( + db_block.file_num, + db_tx.entry.data_pos, + db_tx.entry.undo_pos, + ) + .wrap_err(ReadFailure(db_tx.entry.txid))?, ); let outputs_spent = OutputsSpent::query( &spent_by_reader, &tx_reader, self.mempool.spent_by().outputs_spent(&db_tx.entry.txid), tx_num, )?; let token = TxTokenData::from_db( self.db, tx_num, &tx, self.is_token_index_enabled, )?; txs.push(make_tx_proto( &tx, &outputs_spent, db_tx.entry.time_first_seen, db_tx.entry.is_coinbase, Some(&db_block), self.avalanche, token.as_ref(), )); } let total_num_txs = (tx_range.end - tx_range.start) as usize; let total_num_pages = (total_num_txs + request_page_size - 1) / request_page_size; Ok(proto::TxHistoryPage { txs, num_pages: total_num_pages as u32, num_txs: total_num_txs as u32, }) } /// Query some info about the blockchain, e.g. the tip hash and height. pub fn blockchain_info(&self) -> Result { let block_reader = BlockReader::new(self.db)?; match block_reader.tip()? { Some(block) => Ok(proto::BlockchainInfo { tip_hash: block.hash.to_vec(), tip_height: block.height, }), None => Ok(proto::BlockchainInfo { tip_hash: vec![0; 32], tip_height: -1, }), } } fn make_block_info_proto( &self, db_block: &DbBlock, block_stats: &BlockStats, ) -> proto::BlockInfo { proto::BlockInfo { hash: db_block.hash.to_vec(), prev_hash: db_block.prev_hash.to_vec(), height: db_block.height, n_bits: db_block.n_bits, timestamp: db_block.timestamp, is_final: self.avalanche.is_final_height(db_block.height), block_size: block_stats.block_size, num_txs: block_stats.num_txs, num_inputs: block_stats.num_inputs, num_outputs: block_stats.num_outputs, sum_input_sats: block_stats.sum_input_sats, sum_coinbase_output_sats: block_stats.sum_coinbase_output_sats, sum_normal_output_sats: block_stats.sum_normal_output_sats, sum_burned_sats: block_stats.sum_burned_sats, } } } diff --git a/chronik/chronik-indexer/src/query/group_history.rs b/chronik/chronik-indexer/src/query/group_history.rs index 7c1c01c67..b55af06ce 100644 --- a/chronik/chronik-indexer/src/query/group_history.rs +++ b/chronik/chronik-indexer/src/query/group_history.rs @@ -1,411 +1,413 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`QueryGroupHistory`], to query the tx history of a group. use std::collections::BTreeSet; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; -use chronik_bridge::ffi; use chronik_db::{ db::Db, group::Group, io::{BlockReader, GroupHistoryReader, SpentByReader, TxNum, TxReader}, mem::{Mempool, MempoolGroupHistory}, }; use chronik_proto::proto; use chronik_util::log; use thiserror::Error; use crate::{ avalanche::Avalanche, + indexer::Node, query::{make_tx_proto, OutputsSpent, TxTokenData}, }; /// Smallest allowed page size pub const MIN_HISTORY_PAGE_SIZE: usize = 1; /// Largest allowed page size pub const MAX_HISTORY_PAGE_SIZE: usize = 200; static EMPTY_MEMBER_TX_HISTORY: BTreeSet<(i64, TxId)> = BTreeSet::new(); /// Query pages of the tx history of a group #[derive(Debug)] pub struct QueryGroupHistory<'a, G: Group> { /// Database pub db: &'a Db, /// Avalanche pub avalanche: &'a Avalanche, /// Mempool pub mempool: &'a Mempool, /// The part of the mempool we search for this group's history. pub mempool_history: &'a MempoolGroupHistory, /// Group to query txs by pub group: G, + /// Access to bitcoind to read txs + pub node: &'a Node, /// Whether the SLP/ALP token index is enabled pub is_token_index_enabled: bool, } /// Errors indicating something went wrong with reading txs. #[derive(Debug, Error, PartialEq)] pub enum QueryGroupHistoryError { /// Transaction not in mempool. #[error("500: Inconsistent mempool: Transaction {0} not in mempool")] MissingMempoolTx(TxId), /// tx_num in group index but not in "tx" CF. #[error("500: Inconsistent DB: Transaction num {0} not in DB")] MissingDbTx(TxNum), /// tx_num in DB but has no block. #[error("500: Inconsistent DB: Transaction num {0} has no block")] MissingDbTxBlock(TxNum), /// Can only request page sizes below a certain maximum. #[error( "400: Requested page size {0} is too big, maximum is {}", MAX_HISTORY_PAGE_SIZE )] RequestPageSizeTooBig(usize), /// Can only request page sizes below a certain minimum. #[error( "400: Requested page size {0} is too small, minimum is {}", MIN_HISTORY_PAGE_SIZE )] RequestPageSizeTooSmall(usize), } use self::QueryGroupHistoryError::*; impl<'a, G: Group> QueryGroupHistory<'a, G> { /// Return the confirmed txs of the group in the order as txs occur on the /// blockchain, i.e.: /// - Sorted by block height ascendingly. /// - Within a block, sorted as txs occur in the block. pub fn confirmed_txs( &self, member: G::Member<'_>, request_page_num: usize, request_page_size: usize, ) -> Result { if request_page_size < MIN_HISTORY_PAGE_SIZE { return Err(RequestPageSizeTooSmall(request_page_size).into()); } if request_page_size > MAX_HISTORY_PAGE_SIZE { return Err(RequestPageSizeTooBig(request_page_size).into()); } let db_reader = GroupHistoryReader::::new(self.db)?; let member_ser = self.group.ser_member(&member); let (num_db_pages, num_db_txs) = db_reader.member_num_pages_and_txs(member_ser.as_ref())?; let num_request_pages = (num_db_txs + request_page_size - 1) / request_page_size; let make_result = |txs: Vec| { if txs.len() != txs.capacity() { // We should've predicted exactly how many txs we'll return. log!("WARNING: Allocated more txs than needed\n"); } proto::TxHistoryPage { txs, num_pages: num_request_pages as u32, num_txs: num_db_txs as u32, } }; // Initial index in the list of all txs of this script. // On 32-bit, this could overflow, so we saturate. let first_tx_idx = request_page_num.saturating_mul(request_page_size); // Calculate how many txs we're going to return, and allocate sufficient // space. Handle out-of-range pages by saturating. // Since at most `num_returned_txs` can be MAX_HISTORY_PAGE_SIZE, OOM // attacks are not possible. let num_returned_txs = request_page_size.min(num_db_txs.saturating_sub(first_tx_idx)); let mut page_txs = Vec::with_capacity(num_returned_txs); // Short-circuit so we don't fetch DB again if no results. if num_returned_txs == 0 { return Ok(make_result(vec![])); } // First DB page to start reading from. let db_page_num_start = first_tx_idx / db_reader.page_size(); // First tx index with that page. let mut first_inner_idx = first_tx_idx % db_reader.page_size(); // Iterate DB pages, starting from the DB page the first tx is in. // Since DB pages are much larger than MAX_HISTORY_PAGE_SIZE, this will // only fetch 2 pages at most. for current_page_num in db_page_num_start..num_db_pages { let db_page_tx_nums = db_reader .page_txs(member_ser.as_ref(), current_page_num as u32)? .unwrap_or_default(); for &tx_num in db_page_tx_nums.iter().skip(first_inner_idx) { page_txs.push(self.read_block_tx(tx_num)?); // We filled up the requested page size -> return if page_txs.len() == request_page_size { return Ok(make_result(page_txs)); } } first_inner_idx = 0; } // Couldn't fill requested page size completely Ok(make_result(page_txs)) } /// Return the group history in reverse chronological order, i.e. the latest /// one first, including mempool txs. /// /// We start pages at the most recent mempool tx, go backwards in the /// mempool until we reach the oldest tx in the mempool, then continue with /// the most recent DB tx and go backwards from there. /// /// Note that unlike `confirmed_txs` and `unconfirmed_txs`, the order of txs /// observed by fetching multiple pages can change if new txs are added, or /// the page size is changed. This is because txs are fetched from the DB /// the order they appear on the blockchain, and only then are sorted by /// time_first_seen. /// /// This means that if tx1 < tx2 wrt time_first_seen, but tx2 < tx1 wrt /// txid, tx1 would be ordered *before* tx2 if they are in the same block /// (because of time_first_seen), but tx1 might be cut out for other page /// sizes entirely, because it isn't even queried because it comes "too /// late" on the blockchain (because of txid). /// /// We accept this trade-off, because if we wanted to always get consistent /// order here, we'd need to sort txs by time_first_seen in the DB, which /// isn't a very reliable metric. We could also continue reading more txs /// until we run into a new block, but that could open potential DoS /// attacks. And in practice this ordering isn't a big issue, as most people /// are mostly interested in the "latest" txs of the address. pub fn rev_history( &self, member: G::Member<'_>, request_page_num: usize, request_page_size: usize, ) -> Result { if request_page_size < MIN_HISTORY_PAGE_SIZE { return Err(RequestPageSizeTooSmall(request_page_size).into()); } if request_page_size > MAX_HISTORY_PAGE_SIZE { return Err(RequestPageSizeTooBig(request_page_size).into()); } let db_reader = GroupHistoryReader::::new(self.db)?; let member_ser = self.group.ser_member(&member); let (_, num_db_txs) = db_reader.member_num_pages_and_txs(member_ser.as_ref())?; // How many txs in total to skip, beginning from the mempool txs, and // then continuing backwards into the DB txs. let request_tx_offset = request_page_num.saturating_mul(request_page_size); // All the mempool txs for this member let mempool_txs = self .mempool_history .member_history(member_ser.as_ref()) .unwrap_or(&EMPTY_MEMBER_TX_HISTORY); let total_num_txs = mempool_txs.len() + num_db_txs; let total_num_pages = (total_num_txs + request_page_size - 1) / request_page_size; let make_result = |txs: Vec| { assert_eq!(txs.len(), txs.capacity()); proto::TxHistoryPage { txs, num_pages: total_num_pages as u32, num_txs: total_num_txs as u32, } }; // Number of mempool txs in the result. // We saturate to clip numbers to >= 0. let num_page_mempool_txs = request_page_size .min(mempool_txs.len().saturating_sub(request_tx_offset)); // Backwards offset into the DB. If this were zero, we'd start reading // at the last tx in the DB. let request_db_tx_offset = request_tx_offset.saturating_sub(mempool_txs.len()); // DB txs left after skipping the requested offset. let num_db_txs_available = num_db_txs.saturating_sub(request_db_tx_offset); // How many DB txs we can return at most; the page could already be // partially filled with mempool txs. This cannot overflow as // num_page_mempool_txs <= request_page_size. let max_page_db_txs = request_page_size - num_page_mempool_txs; // How many DB txs we return. It's either the number of txs we have left // in the DB or the maximum we can still put on the page. let num_page_db_txs = max_page_db_txs.min(num_db_txs_available); // Allocate sufficient space for the txs on the page. let mut page_txs = Vec::with_capacity(num_page_mempool_txs + num_page_db_txs); // Add the requested mempool txs, we skip over the requested offset, and // take only as many as we can put on the page. let page_mempool_txs_iter = mempool_txs .iter() .rev() .skip(request_tx_offset) .take(request_page_size); for (_, txid) in page_mempool_txs_iter { let entry = self.mempool.tx(txid).ok_or(MissingMempoolTx(*txid))?; page_txs.push(make_tx_proto( &entry.tx, &OutputsSpent::new_mempool( self.mempool.spent_by().outputs_spent(txid), ), entry.time_first_seen, false, None, self.avalanche, TxTokenData::from_mempool(self.mempool.tokens(), &entry.tx) .as_ref(), )); } // If we filled up the page with mempool txs, or there's no DB txs on // this page, we can return early to avoid reading the DB. if num_page_mempool_txs == request_page_size || num_page_db_txs == 0 { return Ok(make_result(page_txs)); } // Initial index to start reading from in the list of all DB txs of this // member. We then iterate this backwards, until we fill the page or hit // the first DB tx of the member. // Note that this never overflows, as num_db_txs_available > 0 due to // the check num_page_db_txs == 0. let first_db_tx_idx = num_db_txs_available - 1; // First DB page to start reading from, from there we go backwards. let db_page_num_start = first_db_tx_idx / db_reader.page_size(); // First tx index within that page, from there we go backwards. let mut first_inner_idx = first_db_tx_idx % db_reader.page_size(); 'outer: for current_page_num in (0..=db_page_num_start).rev() { let db_page_tx_nums = db_reader .page_txs(member_ser.as_ref(), current_page_num as u32)? .unwrap_or_default(); for inner_idx in (0..=first_inner_idx).rev() { let tx_num = db_page_tx_nums[inner_idx]; page_txs.push(self.read_block_tx(tx_num)?); // Filled up page: break out of outer loop. if page_txs.len() == request_page_size { break 'outer; } } first_inner_idx = db_reader.page_size() - 1; } // We use stable sort, so the block order is retained when timestamps // are identical. page_txs[num_page_mempool_txs..].sort_by_key(|tx| { match (&tx.block, tx.time_first_seen) { // Within blocks, sort txs without time_first_seen first (Some(block), 0) => (-block.height, i64::MIN), // Otherwise, sort by time_first_seen within blocks (Some(block), time_first_seen) => { (-block.height, -time_first_seen) } (None, _) => unreachable!("We skip sorting mempool txs"), } }); Ok(make_result(page_txs)) } /// Return the unconfirmed txs (i.e. all txs in the mempool) in first-seen /// order. If two txs have been seen at the same second, we order them by /// txid. /// /// This should always be small enough to return without pagination, but /// just to be future-proof, we always pretend as if there's exactly one /// page with all the txs (or 0 pages if there's no txs), so we can add /// pagination later. pub fn unconfirmed_txs( &self, member: G::Member<'_>, ) -> Result { let member_ser = self.group.ser_member(&member); let txs = match self.mempool_history.member_history(member_ser.as_ref()) { Some(mempool_txs) => mempool_txs .iter() .map(|(_, txid)| -> Result<_> { let entry = self.mempool.tx(txid).ok_or(MissingMempoolTx(*txid))?; Ok(make_tx_proto( &entry.tx, &OutputsSpent::new_mempool( self.mempool.spent_by().outputs_spent(txid), ), entry.time_first_seen, false, None, self.avalanche, TxTokenData::from_mempool( self.mempool.tokens(), &entry.tx, ) .as_ref(), )) }) .collect::>>()?, None => vec![], }; Ok(proto::TxHistoryPage { num_pages: if txs.is_empty() { 0 } else { 1 }, num_txs: txs.len() as u32, txs, }) } fn read_block_tx(&self, tx_num: TxNum) -> Result { let tx_reader = TxReader::new(self.db)?; let block_reader = BlockReader::new(self.db)?; let spent_by_reader = SpentByReader::new(self.db)?; let block_tx = tx_reader.tx_by_tx_num(tx_num)?.ok_or(MissingDbTx(tx_num))?; let block = block_reader .by_height(block_tx.block_height)? .ok_or(MissingDbTxBlock(tx_num))?; - let tx = Tx::from(ffi::load_tx( + let tx = Tx::from(self.node.bridge.load_tx( block.file_num, block_tx.entry.data_pos, block_tx.entry.undo_pos, )?); let outputs_spent = OutputsSpent::query( &spent_by_reader, &tx_reader, self.mempool.spent_by().outputs_spent(&block_tx.entry.txid), tx_num, )?; let token = TxTokenData::from_db( self.db, tx_num, &tx, self.is_token_index_enabled, )?; Ok(make_tx_proto( &tx, &outputs_spent, block_tx.entry.time_first_seen, block_tx.entry.is_coinbase, Some(&block), self.avalanche, token.as_ref(), )) } } diff --git a/chronik/chronik-indexer/src/query/txs.rs b/chronik/chronik-indexer/src/query/txs.rs index 46d6b9fd5..6a1d45b0c 100644 --- a/chronik/chronik-indexer/src/query/txs.rs +++ b/chronik/chronik-indexer/src/query/txs.rs @@ -1,164 +1,170 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`QueryTxs`], to query txs from mempool/db. use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ ser::BitcoinSer, tx::{Tx, TxId}, }; -use chronik_bridge::ffi; use chronik_db::{ db::Db, io::{BlockReader, SpentByReader, TxReader}, mem::Mempool, }; use chronik_proto::proto; use thiserror::Error; use crate::{ avalanche::Avalanche, + indexer::Node, query::{ make_genesis_info_proto, make_token_type_proto, make_tx_proto, read_token_info, OutputsSpent, QueryTxError::*, TxTokenData, }, }; /// Struct for querying txs from the db/mempool. #[derive(Debug)] pub struct QueryTxs<'a> { /// Database pub db: &'a Db, /// Avalanche pub avalanche: &'a Avalanche, /// Mempool pub mempool: &'a Mempool, + /// Access to bitcoind to read txs + pub node: &'a Node, /// Whether the SLP/ALP token index is enabled pub is_token_index_enabled: bool, } /// Errors indicating something went wrong with reading txs. #[derive(Debug, Error, PartialEq)] pub enum QueryTxError { /// Transaction not in mempool nor DB. #[error("404: Transaction {0} not found in the index")] TxNotFound(TxId), /// Token not found in mempool nor DB. #[error("404: Token {0} not found in the index")] TokenNotFound(TxId), /// Transaction in DB without block #[error("500: Inconsistent DB: {0} has no block")] DbTxHasNoBlock(TxId), /// Reading failed, likely corrupted block data #[error("500: Reading {0} failed")] ReadFailure(TxId), } impl<'a> QueryTxs<'a> { /// Query a tx by txid from the mempool or DB. pub fn tx_by_id(&self, txid: TxId) -> Result { match self.mempool.tx(&txid) { Some(tx) => Ok(make_tx_proto( &tx.tx, &OutputsSpent::new_mempool( self.mempool.spent_by().outputs_spent(&txid), ), tx.time_first_seen, false, None, self.avalanche, TxTokenData::from_mempool(self.mempool.tokens(), &tx.tx) .as_ref(), )), None => { let tx_reader = TxReader::new(self.db)?; let (tx_num, block_tx) = tx_reader .tx_and_num_by_txid(&txid)? .ok_or(TxNotFound(txid))?; let tx_entry = block_tx.entry; let block_reader = BlockReader::new(self.db)?; let spent_by_reader = SpentByReader::new(self.db)?; let block = block_reader .by_height(block_tx.block_height)? .ok_or(DbTxHasNoBlock(txid))?; let tx = Tx::from( - ffi::load_tx( - block.file_num, - tx_entry.data_pos, - tx_entry.undo_pos, - ) - .wrap_err(ReadFailure(txid))?, + self.node + .bridge + .load_tx( + block.file_num, + tx_entry.data_pos, + tx_entry.undo_pos, + ) + .wrap_err(ReadFailure(txid))?, ); let outputs_spent = OutputsSpent::query( &spent_by_reader, &tx_reader, self.mempool.spent_by().outputs_spent(&txid), tx_num, )?; let token = TxTokenData::from_db( self.db, tx_num, &tx, self.is_token_index_enabled, )?; Ok(make_tx_proto( &tx, &outputs_spent, tx_entry.time_first_seen, tx_entry.is_coinbase, Some(&block), self.avalanche, token.as_ref(), )) } } } /// Query the raw serialized tx by txid. /// /// Serializes the tx if it's in the mempool, or reads the tx data from the /// node's storage otherwise. pub fn raw_tx_by_id(&self, txid: &TxId) -> Result { let raw_tx = match self.mempool.tx(txid) { Some(mempool_tx) => mempool_tx.tx.ser().to_vec(), None => { let tx_reader = TxReader::new(self.db)?; let block_reader = BlockReader::new(self.db)?; let block_tx = tx_reader.tx_by_txid(txid)?.ok_or(TxNotFound(*txid))?; let block = block_reader .by_height(block_tx.block_height)? .ok_or(DbTxHasNoBlock(*txid))?; - ffi::load_raw_tx(block.file_num, block_tx.entry.data_pos) + self.node + .bridge + .load_raw_tx(block.file_num, block_tx.entry.data_pos) .wrap_err(ReadFailure(*txid))? } }; Ok(proto::RawTx { raw_tx }) } /// Get token info of the token by token ID. pub fn token_info(&self, token_id_txid: &TxId) -> Result { let token_info = read_token_info( self.db, self.mempool, self.avalanche, token_id_txid, )? .ok_or(TokenNotFound(*token_id_txid))?; let meta = &token_info.meta; Ok(proto::TokenInfo { token_id: meta.token_id.to_string(), token_type: Some(make_token_type_proto(meta.token_type)), genesis_info: Some(make_genesis_info_proto( &token_info.genesis_info, )), block: token_info.block, time_first_seen: token_info.time_first_seen, }) } } diff --git a/chronik/test/bridgeprimitives_tests.cpp b/chronik/test/bridgeprimitives_tests.cpp index d6f64fbee..f22c2090a 100644 --- a/chronik/test/bridgeprimitives_tests.cpp +++ b/chronik/test/bridgeprimitives_tests.cpp @@ -1,484 +1,486 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. // Test to verify all the Bridge* functions in chronik_bridge.cpp are working // correctly. #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace chronik::util; BOOST_AUTO_TEST_SUITE(bridgeprimitives_tests) void CheckTxsEqual(const chronik_bridge::Tx &left, const chronik_bridge::Tx &right) { BOOST_CHECK_EQUAL(HexStr(left.txid), HexStr(right.txid)); BOOST_CHECK_EQUAL(left.version, right.version); BOOST_CHECK_EQUAL(left.locktime, right.locktime); BOOST_CHECK_EQUAL(left.inputs.size(), right.inputs.size()); for (size_t inputIdx = 0; inputIdx < left.inputs.size(); ++inputIdx) { const chronik_bridge::TxInput &inLeft = left.inputs[inputIdx]; const chronik_bridge::TxInput &inRight = right.inputs.at(inputIdx); BOOST_CHECK_EQUAL(HexStr(inLeft.prev_out.txid), HexStr(inRight.prev_out.txid)); BOOST_CHECK_EQUAL(inLeft.prev_out.out_idx, inRight.prev_out.out_idx); BOOST_CHECK_EQUAL(HexStr(inLeft.script), HexStr(inRight.script)); BOOST_CHECK_EQUAL(inLeft.sequence, inRight.sequence); BOOST_CHECK_EQUAL(inLeft.coin.output.value, inRight.coin.output.value); BOOST_CHECK_EQUAL(HexStr(inLeft.coin.output.script), HexStr(inRight.coin.output.script)); BOOST_CHECK_EQUAL(inLeft.coin.height, inRight.coin.height); BOOST_CHECK_EQUAL(inLeft.coin.is_coinbase, inRight.coin.is_coinbase); } BOOST_CHECK_EQUAL(left.outputs.size(), right.outputs.size()); for (size_t outputIdx = 0; outputIdx < left.outputs.size(); ++outputIdx) { const chronik_bridge::TxOutput &outLeft = left.outputs[outputIdx]; const chronik_bridge::TxOutput &outRight = right.outputs.at(outputIdx); BOOST_CHECK_EQUAL(outLeft.value, outRight.value); BOOST_CHECK_EQUAL(HexStr(outLeft.script), HexStr(outRight.script)); } } void CheckBlocksEqual(const chronik_bridge::Block &left, const chronik_bridge::Block &right) { BOOST_CHECK_EQUAL(HexStr(left.hash), HexStr(right.hash)); BOOST_CHECK_EQUAL(HexStr(left.prev_hash), HexStr(right.prev_hash)); BOOST_CHECK_EQUAL(left.n_bits, right.n_bits); BOOST_CHECK_EQUAL(left.timestamp, right.timestamp); BOOST_CHECK_EQUAL(left.height, right.height); BOOST_CHECK_EQUAL(left.file_num, right.file_num); BOOST_CHECK_EQUAL(left.data_pos, right.data_pos); BOOST_CHECK_EQUAL(left.undo_pos, right.undo_pos); BOOST_CHECK_EQUAL(left.size, right.size); BOOST_CHECK_EQUAL(left.txs.size(), right.txs.size()); for (size_t txIdx = 0; txIdx < left.txs.size(); ++txIdx) { const chronik_bridge::BlockTx &txLeft = left.txs[txIdx]; const chronik_bridge::BlockTx &txRight = right.txs.at(txIdx); BOOST_CHECK_EQUAL(txLeft.data_pos, txRight.data_pos); BOOST_CHECK_EQUAL(txLeft.undo_pos, txRight.undo_pos); CheckTxsEqual(txLeft.tx, txRight.tx); } } void CheckMatchesDisk(const CBlock &block, const chronik_bridge::Block &bridgedBlock) { for (size_t idx = 0; idx < block.vtx.size(); ++idx) { const chronik_bridge::BlockTx blockTx = bridgedBlock.txs[idx]; CMutableTransaction txFromDisk; BOOST_CHECK(node::ReadTxFromDisk( txFromDisk, FlatFilePos(bridgedBlock.file_num, blockTx.data_pos))); BOOST_CHECK(txFromDisk.GetHash() == block.vtx[idx]->GetHash()); if (idx == 0) { continue; } CTxUndo txundo; BOOST_CHECK(node::ReadTxUndoFromDisk( txundo, FlatFilePos(bridgedBlock.file_num, blockTx.undo_pos))); BOOST_CHECK_EQUAL(txundo.vprevout.size(), txFromDisk.vin.size()); for (size_t inputIdx = 0; inputIdx < blockTx.tx.inputs.size(); ++inputIdx) { const Coin &coin = txundo.vprevout[inputIdx]; const chronik_bridge::Coin &bridgeCoin = blockTx.tx.inputs[inputIdx].coin; BOOST_CHECK_EQUAL(coin.GetTxOut().nValue / SATOSHI, bridgeCoin.output.value); BOOST_CHECK_EQUAL(HexStr(coin.GetTxOut().scriptPubKey), HexStr(bridgeCoin.output.script)); BOOST_CHECK_EQUAL(coin.GetHeight(), bridgeCoin.height); BOOST_CHECK_EQUAL(coin.IsCoinBase(), bridgeCoin.is_coinbase); } } } BOOST_FIXTURE_TEST_CASE(test_bridge_genesis, TestChain100Setup) { LOCK(cs_main); const CChainParams ¶ms = GetConfig().GetChainParams(); const chronik_bridge::ChronikBridge bridge(params.GetConsensus(), m_node); ChainstateManager &chainman = *Assert(m_node.chainman); CBlockIndex *pgenesis = chainman.ActiveTip()->GetAncestor(0); const CBlock &genesisBlock = params.GenesisBlock(); // Loading genesis unblock data returns an empty undo std::unique_ptr genesisBlockUndo = bridge.load_block_undo(*pgenesis); BOOST_CHECK(genesisBlockUndo->vtxundo.empty()); chronik_bridge::Block bridgedGenesisBlock = chronik_bridge::bridge_block( genesisBlock, *genesisBlockUndo, *pgenesis); chronik_bridge::Tx expectedGenesisTx = { .txid = HashToArray(genesisBlock.vtx[0]->GetId()), .version = 1, .inputs = {{ .prev_out = chronik_bridge::OutPoint({ .txid = {}, .out_idx = 0xffff'ffff, }), .script = ToRustVec(genesisBlock.vtx[0]->vin[0].scriptSig), .sequence = 0xffff'ffff, .coin = {}, // null coin }}, .outputs = {{ .value = 5000000000, .script = ToRustVec(genesisBlock.vtx[0]->vout[0].scriptPubKey), }}, .locktime = 0, }; chronik_bridge::Block expectedBridgedGenesisBlock = { .hash = HashToArray(genesisBlock.GetHash()), .prev_hash = {}, .n_bits = 0x207fffff, .timestamp = 1296688602, .height = 0, .file_num = 0, .data_pos = 8, // 8 magic bytes in block file .undo_pos = 0, // genesis has no undo data .size = 285, .txs = {{ .tx = expectedGenesisTx, .data_pos = 89, // +80 header +1 compact size .undo_pos = 0 // coinbase has no undo data }}}; CheckBlocksEqual(bridgedGenesisBlock, expectedBridgedGenesisBlock); chronik_bridge::BlockTx &bridgedGenesisTx = bridgedGenesisBlock.txs[0]; CMutableTransaction genesisTxFromDisk; BOOST_CHECK(node::ReadTxFromDisk( genesisTxFromDisk, FlatFilePos(bridgedGenesisBlock.file_num, bridgedGenesisTx.data_pos))); BOOST_CHECK(genesisTxFromDisk.GetHash() == genesisBlock.vtx[0]->GetHash()); - CheckTxsEqual(chronik_bridge::load_tx(bridgedGenesisBlock.file_num, - bridgedGenesisTx.data_pos, - bridgedGenesisTx.undo_pos), + CheckTxsEqual(bridge.load_tx(bridgedGenesisBlock.file_num, + bridgedGenesisTx.data_pos, + bridgedGenesisTx.undo_pos), bridgedGenesisTx.tx); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << genesisBlock.vtx[0]; - BOOST_CHECK_EQUAL(HexStr(ss), HexStr(chronik_bridge::load_raw_tx( - bridgedGenesisBlock.file_num, - bridgedGenesisTx.data_pos))); + BOOST_CHECK_EQUAL(HexStr(ss), + HexStr(bridge.load_raw_tx(bridgedGenesisBlock.file_num, + bridgedGenesisTx.data_pos))); } BOOST_FIXTURE_TEST_CASE(test_bridge_detailled, TestChain100Setup) { LOCK(cs_main); const CChainParams ¶ms = GetConfig().GetChainParams(); const chronik_bridge::ChronikBridge bridge(params.GetConsensus(), m_node); ChainstateManager &chainman = *Assert(m_node.chainman); CBlock coinsBlock = CreateAndProcessBlock({}, CScript() << OP_1, &chainman.ActiveChainstate()); mineBlocks(100); CScript scriptPad = CScript() << OP_RETURN << std::vector(100); CMutableTransaction tx1; tx1.nVersion = 1; tx1.vin = {CTxIn(coinsBlock.vtx[0]->GetId(), 0)}; tx1.vout = { CTxOut(50 * COIN - 10000 * SATOSHI, CScript() << OP_3), CTxOut(1000 * SATOSHI, CScript() << OP_4), CTxOut(Amount::zero(), scriptPad), }; tx1.nLockTime = 123; CMutableTransaction tx2; tx2.nVersion = 1; tx2.vin = {CTxIn(tx1.GetId(), 0), CTxIn(tx1.GetId(), 1)}; tx2.vout = { CTxOut(50 * COIN - 30000 * SATOSHI, CScript() << OP_5), CTxOut(Amount::zero(), scriptPad), }; BOOST_CHECK(tx1.GetId() < tx2.GetId()); CBlock testBlock = CreateAndProcessBlock({tx1, tx2}, CScript() << OP_2, &chainman.ActiveChainstate()); std::unique_ptr testBlockUndo = bridge.load_block_undo(*chainman.ActiveTip()); BOOST_CHECK_EQUAL(chainman.ActiveTip()->GetBlockHash(), testBlock.GetHash()); chronik_bridge::Block bridgedTestBlock = chronik_bridge::bridge_block( testBlock, *testBlockUndo, *chainman.ActiveTip()); chronik_bridge::Tx expectedTestTx0 = { .txid = HashToArray(testBlock.vtx[0]->GetId()), .version = 2, .inputs = {{ .prev_out = chronik_bridge::OutPoint({ .txid = {}, .out_idx = 0xffff'ffff, }), .script = ToRustVec(testBlock.vtx[0]->vin[0].scriptSig), .sequence = 0xffff'ffff, .coin = {}, // null coin }}, .outputs = {{ .value = 2500000000, .script = {0x52}, }}, .locktime = 0, }; chronik_bridge::Tx expectedTestTx1 = { .txid = HashToArray(tx1.GetId()), .version = 1, .inputs = {{ .prev_out = chronik_bridge::OutPoint({ .txid = HashToArray(coinsBlock.vtx[0]->GetId()), .out_idx = 0, }), .script = {}, .sequence = 0xffff'ffff, .coin = chronik_bridge::Coin({ .output = {5000000000, {0x51}}, .height = 101, .is_coinbase = true, }), }}, .outputs = {{4999990000, {0x53}}, {1000, {0x54}}, {0, ToRustVec(scriptPad)}}, .locktime = 123, }; chronik_bridge::Tx expectedTestTx2 = { .txid = HashToArray(tx2.GetId()), .version = 1, .inputs = {chronik_bridge::TxInput({ .prev_out = chronik_bridge::OutPoint({ .txid = HashToArray(tx1.GetId()), .out_idx = 0, }), .script = {}, .sequence = 0xffff'ffff, .coin = { .output = {4999990000, {0x53}}, .height = 202, .is_coinbase = false, }, }), chronik_bridge::TxInput({ .prev_out = chronik_bridge::OutPoint({ .txid = HashToArray(tx1.GetId()), .out_idx = 1, }), .script = {}, .sequence = 0xffff'ffff, .coin = { .output = {1000, {0x54}}, .height = 202, .is_coinbase = false, }, })}, .outputs = {{ .value = 4999970000, .script = {0x55}, }, { .value = 0, .script = ToRustVec(scriptPad), }}, .locktime = 0, }; chronik_bridge::Block expectedBridgedTestBlock = { .hash = HashToArray(testBlock.GetHash()), .prev_hash = HashToArray(testBlock.hashPrevBlock), .n_bits = 0x207fffff, .timestamp = 1598888152, .height = 202, .file_num = 0, .data_pos = 39548, .undo_pos = 8249, .size = 578, .txs = { {.tx = expectedTestTx0, .data_pos = 39629, .undo_pos = 0}, {.tx = expectedTestTx1, .data_pos = 39729, .undo_pos = 8250}, {.tx = expectedTestTx2, .data_pos = 39912, .undo_pos = 8257}, }}; CheckBlocksEqual(bridgedTestBlock, expectedBridgedTestBlock); CheckMatchesDisk(testBlock, bridgedTestBlock); for (const chronik_bridge::BlockTx &bridgedTx : bridgedTestBlock.txs) { - CheckTxsEqual(chronik_bridge::load_tx(bridgedTestBlock.file_num, - bridgedTx.data_pos, - bridgedTx.undo_pos), + CheckTxsEqual(bridge.load_tx(bridgedTestBlock.file_num, + bridgedTx.data_pos, bridgedTx.undo_pos), bridgedTx.tx); } for (size_t i = 0; i < testBlock.vtx.size(); ++i) { CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << testBlock.vtx[i]; - BOOST_CHECK_EQUAL(HexStr(ss), HexStr(chronik_bridge::load_raw_tx( + BOOST_CHECK_EQUAL(HexStr(ss), HexStr(bridge.load_raw_tx( bridgedTestBlock.file_num, bridgedTestBlock.txs[i].data_pos))); } } BOOST_FIXTURE_TEST_CASE(test_bridge_bad, TestChain100Setup) { LOCK(cs_main); const CChainParams ¶ms = GetConfig().GetChainParams(); const chronik_bridge::ChronikBridge bridge(params.GetConsensus(), m_node); ChainstateManager &chainman = *Assert(m_node.chainman); // Incompatible CBlock and CBlockUndo: // CBlock has a tx that the CBlockUndo doesn't have. CBlock badBlock1 = CreateBlock({CMutableTransaction()}, CScript() << OP_1, chainman.ActiveChainstate()); CBlockUndo badBlockUndo1; BOOST_CHECK_EXCEPTION(chronik_bridge::bridge_block(badBlock1, badBlockUndo1, *chainman.ActiveTip()), std::runtime_error, [](const std::runtime_error &ex) { BOOST_CHECK_EQUAL(ex.what(), "Missing undo data for tx"); return true; }); CBlock coinsBlock = CreateAndProcessBlock({}, CScript() << OP_1, &chainman.ActiveChainstate()); mineBlocks(100); // Create block with some undo data CMutableTransaction tx; tx.nVersion = 1; tx.vin = {CTxIn(coinsBlock.vtx[0]->GetId(), 0)}; tx.vout = {CTxOut(50 * COIN - 10000 * SATOSHI, CScript() << OP_RETURN << std::vector(100))}; CreateAndProcessBlock({tx}, CScript() << OP_1, &chainman.ActiveChainstate()); std::unique_ptr blockUndo = bridge.load_block_undo(*chainman.ActiveTip()); // This time, bad CBlock has two inputs whereas the disk has only one. CMutableTransaction badTx; badTx.vin.resize(2); CBlock badBlock2 = CreateBlock({badTx}, CScript() << OP_1, chainman.ActiveChainstate()); BOOST_CHECK_EXCEPTION(chronik_bridge::bridge_block(badBlock2, *blockUndo, *chainman.ActiveTip()), std::runtime_error, [](const std::runtime_error &ex) { BOOST_CHECK_EQUAL(ex.what(), "Missing coin for input"); return true; }); } // It's easy to make a hard to detect off-by-one error when using // GetSizeOfCompactSize, therefore we test blocks with "dangerous" number of // txs, which cover the cases where GetSizeOfCompactSize goes from 1 -> 3 -> 5. BOOST_FIXTURE_TEST_CASE(test_bridge_big, TestChain100Setup) { LOCK(cs_main); const CChainParams ¶ms = GetConfig().GetChainParams(); const chronik_bridge::ChronikBridge bridge(params.GetConsensus(), m_node); ChainstateManager &chainman = *Assert(m_node.chainman); std::vector testNumTxsCases = { 0, 1, 2, 3, 10, 62, 63, 64, 65, 126, 127, 128, 129, 130, 250, 251, 252, 253, 254, 255, 256, 257, 258, 65536}; std::vector coinblocks; for (size_t idx = 0; idx < testNumTxsCases.size(); ++idx) { CBlock coinblock = CreateAndProcessBlock({}, CScript() << OP_1, &chainman.ActiveChainstate()); coinblocks.push_back(coinblock); BOOST_CHECK_EQUAL(chainman.ActiveTip()->GetBlockHash(), coinblock.GetHash()); } mineBlocks(100); CScript scriptPad = CScript() << OP_RETURN << std::vector(100); for (size_t idx = 0; idx < testNumTxsCases.size(); ++idx) { size_t numTxs = testNumTxsCases[idx]; std::vector txs; txs.reserve(numTxs); const CTransactionRef &coinbase = coinblocks[idx].vtx[0]; Amount available = coinbase->vout[0].nValue; TxId coinTxId = coinbase->GetId(); for (size_t txIdx = 0; txIdx < numTxs; ++txIdx) { CMutableTransaction tx; tx.nVersion = 1; tx.vin = {CTxIn(coinTxId, 0)}; available -= 10000 * SATOSHI; tx.vout = { CTxOut(available, CScript() << OP_2), CTxOut(Amount::zero(), scriptPad), }; coinTxId = tx.GetId(); txs.push_back(tx); } CBlock testBlock = CreateAndProcessBlock(txs, CScript() << OP_1, &chainman.ActiveChainstate()); BOOST_CHECK_EQUAL(chainman.ActiveTip()->GetBlockHash(), testBlock.GetHash()); std::unique_ptr testBlockUndo = bridge.load_block_undo(*chainman.ActiveTip()); // test matches disk chronik_bridge::Block bridgedBlock = chronik_bridge::bridge_block( testBlock, *testBlockUndo, *chainman.ActiveTip()); CheckMatchesDisk(testBlock, bridgedBlock); for (const chronik_bridge::BlockTx &bridgedTx : bridgedBlock.txs) { - CheckTxsEqual(chronik_bridge::load_tx(bridgedBlock.file_num, - bridgedTx.data_pos, - bridgedTx.undo_pos), + CheckTxsEqual(bridge.load_tx(bridgedBlock.file_num, + bridgedTx.data_pos, + bridgedTx.undo_pos), bridgedTx.tx); } } } BOOST_FIXTURE_TEST_CASE(test_load_tx_bad, TestChain100Setup) { + const CChainParams ¶ms = GetConfig().GetChainParams(); + const chronik_bridge::ChronikBridge bridge(params.GetConsensus(), m_node); + BOOST_CHECK_EXCEPTION( - chronik_bridge::load_tx(0x7fffffff, 0, 0), std::runtime_error, + bridge.load_tx(0x7fffffff, 0, 0), std::runtime_error, [](const std::runtime_error &ex) { BOOST_CHECK_EQUAL(ex.what(), "Reading tx data from disk failed"); return true; }); BOOST_CHECK_EXCEPTION( - chronik_bridge::load_tx(0, 0x7fffffff, 0), std::runtime_error, + bridge.load_tx(0, 0x7fffffff, 0), std::runtime_error, [](const std::runtime_error &ex) { BOOST_CHECK_EQUAL(ex.what(), "Reading tx data from disk failed"); return true; }); uint32_t genesisCbDataPos = 89; - BOOST_CHECK_EXCEPTION( - chronik_bridge::load_tx(0, genesisCbDataPos, 0x7fffffff), - std::runtime_error, [](const std::runtime_error &ex) { - BOOST_CHECK_EQUAL(ex.what(), - "Reading tx undo data from disk failed"); - return true; - }); + BOOST_CHECK_EXCEPTION(bridge.load_tx(0, genesisCbDataPos, 0x7fffffff), + std::runtime_error, [](const std::runtime_error &ex) { + BOOST_CHECK_EQUAL( + ex.what(), + "Reading tx undo data from disk failed"); + return true; + }); // sanity check - chronik_bridge::load_tx(0, genesisCbDataPos, 0); + bridge.load_tx(0, genesisCbDataPos, 0); } BOOST_AUTO_TEST_SUITE_END()