diff --git a/chronik/chronik-cpp/chronik.cpp b/chronik/chronik-cpp/chronik.cpp index b49d54d5b..fee83e931 100644 --- a/chronik/chronik-cpp/chronik.cpp +++ b/chronik/chronik-cpp/chronik.cpp @@ -1,69 +1,78 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include namespace chronik { // Duration between WebSocket pings initiated by Chronik. // 45s has been empirically established as a reliable duration for both browser // and NodeJS WebSockets. static constexpr std::chrono::seconds WS_PING_INTERVAL_DEFAULT{45s}; // Ping duration is just 5s on regtest to speed up ping tests and make // functional tests more reliable. static constexpr std::chrono::seconds WS_PING_INTERVAL_REGTEST{5s}; template rust::Vec ToRustVec(const C &container) { rust::Vec vec; vec.reserve(container.size()); std::copy(container.begin(), container.end(), std::back_inserter(vec)); return vec; } bool Start(const Config &config, const node::NodeContext &node, bool fWipe) { const bool is_pause_allowed = gArgs.GetBoolArg("-chronikallowpause", false); const CChainParams ¶ms = config.GetChainParams(); if (is_pause_allowed && !params.IsTestChain()) { return InitError(_("Using -chronikallowpause on a mainnet chain is not " "allowed for security reasons.")); } return chronik_bridge::setup_chronik( { .datadir_net = gArgs.GetDataDirNet().u8string(), .hosts = ToRustVec(gArgs.IsArgSet("-chronikbind") ? gArgs.GetArgs("-chronikbind") : DEFAULT_BINDS), .default_port = BaseParams().ChronikPort(), .wipe_db = fWipe, .enable_token_index = gArgs.GetBoolArg("-chroniktokenindex", true), .is_pause_allowed = is_pause_allowed, .enable_perf_stats = gArgs.GetBoolArg("-chronikperfstats", false), .ws_ping_interval_secs = params.NetworkIDString() == CBaseChainParams::REGTEST ? uint64_t(count_seconds(WS_PING_INTERVAL_REGTEST)) : uint64_t(count_seconds(WS_PING_INTERVAL_DEFAULT)), + .tx_num_cache = + { + .num_buckets = + (size_t)gArgs.GetIntArg("-chroniktxnumcachebuckets", + DEFAULT_TX_NUM_CACHE_BUCKETS), + .bucket_size = (size_t)gArgs.GetIntArg( + "-chroniktxnumcachebucketsize", + DEFAULT_TX_NUM_CACHE_BUCKET_SIZE), + }, }, config, node); } void Stop() { LogPrintf("Stopping Chronik...\n"); StopChronikValidationInterface(); } } // namespace chronik diff --git a/chronik/chronik-cpp/chronik.h b/chronik/chronik-cpp/chronik.h index 6de223cc5..668bf57cb 100644 --- a/chronik/chronik-cpp/chronik.h +++ b/chronik/chronik-cpp/chronik.h @@ -1,28 +1,38 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #ifndef BITCOIN_CHRONIK_CPP_CHRONIK_H #define BITCOIN_CHRONIK_CPP_CHRONIK_H #include #include class Config; namespace node { struct NodeContext; } // namespace node namespace chronik { static const std::vector DEFAULT_BINDS = {"127.0.0.1", "::1"}; +// How many buckets of txid -> tx num maps are cached in-memory. +// Don't set this too high, Chronik will do a linear scan over all buckets. +static const size_t DEFAULT_TX_NUM_CACHE_BUCKETS = 10; +// Size of each bucket in the in-memory cache of tx nums. +// Unlike the number of buckets, this may be increased without much danger of +// slowing the indexer down. The total cache size will be +// `num_buckets * bucket_size * 40B`, so by default the cache will require 40MB +// of memory. +static const size_t DEFAULT_TX_NUM_CACHE_BUCKET_SIZE = 100'000; + // Registers Chronik indexer as ValidationInterface, listens to HTTP queries bool Start(const Config &config, const node::NodeContext &node, bool fWipe); // Unregisters Chronik indexer as ValidationInterface, stops the HTTP server void Stop(); } // namespace chronik #endif // BITCOIN_CHRONIK_CPP_CHRONIK_H diff --git a/chronik/chronik-db/src/index_tx.rs b/chronik/chronik-db/src/index_tx.rs index 80bb53507..e58884394 100644 --- a/chronik/chronik-db/src/index_tx.rs +++ b/chronik/chronik-db/src/index_tx.rs @@ -1,196 +1,421 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`IndexTx`] and [`prepare_indexed_txs`]. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{hash_map::Entry, BTreeSet, HashMap, VecDeque}; use abc_rust_error::Result; -use bitcoinsuite_core::tx::{OutPoint, Tx}; +use bitcoinsuite_core::tx::{OutPoint, Tx, TxId}; use thiserror::Error; use crate::{ db::Db, + index_tx::IndexTxError::*, io::{TxNum, TxReader}, }; /// Tx in a block to be added to the index, with prepared data to guide /// indexing. #[derive(Clone, Debug, Eq, PartialEq)] pub struct IndexTx<'a> { /// Tx from the node to be indexed. pub tx: &'a Tx, /// [`TxNum`] of this tx. pub tx_num: TxNum, /// Whether this tx is a coinbase tx. pub is_coinbase: bool, /// [`TxNum`]s of the inputs of the tx. Either references tx from the DB or /// other txs within the new block. /// /// Empty for coinbase txs. pub input_nums: Vec, } +/// Cache for tx nums to speed up prepare_indexed_txs_cached. +/// It works by having an internal "conveyor belt" of buckets, and each block +/// puts all their TxId -> TxNum tuples into the bucket at the front of the +/// belt, one-by-one. If this bucket is full, the bucket at the end of the belt +/// is "dropped off", emptied and moved to the front, moving all other buckets +/// one step back. Then the new empty bucket will be filled, until it is full, +/// etc. +/// +/// When querying the cache, buckets are looked into one-by-one in the order on +/// the belt. +/// +/// On a reorg, all buckets are simply cleared, as the goal is to speed up +/// initial sync. +#[derive(Debug, Default)] +pub struct TxNumCache { + num_buckets: usize, + bucket_size: usize, + buckets: VecDeque>, +} + +/// Params for tuning the TxNumCache. +#[derive(Clone, Debug, Default)] +pub struct TxNumCacheSettings { + /// How many buckets are on the belt + pub num_buckets: usize, + /// How many txs are cached in each bucket + pub bucket_size: usize, +} + /// Error indicating something went wrong with a [`IndexTx`]. #[derive(Debug, Error, PartialEq, Eq)] pub enum IndexTxError { /// An input references a txid which could neither be found in the DB nor /// within the new block. #[error("Unknown input spent: {0:?}")] UnknownInputSpent(OutPoint), } -use self::IndexTxError::*; +/// Update mode of [`prepare_indexed_txs_cached`], to ensure the cache is +/// updated correctly. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum PrepareUpdateMode { + /// Txs are being added to the DB, will add them to the cache + Add, + /// Txs are being removed from the DB, will clear the cache + Delete, + /// Txs are only read from the DB, won't update the cache + Read, +} /// Prepare txs of a block which is about to be added/removed from the DB with /// some additional data, either coming from the DB or from within the block. +/// This function works like [`prepare_indexed_txs_cached`] but with the cache +/// disabled. pub fn prepare_indexed_txs<'a>( db: &Db, first_tx_num: TxNum, txs: &'a [Tx], +) -> Result>> { + prepare_indexed_txs_cached( + db, + first_tx_num, + txs, + &mut TxNumCache::default(), + PrepareUpdateMode::Read, + ) +} + +/// Prepare txs of a block which is about to be added/removed from the DB with +/// some additional data, either coming from the DB or from within the block. +/// Uses the provided [`TxNumCache`], and puts the new TxNums into it. +pub fn prepare_indexed_txs_cached<'a>( + db: &Db, + first_tx_num: TxNum, + txs: &'a [Tx], + cache: &mut TxNumCache, + update_mode: PrepareUpdateMode, ) -> Result>> { let mut tx_nums_by_txid = HashMap::with_capacity(txs.len()); for (tx_idx, tx) in txs.iter().enumerate() { tx_nums_by_txid.insert(tx.txid_ref(), first_tx_num + tx_idx as TxNum); } let mut db_txids = BTreeSet::new(); for tx in txs { for tx_input in &tx.inputs { - if !tx_nums_by_txid.contains_key(&&tx_input.prev_out.txid) { + if let Entry::Vacant(entry) = + tx_nums_by_txid.entry(&tx_input.prev_out.txid) + { + if let Some(tx_num) = cache.get(&tx_input.prev_out.txid) { + entry.insert(tx_num); + continue; + } db_txids.insert(&tx_input.prev_out.txid); } } } let tx_reader = TxReader::new(db)?; let db_tx_nums = tx_reader.tx_nums_by_txids(db_txids.iter().copied())?; let db_txids = db_txids.into_iter().collect::>(); - txs.iter() + let index_txs = txs + .iter() .enumerate() .map(|(tx_idx, tx)| { let tx_num = first_tx_num + tx_idx as TxNum; let is_coinbase = tx_idx == 0; let input_nums = if is_coinbase { vec![] } else { tx.inputs .iter() .map(|input| { Ok(match tx_nums_by_txid.get(&input.prev_out.txid) { Some(&tx_num) => tx_num, None => { let tx_num_idx = db_txids .binary_search(&&input.prev_out.txid) .map_err(|_| { UnknownInputSpent(input.prev_out) })?; db_tx_nums[tx_num_idx] .ok_or(UnknownInputSpent(input.prev_out))? } }) }) .collect::>>()? }; Ok(IndexTx { tx, tx_num, is_coinbase, input_nums, }) }) - .collect::>>() + .collect::>>()?; + match update_mode { + PrepareUpdateMode::Add => cache.add_to_cache(&index_txs), + PrepareUpdateMode::Delete => cache.clear(), + PrepareUpdateMode::Read => {} + } + Ok(index_txs) +} + +impl TxNumCache { + /// Create a [`TxNumCache`] with the given tuning settings. + /// Will allocate all the needed memory up-front. + pub fn new(settings: TxNumCacheSettings) -> TxNumCache { + TxNumCache { + num_buckets: settings.num_buckets, + bucket_size: settings.bucket_size, + buckets: (0..settings.num_buckets) + .map(|_| HashMap::with_capacity(settings.bucket_size)) + .collect(), + } + } + + fn add_to_cache(&mut self, index_txs: &[IndexTx<'_>]) { + if self.num_buckets == 0 { + return; + } + let mut front = self.buckets.front_mut().unwrap(); + for tx in index_txs { + // Bucket at the front is full, get a new empty one + if front.len() >= self.bucket_size { + // Drop off the last bucket, empty it and move it to the front + let mut new_bucket = self.buckets.pop_back().unwrap(); + new_bucket.clear(); + self.buckets.push_front(new_bucket); + front = self.buckets.front_mut().unwrap(); + } + front.insert(tx.tx.txid(), tx.tx_num); + } + } + + fn clear(&mut self) { + for bucket in &mut self.buckets { + bucket.clear() + } + } + + fn get(&self, txid: &TxId) -> Option { + for block in &self.buckets { + if let Some(&tx_num) = block.get(txid) { + return Some(tx_num); + } + } + None + } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::tx::{OutPoint, Tx, TxId, TxInput, TxMut}; use pretty_assertions::assert_eq; use rocksdb::WriteBatch; use crate::{ db::Db, - index_tx::{prepare_indexed_txs, IndexTx}, + index_tx::{ + prepare_indexed_txs, IndexTx, TxNumCache, TxNumCacheSettings, + }, io::{BlockTxs, TxEntry, TxNum, TxWriter, TxsMemData}, }; #[test] fn test_prepare_indexed_txs() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--indexed_txs")?; let db = Db::open(tempdir.path())?; let tx_writer = TxWriter::new(&db)?; let mut txs_mem_data = TxsMemData::default(); let mut batch = WriteBatch::default(); let db_txs = (100..110) .map(|txid_num: u8| TxEntry { txid: TxId::from([txid_num; 32]), ..Default::default() }) .collect::>(); let first_tx_num = db_txs.len() as TxNum; tx_writer.insert( &mut batch, &BlockTxs { txs: db_txs, block_height: 0, }, &mut txs_mem_data, )?; db.write_batch(batch)?; fn make_tx( txid_num: u8, input_txid_nums: [u8; N], ) -> Tx { Tx::with_txid( TxId::from([txid_num; 32]), TxMut { inputs: input_txid_nums .into_iter() .map(|input_txid_num| TxInput { prev_out: OutPoint { txid: TxId::from([input_txid_num; 32]), out_idx: 0, }, ..Default::default() }) .collect(), ..Default::default() }, ) } let new_txs = vec![ make_tx(110, [0]), make_tx(111, [110, 100, 101]), make_tx(112, [111, 109, 103]), ]; assert_eq!( prepare_indexed_txs(&db, first_tx_num, &new_txs)?, vec![ IndexTx { tx: &new_txs[0], tx_num: 10, is_coinbase: true, input_nums: vec![], }, IndexTx { tx: &new_txs[1], tx_num: 11, is_coinbase: false, input_nums: vec![10, 0, 1], }, IndexTx { tx: &new_txs[2], tx_num: 12, is_coinbase: false, input_nums: vec![11, 9, 3], }, ], ); Ok(()) } + + #[test] + fn test_tx_num_cache() { + let txs = (0..=10) + .map(|tx_num| { + Tx::with_txid(TxId::new([tx_num; 32]), Default::default()) + }) + .collect::>(); + let index_txs = txs + .iter() + .enumerate() + .map(|(tx_num, tx)| IndexTx { + tx_num: tx_num as TxNum, + tx, + input_nums: vec![], + is_coinbase: false, + }) + .collect::>(); + let mut cache = TxNumCache::new(TxNumCacheSettings { + num_buckets: 2, + bucket_size: 3, + }); + assert_eq!(cache.buckets.len(), 2); + for tx in &txs { + assert_eq!(cache.get(tx.txid_ref()), None); + } + + // Add the first tx + cache.add_to_cache(&index_txs[..1]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[0].txid_ref()], 0); + assert_eq!(cache.get(txs[0].txid_ref()), Some(0)); + + // Add three more, filling the next bucket + cache.add_to_cache(&index_txs[1..4]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[3].txid_ref()], 3); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[0].txid_ref()], 0); + assert_eq!(cache.buckets[1][txs[1].txid_ref()], 1); + assert_eq!(cache.buckets[1][txs[2].txid_ref()], 2); + for tx in &index_txs[..4] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Add two more, filling the cache + cache.add_to_cache(&index_txs[4..6]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 3); + assert_eq!(cache.buckets[0][txs[3].txid_ref()], 3); + assert_eq!(cache.buckets[0][txs[4].txid_ref()], 4); + assert_eq!(cache.buckets[0][txs[5].txid_ref()], 5); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[0].txid_ref()], 0); + assert_eq!(cache.buckets[1][txs[1].txid_ref()], 1); + assert_eq!(cache.buckets[1][txs[2].txid_ref()], 2); + for tx in &index_txs[..6] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Adding one more empties the last bucket and moves it front + cache.add_to_cache(&index_txs[6..7]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[6].txid_ref()], 6); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[3].txid_ref()], 3); + assert_eq!(cache.buckets[1][txs[4].txid_ref()], 4); + assert_eq!(cache.buckets[1][txs[5].txid_ref()], 5); + for tx in txs[..3].iter() { + assert_eq!(cache.get(tx.txid_ref()), None); + } + for tx in &index_txs[3..7] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Adding three more again empties the last bucket and moves it front + cache.add_to_cache(&index_txs[7..10]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[9].txid_ref()], 9); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[6].txid_ref()], 6); + assert_eq!(cache.buckets[1][txs[7].txid_ref()], 7); + assert_eq!(cache.buckets[1][txs[8].txid_ref()], 8); + for tx in txs[..6].iter() { + assert_eq!(cache.get(tx.txid_ref()), None); + } + for tx in &index_txs[6..10] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Clearing the cache leaves the buckets allocated but empty + cache.clear(); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 0); + assert_eq!(cache.buckets[0].capacity(), 3); + assert_eq!(cache.buckets[1].len(), 0); + assert_eq!(cache.buckets[1].capacity(), 3); + } } diff --git a/chronik/chronik-db/src/mem/data.rs b/chronik/chronik-db/src/mem/data.rs index f503c7620..a3b1bb19b 100644 --- a/chronik/chronik-db/src/mem/data.rs +++ b/chronik/chronik-db/src/mem/data.rs @@ -1,60 +1,69 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -use crate::io::{ - GroupHistoryMemData, GroupHistoryStats, GroupUtxoMemData, GroupUtxoStats, - SpentByMemData, SpentByStats, TxsMemData, TxsStats, +use crate::{ + index_tx::{TxNumCache, TxNumCacheSettings}, + io::{ + GroupHistoryMemData, GroupHistoryStats, GroupUtxoMemData, + GroupUtxoStats, SpentByMemData, SpentByStats, TxsMemData, TxsStats, + }, }; /// In-memory data for Chronik, e.g. caches, perf statistics. #[derive(Debug)] pub struct MemData { /// In-memory data for indexing txs. pub txs: TxsMemData, + /// In-memory TxNumCache. + pub tx_num_cache: TxNumCache, /// In-memory data for indexing script tx history. pub script_history: GroupHistoryMemData, /// In-memory data for indexing script UTXOs. pub script_utxos: GroupUtxoMemData, /// In-memory data for indexing spent-by data. pub spent_by: SpentByMemData, } /// Only the stats data from [`MemData`] #[derive(Debug)] pub struct StatsData { /// Stats data for indexing txs. pub txs: TxsStats, /// Stats data for indexing script tx history. pub script_history: GroupHistoryStats, /// Stats data for indexing script UTXOs. pub script_utxos: GroupUtxoStats, /// Stats data for indexing spent-by data. pub spent_by: SpentByStats, } /// Config for in-memory data for Chronik. #[derive(Clone, Debug)] -pub struct MemDataConf {} +pub struct MemDataConf { + /// Settings for tuning TxNumCache. + pub tx_num_cache: TxNumCacheSettings, +} impl MemData { /// Create a new [`MemData`] from the given configuration. - pub fn new(_: MemDataConf) -> Self { + pub fn new(conf: MemDataConf) -> Self { MemData { txs: TxsMemData::default(), + tx_num_cache: TxNumCache::new(conf.tx_num_cache), script_history: GroupHistoryMemData::default(), script_utxos: GroupUtxoMemData::default(), spent_by: SpentByMemData::default(), } } /// Only the stats data from this [`MemData`]. pub fn stats(&self) -> StatsData { StatsData { txs: self.txs.stats.clone(), script_history: self.script_history.stats.clone(), script_utxos: self.script_utxos.stats.clone(), spent_by: self.spent_by.stats.clone(), } } } diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs index e205f6125..d7c01c4f0 100644 --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -1,995 +1,1018 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`ChronikIndexer`] to index blocks and txs. use std::{io::Write, path::PathBuf}; use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; use chronik_bridge::{ffi, util::expect_unique_ptr}; use chronik_db::{ db::{Db, WriteBatch}, groups::{ ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, TokenIdGroup, TokenIdGroupAux, TokenIdHistoryWriter, TokenIdUtxoWriter, }, - index_tx::prepare_indexed_txs, + index_tx::{ + prepare_indexed_txs_cached, PrepareUpdateMode, TxNumCacheSettings, + }, io::{ merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, TxReader, TxWriter, }, mem::{MemData, MemDataConf, Mempool, MempoolTx}, }; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, indexer::ChronikIndexerError::*, query::{ QueryBlocks, QueryBroadcast, QueryGroupHistory, QueryGroupUtxos, QueryTxs, UtxoProtobufOutput, UtxoProtobufValue, }, subs::{BlockMsg, BlockMsgType, Subs}, subs_group::TxMsgType, }; const CURRENT_INDEXER_VERSION: SchemaVersion = 11; const LAST_UPGRADABLE_VERSION: SchemaVersion = 10; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] pub struct ChronikIndexerParams { /// Folder where the node stores its data, net-dependent. pub datadir_net: PathBuf, /// Whether to clear the DB before opening the DB, e.g. when reindexing. pub wipe_db: bool, /// Whether Chronik should index SLP/ALP token txs. pub enable_token_index: bool, /// Whether to output Chronik performance statistics into a perf/ folder pub enable_perf_stats: bool, + /// Settings for tuning TxNumCache. + pub tx_num_cache: TxNumCacheSettings, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. #[derive(Debug)] pub struct ChronikIndexer { db: Db, mem_data: MemData, mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, subs: RwLock, perf_path: Option, is_token_index_enabled: bool, } /// Access to the bitcoind node. #[derive(Debug)] pub struct Node { /// FFI bridge to the node. pub bridge: cxx::UniquePtr, } /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { /// Data about the block (w/o txs) pub db_block: DbBlock, /// Txs in the block, with locations of where they are stored on disk. pub block_txs: BlockTxs, /// Block size in bytes. pub size: u64, /// Txs in the block, with inputs/outputs so we can group them. pub txs: Vec, } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikIndexerError { /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateDirFailed(PathBuf), /// Cannot rewind blocks that bitcoind doesn't have #[error( "Cannot rewind Chronik, it contains block {0} that the node doesn't \ have. You may need to use -reindex/-chronikreindex, or delete \ indexes/chronik and restart" )] CannotRewindChronik(BlockHash), /// Lower block doesn't exist but higher block does #[error( "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" )] BlocksBelowMissing { /// Lower height that is missing missing: BlockHeight, /// Higher height that exists exists: BlockHeight, }, /// Corrupted schema version #[error( "Corrupted schema version in the Chronik database, consider running \ -reindex/-chronikreindex" )] CorruptedSchemaVersion, /// Missing schema version for non-empty database #[error( "Missing schema version in non-empty Chronik database, consider \ running -reindex/-chronikreindex" )] MissingSchemaVersion, /// This Chronik instance is outdated #[error( "Chronik outdated: Chronik has version {}, but the database has \ version {0}. Upgrade your node to the appropriate version.", CURRENT_INDEXER_VERSION )] ChronikOutdated(SchemaVersion), /// Database is outdated #[error( "DB outdated: Chronik has version {CURRENT_INDEXER_VERSION}, but the \ database has version {0}. The last upgradable version is \ {LAST_UPGRADABLE_VERSION}. -reindex/-chronikreindex to reindex the \ database to the new version." )] DatabaseOutdated(SchemaVersion), /// Cannot enable token index on a DB that previously had it disabled #[error( "Cannot enable -chroniktokenindex on a DB that previously had it \ disabled. Provide -reindex/-chronikreindex to reindex the database \ with token data, or specify -chroniktokenindex=0 to disable the \ token index again." )] CannotEnableTokenIndex, } impl ChronikIndexer { /// Setup the indexer with the given parameters, e.g. open the DB etc. pub fn setup(params: ChronikIndexerParams) -> Result { let indexes_path = params.datadir_net.join("indexes"); let perf_path = params.datadir_net.join("perf"); if !indexes_path.exists() { std::fs::create_dir(&indexes_path) .wrap_err_with(|| CreateDirFailed(indexes_path.clone()))?; } if params.enable_perf_stats && !perf_path.exists() { std::fs::create_dir(&perf_path) .wrap_err_with(|| CreateDirFailed(perf_path.clone()))?; } let db_path = indexes_path.join("chronik"); if params.wipe_db { log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; let schema_version = verify_schema_version(&db)?; verify_enable_token_index(&db, params.enable_token_index)?; upgrade_db_if_needed(&db, schema_version, params.enable_token_index)?; let mempool = Mempool::new(ScriptGroup, params.enable_token_index); Ok(ChronikIndexer { db, mempool, - mem_data: MemData::new(MemDataConf {}), + mem_data: MemData::new(MemDataConf { + tx_num_cache: params.tx_num_cache, + }), script_group: ScriptGroup, avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(ScriptGroup)), perf_path: params.enable_perf_stats.then_some(perf_path), is_token_index_enabled: params.enable_token_index, }) } /// Resync Chronik index to the node pub fn resync_indexer( &mut self, bridge: &ffi::ChronikBridge, ) -> Result<()> { let block_reader = BlockReader::new(&self.db)?; let indexer_tip = block_reader.tip()?; let Ok(node_tip_index) = bridge.get_chain_tip() else { if let Some(indexer_tip) = &indexer_tip { return Err( CannotRewindChronik(indexer_tip.hash.clone()).into() ); } return Ok(()); }; let node_tip_info = ffi::get_block_info(node_tip_index); let node_height = node_tip_info.height; let node_tip_hash = BlockHash::from(node_tip_info.hash); let start_height = match indexer_tip { Some(tip) if tip.hash != node_tip_hash => { let indexer_tip_hash = tip.hash.clone(); let indexer_height = tip.height; log!( "Node and Chronik diverged, node is on block \ {node_tip_hash} at height {node_height}, and Chronik is \ on block {indexer_tip_hash} at height {indexer_height}.\n" ); let indexer_tip_index = bridge .lookup_block_index(tip.hash.to_bytes()) .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; self.rewind_indexer(bridge, indexer_tip_index, &tip)? } Some(tip) => tip.height, None => { log!( "Chronik database empty, syncing to block {node_tip_hash} \ at height {node_height}.\n" ); -1 } }; let tip_height = node_tip_info.height; for height in start_height + 1..=tip_height { if bridge.shutdown_requested() { log!("Stopped re-sync adding blocks\n"); return Ok(()); } let block_index = ffi::get_block_ancestor(node_tip_index, height)?; let block = self.load_chronik_block(bridge, block_index)?; let hash = block.db_block.hash.clone(); self.handle_block_connected(block)?; log_chronik!( "Added block {hash}, height {height}/{tip_height} to Chronik\n" ); if height % 100 == 0 { log!( "Synced Chronik up to block {hash} at height \ {height}/{tip_height}\n" ); } } log!( "Chronik completed re-syncing with the node, both are now at \ block {node_tip_hash} at height {node_height}.\n" ); if let Some(perf_path) = &self.perf_path { let mut resync_stats = std::fs::File::create(perf_path.join("resync_stats.txt"))?; write!(&mut resync_stats, "{:#.3?}", self.mem_data.stats())?; } Ok(()) } fn rewind_indexer( &mut self, bridge: &ffi::ChronikBridge, indexer_tip_index: &ffi::CBlockIndex, indexer_db_tip: &DbBlock, ) -> Result { let indexer_height = indexer_db_tip.height; let fork_block_index = bridge .find_fork(indexer_tip_index) .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; let fork_info = ffi::get_block_info(fork_block_index); let fork_block_hash = BlockHash::from(fork_info.hash); let fork_height = fork_info.height; let revert_height = fork_height + 1; log!( "The last common block is {fork_block_hash} at height \ {fork_height}.\n" ); log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); for height in (revert_height..indexer_height).rev() { if bridge.shutdown_requested() { log!("Stopped re-sync rewinding blocks\n"); // return MAX here so we don't add any blocks return Ok(BlockHeight::MAX); } let db_block = BlockReader::new(&self.db)? .by_height(height)? .ok_or(BlocksBelowMissing { missing: height, exists: indexer_height, })?; let block_index = bridge .lookup_block_index(db_block.hash.to_bytes()) .map_err(|_| CannotRewindChronik(db_block.hash))?; let block = self.load_chronik_block(bridge, block_index)?; self.handle_block_disconnected(block)?; } Ok(fork_info.height) } /// Add transaction to the indexer's mempool. pub fn handle_tx_added_to_mempool( &mut self, mempool_tx: MempoolTx, ) -> Result<()> { let result = self.mempool.insert(&self.db, mempool_tx)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::AddedToMempool, &result.token_id_aux, ); Ok(()) } /// Remove tx from the indexer's mempool, e.g. by a conflicting tx, expiry /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { let result = self.mempool.remove(txid)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::RemovedFromMempool, &result.token_id_aux, ); Ok(()) } /// Add the block to the index. pub fn handle_block_connected( &mut self, block: ChronikBlock, ) -> Result<()> { let height = block.db_block.height; let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.insert(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.insert( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; - let index_txs = - prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; + let index_txs = prepare_indexed_txs_cached( + &self.db, + first_tx_num, + &block.txs, + &mut self.mem_data.tx_num_cache, + PrepareUpdateMode::Add, + )?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.insert( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; let token_id_aux; if self.is_token_index_enabled { let processed_token_batch = token_writer.insert(&mut batch, &index_txs)?; token_id_aux = TokenIdGroupAux::from_batch(&index_txs, &processed_token_batch); token_id_history_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; } else { token_id_aux = TokenIdGroupAux::default(); } self.db.write_batch(batch)?; for tx in &block.block_txs.txs { self.mempool.remove_mined(&tx.txid)?; } merge::check_for_errors()?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Connected, hash: block.db_block.hash, height: block.db_block.height, }); subs.handle_block_tx_events( &block.txs, TxMsgType::Confirmed, &token_id_aux, ); Ok(()) } /// Remove the block from the index. pub fn handle_block_disconnected( &mut self, block: ChronikBlock, ) -> Result<()> { let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.delete(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.delete( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; - let index_txs = - prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; + let index_txs = prepare_indexed_txs_cached( + &self.db, + first_tx_num, + &block.txs, + &mut self.mem_data.tx_num_cache, + PrepareUpdateMode::Delete, + )?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.delete( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; if self.is_token_index_enabled { let token_id_aux = TokenIdGroupAux::from_db(&index_txs, &self.db)?; token_id_history_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; token_writer.delete(&mut batch, &index_txs)?; } self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Disconnected, hash: block.db_block.hash, height: block.db_block.height, }); Ok(()) } /// Block finalized with Avalanche. pub fn handle_block_finalized( &mut self, block: ChronikBlock, ) -> Result<()> { self.avalanche.finalize_block(block.db_block.height)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Finalized, hash: block.db_block.hash, height: block.db_block.height, }); let tx_reader = TxReader::new(&self.db)?; let first_tx_num = tx_reader .first_tx_num_by_block(block.db_block.height)? .unwrap(); - let index_txs = - prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; + let index_txs = prepare_indexed_txs_cached( + &self.db, + first_tx_num, + &block.txs, + &mut self.mem_data.tx_num_cache, + PrepareUpdateMode::Read, + )?; let token_id_aux = if self.is_token_index_enabled { TokenIdGroupAux::from_db(&index_txs, &self.db)? } else { TokenIdGroupAux::default() }; subs.handle_block_tx_events( &block.txs, TxMsgType::Finalized, &token_id_aux, ); Ok(()) } /// Return [`QueryBroadcast`] to broadcast tx to the network. pub fn broadcast<'a>(&'a self, node: &'a Node) -> QueryBroadcast<'a> { QueryBroadcast { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryBlocks`] to read blocks from the DB. pub fn blocks<'a>(&'a self, node: &'a Node) -> QueryBlocks<'a> { QueryBlocks { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryTxs`] to return txs from mempool/DB. pub fn txs<'a>(&'a self, node: &'a Node) -> QueryTxs<'a> { QueryTxs { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupHistory`] for scripts to query the tx history of /// scripts. pub fn script_history<'a>( &'a self, node: &'a Node, ) -> Result> { Ok(QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.script_history(), group: self.script_group.clone(), node, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupUtxos`] for scripts to query the utxos of scripts. pub fn script_utxos( &self, ) -> Result> { Ok(QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.script_utxos(), group: self.script_group.clone(), utxo_mapper: UtxoProtobufValue, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupHistory`] for token IDs to query the tx history of /// token IDs. pub fn token_id_history<'a>( &'a self, node: &'a Node, ) -> QueryGroupHistory<'a, TokenIdGroup> { QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.token_id_history(), group: TokenIdGroup, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupUtxos`] for token IDs to query the utxos of token IDs pub fn token_id_utxos( &self, ) -> QueryGroupUtxos<'_, TokenIdGroup, UtxoProtobufOutput> { QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.token_id_utxos(), group: TokenIdGroup, utxo_mapper: UtxoProtobufOutput, is_token_index_enabled: self.is_token_index_enabled, } } /// Subscribers, behind read/write lock pub fn subs(&self) -> &RwLock { &self.subs } /// Build a ChronikBlock from a ffi::Block. pub fn make_chronik_block(&self, block: ffi::Block) -> ChronikBlock { let db_block = DbBlock { hash: BlockHash::from(block.hash), prev_hash: BlockHash::from(block.prev_hash), height: block.height, n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, }; let block_txs = BlockTxs { block_height: block.height, txs: block .txs .iter() .map(|tx| { let txid = TxId::from(tx.tx.txid); TxEntry { txid, data_pos: tx.data_pos, undo_pos: tx.undo_pos, time_first_seen: match self.mempool.tx(&txid) { Some(tx) => tx.time_first_seen, None => 0, }, is_coinbase: tx.undo_pos == 0, } }) .collect(), }; let txs = block .txs .into_iter() .map(|block_tx| Tx::from(block_tx.tx)) .collect::>(); ChronikBlock { db_block, block_txs, size: block.size, txs, } } /// Load a ChronikBlock from the node given the CBlockIndex. pub fn load_chronik_block( &self, bridge: &ffi::ChronikBridge, block_index: &ffi::CBlockIndex, ) -> Result { let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let ffi_block_undo = bridge.load_block_undo(block_index)?; let ffi_block_undo = expect_unique_ptr("load_block_undo", &ffi_block_undo); let block = ffi::bridge_block(ffi_block, ffi_block_undo, block_index)?; Ok(self.make_chronik_block(block)) } } fn verify_schema_version(db: &Db) -> Result { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; let schema_version = match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { Some(schema_version) => { assert!(!is_empty, "Empty DB can't have a schema version"); if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } if schema_version < LAST_UPGRADABLE_VERSION { return Err(DatabaseOutdated(schema_version).into()); } log!( "Chronik has version {CURRENT_INDEXER_VERSION}, DB has \ version {schema_version}\n" ); schema_version } None => { if !is_empty { return Err(MissingSchemaVersion.into()); } let mut batch = WriteBatch::default(); metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; log!( "Chronik has version {CURRENT_INDEXER_VERSION}, initialized \ DB with that version\n" ); CURRENT_INDEXER_VERSION } }; Ok(schema_version) } fn verify_enable_token_index(db: &Db, enable_token_index: bool) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let token_writer = TokenWriter::new(db)?; let is_empty = db.is_db_empty()?; let is_token_index_enabled = metadata_reader.is_token_index_enabled()?; let mut batch = WriteBatch::default(); if !is_empty { // Cannot enable token index if not already previously enabled if enable_token_index && !is_token_index_enabled { return Err(CannotEnableTokenIndex.into()); } // Wipe token index if previously enabled and now disabled if !enable_token_index && is_token_index_enabled { log!( "Warning: Wiping existing token index, since \ -chroniktokenindex=0\n" ); log!("You will need to -reindex/-chronikreindex to restore\n"); token_writer.wipe(&mut batch); } } metadata_writer .update_is_token_index_enabled(&mut batch, enable_token_index)?; db.write_batch(batch)?; Ok(()) } fn upgrade_db_if_needed( db: &Db, schema_version: u64, enable_token_index: bool, ) -> Result<()> { // DB has version 10, upgrade to 11 if schema_version == 10 { upgrade_10_to_11(db, enable_token_index)?; } Ok(()) } fn upgrade_10_to_11(db: &Db, enable_token_index: bool) -> Result<()> { log!("Upgrading Chronik DB from version 10 to 11...\n"); let script_utxo_writer = ScriptUtxoWriter::new(db, ScriptGroup)?; script_utxo_writer.upgrade_10_to_11()?; if enable_token_index { let token_id_utxo_writer = TokenIdUtxoWriter::new(db, TokenIdGroup)?; token_id_utxo_writer.upgrade_10_to_11()?; } let mut batch = WriteBatch::default(); let metadata_writer = MetadataWriter::new(db)?; metadata_writer.update_schema_version(&mut batch, 11)?; db.write_batch(batch)?; log!("Successfully upgraded Chronik DB from version 10 to 11.\n"); Ok(()) } impl Node { /// If `result` is [`Err`], logs and aborts the node. pub fn ok_or_abort(&self, func_name: &str, result: Result) { if let Err(report) = result { log_chronik!("{report:?}\n"); self.bridge.abort_node( &format!("ERROR Chronik in {func_name}"), &format!("{report:#}"), ); } } } impl std::fmt::Debug for ChronikIndexerParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikIndexerParams") .field("datadir_net", &self.datadir_net) .field("wipe_db", &self.wipe_db) .field("fn_compress_script", &"..") .finish() } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use chronik_db::{ db::{Db, WriteBatch, CF_META}, io::{BlockReader, BlockTxs, DbBlock, MetadataReader, MetadataWriter}, }; use pretty_assertions::assert_eq; use crate::indexer::{ ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams, CURRENT_INDEXER_VERSION, }; #[test] fn test_indexer() -> Result<()> { let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); let params = ChronikIndexerParams { datadir_net: datadir_net.clone(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, + tx_num_cache: Default::default(), }; // regtest folder doesn't exist yet -> error assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CreateDirFailed(datadir_net.join("indexes")), ); // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; let mut indexer = ChronikIndexer::setup(params.clone())?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); // DB is empty assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); let block = ChronikBlock { db_block: DbBlock { hash: BlockHash::from([4; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1deadbef, timestamp: 1234567890, file_num: 0, data_pos: 1337, }, block_txs: BlockTxs { block_height: 0, txs: vec![], }, size: 285, txs: vec![], }; // Add block indexer.handle_block_connected(block.clone())?; assert_eq!( BlockReader::new(&indexer.db)?.by_height(0)?, Some(block.db_block.clone()) ); // Remove block again indexer.handle_block_disconnected(block.clone())?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); // Add block then wipe, block not there indexer.handle_block_connected(block)?; std::mem::drop(indexer); let indexer = ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..params })?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); Ok(()) } #[test] fn test_schema_version() -> Result<()> { let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { datadir_net: dir.path().to_path_buf(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, + tx_num_cache: Default::default(), }; // Setting up DB first time sets the schema version ChronikIndexer::setup(params.clone())?; { let db = Db::open(&chronik_path)?; assert_eq!( MetadataReader::new(&db)?.schema_version()?, Some(CURRENT_INDEXER_VERSION) ); } // Opening DB again works fine ChronikIndexer::setup(params.clone())?; // Override DB schema version to 0 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?; db.write_batch(batch)?; } // -> DB too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::DatabaseOutdated(0), ); // Override DB schema version to CURRENT_INDEXER_VERSION + 1 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version( &mut batch, CURRENT_INDEXER_VERSION + 1, )?; db.write_batch(batch)?; } // -> Chronik too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), ); // Corrupt schema version { let db = Db::open(&chronik_path)?; let cf_meta = db.cf(CF_META)?; let mut batch = WriteBatch::default(); batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]); db.write_batch(batch)?; } assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CorruptedSchemaVersion, ); // New db path, but has existing data let new_dir = dir.path().join("new"); let new_chronik_path = new_dir.join("indexes").join("chronik"); std::fs::create_dir_all(&new_chronik_path)?; let new_params = ChronikIndexerParams { datadir_net: new_dir, wipe_db: false, ..params }; { // new db with obscure field in meta let db = Db::open(&new_chronik_path)?; let mut batch = WriteBatch::default(); batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR"); db.write_batch(batch)?; } // Error: non-empty DB without schema version assert_eq!( ChronikIndexer::setup(new_params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..new_params })?; Ok(()) } } diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs index 8523ebe3e..4ab6694e7 100644 --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -1,290 +1,294 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Rust side of the bridge; these structs and functions are exposed to C++. use std::{ net::{AddrParseError, IpAddr, SocketAddr}, sync::Arc, time::Duration, }; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; -use chronik_db::mem::MempoolTx; +use chronik_db::{index_tx::TxNumCacheSettings, mem::MempoolTx}; use chronik_http::server::{ ChronikServer, ChronikServerParams, ChronikSettings, }; use chronik_indexer::{ indexer::{ChronikIndexer, ChronikIndexerParams, Node}, pause::Pause, }; use chronik_plugin::context::PluginContext; use chronik_util::{log, log_chronik, mount_loggers, Loggers}; use thiserror::Error; use tokio::sync::RwLock; use crate::ffi::{self, StartChronikValidationInterface}; /// Errors for [`Chronik`] and [`setup_chronik`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikError { /// Chronik host address failed to parse #[error("Invalid Chronik host address {0:?}: {1}")] InvalidChronikHost(String, AddrParseError), } use self::ChronikError::*; /// Setup the Chronik bridge. Returns a ChronikIndexer object. pub fn setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node: &ffi::NodeContext, ) -> bool { match try_setup_chronik(params, config, node) { Ok(()) => true, Err(report) => { log_chronik!("{report:?}\n"); init_error(&report.to_string()) } } } fn try_setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node_context: &ffi::NodeContext, ) -> Result<()> { abc_rust_error::install(); mount_loggers(Loggers { log: chronik_bridge::ffi::log_print, log_chronik: chronik_bridge::ffi::log_print_chronik, }); let hosts = params .hosts .into_iter() .map(|host| parse_socket_addr(host, params.default_port)) .collect::>>()?; PluginContext::setup()?; log!("Starting Chronik bound to {:?}\n", hosts); let bridge = chronik_bridge::ffi::make_bridge(config, node_context); let bridge_ref = expect_unique_ptr("make_bridge", &bridge); let (pause, pause_notify) = Pause::new_pair(params.is_pause_allowed); let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { datadir_net: params.datadir_net.into(), wipe_db: params.wipe_db, enable_token_index: params.enable_token_index, enable_perf_stats: params.enable_perf_stats, + tx_num_cache: TxNumCacheSettings { + bucket_size: params.tx_num_cache.bucket_size, + num_buckets: params.tx_num_cache.num_buckets, + }, })?; indexer.resync_indexer(bridge_ref)?; if bridge.shutdown_requested() { // Don't setup Chronik if the user requested shutdown during resync return Ok(()); } let indexer = Arc::new(RwLock::new(indexer)); let node = Arc::new(Node { bridge }); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; let server = runtime.block_on({ let indexer = Arc::clone(&indexer); let node = Arc::clone(&node); async move { // try_bind requires a Runtime ChronikServer::setup(ChronikServerParams { hosts, indexer, node, pause_notify: Arc::new(pause_notify), settings: ChronikSettings { ws_ping_interval: Duration::from_secs( params.ws_ping_interval_secs, ), }, }) } })?; runtime.spawn({ let node = Arc::clone(&node); async move { node.ok_or_abort("ChronikServer::serve", server.serve().await); } }); let chronik = Box::new(Chronik { node: Arc::clone(&node), indexer, pause, runtime, }); StartChronikValidationInterface(node_context, chronik); Ok(()) } fn parse_socket_addr(host: String, default_port: u16) -> Result { if let Ok(addr) = host.parse::() { return Ok(addr); } let ip_addr = host .parse::() .map_err(|err| InvalidChronikHost(host, err))?; Ok(SocketAddr::new(ip_addr, default_port)) } /// Contains all db, runtime, tpc, etc. handles needed by Chronik. /// This makes it so when this struct is dropped, all handles are relased /// cleanly. pub struct Chronik { node: Arc, indexer: Arc>, pause: Pause, // Having this here ensures HTTP server, outstanding requests etc. will get // stopped when `Chronik` is dropped. runtime: tokio::runtime::Runtime, } impl Chronik { /// Tx added to the bitcoind mempool pub fn handle_tx_added_to_mempool( &self, ptx: &ffi::CTransaction, spent_coins: &cxx::CxxVector, time_first_seen: i64, ) { self.block_if_paused(); self.node.ok_or_abort( "handle_tx_added_to_mempool", self.add_tx_to_mempool(ptx, spent_coins, time_first_seen), ); } /// Tx removed from the bitcoind mempool pub fn handle_tx_removed_from_mempool(&self, txid: [u8; 32]) { self.block_if_paused(); let mut indexer = self.indexer.blocking_write(); let txid = TxId::from(txid); self.node.ok_or_abort( "handle_tx_removed_from_mempool", indexer.handle_tx_removed_from_mempool(txid), ); log_chronik!("Chronik: transaction {} removed from mempool\n", txid); } /// Block connected to the longest chain pub fn handle_block_connected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { self.block_if_paused(); self.node.ok_or_abort( "handle_block_connected", self.connect_block(block, bindex), ); } /// Block disconnected from the longest chain pub fn handle_block_disconnected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { self.block_if_paused(); self.node.ok_or_abort( "handle_block_disconnected", self.disconnect_block(block, bindex), ); } /// Block finalized with Avalanche pub fn handle_block_finalized(&self, bindex: &ffi::CBlockIndex) { self.block_if_paused(); self.node .ok_or_abort("handle_block_finalized", self.finalize_block(bindex)); } fn add_tx_to_mempool( &self, ptx: &ffi::CTransaction, spent_coins: &cxx::CxxVector, time_first_seen: i64, ) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let tx = chronik_bridge::ffi::bridge_tx(ptx, spent_coins)?; let txid = TxId::from(tx.txid); indexer.handle_tx_added_to_mempool(MempoolTx { tx: Tx::from(tx), time_first_seen, })?; log_chronik!("Chronik: transaction {} added to mempool\n", txid); Ok(()) } fn connect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let block_undo = self.node.bridge.load_block_undo(bindex)?; let block = chronik_bridge::ffi::bridge_block(block, &block_undo, bindex)?; let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block); let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_connected(block)?; log_chronik!( "Chronik: block {} connected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn disconnect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let block_undo = self.node.bridge.load_block_undo(bindex)?; let block = chronik_bridge::ffi::bridge_block(block, &block_undo, bindex)?; let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block); let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_disconnected(block)?; log_chronik!( "Chronik: block {} disconnected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn finalize_block(&self, bindex: &ffi::CBlockIndex) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let block = indexer.load_chronik_block(&self.node.bridge, bindex)?; let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_finalized(block)?; log_chronik!( "Chronik: block {} finalized with {} txs\n", block_hash, num_txs, ); Ok(()) } fn block_if_paused(&self) { self.pause.block_if_paused(&self.runtime); } } impl std::fmt::Debug for Chronik { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Chronik {{ .. }}") } } diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs index fbffd60b3..c86cd7089 100644 --- a/chronik/chronik-lib/src/ffi.rs +++ b/chronik/chronik-lib/src/ffi.rs @@ -1,105 +1,116 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing the cxx definitions for the bridge from Rust to C++. pub use self::ffi_inner::*; use crate::bridge::{setup_chronik, Chronik}; #[allow(unsafe_code)] #[cxx::bridge(namespace = "chronik_bridge")] mod ffi_inner { /// Params for setting up Chronik #[derive(Debug)] pub struct SetupParams { /// Where the data of the blockchain is stored, dependent on network /// (mainnet, testnet, regtest) pub datadir_net: String, /// Host addresses where the Chronik HTTP endpoint will be served pub hosts: Vec, /// Default port for `hosts` if only an IP address is given pub default_port: u16, /// Whether to clear the DB before proceeding, e.g. when reindexing pub wipe_db: bool, /// Whether Chronik should index SLP/ALP token transactions pub enable_token_index: bool, /// Whether pausing Chronik indexing is allowed pub is_pause_allowed: bool, /// Whether to output Chronik performance statistics into a perf/ /// folder pub enable_perf_stats: bool, /// Duration between WebSocket pings initiated by Chronik. pub ws_ping_interval_secs: u64, + /// Tuning settings for the TxNumCache. + pub tx_num_cache: TxNumCacheSettings, + } + + /// Settings for tuning the TxNumCache. + #[derive(Debug)] + pub struct TxNumCacheSettings { + /// How many buckets are on the belt + pub num_buckets: usize, + /// How many txs are cached in each bucket + pub bucket_size: usize, } extern "Rust" { type Chronik; fn setup_chronik( params: SetupParams, config: &Config, node: &NodeContext, ) -> bool; fn handle_tx_added_to_mempool( &self, ptx: &CTransaction, spent_coins: &CxxVector, time_first_seen: i64, ); fn handle_tx_removed_from_mempool(&self, txid: [u8; 32]); fn handle_block_connected(&self, block: &CBlock, bindex: &CBlockIndex); fn handle_block_disconnected( &self, block: &CBlock, bindex: &CBlockIndex, ); fn handle_block_finalized(&self, bindex: &CBlockIndex); } unsafe extern "C++" { include!("blockindex.h"); include!("chronik-cpp/chronik_validationinterface.h"); include!("coins.h"); include!("config.h"); include!("node/context.h"); include!("primitives/block.h"); include!("primitives/transaction.h"); /// CBlockIndex from blockindex.h #[namespace = ""] type CBlockIndex = chronik_bridge::ffi::CBlockIndex; /// ::CBlock from primitives/block.h #[namespace = ""] type CBlock = chronik_bridge::ffi::CBlock; /// ::Coin from coins.h (renamed to CCoin to prevent a name clash) #[namespace = ""] #[cxx_name = "Coin"] type CCoin = chronik_bridge::ffi::CCoin; /// ::Config from config.h #[namespace = ""] type Config = chronik_bridge::ffi::Config; /// ::CTransaction from primitives/transaction.h #[namespace = ""] type CTransaction = chronik_bridge::ffi::CTransaction; /// NodeContext from node/context.h #[namespace = "node"] type NodeContext = chronik_bridge::ffi::NodeContext; /// Bridge to bitcoind to access the node type ChronikBridge = chronik_bridge::ffi::ChronikBridge; /// Register the Chronik instance as CValidationInterface to receive /// chain updates from the node. #[namespace = "chronik"] fn StartChronikValidationInterface( node: &NodeContext, chronik: Box, ); } } diff --git a/src/init.cpp b/src/init.cpp index be5122f76..bb29805fd 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1,2929 +1,2949 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #if defined(HAVE_CONFIG_H) #include #endif #include #include #include #include #include // For AVALANCHE_LEGACY_PROOF_DEFAULT #include #include // For AVALANCHE_VOTE_STALE_* #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include