diff --git a/chronik/chronik-cpp/chronik.h b/chronik/chronik-cpp/chronik.h --- a/chronik/chronik-cpp/chronik.h +++ b/chronik/chronik-cpp/chronik.h @@ -17,6 +17,16 @@ static const std::vector DEFAULT_BINDS = {"127.0.0.1", "::1"}; +// How many buckets of txid -> tx num maps are cached in-memory. +// Don't set this too high, Chronik will do a linear scan over all buckets. +static const size_t DEFAULT_TX_NUM_CACHE_BUCKETS = 10; +// Size of each bucket in the in-memory cache of tx nums. +// Unlike the number of buckets, this may be increased without much danger of +// slowing the indexer down. The total cache size will be +// `num_buckets * bucket_size * 40B`, so by default the cache will require 40MB +// of memory. +static const size_t DEFAULT_TX_NUM_CACHE_BUCKET_SIZE = 100'000; + // Registers Chronik indexer as ValidationInterface, listens to HTTP queries bool Start(const Config &config, const node::NodeContext &node, bool fWipe); diff --git a/chronik/chronik-cpp/chronik.cpp b/chronik/chronik-cpp/chronik.cpp --- a/chronik/chronik-cpp/chronik.cpp +++ b/chronik/chronik-cpp/chronik.cpp @@ -57,6 +57,15 @@ params.NetworkIDString() == CBaseChainParams::REGTEST ? uint64_t(count_seconds(WS_PING_INTERVAL_REGTEST)) : uint64_t(count_seconds(WS_PING_INTERVAL_DEFAULT)), + .tx_num_cache = + { + .num_buckets = + (size_t)gArgs.GetIntArg("-chroniktxnumcachebuckets", + DEFAULT_TX_NUM_CACHE_BUCKETS), + .bucket_size = (size_t)gArgs.GetIntArg( + "-chroniktxnumcachebucketsize", + DEFAULT_TX_NUM_CACHE_BUCKET_SIZE), + }, }, config, node); } diff --git a/chronik/chronik-db/src/index_tx.rs b/chronik/chronik-db/src/index_tx.rs --- a/chronik/chronik-db/src/index_tx.rs +++ b/chronik/chronik-db/src/index_tx.rs @@ -4,14 +4,15 @@ //! Module for [`IndexTx`] and [`prepare_indexed_txs`]. -use std::collections::{BTreeSet, HashMap}; +use std::collections::{hash_map::Entry, BTreeSet, HashMap, VecDeque}; use abc_rust_error::Result; -use bitcoinsuite_core::tx::{OutPoint, Tx}; +use bitcoinsuite_core::tx::{OutPoint, Tx, TxId}; use thiserror::Error; use crate::{ db::Db, + index_tx::IndexTxError::*, io::{TxNum, TxReader}, }; @@ -32,6 +33,35 @@ pub input_nums: Vec, } +/// Cache for tx nums to speed up prepare_indexed_txs_cached. +/// It works by having an internal "conveyor belt" of buckets, and each block +/// puts all their TxId -> TxNum tuples into the bucket at the front of the +/// belt, one-by-one. If this bucket is full, the bucket at the end of the belt +/// is "dropped off", emptied and moved to the front, moving all other buckets +/// one step back. Then the new empty bucket will be filled, until it is full, +/// etc. +/// +/// When querying the cache, buckets are looked into one-by-one in the order on +/// the belt. +/// +/// On a reorg, all buckets are simply cleared, as the goal is to speed up +/// initial sync. +#[derive(Debug, Default)] +pub struct TxNumCache { + num_buckets: usize, + bucket_size: usize, + buckets: VecDeque>, +} + +/// Params for tuning the TxNumCache. +#[derive(Clone, Debug, Default)] +pub struct TxNumCacheSettings { + /// How many buckets are on the belt + pub num_buckets: usize, + /// How many txs are cached in each bucket + pub bucket_size: usize, +} + /// Error indicating something went wrong with a [`IndexTx`]. #[derive(Debug, Error, PartialEq, Eq)] pub enum IndexTxError { @@ -41,14 +71,45 @@ UnknownInputSpent(OutPoint), } -use self::IndexTxError::*; +/// Update mode of [`prepare_indexed_txs_cached`], to ensure the cache is +/// updated correctly. +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub enum PrepareUpdateMode { + /// Txs are being added to the DB, will add them to the cache + Add, + /// Txs are being removed from the DB, will clear the cache + Delete, + /// Txs are only read from the DB, won't update the cache + Read, +} /// Prepare txs of a block which is about to be added/removed from the DB with /// some additional data, either coming from the DB or from within the block. +/// This function works like [`prepare_indexed_txs_cached`] but with the cache +/// disabled. pub fn prepare_indexed_txs<'a>( db: &Db, first_tx_num: TxNum, txs: &'a [Tx], +) -> Result>> { + prepare_indexed_txs_cached( + db, + first_tx_num, + txs, + &mut TxNumCache::default(), + PrepareUpdateMode::Read, + ) +} + +/// Prepare txs of a block which is about to be added/removed from the DB with +/// some additional data, either coming from the DB or from within the block. +/// Uses the provided [`TxNumCache`], and puts the new TxNums into it. +pub fn prepare_indexed_txs_cached<'a>( + db: &Db, + first_tx_num: TxNum, + txs: &'a [Tx], + cache: &mut TxNumCache, + update_mode: PrepareUpdateMode, ) -> Result>> { let mut tx_nums_by_txid = HashMap::with_capacity(txs.len()); for (tx_idx, tx) in txs.iter().enumerate() { @@ -57,7 +118,13 @@ let mut db_txids = BTreeSet::new(); for tx in txs { for tx_input in &tx.inputs { - if !tx_nums_by_txid.contains_key(&&tx_input.prev_out.txid) { + if let Entry::Vacant(entry) = + tx_nums_by_txid.entry(&tx_input.prev_out.txid) + { + if let Some(tx_num) = cache.get(&tx_input.prev_out.txid) { + entry.insert(tx_num); + continue; + } db_txids.insert(&tx_input.prev_out.txid); } } @@ -65,7 +132,8 @@ let tx_reader = TxReader::new(db)?; let db_tx_nums = tx_reader.tx_nums_by_txids(db_txids.iter().copied())?; let db_txids = db_txids.into_iter().collect::>(); - txs.iter() + let index_txs = txs + .iter() .enumerate() .map(|(tx_idx, tx)| { let tx_num = first_tx_num + tx_idx as TxNum; @@ -98,7 +166,60 @@ input_nums, }) }) - .collect::>>() + .collect::>>()?; + match update_mode { + PrepareUpdateMode::Add => cache.add_to_cache(&index_txs), + PrepareUpdateMode::Delete => cache.clear(), + PrepareUpdateMode::Read => {} + } + Ok(index_txs) +} + +impl TxNumCache { + /// Create a [`TxNumCache`] with the given tuning settings. + /// Will allocate all the needed memory up-front. + pub fn new(settings: TxNumCacheSettings) -> TxNumCache { + TxNumCache { + num_buckets: settings.num_buckets, + bucket_size: settings.bucket_size, + buckets: (0..settings.num_buckets) + .map(|_| HashMap::with_capacity(settings.bucket_size)) + .collect(), + } + } + + fn add_to_cache(&mut self, index_txs: &[IndexTx<'_>]) { + if self.num_buckets == 0 { + return; + } + let mut front = self.buckets.front_mut().unwrap(); + for tx in index_txs { + // Bucket at the front is full, get a new empty one + if front.len() >= self.bucket_size { + // Drop off the last bucket, empty it and move it to the front + let mut new_bucket = self.buckets.pop_back().unwrap(); + new_bucket.clear(); + self.buckets.push_front(new_bucket); + front = self.buckets.front_mut().unwrap(); + } + front.insert(tx.tx.txid(), tx.tx_num); + } + } + + fn clear(&mut self) { + for bucket in &mut self.buckets { + bucket.clear() + } + } + + fn get(&self, txid: &TxId) -> Option { + for block in &self.buckets { + if let Some(&tx_num) = block.get(txid) { + return Some(tx_num); + } + } + None + } } #[cfg(test)] @@ -110,7 +231,9 @@ use crate::{ db::Db, - index_tx::{prepare_indexed_txs, IndexTx}, + index_tx::{ + prepare_indexed_txs, IndexTx, TxNumCache, TxNumCacheSettings, + }, io::{BlockTxs, TxEntry, TxNum, TxWriter, TxsMemData}, }; @@ -193,4 +316,106 @@ Ok(()) } + + #[test] + fn test_tx_num_cache() { + let txs = (0..=10) + .map(|tx_num| { + Tx::with_txid(TxId::new([tx_num; 32]), Default::default()) + }) + .collect::>(); + let index_txs = txs + .iter() + .enumerate() + .map(|(tx_num, tx)| IndexTx { + tx_num: tx_num as TxNum, + tx, + input_nums: vec![], + is_coinbase: false, + }) + .collect::>(); + let mut cache = TxNumCache::new(TxNumCacheSettings { + num_buckets: 2, + bucket_size: 3, + }); + assert_eq!(cache.buckets.len(), 2); + for tx in &txs { + assert_eq!(cache.get(tx.txid_ref()), None); + } + + // Add the first tx + cache.add_to_cache(&index_txs[..1]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[0].txid_ref()], 0); + assert_eq!(cache.get(txs[0].txid_ref()), Some(0)); + + // Add three more, filling the next bucket + cache.add_to_cache(&index_txs[1..4]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[3].txid_ref()], 3); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[0].txid_ref()], 0); + assert_eq!(cache.buckets[1][txs[1].txid_ref()], 1); + assert_eq!(cache.buckets[1][txs[2].txid_ref()], 2); + for tx in &index_txs[..4] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Add two more, filling the cache + cache.add_to_cache(&index_txs[4..6]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 3); + assert_eq!(cache.buckets[0][txs[3].txid_ref()], 3); + assert_eq!(cache.buckets[0][txs[4].txid_ref()], 4); + assert_eq!(cache.buckets[0][txs[5].txid_ref()], 5); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[0].txid_ref()], 0); + assert_eq!(cache.buckets[1][txs[1].txid_ref()], 1); + assert_eq!(cache.buckets[1][txs[2].txid_ref()], 2); + for tx in &index_txs[..6] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Adding one more empties the last bucket and moves it front + cache.add_to_cache(&index_txs[6..7]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[6].txid_ref()], 6); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[3].txid_ref()], 3); + assert_eq!(cache.buckets[1][txs[4].txid_ref()], 4); + assert_eq!(cache.buckets[1][txs[5].txid_ref()], 5); + for tx in txs[..3].iter() { + assert_eq!(cache.get(tx.txid_ref()), None); + } + for tx in &index_txs[3..7] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Adding three more again empties the last bucket and moves it front + cache.add_to_cache(&index_txs[7..10]); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 1); + assert_eq!(cache.buckets[0][txs[9].txid_ref()], 9); + assert_eq!(cache.buckets[1].len(), 3); + assert_eq!(cache.buckets[1][txs[6].txid_ref()], 6); + assert_eq!(cache.buckets[1][txs[7].txid_ref()], 7); + assert_eq!(cache.buckets[1][txs[8].txid_ref()], 8); + for tx in txs[..6].iter() { + assert_eq!(cache.get(tx.txid_ref()), None); + } + for tx in &index_txs[6..10] { + assert_eq!(cache.get(tx.tx.txid_ref()), Some(tx.tx_num)); + } + + // Clearing the cache leaves the buckets allocated but empty + cache.clear(); + assert_eq!(cache.buckets.len(), 2); + assert_eq!(cache.buckets[0].len(), 0); + assert_eq!(cache.buckets[0].capacity(), 3); + assert_eq!(cache.buckets[1].len(), 0); + assert_eq!(cache.buckets[1].capacity(), 3); + } } diff --git a/chronik/chronik-db/src/mem/data.rs b/chronik/chronik-db/src/mem/data.rs --- a/chronik/chronik-db/src/mem/data.rs +++ b/chronik/chronik-db/src/mem/data.rs @@ -2,9 +2,12 @@ // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -use crate::io::{ - GroupHistoryMemData, GroupHistoryStats, GroupUtxoMemData, GroupUtxoStats, - SpentByMemData, SpentByStats, TxsMemData, TxsStats, +use crate::{ + index_tx::{TxNumCache, TxNumCacheSettings}, + io::{ + GroupHistoryMemData, GroupHistoryStats, GroupUtxoMemData, + GroupUtxoStats, SpentByMemData, SpentByStats, TxsMemData, TxsStats, + }, }; /// In-memory data for Chronik, e.g. caches, perf statistics. @@ -12,6 +15,8 @@ pub struct MemData { /// In-memory data for indexing txs. pub txs: TxsMemData, + /// In-memory TxNumCache. + pub tx_num_cache: TxNumCache, /// In-memory data for indexing script tx history. pub script_history: GroupHistoryMemData, /// In-memory data for indexing script UTXOs. @@ -35,13 +40,17 @@ /// Config for in-memory data for Chronik. #[derive(Clone, Debug)] -pub struct MemDataConf {} +pub struct MemDataConf { + /// Settings for tuning TxNumCache. + pub tx_num_cache: TxNumCacheSettings, +} impl MemData { /// Create a new [`MemData`] from the given configuration. - pub fn new(_: MemDataConf) -> Self { + pub fn new(conf: MemDataConf) -> Self { MemData { txs: TxsMemData::default(), + tx_num_cache: TxNumCache::new(conf.tx_num_cache), script_history: GroupHistoryMemData::default(), script_utxos: GroupUtxoMemData::default(), spent_by: SpentByMemData::default(), diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -18,7 +18,9 @@ ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, TokenIdGroup, TokenIdGroupAux, TokenIdHistoryWriter, TokenIdUtxoWriter, }, - index_tx::prepare_indexed_txs, + index_tx::{ + prepare_indexed_txs_cached, PrepareUpdateMode, TxNumCacheSettings, + }, io::{ merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData, @@ -56,6 +58,8 @@ pub enable_token_index: bool, /// Whether to output Chronik performance statistics into a perf/ folder pub enable_perf_stats: bool, + /// Settings for tuning TxNumCache. + pub tx_num_cache: TxNumCacheSettings, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. @@ -187,7 +191,9 @@ Ok(ChronikIndexer { db, mempool, - mem_data: MemData::new(MemDataConf {}), + mem_data: MemData::new(MemDataConf { + tx_num_cache: params.tx_num_cache, + }), script_group: ScriptGroup, avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(ScriptGroup)), @@ -362,8 +368,13 @@ &block.block_txs, &mut self.mem_data.txs, )?; - let index_txs = - prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; + let index_txs = prepare_indexed_txs_cached( + &self.db, + first_tx_num, + &block.txs, + &mut self.mem_data.tx_num_cache, + PrepareUpdateMode::Add, + )?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert( @@ -448,8 +459,13 @@ &block.block_txs, &mut self.mem_data.txs, )?; - let index_txs = - prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; + let index_txs = prepare_indexed_txs_cached( + &self.db, + first_tx_num, + &block.txs, + &mut self.mem_data.tx_num_cache, + PrepareUpdateMode::Delete, + )?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete( &mut batch, @@ -511,8 +527,13 @@ let first_tx_num = tx_reader .first_tx_num_by_block(block.db_block.height)? .unwrap(); - let index_txs = - prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; + let index_txs = prepare_indexed_txs_cached( + &self.db, + first_tx_num, + &block.txs, + &mut self.mem_data.tx_num_cache, + PrepareUpdateMode::Read, + )?; let token_id_aux = if self.is_token_index_enabled { TokenIdGroupAux::from_db(&index_txs, &self.db)? } else { @@ -831,6 +852,7 @@ wipe_db: false, enable_token_index: false, enable_perf_stats: false, + tx_num_cache: Default::default(), }; // regtest folder doesn't exist yet -> error assert_eq!( @@ -899,6 +921,7 @@ wipe_db: false, enable_token_index: false, enable_perf_stats: false, + tx_num_cache: Default::default(), }; // Setting up DB first time sets the schema version diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -13,7 +13,7 @@ use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; -use chronik_db::mem::MempoolTx; +use chronik_db::{index_tx::TxNumCacheSettings, mem::MempoolTx}; use chronik_http::server::{ ChronikServer, ChronikServerParams, ChronikSettings, }; @@ -78,6 +78,10 @@ wipe_db: params.wipe_db, enable_token_index: params.enable_token_index, enable_perf_stats: params.enable_perf_stats, + tx_num_cache: TxNumCacheSettings { + bucket_size: params.tx_num_cache.bucket_size, + num_buckets: params.tx_num_cache.num_buckets, + }, })?; indexer.resync_indexer(bridge_ref)?; if bridge.shutdown_requested() { diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs --- a/chronik/chronik-lib/src/ffi.rs +++ b/chronik/chronik-lib/src/ffi.rs @@ -31,6 +31,17 @@ pub enable_perf_stats: bool, /// Duration between WebSocket pings initiated by Chronik. pub ws_ping_interval_secs: u64, + /// Tuning settings for the TxNumCache. + pub tx_num_cache: TxNumCacheSettings, + } + + /// Settings for tuning the TxNumCache. + #[derive(Debug)] + pub struct TxNumCacheSettings { + /// How many buckets are on the belt + pub num_buckets: usize, + /// How many txs are cached in each bucket + pub bucket_size: usize, } extern "Rust" { diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -650,6 +650,26 @@ "Reindex the Chronik indexer from genesis, but leave the " "other indexes untouched", ArgsManager::ALLOW_BOOL, OptionsCategory::CHRONIK); + argsman.AddArg( + "-chroniktxnumcachebuckets", + strprintf( + "Tuning param of the TxNumCache, specifies how many buckets " + "to use on the belt. Caution against setting this too high, " + "it may slow down indexing. Set to 0 to disable. (default: %d)", + chronik::DEFAULT_TX_NUM_CACHE_BUCKETS), + ArgsManager::ALLOW_INT, OptionsCategory::CHRONIK); + argsman.AddArg( + "-chroniktxnumcachebucketsize", + strprintf( + "Tuning param of the TxNumCache, specifies the size of each bucket " + "on the belt. Unlike the number of buckets, this may be increased " + "without much danger of slowing the indexer down. The total cache " + "size will be `num_buckets * bucket_size * 40B`, so by default the " + "cache will require %dkB of memory. (default: %d)", + chronik::DEFAULT_TX_NUM_CACHE_BUCKETS * + chronik::DEFAULT_TX_NUM_CACHE_BUCKET_SIZE * 40 / 1000, + chronik::DEFAULT_TX_NUM_CACHE_BUCKET_SIZE), + ArgsManager::ALLOW_INT, OptionsCategory::CHRONIK); argsman.AddArg("-chronikperfstats", "Output some performance statistics (e.g. num cache hits, " "seconds spent) into a /perf folder. (default: 0)", diff --git a/test/functional/test_framework/util.py b/test/functional/test_framework/util.py --- a/test/functional/test_framework/util.py +++ b/test/functional/test_framework/util.py @@ -461,6 +461,8 @@ f.write(f"port={str(p2p_port(n))}\n") f.write(f"rpcport={str(rpc_port(n))}\n") f.write(f"chronikbind=127.0.0.1:{str(chronik_port(n))}\n") + # Chronik by default is tuned for initial sync, tune it down for regtest + f.write("chroniktxnumcachebucketsize=100\n") f.write("fallbackfee=200\n") f.write("server=1\n") f.write("keypool=1\n")