diff --git a/chronik/chronik-db/src/index_tx.rs b/chronik/chronik-db/src/index_tx.rs index 477480805..80bb53507 100644 --- a/chronik/chronik-db/src/index_tx.rs +++ b/chronik/chronik-db/src/index_tx.rs @@ -1,180 +1,196 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`IndexTx`] and [`prepare_indexed_txs`]. -use std::collections::HashMap; +use std::collections::{BTreeSet, HashMap}; use abc_rust_error::Result; use bitcoinsuite_core::tx::{OutPoint, Tx}; use thiserror::Error; use crate::{ db::Db, io::{TxNum, TxReader}, }; /// Tx in a block to be added to the index, with prepared data to guide /// indexing. #[derive(Clone, Debug, Eq, PartialEq)] pub struct IndexTx<'a> { /// Tx from the node to be indexed. pub tx: &'a Tx, /// [`TxNum`] of this tx. pub tx_num: TxNum, /// Whether this tx is a coinbase tx. pub is_coinbase: bool, /// [`TxNum`]s of the inputs of the tx. Either references tx from the DB or /// other txs within the new block. /// /// Empty for coinbase txs. pub input_nums: Vec, } /// Error indicating something went wrong with a [`IndexTx`]. #[derive(Debug, Error, PartialEq, Eq)] pub enum IndexTxError { /// An input references a txid which could neither be found in the DB nor /// within the new block. #[error("Unknown input spent: {0:?}")] UnknownInputSpent(OutPoint), } use self::IndexTxError::*; /// Prepare txs of a block which is about to be added/removed from the DB with /// some additional data, either coming from the DB or from within the block. pub fn prepare_indexed_txs<'a>( db: &Db, first_tx_num: TxNum, txs: &'a [Tx], ) -> Result>> { let mut tx_nums_by_txid = HashMap::with_capacity(txs.len()); for (tx_idx, tx) in txs.iter().enumerate() { tx_nums_by_txid.insert(tx.txid_ref(), first_tx_num + tx_idx as TxNum); } + let mut db_txids = BTreeSet::new(); + for tx in txs { + for tx_input in &tx.inputs { + if !tx_nums_by_txid.contains_key(&&tx_input.prev_out.txid) { + db_txids.insert(&tx_input.prev_out.txid); + } + } + } let tx_reader = TxReader::new(db)?; + let db_tx_nums = tx_reader.tx_nums_by_txids(db_txids.iter().copied())?; + let db_txids = db_txids.into_iter().collect::>(); txs.iter() .enumerate() .map(|(tx_idx, tx)| { let tx_num = first_tx_num + tx_idx as TxNum; let is_coinbase = tx_idx == 0; let input_nums = if is_coinbase { vec![] } else { tx.inputs .iter() .map(|input| { Ok(match tx_nums_by_txid.get(&input.prev_out.txid) { Some(&tx_num) => tx_num, - None => tx_reader - .tx_num_by_txid(&input.prev_out.txid)? - .ok_or(UnknownInputSpent(input.prev_out))?, + None => { + let tx_num_idx = db_txids + .binary_search(&&input.prev_out.txid) + .map_err(|_| { + UnknownInputSpent(input.prev_out) + })?; + db_tx_nums[tx_num_idx] + .ok_or(UnknownInputSpent(input.prev_out))? + } }) }) .collect::>>()? }; Ok(IndexTx { tx, tx_num, is_coinbase, input_nums, }) }) .collect::>>() } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::tx::{OutPoint, Tx, TxId, TxInput, TxMut}; use pretty_assertions::assert_eq; use rocksdb::WriteBatch; use crate::{ db::Db, index_tx::{prepare_indexed_txs, IndexTx}, io::{BlockTxs, TxEntry, TxNum, TxWriter, TxsMemData}, }; #[test] fn test_prepare_indexed_txs() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--indexed_txs")?; let db = Db::open(tempdir.path())?; let tx_writer = TxWriter::new(&db)?; let mut txs_mem_data = TxsMemData::default(); let mut batch = WriteBatch::default(); let db_txs = (100..110) .map(|txid_num: u8| TxEntry { txid: TxId::from([txid_num; 32]), ..Default::default() }) .collect::>(); let first_tx_num = db_txs.len() as TxNum; tx_writer.insert( &mut batch, &BlockTxs { txs: db_txs, block_height: 0, }, &mut txs_mem_data, )?; db.write_batch(batch)?; fn make_tx( txid_num: u8, input_txid_nums: [u8; N], ) -> Tx { Tx::with_txid( TxId::from([txid_num; 32]), TxMut { inputs: input_txid_nums .into_iter() .map(|input_txid_num| TxInput { prev_out: OutPoint { txid: TxId::from([input_txid_num; 32]), out_idx: 0, }, ..Default::default() }) .collect(), ..Default::default() }, ) } let new_txs = vec![ make_tx(110, [0]), make_tx(111, [110, 100, 101]), make_tx(112, [111, 109, 103]), ]; assert_eq!( prepare_indexed_txs(&db, first_tx_num, &new_txs)?, vec![ IndexTx { tx: &new_txs[0], tx_num: 10, is_coinbase: true, input_nums: vec![], }, IndexTx { tx: &new_txs[1], tx_num: 11, is_coinbase: false, input_nums: vec![10, 0, 1], }, IndexTx { tx: &new_txs[2], tx_num: 12, is_coinbase: false, input_nums: vec![11, 9, 3], }, ], ); Ok(()) } } diff --git a/chronik/chronik-db/src/io/blocks.rs b/chronik/chronik-db/src/io/blocks.rs index c2c0f692e..e38d2981d 100644 --- a/chronik/chronik-db/src/io/blocks.rs +++ b/chronik/chronik-db/src/io/blocks.rs @@ -1,428 +1,447 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use rocksdb::ColumnFamilyDescriptor; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{Db, CF, CF_BLK, CF_LOOKUP_BLK_BY_HASH}, reverse_lookup::{LookupColumn, ReverseLookup}, ser::{db_deserialize, db_serialize}, }; /// Height of a block in the chain. Genesis block has height 0. /// -1 indicates "not part of the chain". pub type BlockHeight = i32; struct BlockColumn<'a> { db: &'a Db, cf_blk: &'a CF, } /// Writes block data to the database. pub struct BlockWriter<'a> { col: BlockColumn<'a>, } /// Reads block data ([`DbBlock`]) from the database. pub struct BlockReader<'a> { col: BlockColumn<'a>, } /// Block data to/from the database. #[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] pub struct DbBlock { /// Hash of the block. pub hash: BlockHash, /// Hash of the previous block of the block. pub prev_hash: BlockHash, /// Height of the block in the chain. pub height: BlockHeight, /// `nBits` field of the block; encodes the target compactly. pub n_bits: u32, /// Timestamp field of the block. pub timestamp: i64, /// Number of the node's blk?????.dat flat file where the block is stored. pub file_num: u32, /// Location in the flat file where the first byte of the block is stored, /// starting at the header. pub data_pos: u32, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] struct SerBlock { hash: [u8; 32], n_bits: u32, timestamp: i64, file_num: u32, data_pos: u32, } type LookupByHash<'a> = ReverseLookup>; impl LookupColumn for BlockColumn<'_> { type CheapHash = [u8; 4]; type Data = SerBlock; type SerialNum = BlockHeight; const CF_DATA: &'static str = CF_BLK; const CF_INDEX: &'static str = CF_LOOKUP_BLK_BY_HASH; fn cheap_hash(key: &[u8; 32]) -> Self::CheapHash { // use the lowest 32 bits of seashash as hash (seahash::hash(key) as u32).to_be_bytes() } fn data_key(data: &SerBlock) -> &[u8; 32] { &data.hash } fn get_data(&self, block_height: BlockHeight) -> Result> { self.get_block(block_height) } + + fn get_data_multi( + &self, + block_heights: impl IntoIterator, + ) -> Result>> { + let data_ser = self.db.multi_get( + self.cf_blk, + block_heights.into_iter().map(bh_to_bytes), + false, + )?; + data_ser + .into_iter() + .map(|data_ser| { + data_ser + .map(|data_ser| db_deserialize::(&data_ser)) + .transpose() + }) + .collect::<_>() + } } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum BlocksError { /// Block height must be 4 bytes. #[error("Inconsistent DB: Invalid height bytes: {0:?}")] InvalidHeightBytes(Vec), /// Block has no parent. #[error("Inconsistent DB: Orphan block at height {0}")] OrphanBlock(BlockHeight), } use self::BlocksError::*; /// Serialize block height for using it as keys in the DB. /// Big-endian, so blocks are sorted ascendingly. pub(crate) fn bh_to_bytes(height: BlockHeight) -> [u8; 4] { height.to_be_bytes() } /// Deserialize block height from bytes. pub(crate) fn bytes_to_bh(bytes: &[u8]) -> Result { Ok(BlockHeight::from_be_bytes( bytes .try_into() .map_err(|_| InvalidHeightBytes(bytes.to_vec()))?, )) } impl<'a> BlockColumn<'a> { fn new(db: &'a Db) -> Result { let cf_blk = db.cf(CF_BLK)?; db.cf(CF_LOOKUP_BLK_BY_HASH)?; Ok(BlockColumn { db, cf_blk }) } fn get_block(&self, block_height: BlockHeight) -> Result> { match self.db.get(self.cf_blk, bh_to_bytes(block_height))? { Some(bytes) => Ok(Some(db_deserialize::(&bytes)?)), None => Ok(None), } } } impl<'a> BlockWriter<'a> { /// Create writer to the database for blocks. pub fn new(db: &'a Db) -> Result { Ok(BlockWriter { col: BlockColumn::new(db)?, }) } /// Add a new block to the database. pub fn insert( &self, batch: &mut rocksdb::WriteBatch, block: &DbBlock, ) -> Result<()> { // Serialize the block data let data = db_serialize(&SerBlock { hash: block.hash.to_bytes(), n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, })?; batch.put_cf(self.col.cf_blk, bh_to_bytes(block.height), &data); LookupByHash::insert_pairs( self.col.db, batch, [(block.height, &block.hash.to_bytes())], )?; Ok(()) } /// Remove a block by height from the database. pub fn delete( &self, batch: &mut rocksdb::WriteBatch, block: &DbBlock, ) -> Result<()> { batch.delete_cf(self.col.cf_blk, bh_to_bytes(block.height)); LookupByHash::delete_pairs( self.col.db, batch, [(block.height, &block.hash.to_bytes())], )?; Ok(()) } pub(crate) fn add_cfs(columns: &mut Vec) { columns.push(ColumnFamilyDescriptor::new( CF_BLK, rocksdb::Options::default(), )); LookupByHash::add_cfs(columns); } } impl std::fmt::Debug for BlockWriter<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "BlockWriter {{ ... }}") } } impl<'a> BlockReader<'a> { /// Create reader from the database for blocks. pub fn new(db: &'a Db) -> Result { Ok(BlockReader { col: BlockColumn::new(db)?, }) } /// The height of the most-work fully-validated chain. The genesis block has /// height 0 pub fn height(&self) -> Result { let mut iter = self.col.db.iterator_end(self.col.cf_blk); match iter.next() { Some(result) => { let (height_bytes, _) = result?; Ok(bytes_to_bh(&height_bytes)?) } None => Ok(-1), } } /// Tip of the chain: the most recently added block. /// [`None`] if chain is empty (not even the genesis block). pub fn tip(&self) -> Result> { let mut iter = self.col.db.iterator_end(self.col.cf_blk); match iter.next() { Some(result) => { let (height_bytes, block_data) = result?; let height = bytes_to_bh(&height_bytes)?; let block_data = db_deserialize::(&block_data)?; let prev_block_hash = self.get_prev_hash(height)?; Ok(Some(DbBlock { hash: BlockHash::from(block_data.hash), prev_hash: BlockHash::from(prev_block_hash), height, n_bits: block_data.n_bits, timestamp: block_data.timestamp, file_num: block_data.file_num, data_pos: block_data.data_pos, })) } None => Ok(None), } } /// [`DbBlock`] by height. The genesis block has height 0. pub fn by_height(&self, height: BlockHeight) -> Result> { let Some(block_data) = self.col.get_block(height)? else { return Ok(None); }; let prev_block_hash = self.get_prev_hash(height)?; Ok(Some(DbBlock { hash: BlockHash::from(block_data.hash), prev_hash: BlockHash::from(prev_block_hash), height, n_bits: block_data.n_bits, timestamp: block_data.timestamp, file_num: block_data.file_num, data_pos: block_data.data_pos, })) } /// [`DbBlock`] by hash. pub fn by_hash(&self, hash: &BlockHash) -> Result> { let hash = hash.to_bytes(); let (height, ser_block) = match LookupByHash::get(&self.col, self.col.db, &hash)? { Some(data) => data, None => return Ok(None), }; Ok(Some(DbBlock { hash: BlockHash::from(ser_block.hash), prev_hash: BlockHash::from(self.get_prev_hash(height)?), height, n_bits: ser_block.n_bits, timestamp: ser_block.timestamp, file_num: ser_block.file_num, data_pos: ser_block.data_pos, })) } fn get_prev_hash(&self, height: BlockHeight) -> Result<[u8; 32]> { if height == 0 { return Ok([0; 32]); } let prev_block_data = self .col .db .get(self.col.cf_blk, bh_to_bytes(height - 1))? .ok_or(OrphanBlock(height))?; let prev_block = db_deserialize::(&prev_block_data)?; Ok(prev_block.hash) } } impl std::fmt::Debug for BlockReader<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "BlockReader {{ ... }}") } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use crate::{ db::{Db, CF_BLK}, io::{ blocks::SerBlock, BlockReader, BlockWriter, BlocksError, DbBlock, }, ser::{db_deserialize, SerError}, }; #[test] fn test_blocks() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--blocks")?; let db = Db::open(tempdir.path())?; let writer = BlockWriter::new(&db)?; let blocks = BlockReader::new(&db)?; { assert_eq!(blocks.height()?, -1); assert_eq!(blocks.tip()?, None); assert_eq!(blocks.by_height(0)?, None); } let block0 = DbBlock { hash: BlockHash::from([44; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1c100000, timestamp: 1600000000, file_num: 6, data_pos: 100, }; let block1 = DbBlock { hash: BlockHash::from([22; 32]), prev_hash: BlockHash::from([44; 32]), height: 1, n_bits: 0x1c100001, timestamp: 1600000001, file_num: 7, data_pos: 200, }; let cf = db.cf(CF_BLK)?; { let mut batch = rocksdb::WriteBatch::default(); writer.insert(&mut batch, &block0)?; db.write_batch(batch)?; assert_eq!( db_deserialize::( &db.get(cf, [0, 0, 0, 0])?.unwrap() )?, SerBlock { hash: block0.hash.to_bytes(), n_bits: block0.n_bits, timestamp: block0.timestamp, file_num: block0.file_num, data_pos: block0.data_pos, }, ); assert_eq!(blocks.height()?, 0); assert_eq!(blocks.tip()?, Some(block0.clone())); assert_eq!(blocks.by_height(0)?, Some(block0.clone())); assert_eq!(blocks.by_height(1)?, None); } { let mut batch = rocksdb::WriteBatch::default(); writer.insert(&mut batch, &block1)?; db.write_batch(batch)?; assert_eq!( db_deserialize::( &db.get(cf, [0, 0, 0, 1])?.unwrap(), )?, SerBlock { hash: block1.hash.to_bytes(), n_bits: block1.n_bits, timestamp: block1.timestamp, file_num: block1.file_num, data_pos: block1.data_pos, }, ); assert_eq!(blocks.height()?, 1); assert_eq!(blocks.tip()?, Some(block1.clone())); assert_eq!(blocks.by_height(0)?, Some(block0.clone())); assert_eq!(blocks.by_height(1)?, Some(block1.clone())); assert_eq!(blocks.by_height(2)?, None); } { let mut batch = rocksdb::WriteBatch::default(); writer.delete(&mut batch, &block1)?; db.write_batch(batch)?; assert_eq!(db.get(cf, [0, 0, 0, 1])?.as_deref(), None); assert!(db.get(cf, [0, 0, 0, 0])?.is_some()); assert_eq!(blocks.height()?, 0); assert_eq!(blocks.tip()?, Some(block0.clone())); assert_eq!(blocks.by_height(0)?, Some(block0.clone())); assert_eq!(blocks.by_height(1)?, None); } { let mut batch = rocksdb::WriteBatch::default(); writer.delete(&mut batch, &block0)?; db.write_batch(batch)?; assert_eq!(db.get(cf, [0, 0, 0, 1])?.as_deref(), None); assert_eq!(db.get(cf, [0, 0, 0, 0])?.as_deref(), None); assert_eq!(blocks.height()?, -1); assert_eq!(blocks.tip()?, None); assert_eq!(blocks.by_height(0)?, None); } { let mut batch = rocksdb::WriteBatch::default(); batch.put_cf(cf, [0, 0, 0], []); db.write_batch(batch)?; assert_eq!( blocks.height().unwrap_err().downcast::()?, BlocksError::InvalidHeightBytes(vec![0, 0, 0]), ); } { let mut batch = rocksdb::WriteBatch::default(); batch.put_cf(cf, [0, 0, 0, 0], [0xff, 0xff, 0xff]); db.write_batch(batch)?; assert_eq!( blocks.by_height(0).unwrap_err().downcast::()?, SerError::DeserializeError { type_name: "chronik_db::io::blocks::SerBlock", error: postcard::Error::DeserializeUnexpectedEnd, bytes: vec![0xff, 0xff, 0xff], }, ); } Ok(()) } } diff --git a/chronik/chronik-db/src/io/txs.rs b/chronik/chronik-db/src/io/txs.rs index eafd56bb6..d0518a303 100644 --- a/chronik/chronik-db/src/io/txs.rs +++ b/chronik/chronik-db/src/io/txs.rs @@ -1,723 +1,763 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Stores txs from the node in the DB. //! //! Instead of having txids be the keys for the column family containing the //! txs, we use a 64-bit serial number "TxNum" that increments with every //! transaction in block order. This allows us to e.g. very easily iterate over //! all the txs in a block just by knowing the first tx_num of the block. It //! also simplifies the address index (especially reduces space requirements), //! as we simply store a list of relatively small integers instead of txids. //! //! 64-bits allows us to store a maximum of 18446744073709551616 txs, which even //! at 1M tx/s would be enough for +500000 years. //! //! We only store the `txid`, `data_pos`, `undo_pos` and `time_first_seen` in //! the DB, everything else we can read from the block/undo files. We use the //! fact that coinbase txs don't have undo data, and undo data for txs never is //! at position 0, so we set `undo_pos = 0` for coinbase txs, and treat every //! entry with `undo_pos == 0` as a coinbase tx. //! //! For the reverse index txid -> tx_num, we use `ReverseLookup`. We use a //! 64-bit cheap hash to make collisions rare/difficult. use std::{ops::Range, time::Instant}; use abc_rust_error::Result; use bitcoinsuite_core::tx::TxId; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{ Db, CF, CF_BLK_BY_FIRST_TX, CF_FIRST_TX_BY_BLK, CF_LOOKUP_TX_BY_HASH, CF_TX, }, io::{bh_to_bytes, bytes_to_bh, BlockHeight}, reverse_lookup::{LookupColumn, ReverseLookup}, ser::{db_deserialize, db_serialize}, }; type LookupByHash<'a> = ReverseLookup>; /// Number that uniquely identifies a tx in the blockchain. /// Transactions are enumerated in the exact order they appear in the /// blockchain. This looks like this: /// /// * - 0 (coinbase of genesis) /// - 1 (first non-coinbase tx in genesis block), /// - ... /// - N-1 (last tx in genesis block) /// * - N (coinbase of second block) /// - N+1 (first non-coinbase tx in second block), /// - ... /// - M-1 (last tx in second block) /// * - M (coinbase of third block) /// - M + 1 (first non-coinbase tx in third block), /// - etc. /// /// With CTOR, the non-coinbase txs within blocks are sorted in order of txid. pub type TxNum = u64; /// Entry of a tx in the DB. /// /// Instead of storing tx data directly, we utilize the fact that the node /// already stores the block and undo data, and just store positions into these /// files. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxEntry { /// TxId of the tx. pub txid: TxId, /// Position of the tx data in the tx's block's block file. pub data_pos: u32, /// Position of the tx undo data in the tx's block's undo file. pub undo_pos: u32, /// When this tx has first been seen in the mempool. pub time_first_seen: i64, /// Whether this is a coinbase tx. pub is_coinbase: bool, } /// Tx from the DB with height. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockTx { /// Tx data. pub entry: TxEntry, /// Height of block of the tx. pub block_height: BlockHeight, } /// Txs of a block, bundled in one struct so we can add it easily to the DB. #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct BlockTxs { /// Tx data. pub txs: Vec, /// Height of the block of the txs. pub block_height: BlockHeight, } /// In-memory data for the tx history. #[derive(Debug, Default)] pub struct TxsMemData { /// Stats about cache hits, num requests etc. pub stats: TxsStats, } /// Stats about cache hits, num requests etc. #[derive(Clone, Debug, Default)] pub struct TxsStats { /// Total number of txs updated. pub n_total: usize, /// Time [s] for insert/delete. pub t_total: f64, /// Time [s] for indexing txs. pub t_index: f64, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] struct SerTx { txid: [u8; 32], data_pos: u32, undo_pos: u32, // == 0 <==> is_coinbase == true time_first_seen: i64, } struct TxColumn<'a> { db: &'a Db, cf_tx: &'a CF, cf_blk_by_first_tx: &'a CF, cf_first_tx_by_blk: &'a CF, } /// Write [`BlockTxs`] to the DB. pub struct TxWriter<'a> { col: TxColumn<'a>, } /// Read [`BlockTx`]s from the DB. pub struct TxReader<'a> { col: TxColumn<'a>, } /// Error indicating something went wrong with the tx index. #[derive(Debug, Error, PartialEq, Eq)] pub enum TxsError { /// TxNum must be 8 bytes. #[error("Inconsistent DB: Invalid tx_num bytes: {0:?}")] InvalidTxNumBytes(Vec), /// A block without txs is not valid. #[error("Inconsistent DB: Txs for block {0} not found")] NoTxsForBlock(BlockHeight), /// A tx must always have a block. #[error("Inconsistent DB: Block for tx {0} not found")] TxWithoutBlock(TxNum), } use self::TxsError::*; pub(crate) fn tx_num_to_bytes(tx_num: TxNum) -> [u8; 8] { // big-endian, so txs are sorted ascendingly tx_num.to_be_bytes() } pub(crate) fn bytes_to_tx_num(bytes: &[u8]) -> Result { Ok(TxNum::from_be_bytes( bytes .try_into() .map_err(|_| InvalidTxNumBytes(bytes.to_vec()))?, )) } impl<'a> LookupColumn for TxColumn<'a> { type CheapHash = [u8; 8]; type Data = SerTx; type SerialNum = TxNum; const CF_DATA: &'static str = CF_TX; const CF_INDEX: &'static str = CF_LOOKUP_TX_BY_HASH; fn cheap_hash(key: &[u8; 32]) -> Self::CheapHash { seahash::hash(key).to_be_bytes() } fn data_key(value: &Self::Data) -> &[u8; 32] { &value.txid } fn get_data(&self, tx_num: Self::SerialNum) -> Result> { self.get_tx(tx_num) } + + fn get_data_multi( + &self, + tx_nums: impl IntoIterator, + ) -> Result>> { + let data_ser = self.db.multi_get( + self.cf_tx, + tx_nums.into_iter().map(tx_num_to_bytes), + false, + )?; + data_ser + .into_iter() + .map(|data_ser| { + data_ser + .map(|data_ser| db_deserialize::(&data_ser)) + .transpose() + }) + .collect::<_>() + } } impl<'a> TxColumn<'a> { fn new(db: &'a Db) -> Result { let cf_tx = db.cf(CF_TX)?; let cf_blk_by_first_tx = db.cf(CF_BLK_BY_FIRST_TX)?; let cf_first_tx_by_blk = db.cf(CF_FIRST_TX_BY_BLK)?; db.cf(CF_LOOKUP_TX_BY_HASH)?; Ok(TxColumn { db, cf_tx, cf_blk_by_first_tx, cf_first_tx_by_blk, }) } fn get_tx(&self, tx_num: TxNum) -> Result> { match self.db.get(self.cf_tx, tx_num_to_bytes(tx_num))? { Some(bytes) => Ok(Some(db_deserialize::(&bytes)?)), None => Ok(None), } } } impl<'a> TxWriter<'a> { /// Create a new [`TxWriter`]. pub fn new(db: &'a Db) -> Result { Ok(TxWriter { col: TxColumn::new(db)?, }) } /// Insert and index the txs into the DB. pub fn insert( &self, batch: &mut WriteBatch, block_txs: &BlockTxs, mem_data: &mut TxsMemData, ) -> Result { let stats = &mut mem_data.stats; let t_start = Instant::now(); stats.n_total += block_txs.txs.len(); let mut last_tx_num_iterator = self.col.db.iterator_end(self.col.cf_tx); let first_new_tx = match last_tx_num_iterator.next() { Some(result) => { let (tx_num, _) = result?; bytes_to_tx_num(&tx_num)? + 1 } None => 0, }; batch.put_cf( self.col.cf_blk_by_first_tx, tx_num_to_bytes(first_new_tx), bh_to_bytes(block_txs.block_height), ); batch.put_cf( self.col.cf_first_tx_by_blk, bh_to_bytes(block_txs.block_height), tx_num_to_bytes(first_new_tx), ); let mut index_pairs = Vec::with_capacity(block_txs.txs.len()); let mut next_tx_num = first_new_tx; for tx in &block_txs.txs { let ser_tx = SerTx::from(tx); batch.put_cf( self.col.cf_tx, tx_num_to_bytes(next_tx_num), db_serialize(&ser_tx)?, ); index_pairs.push((next_tx_num, tx.txid.as_bytes())); next_tx_num += 1; } let t_index = Instant::now(); LookupByHash::insert_pairs(self.col.db, batch, index_pairs)?; stats.t_index += t_index.elapsed().as_secs_f64(); stats.t_total += t_start.elapsed().as_secs_f64(); Ok(first_new_tx) } /// Delete and unindex the txs from the DB. pub fn delete( &self, batch: &mut WriteBatch, block_txs: &BlockTxs, mem_data: &mut TxsMemData, ) -> Result { let stats = &mut mem_data.stats; let t_start = Instant::now(); stats.n_total += block_txs.txs.len(); let first_tx_num = self .col .db .get( self.col.cf_first_tx_by_blk, bh_to_bytes(block_txs.block_height), )? .ok_or(NoTxsForBlock(block_txs.block_height))?; let first_tx_num = bytes_to_tx_num(&first_tx_num)?; let mut next_tx_num = first_tx_num; let mut index_pairs = Vec::with_capacity(block_txs.txs.len()); for tx in &block_txs.txs { batch.delete_cf(self.col.cf_tx, tx_num_to_bytes(next_tx_num)); index_pairs.push((next_tx_num, tx.txid.as_bytes())); next_tx_num += 1; } batch.delete_cf( self.col.cf_blk_by_first_tx, tx_num_to_bytes(first_tx_num), ); batch.delete_cf( self.col.cf_first_tx_by_blk, bh_to_bytes(block_txs.block_height), ); let t_index = Instant::now(); LookupByHash::delete_pairs(self.col.db, batch, index_pairs)?; stats.t_index += t_index.elapsed().as_secs_f64(); stats.t_total += t_start.elapsed().as_secs_f64(); Ok(first_tx_num) } /// Add the column families used for txs. pub(crate) fn add_cfs(columns: &mut Vec) { columns.push(ColumnFamilyDescriptor::new(CF_TX, Options::default())); columns.push(ColumnFamilyDescriptor::new( CF_BLK_BY_FIRST_TX, Options::default(), )); columns.push(ColumnFamilyDescriptor::new( CF_FIRST_TX_BY_BLK, Options::default(), )); LookupByHash::add_cfs(columns); } } impl std::fmt::Debug for TxWriter<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "TxWriter {{ ... }}") } } impl<'a> TxReader<'a> { /// Create a new [`TxReader`]. pub fn new(db: &'a Db) -> Result { Ok(TxReader { col: TxColumn::new(db)?, }) } /// Read a tx by txid, or None if not in the DB. pub fn tx_by_txid(&self, txid: &TxId) -> Result> { match self.tx_and_num_by_txid(txid)? { Some((_, block_tx)) => Ok(Some(block_tx)), None => Ok(None), } } /// Read a [`BlockTx`] and its [`TxNum`] by [`TxId`], or [`None`] if not in /// the DB. pub fn tx_and_num_by_txid( &self, txid: &TxId, ) -> Result> { let (tx_num, ser_tx) = match LookupByHash::get(&self.col, self.col.db, txid.as_bytes())? { Some(tuple) => tuple, None => return Ok(None), }; let block_height = self.block_height_by_tx_num(tx_num)?; Ok(Some(( tx_num, BlockTx { entry: TxEntry::from(ser_tx), block_height, }, ))) } /// Read just the [`TxNum`] of the tx by [`TxId`], or [`None`] if not in the /// DB. This is faster than [`TxReader::tx_and_num_by_txid`]. pub fn tx_num_by_txid(&self, txid: &TxId) -> Result> { match LookupByHash::get(&self.col, self.col.db, txid.as_bytes())? { Some((tx_num, _)) => Ok(Some(tx_num)), None => Ok(None), } } + /// Batch-read just the [`TxNum`]s of the txs by [`TxId`]s, or [`None`] if + /// not in the DB. + pub fn tx_nums_by_txids<'b, I>( + &self, + txids: I, + ) -> Result>> + where + I: IntoIterator + Clone, + I::IntoIter: Clone, + { + let data = LookupByHash::get_multi( + &self.col, + self.col.db, + txids.into_iter().map(|txid| txid.as_bytes()), + )?; + Ok(data + .into_iter() + .map(|data| data.map(|(tx_num, _)| tx_num)) + .collect()) + } + /// Read the [`BlockTx`] by [`TxNum`], or [`None`] if not in the DB. pub fn tx_by_tx_num(&self, tx_num: TxNum) -> Result> { let Some(ser_tx) = self.col.get_tx(tx_num)? else { return Ok(None); }; let block_height = self.block_height_by_tx_num(tx_num)?; Ok(Some(BlockTx { entry: TxEntry::from(ser_tx), block_height, })) } /// Read just the [`TxId`] of the tx by [`TxNum`], or [`None`] if not in the /// DB. This is faster than [`TxReader::tx_and_num_by_txid`]. pub fn txid_by_tx_num(&self, tx_num: TxNum) -> Result> { let Some(ser_tx) = self.col.get_tx(tx_num)? else { return Ok(None); }; Ok(Some(TxId::from(ser_tx.txid))) } /// Read the first [`TxNum`] of a [`BlockHeight`], or [`None`] if not in the /// DB. This is useful for getting all the txs in a block, by iterating /// between this (inclusive) and the next block's first tx_num (exclusive), /// we get all tx nums of the block. pub fn first_tx_num_by_block( &self, block_height: BlockHeight, ) -> Result> { match self .col .db .get(self.col.cf_first_tx_by_blk, bh_to_bytes(block_height))? { Some(first_tx_num) => Ok(Some(bytes_to_tx_num(&first_tx_num)?)), None => Ok(None), } } /// Return the range of [`TxNum`]s of the block, or None if the block /// doesn't exist. pub fn block_tx_num_range( &self, block_height: BlockHeight, ) -> Result>> { let tx_num_start = match self.first_tx_num_by_block(block_height)? { Some(tx_num) => tx_num, None => return Ok(None), }; let tx_num_end = match self.first_tx_num_by_block(block_height + 1)? { Some(first_tx_num_next) => first_tx_num_next, None => match self.last_tx_num()? { Some(last_tx_num) => last_tx_num + 1, None => return Err(NoTxsForBlock(block_height).into()), }, }; Ok(Some(tx_num_start..tx_num_end)) } /// Read the last [`TxNum`] of the DB. /// This is useful when iterating over the txs of the last block. pub fn last_tx_num(&self) -> Result> { let mut iter = self.col.db.iterator_end(self.col.cf_tx); match iter.next() { Some(result) => { let (key, _) = result?; Ok(Some(bytes_to_tx_num(&key)?)) } None => Ok(None), } } /// Read the block height the tx_num has. Err if not found. fn block_height_by_tx_num(&self, tx_num: TxNum) -> Result { let mut iter = self.col.db.iterator( self.col.cf_blk_by_first_tx, &tx_num_to_bytes(tx_num), rocksdb::Direction::Reverse, ); match iter.next() { Some(result) => { let (_, block_height) = result?; bytes_to_bh(&block_height) } None => Err(TxWithoutBlock(tx_num).into()), } } } impl std::fmt::Debug for TxReader<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "TxReader {{ ... }}") } } impl From<&'_ TxEntry> for SerTx { fn from(value: &TxEntry) -> Self { SerTx { txid: value.txid.to_bytes(), data_pos: value.data_pos, undo_pos: value.undo_pos, time_first_seen: value.time_first_seen, } } } impl From for TxEntry { fn from(value: SerTx) -> Self { TxEntry { txid: TxId::from(value.txid), data_pos: value.data_pos, undo_pos: value.undo_pos, time_first_seen: value.time_first_seen, is_coinbase: value.undo_pos == 0, } } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::tx::TxId; use pretty_assertions::assert_eq; use rocksdb::WriteBatch; use crate::{ db::Db, io::{BlockTx, BlockTxs, TxEntry, TxReader, TxWriter, TxsMemData}, }; #[test] fn test_insert_txs() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--txs")?; let db = Db::open(tempdir.path())?; let tx_writer = TxWriter::new(&db)?; let tx_reader = TxReader::new(&db)?; let mut mem_data = TxsMemData::default(); let tx1 = TxEntry { txid: TxId::from([1; 32]), data_pos: 100, undo_pos: 0, time_first_seen: 123456, is_coinbase: true, }; let block_tx1 = BlockTx { entry: tx1.clone(), block_height: 0, }; let block1 = BlockTxs { block_height: 0, txs: vec![tx1], }; assert_eq!(tx_reader.last_tx_num()?, None); assert_eq!(tx_reader.block_tx_num_range(0)?, None); { // insert genesis tx let mut batch = WriteBatch::default(); assert_eq!( tx_writer.insert(&mut batch, &block1, &mut mem_data)?, 0 ); db.write_batch(batch)?; let tx_reader = TxReader::new(&db)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, None); assert_eq!(tx_reader.last_tx_num()?, Some(0)); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.tx_by_txid(&TxId::from([0; 32]))?, None); assert_eq!(tx_reader.tx_num_by_txid(&TxId::from([0; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1.clone()), ); assert_eq!(tx_reader.tx_by_tx_num(0)?, Some(block_tx1.clone())); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([1; 32]))?, Some(0), ); } let tx2 = TxEntry { txid: TxId::from([2; 32]), data_pos: 200, undo_pos: 20, time_first_seen: 234567, is_coinbase: false, }; let block_tx2 = BlockTx { entry: tx2.clone(), block_height: 1, }; let tx3 = TxEntry { txid: TxId::from([3; 32]), data_pos: 300, undo_pos: 30, time_first_seen: 345678, is_coinbase: false, }; let block_tx3 = BlockTx { entry: tx3.clone(), block_height: 1, }; let block2 = BlockTxs { block_height: 1, txs: vec![tx2, tx3], }; { // insert 2 more txs let mut batch = WriteBatch::default(); assert_eq!( tx_writer.insert(&mut batch, &block2, &mut mem_data)?, 1 ); db.write_batch(batch)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, Some(1)); assert_eq!(tx_reader.first_tx_num_by_block(2)?, None); assert_eq!(tx_reader.last_tx_num()?, Some(2)); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.block_tx_num_range(1)?, Some(1..3)); assert_eq!(tx_reader.tx_by_txid(&TxId::from([0; 32]))?, None); assert_eq!(tx_reader.tx_num_by_txid(&TxId::from([0; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([1; 32]))?, Some(0), ); assert_eq!(tx_reader.tx_by_tx_num(0)?, Some(block_tx1.clone())); assert_eq!( tx_reader.tx_by_txid(&TxId::from([2; 32]))?, Some(block_tx2.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([2; 32]))?, Some(1), ); assert_eq!(tx_reader.tx_by_tx_num(1)?, Some(block_tx2)); assert_eq!( tx_reader.tx_by_txid(&TxId::from([3; 32]))?, Some(block_tx3.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([3; 32]))?, Some(2), ); assert_eq!(tx_reader.tx_by_tx_num(2)?, Some(block_tx3)); } { // delete latest block let mut batch = WriteBatch::default(); assert_eq!( tx_writer.delete(&mut batch, &block2, &mut mem_data)?, 1 ); db.write_batch(batch)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, None); assert_eq!(tx_reader.last_tx_num()?, Some(0)); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.block_tx_num_range(1)?, None); assert_eq!(tx_reader.tx_by_txid(&TxId::from([0; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1.clone()), ); assert_eq!(tx_reader.tx_by_tx_num(0)?, Some(block_tx1.clone())); assert_eq!(tx_reader.tx_by_txid(&TxId::from([2; 32]))?, None); assert_eq!(tx_reader.tx_by_tx_num(1)?, None); assert_eq!(tx_reader.tx_by_txid(&TxId::from([3; 32]))?, None); assert_eq!(tx_reader.tx_by_tx_num(2)?, None); } let tx2 = TxEntry { txid: TxId::from([102; 32]), data_pos: 200, undo_pos: 20, time_first_seen: 234567, is_coinbase: false, }; let block_tx2 = BlockTx { entry: tx2.clone(), block_height: 1, }; let tx3 = TxEntry { txid: TxId::from([103; 32]), data_pos: 300, undo_pos: 30, time_first_seen: 345678, is_coinbase: false, }; let block_tx3 = BlockTx { entry: tx3.clone(), block_height: 1, }; let block3 = BlockTxs { block_height: 1, txs: vec![tx2, tx3], }; { // Add reorg block let mut batch = WriteBatch::default(); assert_eq!( tx_writer.insert(&mut batch, &block3, &mut mem_data)?, 1 ); db.write_batch(batch)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, Some(1)); assert_eq!(tx_reader.first_tx_num_by_block(2)?, None); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.block_tx_num_range(1)?, Some(1..3)); assert_eq!(tx_reader.block_tx_num_range(2)?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1), ); assert_eq!(tx_reader.tx_by_txid(&TxId::from([2; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([102; 32]))?, Some(block_tx2.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([102; 32]))?, Some(1), ); assert_eq!(tx_reader.tx_by_tx_num(1)?, Some(block_tx2)); assert_eq!( tx_reader.tx_by_txid(&TxId::from([103; 32]))?, Some(block_tx3.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([103; 32]))?, Some(2), ); assert_eq!(tx_reader.tx_by_tx_num(2)?, Some(block_tx3)); } Ok(()) } } diff --git a/chronik/chronik-db/src/reverse_lookup.rs b/chronik/chronik-db/src/reverse_lookup.rs index 7df1adf71..02c38f3c2 100644 --- a/chronik/chronik-db/src/reverse_lookup.rs +++ b/chronik/chronik-db/src/reverse_lookup.rs @@ -1,601 +1,685 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`LookupColumn`] and [`ReverseLookup`]. //! //! Allows reverse-looking up data that has been indexed using a serial number. //! //! Say you have the following column: //! //! `SerialNum -> Data` //! //! This could be mapping: //! - `BlockHeight -> SerBlock`, where `SerBlock` includes `BlockHash` and some //! header data). //! - `TxNum -> SerTx`, where `SerTx` includes `TxId` and some other data. //! //! `Data` contains a hash `[u8; 32]` that you might look up from. //! //! This would be the `BlockHash` for a `BlockHeight -> BlockMeta` column and //! `TxId` for a `TxNum -> TxMeta` column. //! //! We *could* simply add a table `[u8; 32] -> SerialNum`, e.g. `TxId -> TxNum`, //! but this would redundantly store the hash of the tx twice. //! //! Instead, we store: //! //! `CheapHash -> [SerialNum]` //! //! The `[]` indicate a list of `SerialNum`s, as the hash function for //! `CheapHash` is not cryptographically secure and only outputs 4-8 bytes. //! It therefore is expected to occasionally have collisions, which are tracked //! in a list. //! //! Then, you can use [`ReverseLookup`] to maintain an index from `[u8; 32]` to //! `SerialNum` compactly. //! //! To resolve collisions during lookup from `[u8; 32]`: //! 1. Hash `[u8; 32]` to get `CheapHash`. //! 2. Lookup the matching `SerialNum`s in the index and iterate over them. //! 3. Lookup the `Data` in the original `SerialNum -> Data` table. //! 4. Test whether `Data` has the queried `[u8; 32]`, and return that. //! 5. Otherwise, `Key` is not part of the database. use std::{ collections::{BTreeSet, HashMap}, fmt::{Debug, Display}, hash::Hash, marker::PhantomData, }; use abc_rust_error::Result; use rocksdb::{ColumnFamilyDescriptor, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{Db, CF}, io::merge::catch_merge_errors, reverse_lookup::IndexError::*, ser::{db_deserialize, db_serialize}, }; /// Struct for maintaining a reverse lookup index. /// You cannot construct this; you should use a typedef: /// /// `type MyIndex = ReverseLookup` /// /// Then you can use the associated functions like this: /// /// `MyIndex::insert_pairs(db, batch, [(1234, &[123; 32])])?;` pub(crate) struct ReverseLookup(PhantomData); /// Trait providing the data to build a reverse lookup index. pub(crate) trait LookupColumn { /// Number uniquely identifying `Value`s, e.g. `BlockHeight` or `TxNum`. type SerialNum: Copy + for<'a> Deserialize<'a> + Display + Ord + Serialize + 'static; /// A short hash, compacting keys of type `[u8; 32]`. type CheapHash: AsRef<[u8]> + Eq + Hash; /// Data stored in the column, e.g. `SerBlock`. type Data; /// Name of the `SerialNum -> Value` CF. const CF_DATA: &'static str; /// Name of the `CheapHash -> [SerialNum]` CF. const CF_INDEX: &'static str; /// Calculate a short `CheapHash` from the given key. fn cheap_hash(key: &[u8; 32]) -> Self::CheapHash; /// Extract the key to index by from the data. fn data_key(value: &Self::Data) -> &[u8; 32]; /// Fetch the data from the db using the serial num. fn get_data(&self, number: Self::SerialNum) -> Result>; + + /// Fetch data from the batched db using the serial nums. + fn get_data_multi( + &self, + number: impl IntoIterator, + ) -> Result>>; } #[derive(Debug, Error, PartialEq)] pub(crate) enum IndexError { #[error( "Inconsistent DB: Lookup index {cf_data_name} contains {serial_str}, \ but value column {cf_index_name} doesn't" )] NotInDataColumn { serial_str: String, cf_data_name: &'static str, cf_index_name: &'static str, }, #[error( "Inconsistent DB: Tried inserting {serial_str} into {cf_index_name}, \ but value already exists" )] SerialNumAlreadyExists { serial_str: String, cf_index_name: &'static str, }, } impl IndexError { fn not_in_data_column(serial: L::SerialNum) -> IndexError { NotInDataColumn { serial_str: serial.to_string(), cf_data_name: L::CF_DATA, cf_index_name: L::CF_INDEX, } } } fn partial_merge_ordered_list( _key: &[u8], _existing_value: Option<&[u8]>, _operands: &rocksdb::MergeOperands, ) -> Option> { // We don't use partial merge None } fn init_ordered_list Deserialize<'a>>( _key: &[u8], existing_value: Option<&[u8]>, operands: &rocksdb::MergeOperands, ) -> Result> { let mut nums = match existing_value { Some(num) => db_deserialize::>(num)?, None => vec![], }; nums.reserve_exact(operands.len()); Ok(nums) } fn apply_ordered_list Deserialize<'a> + Display + Ord>( cf_index_name: &'static str, nums: &mut Vec, operand: &[u8], ) -> Result<()> { let num = db_deserialize::(operand)?; match nums.binary_search(&num) { Ok(_) => { return Err(SerialNumAlreadyExists { serial_str: num.to_string(), cf_index_name, } .into()) } Err(insert_idx) => nums.insert(insert_idx, num), } Ok(()) } fn ser_ordered_list( _key: &[u8], nums: Vec, ) -> Result> { db_serialize::>(&nums) } impl ReverseLookup { /// Add the cfs required by the reverse lookup index. pub(crate) fn add_cfs(columns: &mut Vec) { let mut options = rocksdb::Options::default(); let merge_op_name = format!("{}::merge_ordered_list", L::CF_INDEX); options.set_merge_operator( merge_op_name.as_str(), catch_merge_errors::>( init_ordered_list::, |_, nums, operand| { apply_ordered_list(L::CF_INDEX, nums, operand) }, ser_ordered_list::, ), partial_merge_ordered_list, ); columns.push(ColumnFamilyDescriptor::new(L::CF_INDEX, options)); } /// Read by key from the DB using the index. pub(crate) fn get( lookup_column: &L, db: &Db, key: &[u8; 32], ) -> Result> { let cf_index = db.cf(L::CF_INDEX)?; let cheap_hash = L::cheap_hash(key); // Lookup CheapHash -> [SerialNum] let serials = match db.get(cf_index, cheap_hash)? { Some(serials) => serials, None => return Ok(None), }; // Iterate serials, read each's data, find the one with the given key. // // This could in theory be a DoS attack by purposefully having the // indexer index lots of colliding keys, however, since keys are // expected to use a cryptographically secure hash function (e.g. // SHA-256), it will be expensive to find data that actually collides. for serial in db_deserialize::>(&serials)? { let value = lookup_column .get_data(serial)? .ok_or_else(|| IndexError::not_in_data_column::(serial))?; if L::data_key(&value) == key { return Ok(Some((serial, value))); } } // We have a key that collides with others but no actual match Ok(None) } + #[allow(clippy::type_complexity)] + pub(crate) fn get_multi<'a>( + lookup_column: &L, + db: &Db, + keys: impl IntoIterator + Clone, + ) -> Result>> { + let cf_index = db.cf(L::CF_INDEX)?; + let serial_lists = db.multi_get( + cf_index, + keys.clone().into_iter().map(L::cheap_hash), + false, + )?; + let serial_lists = serial_lists + .iter() + .map(|serials| match serials { + Some(serials) => db_deserialize::>(serials), + None => Ok(vec![]), + }) + .collect::>>()?; + let mut data_lists = lookup_column.get_data_multi( + serial_lists.iter().flat_map(|s| s.iter().cloned()), + )?; + let mut data_idx = 0; + let mut result = Vec::with_capacity(serial_lists.len()); + for (serial_list, key) in serial_lists.into_iter().zip(keys) { + let mut has_found = false; + for serial in serial_list { + if !has_found { + if let Some(data) = data_lists[data_idx].take() { + if L::data_key(&data) == key { + result.push(Some((serial, data))); + has_found = true; + } + } + } + data_idx += 1; + } + if !has_found { + result.push(None); + } + } + Ok(result) + } + /// Insert into the index. pub(crate) fn insert_pairs<'a>( db: &Db, batch: &mut WriteBatch, pairs: impl IntoIterator, ) -> Result<()> { let cf_index = db.cf(L::CF_INDEX)?; // Use merge_cf to insert serials into the cheap hashes of the keys for (serial, key) in pairs { let cheap_hash = L::cheap_hash(key); batch.merge_cf(cf_index, cheap_hash, db_serialize(&serial)?); } Ok(()) } /// Delete from the index. pub(crate) fn delete_pairs<'a>( db: &Db, batch: &mut WriteBatch, pairs: impl IntoIterator, ) -> Result<()> { let cf_index = db.cf(L::CF_INDEX)?; let mut new_entries = HashMap::>::new(); for (serial, key) in pairs { let serials = Self::get_or_fetch(db, cf_index, &mut new_entries, key)?; if !serials.remove(&serial) { return Err(IndexError::not_in_data_column::(serial).into()); } } for (key, serials) in new_entries { if serials.is_empty() { // Delete the entry from the DB if it doesn't contain any // serials anymore batch.delete_cf(cf_index, key); } else { // Otherwise, override entry with only the remaining serials let serials = db_serialize(&Vec::from_iter(serials))?; batch.put_cf(cf_index, key, &serials); } } Ok(()) } /// Query from `new_entries`, or from DB and then store in `new_entries`. fn get_or_fetch<'a>( db: &Db, index_cf: &CF, new_entries: &'a mut HashMap>, key: &[u8; 32], ) -> Result<&'a mut BTreeSet> { use std::collections::hash_map::Entry; let cheap_hash = L::cheap_hash(key); match new_entries.entry(cheap_hash) { Entry::Occupied(entry) => Ok(entry.into_mut()), Entry::Vacant(entry) => { let serials = match db.get(index_cf, entry.key().as_ref())? { Some(serials) => { db_deserialize::>(&serials)? } None => vec![], }; Ok(entry.insert(BTreeSet::from_iter(serials))) } } } } #[cfg(test)] mod tests { use std::fmt::Debug; use abc_rust_error::Result; use pretty_assertions::assert_eq; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch}; use serde::{Deserialize, Serialize}; use crate::{ db::{Db, CF}, reverse_lookup::{LookupColumn, ReverseLookup}, ser::{db_deserialize, db_serialize}, }; #[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)] #[repr(C)] struct TestData { key: [u8; 32], payload: u32, } struct TestColumn<'a> { cf_data: &'a CF, db: &'a Db, } type Index<'a> = ReverseLookup>; impl Debug for TestColumn<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "TestColumn {{ .. }}") } } const CF_TEST: &str = "test"; const CF_TEST_INDEX: &str = "test_index"; impl<'a> LookupColumn for TestColumn<'a> { type CheapHash = [u8; 1]; type Data = TestData; type SerialNum = u32; const CF_DATA: &'static str = CF_TEST; const CF_INDEX: &'static str = CF_TEST_INDEX; fn cheap_hash(key: &[u8; 32]) -> [u8; 1] { // "Hash" here simply takes the mod 10, giving us lots of collisions [key[0] % 10] } fn data_key(data: &Self::Data) -> &[u8; 32] { &data.key } fn get_data( &self, number: Self::SerialNum, ) -> Result> { match self.db.get(self.cf_data, number.to_be_bytes())? { Some(bytes) => Ok(Some(db_deserialize(&bytes)?)), None => Ok(None), } } + + fn get_data_multi( + &self, + numbers: impl IntoIterator, + ) -> Result>> { + let data_ser = self.db.multi_get( + self.cf_data, + numbers.into_iter().map(|num| num.to_be_bytes()), + false, + )?; + data_ser + .into_iter() + .map(|data_ser| { + data_ser + .map(|data_ser| db_deserialize::(&data_ser)) + .transpose() + }) + .collect::<_>() + } } #[test] fn test_reverse_lookup() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--lookup")?; let mut cfs = vec![ColumnFamilyDescriptor::new(CF_TEST, Options::default())]; Index::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let cf_data = db.cf(CF_TEST)?; let column = TestColumn { db: &db, cf_data }; // First insert let number0: u32 = 100; let key0 = [100; 32]; let data0 = TestData { key: key0, payload: 0xdeadbeef, }; { let mut batch = WriteBatch::default(); batch.put_cf( db.cf(CF_TEST)?, number0.to_be_bytes(), db_serialize(&data0)?, ); Index::insert_pairs( &db, &mut batch, [(number0, &key0)].into_iter(), )?; db.write_batch(batch)?; assert_eq!( Index::get(&column, &db, &key0)?, Some((number0, data0)) ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), Some(db_serialize(&vec![number0])?.as_ref()), ); } // Second insert, inserts 3 at once let number1: u32 = 101; let key1 = [101; 32]; // collides with key3 let data1 = TestData { key: key1, payload: 0xcafe, }; let number2: u32 = 110; let key2 = [110; 32]; // collides with key0 let data2 = TestData { key: key2, payload: 0xabcd, }; let number3: u32 = 111; let key3 = [111; 32]; // collides with key1 let data3 = TestData { key: key3, payload: 0xfedc, }; { let mut batch = WriteBatch::default(); batch.put_cf( db.cf(CF_TEST)?, number1.to_be_bytes(), db_serialize(&data1)?, ); batch.put_cf( db.cf(CF_TEST)?, number2.to_be_bytes(), db_serialize(&data2)?, ); batch.put_cf( db.cf(CF_TEST)?, number3.to_be_bytes(), db_serialize(&data3)?, ); Index::insert_pairs( &db, &mut batch, [(number1, &key1), (number2, &key2), (number3, &key3)] .into_iter(), )?; db.write_batch(batch)?; assert_eq!( Index::get(&column, &db, &key0)?, Some((number0, data0)) ); assert_eq!( Index::get(&column, &db, &key1)?, Some((number1, data1)) ); assert_eq!( Index::get(&column, &db, &key2)?, Some((number2, data2)) ); assert_eq!( Index::get(&column, &db, &key3)?, Some((number3, data3)) ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), Some(db_serialize(&vec![number0, number2])?.as_ref()), ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [1])?.as_deref(), Some(db_serialize(&vec![number1, number3])?.as_ref()), ); } // Delete key1, key2, key3 { let mut batch = WriteBatch::default(); Index::delete_pairs( &db, &mut batch, [(number1, &key1), (number2, &key2), (number3, &key3)] .into_iter(), )?; db.write_batch(batch)?; assert_eq!(Index::get(&column, &db, &key1)?, None); assert_eq!(Index::get(&column, &db, &key2)?, None); assert_eq!(Index::get(&column, &db, &key3)?, None); assert_eq!( Index::get(&column, &db, &key0)?, Some((number0, data0)) ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), Some(db_serialize(&vec![number0])?.as_ref()), ); assert_eq!(db.get(db.cf(CF_TEST_INDEX)?, [1])?.as_deref(), None); } // Delete key0 { let mut batch = WriteBatch::default(); Index::delete_pairs( &db, &mut batch, [(number0, &key0)].into_iter(), )?; db.write_batch(batch)?; assert_eq!(Index::get(&column, &db, &key0)?, None); assert_eq!(db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), None); assert_eq!(db.get(db.cf(CF_TEST_INDEX)?, [1])?.as_deref(), None); } Ok(()) } #[test] fn test_reverse_lookup_rng() -> Result<()> { abc_rust_error::install(); let mut rng = fastrand::Rng::with_seed(0); let tempdir = tempdir::TempDir::new("chronik-db--lookup_rng")?; let mut cfs = vec![ColumnFamilyDescriptor::new(CF_TEST, Options::default())]; Index::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let cf_data = db.cf(CF_TEST)?; let column = TestColumn { db: &db, cf_data }; let test_data = (0u32..1000) .map(|num| { let mut data = TestData { key: [0; 32], payload: rng.u32(0..u32::MAX), }; rng.fill(&mut data.key); (num, data) }) .collect::>(); let insert_data = |entries: &[(u32, TestData)]| -> Result<()> { let mut batch = WriteBatch::default(); let pairs = entries.iter().map(|&(num, ref data)| (num, &data.key)); Index::insert_pairs(&db, &mut batch, pairs)?; for &(num, ref data) in entries { batch.put_cf(cf_data, num.to_be_bytes(), db_serialize(data)?); } db.write_batch(batch)?; Ok(()) }; let delete_data = |entries: &[(u32, TestData)]| -> Result<()> { let mut batch = WriteBatch::default(); let pairs = entries.iter().map(|&(num, ref data)| (num, &data.key)); Index::delete_pairs(&db, &mut batch, pairs)?; for &(num, _) in entries { batch.delete_cf(cf_data, num.to_be_bytes()); } db.write_batch(batch)?; Ok(()) }; let check_data = |entries: &[(u32, TestData)]| -> Result<()> { for &(expected_num, ref expected_data) in entries { let (actual_num, actual_data) = Index::get(&column, &db, &expected_data.key)?.unwrap(); assert_eq!(expected_num, actual_num); assert_eq!(*expected_data, actual_data); } + let batch_result = Index::get_multi( + &column, + &db, + entries.iter().map(|(_, data)| &data.key), + )?; + assert_eq!( + batch_result, + entries.iter().map(|entry| Some(*entry)).collect::>(), + ); Ok(()) }; let check_not_data = |entries: &[(u32, TestData)]| -> Result<()> { for (_, data) in entries { assert!(Index::get(&column, &db, &data.key)?.is_none()); } + let batch_result = Index::get_multi( + &column, + &db, + entries.iter().map(|(_, data)| &data.key), + )?; + assert_eq!(batch_result, vec![None; entries.len()]); Ok(()) }; // Insert first 400 entries insert_data(&test_data[..400])?; check_data(&test_data[..400])?; check_not_data(&test_data[400..])?; // Insert next 600 entries insert_data(&test_data[400..])?; check_data(&test_data)?; // Delete last 600 entries again delete_data(&test_data[400..])?; check_data(&test_data[..400])?; check_not_data(&test_data[400..])?; // Delete remaining 400 entries delete_data(&test_data[..400])?; check_not_data(&test_data)?; Ok(()) } }