diff --git a/chronik/chronik-db/src/index_tx.rs b/chronik/chronik-db/src/index_tx.rs --- a/chronik/chronik-db/src/index_tx.rs +++ b/chronik/chronik-db/src/index_tx.rs @@ -4,7 +4,7 @@ //! Module for [`IndexTx`] and [`prepare_indexed_txs`]. -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use abc_rust_error::Result; use bitcoinsuite_core::tx::{OutPoint, Tx}; @@ -54,7 +54,17 @@ for (tx_idx, tx) in txs.iter().enumerate() { tx_nums_by_txid.insert(tx.txid_ref(), first_tx_num + tx_idx as TxNum); } + let mut db_txids = BTreeSet::new(); + for tx in txs { + for tx_input in &tx.inputs { + if !tx_nums_by_txid.contains_key(&&tx_input.prev_out.txid) { + db_txids.insert(&tx_input.prev_out.txid); + } + } + } let tx_reader = TxReader::new(db)?; + let db_tx_nums = tx_reader.tx_nums_by_txids(db_txids.iter().copied())?; + let db_txids = db_txids.into_iter().collect::>(); txs.iter() .enumerate() .map(|(tx_idx, tx)| { @@ -68,9 +78,15 @@ .map(|input| { Ok(match tx_nums_by_txid.get(&input.prev_out.txid) { Some(&tx_num) => tx_num, - None => tx_reader - .tx_num_by_txid(&input.prev_out.txid)? - .ok_or(UnknownInputSpent(input.prev_out))?, + None => { + let tx_num_idx = db_txids + .binary_search(&&input.prev_out.txid) + .map_err(|_| { + UnknownInputSpent(input.prev_out) + })?; + db_tx_nums[tx_num_idx] + .ok_or(UnknownInputSpent(input.prev_out))? + } }) }) .collect::>>()? diff --git a/chronik/chronik-db/src/io/blocks.rs b/chronik/chronik-db/src/io/blocks.rs --- a/chronik/chronik-db/src/io/blocks.rs +++ b/chronik/chronik-db/src/io/blocks.rs @@ -84,6 +84,25 @@ fn get_data(&self, block_height: BlockHeight) -> Result> { self.get_block(block_height) } + + fn get_data_multi( + &self, + block_heights: impl IntoIterator, + ) -> Result>> { + let data_ser = self.db.multi_get( + self.cf_blk, + block_heights.into_iter().map(bh_to_bytes), + false, + )?; + data_ser + .into_iter() + .map(|data_ser| { + data_ser + .map(|data_ser| db_deserialize::(&data_ser)) + .transpose() + }) + .collect::<_>() + } } /// Errors for [`BlockWriter`] and [`BlockReader`]. diff --git a/chronik/chronik-db/src/io/txs.rs b/chronik/chronik-db/src/io/txs.rs --- a/chronik/chronik-db/src/io/txs.rs +++ b/chronik/chronik-db/src/io/txs.rs @@ -192,6 +192,25 @@ fn get_data(&self, tx_num: Self::SerialNum) -> Result> { self.get_tx(tx_num) } + + fn get_data_multi( + &self, + tx_nums: impl IntoIterator, + ) -> Result>> { + let data_ser = self.db.multi_get( + self.cf_tx, + tx_nums.into_iter().map(tx_num_to_bytes), + false, + )?; + data_ser + .into_iter() + .map(|data_ser| { + data_ser + .map(|data_ser| db_deserialize::(&data_ser)) + .transpose() + }) + .collect::<_>() + } } impl<'a> TxColumn<'a> { @@ -379,6 +398,27 @@ } } + /// Batch-read just the [`TxNum`]s of the txs by [`TxId`]s, or [`None`] if + /// not in the DB. + pub fn tx_nums_by_txids<'b, I>( + &self, + txids: I, + ) -> Result>> + where + I: IntoIterator + Clone, + I::IntoIter: Clone, + { + let data = LookupByHash::get_multi( + &self.col, + self.col.db, + txids.into_iter().map(|txid| txid.as_bytes()), + )?; + Ok(data + .into_iter() + .map(|data| data.map(|(tx_num, _)| tx_num)) + .collect()) + } + /// Read the [`BlockTx`] by [`TxNum`], or [`None`] if not in the DB. pub fn tx_by_tx_num(&self, tx_num: TxNum) -> Result> { let Some(ser_tx) = self.col.get_tx(tx_num)? else { diff --git a/chronik/chronik-db/src/reverse_lookup.rs b/chronik/chronik-db/src/reverse_lookup.rs --- a/chronik/chronik-db/src/reverse_lookup.rs +++ b/chronik/chronik-db/src/reverse_lookup.rs @@ -100,6 +100,12 @@ /// Fetch the data from the db using the serial num. fn get_data(&self, number: Self::SerialNum) -> Result>; + + /// Fetch data from the batched db using the serial nums. + fn get_data_multi( + &self, + number: impl IntoIterator, + ) -> Result>>; } #[derive(Debug, Error, PartialEq)] @@ -232,6 +238,50 @@ Ok(None) } + #[allow(clippy::type_complexity)] + pub(crate) fn get_multi<'a>( + lookup_column: &L, + db: &Db, + keys: impl IntoIterator + Clone, + ) -> Result>> { + let cf_index = db.cf(L::CF_INDEX)?; + let serial_lists = db.multi_get( + cf_index, + keys.clone().into_iter().map(L::cheap_hash), + false, + )?; + let serial_lists = serial_lists + .iter() + .map(|serials| match serials { + Some(serials) => db_deserialize::>(serials), + None => Ok(vec![]), + }) + .collect::>>()?; + let mut data_lists = lookup_column.get_data_multi( + serial_lists.iter().flat_map(|s| s.iter().cloned()), + )?; + let mut data_idx = 0; + let mut result = Vec::with_capacity(serial_lists.len()); + for (serial_list, key) in serial_lists.into_iter().zip(keys) { + let mut has_found = false; + for serial in serial_list { + if !has_found { + if let Some(data) = data_lists[data_idx].take() { + if L::data_key(&data) == key { + result.push(Some((serial, data))); + has_found = true; + } + } + } + data_idx += 1; + } + if !has_found { + result.push(None); + } + } + Ok(result) + } + /// Insert into the index. pub(crate) fn insert_pairs<'a>( db: &Db, @@ -365,6 +415,25 @@ None => Ok(None), } } + + fn get_data_multi( + &self, + numbers: impl IntoIterator, + ) -> Result>> { + let data_ser = self.db.multi_get( + self.cf_data, + numbers.into_iter().map(|num| num.to_be_bytes()), + false, + )?; + data_ser + .into_iter() + .map(|data_ser| { + data_ser + .map(|data_ser| db_deserialize::(&data_ser)) + .transpose() + }) + .collect::<_>() + } } #[test] @@ -568,6 +637,15 @@ assert_eq!(expected_num, actual_num); assert_eq!(*expected_data, actual_data); } + let batch_result = Index::get_multi( + &column, + &db, + entries.iter().map(|(_, data)| &data.key), + )?; + assert_eq!( + batch_result, + entries.iter().map(|entry| Some(*entry)).collect::>(), + ); Ok(()) }; @@ -575,6 +653,12 @@ for (_, data) in entries { assert!(Index::get(&column, &db, &data.key)?.is_none()); } + let batch_result = Index::get_multi( + &column, + &db, + entries.iter().map(|(_, data)| &data.key), + )?; + assert_eq!(batch_result, vec![None; entries.len()]); Ok(()) };