diff --git a/chronik/chronik-db/src/io/blocks.rs b/chronik/chronik-db/src/io/blocks.rs index 492852209..c2c0f692e 100644 --- a/chronik/chronik-db/src/io/blocks.rs +++ b/chronik/chronik-db/src/io/blocks.rs @@ -1,427 +1,428 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use rocksdb::ColumnFamilyDescriptor; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{Db, CF, CF_BLK, CF_LOOKUP_BLK_BY_HASH}, reverse_lookup::{LookupColumn, ReverseLookup}, ser::{db_deserialize, db_serialize}, }; /// Height of a block in the chain. Genesis block has height 0. /// -1 indicates "not part of the chain". pub type BlockHeight = i32; struct BlockColumn<'a> { db: &'a Db, cf_blk: &'a CF, } /// Writes block data to the database. pub struct BlockWriter<'a> { col: BlockColumn<'a>, } /// Reads block data ([`DbBlock`]) from the database. pub struct BlockReader<'a> { col: BlockColumn<'a>, } /// Block data to/from the database. #[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] pub struct DbBlock { /// Hash of the block. pub hash: BlockHash, /// Hash of the previous block of the block. pub prev_hash: BlockHash, /// Height of the block in the chain. pub height: BlockHeight, /// `nBits` field of the block; encodes the target compactly. pub n_bits: u32, /// Timestamp field of the block. pub timestamp: i64, /// Number of the node's blk?????.dat flat file where the block is stored. pub file_num: u32, /// Location in the flat file where the first byte of the block is stored, /// starting at the header. pub data_pos: u32, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] struct SerBlock { hash: [u8; 32], n_bits: u32, timestamp: i64, file_num: u32, data_pos: u32, } type LookupByHash<'a> = ReverseLookup>; impl LookupColumn for BlockColumn<'_> { type CheapHash = [u8; 4]; type Data = SerBlock; type SerialNum = BlockHeight; const CF_DATA: &'static str = CF_BLK; const CF_INDEX: &'static str = CF_LOOKUP_BLK_BY_HASH; fn cheap_hash(key: &[u8; 32]) -> Self::CheapHash { // use the lowest 32 bits of seashash as hash (seahash::hash(key) as u32).to_be_bytes() } fn data_key(data: &SerBlock) -> &[u8; 32] { &data.hash } fn get_data(&self, block_height: BlockHeight) -> Result> { self.get_block(block_height) } } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum BlocksError { /// Block height must be 4 bytes. #[error("Inconsistent DB: Invalid height bytes: {0:?}")] InvalidHeightBytes(Vec), /// Block has no parent. #[error("Inconsistent DB: Orphan block at height {0}")] OrphanBlock(BlockHeight), } use self::BlocksError::*; /// Serialize block height for using it as keys in the DB. /// Big-endian, so blocks are sorted ascendingly. pub(crate) fn bh_to_bytes(height: BlockHeight) -> [u8; 4] { height.to_be_bytes() } /// Deserialize block height from bytes. pub(crate) fn bytes_to_bh(bytes: &[u8]) -> Result { Ok(BlockHeight::from_be_bytes( bytes .try_into() .map_err(|_| InvalidHeightBytes(bytes.to_vec()))?, )) } impl<'a> BlockColumn<'a> { fn new(db: &'a Db) -> Result { let cf_blk = db.cf(CF_BLK)?; db.cf(CF_LOOKUP_BLK_BY_HASH)?; Ok(BlockColumn { db, cf_blk }) } fn get_block(&self, block_height: BlockHeight) -> Result> { match self.db.get(self.cf_blk, bh_to_bytes(block_height))? { Some(bytes) => Ok(Some(db_deserialize::(&bytes)?)), None => Ok(None), } } } impl<'a> BlockWriter<'a> { /// Create writer to the database for blocks. pub fn new(db: &'a Db) -> Result { Ok(BlockWriter { col: BlockColumn::new(db)?, }) } /// Add a new block to the database. pub fn insert( &self, batch: &mut rocksdb::WriteBatch, block: &DbBlock, ) -> Result<()> { // Serialize the block data let data = db_serialize(&SerBlock { hash: block.hash.to_bytes(), n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, })?; batch.put_cf(self.col.cf_blk, bh_to_bytes(block.height), &data); LookupByHash::insert_pairs( self.col.db, batch, [(block.height, &block.hash.to_bytes())], )?; Ok(()) } /// Remove a block by height from the database. pub fn delete( &self, batch: &mut rocksdb::WriteBatch, block: &DbBlock, ) -> Result<()> { batch.delete_cf(self.col.cf_blk, bh_to_bytes(block.height)); LookupByHash::delete_pairs( self.col.db, batch, [(block.height, &block.hash.to_bytes())], )?; Ok(()) } pub(crate) fn add_cfs(columns: &mut Vec) { columns.push(ColumnFamilyDescriptor::new( CF_BLK, rocksdb::Options::default(), )); - LookupByHash::add_cfs(columns, CF_LOOKUP_BLK_BY_HASH); + LookupByHash::add_cfs(columns); } } impl std::fmt::Debug for BlockWriter<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "BlockWriter {{ ... }}") } } impl<'a> BlockReader<'a> { /// Create reader from the database for blocks. pub fn new(db: &'a Db) -> Result { Ok(BlockReader { col: BlockColumn::new(db)?, }) } /// The height of the most-work fully-validated chain. The genesis block has /// height 0 pub fn height(&self) -> Result { let mut iter = self.col.db.iterator_end(self.col.cf_blk); match iter.next() { Some(result) => { let (height_bytes, _) = result?; Ok(bytes_to_bh(&height_bytes)?) } None => Ok(-1), } } /// Tip of the chain: the most recently added block. /// [`None`] if chain is empty (not even the genesis block). pub fn tip(&self) -> Result> { let mut iter = self.col.db.iterator_end(self.col.cf_blk); match iter.next() { Some(result) => { let (height_bytes, block_data) = result?; let height = bytes_to_bh(&height_bytes)?; let block_data = db_deserialize::(&block_data)?; let prev_block_hash = self.get_prev_hash(height)?; Ok(Some(DbBlock { hash: BlockHash::from(block_data.hash), prev_hash: BlockHash::from(prev_block_hash), height, n_bits: block_data.n_bits, timestamp: block_data.timestamp, file_num: block_data.file_num, data_pos: block_data.data_pos, })) } None => Ok(None), } } /// [`DbBlock`] by height. The genesis block has height 0. pub fn by_height(&self, height: BlockHeight) -> Result> { - let Some(block_data) = self.col.get_block(height)? - else { return Ok(None) }; + let Some(block_data) = self.col.get_block(height)? else { + return Ok(None); + }; let prev_block_hash = self.get_prev_hash(height)?; Ok(Some(DbBlock { hash: BlockHash::from(block_data.hash), prev_hash: BlockHash::from(prev_block_hash), height, n_bits: block_data.n_bits, timestamp: block_data.timestamp, file_num: block_data.file_num, data_pos: block_data.data_pos, })) } /// [`DbBlock`] by hash. pub fn by_hash(&self, hash: &BlockHash) -> Result> { let hash = hash.to_bytes(); let (height, ser_block) = match LookupByHash::get(&self.col, self.col.db, &hash)? { Some(data) => data, None => return Ok(None), }; Ok(Some(DbBlock { hash: BlockHash::from(ser_block.hash), prev_hash: BlockHash::from(self.get_prev_hash(height)?), height, n_bits: ser_block.n_bits, timestamp: ser_block.timestamp, file_num: ser_block.file_num, data_pos: ser_block.data_pos, })) } fn get_prev_hash(&self, height: BlockHeight) -> Result<[u8; 32]> { if height == 0 { return Ok([0; 32]); } let prev_block_data = self .col .db .get(self.col.cf_blk, bh_to_bytes(height - 1))? .ok_or(OrphanBlock(height))?; let prev_block = db_deserialize::(&prev_block_data)?; Ok(prev_block.hash) } } impl std::fmt::Debug for BlockReader<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "BlockReader {{ ... }}") } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use crate::{ db::{Db, CF_BLK}, io::{ blocks::SerBlock, BlockReader, BlockWriter, BlocksError, DbBlock, }, ser::{db_deserialize, SerError}, }; #[test] fn test_blocks() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--blocks")?; let db = Db::open(tempdir.path())?; let writer = BlockWriter::new(&db)?; let blocks = BlockReader::new(&db)?; { assert_eq!(blocks.height()?, -1); assert_eq!(blocks.tip()?, None); assert_eq!(blocks.by_height(0)?, None); } let block0 = DbBlock { hash: BlockHash::from([44; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1c100000, timestamp: 1600000000, file_num: 6, data_pos: 100, }; let block1 = DbBlock { hash: BlockHash::from([22; 32]), prev_hash: BlockHash::from([44; 32]), height: 1, n_bits: 0x1c100001, timestamp: 1600000001, file_num: 7, data_pos: 200, }; let cf = db.cf(CF_BLK)?; { let mut batch = rocksdb::WriteBatch::default(); writer.insert(&mut batch, &block0)?; db.write_batch(batch)?; assert_eq!( db_deserialize::( &db.get(cf, [0, 0, 0, 0])?.unwrap() )?, SerBlock { hash: block0.hash.to_bytes(), n_bits: block0.n_bits, timestamp: block0.timestamp, file_num: block0.file_num, data_pos: block0.data_pos, }, ); assert_eq!(blocks.height()?, 0); assert_eq!(blocks.tip()?, Some(block0.clone())); assert_eq!(blocks.by_height(0)?, Some(block0.clone())); assert_eq!(blocks.by_height(1)?, None); } { let mut batch = rocksdb::WriteBatch::default(); writer.insert(&mut batch, &block1)?; db.write_batch(batch)?; assert_eq!( db_deserialize::( &db.get(cf, [0, 0, 0, 1])?.unwrap(), )?, SerBlock { hash: block1.hash.to_bytes(), n_bits: block1.n_bits, timestamp: block1.timestamp, file_num: block1.file_num, data_pos: block1.data_pos, }, ); assert_eq!(blocks.height()?, 1); assert_eq!(blocks.tip()?, Some(block1.clone())); assert_eq!(blocks.by_height(0)?, Some(block0.clone())); assert_eq!(blocks.by_height(1)?, Some(block1.clone())); assert_eq!(blocks.by_height(2)?, None); } { let mut batch = rocksdb::WriteBatch::default(); writer.delete(&mut batch, &block1)?; db.write_batch(batch)?; assert_eq!(db.get(cf, [0, 0, 0, 1])?.as_deref(), None); assert!(db.get(cf, [0, 0, 0, 0])?.is_some()); assert_eq!(blocks.height()?, 0); assert_eq!(blocks.tip()?, Some(block0.clone())); assert_eq!(blocks.by_height(0)?, Some(block0.clone())); assert_eq!(blocks.by_height(1)?, None); } { let mut batch = rocksdb::WriteBatch::default(); writer.delete(&mut batch, &block0)?; db.write_batch(batch)?; assert_eq!(db.get(cf, [0, 0, 0, 1])?.as_deref(), None); assert_eq!(db.get(cf, [0, 0, 0, 0])?.as_deref(), None); assert_eq!(blocks.height()?, -1); assert_eq!(blocks.tip()?, None); assert_eq!(blocks.by_height(0)?, None); } { let mut batch = rocksdb::WriteBatch::default(); batch.put_cf(cf, [0, 0, 0], []); db.write_batch(batch)?; assert_eq!( blocks.height().unwrap_err().downcast::()?, BlocksError::InvalidHeightBytes(vec![0, 0, 0]), ); } { let mut batch = rocksdb::WriteBatch::default(); batch.put_cf(cf, [0, 0, 0, 0], [0xff, 0xff, 0xff]); db.write_batch(batch)?; assert_eq!( blocks.by_height(0).unwrap_err().downcast::()?, SerError::DeserializeError { type_name: "chronik_db::io::blocks::SerBlock", error: postcard::Error::DeserializeUnexpectedEnd, bytes: vec![0xff, 0xff, 0xff], }, ); } Ok(()) } } diff --git a/chronik/chronik-db/src/io/txs.rs b/chronik/chronik-db/src/io/txs.rs index 0c7c6d716..eafd56bb6 100644 --- a/chronik/chronik-db/src/io/txs.rs +++ b/chronik/chronik-db/src/io/txs.rs @@ -1,723 +1,723 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Stores txs from the node in the DB. //! //! Instead of having txids be the keys for the column family containing the //! txs, we use a 64-bit serial number "TxNum" that increments with every //! transaction in block order. This allows us to e.g. very easily iterate over //! all the txs in a block just by knowing the first tx_num of the block. It //! also simplifies the address index (especially reduces space requirements), //! as we simply store a list of relatively small integers instead of txids. //! //! 64-bits allows us to store a maximum of 18446744073709551616 txs, which even //! at 1M tx/s would be enough for +500000 years. //! //! We only store the `txid`, `data_pos`, `undo_pos` and `time_first_seen` in //! the DB, everything else we can read from the block/undo files. We use the //! fact that coinbase txs don't have undo data, and undo data for txs never is //! at position 0, so we set `undo_pos = 0` for coinbase txs, and treat every //! entry with `undo_pos == 0` as a coinbase tx. //! //! For the reverse index txid -> tx_num, we use `ReverseLookup`. We use a //! 64-bit cheap hash to make collisions rare/difficult. use std::{ops::Range, time::Instant}; use abc_rust_error::Result; use bitcoinsuite_core::tx::TxId; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{ Db, CF, CF_BLK_BY_FIRST_TX, CF_FIRST_TX_BY_BLK, CF_LOOKUP_TX_BY_HASH, CF_TX, }, io::{bh_to_bytes, bytes_to_bh, BlockHeight}, reverse_lookup::{LookupColumn, ReverseLookup}, ser::{db_deserialize, db_serialize}, }; type LookupByHash<'a> = ReverseLookup>; /// Number that uniquely identifies a tx in the blockchain. /// Transactions are enumerated in the exact order they appear in the /// blockchain. This looks like this: /// /// * - 0 (coinbase of genesis) /// - 1 (first non-coinbase tx in genesis block), /// - ... /// - N-1 (last tx in genesis block) /// * - N (coinbase of second block) /// - N+1 (first non-coinbase tx in second block), /// - ... /// - M-1 (last tx in second block) /// * - M (coinbase of third block) /// - M + 1 (first non-coinbase tx in third block), /// - etc. /// /// With CTOR, the non-coinbase txs within blocks are sorted in order of txid. pub type TxNum = u64; /// Entry of a tx in the DB. /// /// Instead of storing tx data directly, we utilize the fact that the node /// already stores the block and undo data, and just store positions into these /// files. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct TxEntry { /// TxId of the tx. pub txid: TxId, /// Position of the tx data in the tx's block's block file. pub data_pos: u32, /// Position of the tx undo data in the tx's block's undo file. pub undo_pos: u32, /// When this tx has first been seen in the mempool. pub time_first_seen: i64, /// Whether this is a coinbase tx. pub is_coinbase: bool, } /// Tx from the DB with height. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockTx { /// Tx data. pub entry: TxEntry, /// Height of block of the tx. pub block_height: BlockHeight, } /// Txs of a block, bundled in one struct so we can add it easily to the DB. #[derive(Debug, Default, Clone, PartialEq, Eq)] pub struct BlockTxs { /// Tx data. pub txs: Vec, /// Height of the block of the txs. pub block_height: BlockHeight, } /// In-memory data for the tx history. #[derive(Debug, Default)] pub struct TxsMemData { /// Stats about cache hits, num requests etc. pub stats: TxsStats, } /// Stats about cache hits, num requests etc. #[derive(Clone, Debug, Default)] pub struct TxsStats { /// Total number of txs updated. pub n_total: usize, /// Time [s] for insert/delete. pub t_total: f64, /// Time [s] for indexing txs. pub t_index: f64, } #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] struct SerTx { txid: [u8; 32], data_pos: u32, undo_pos: u32, // == 0 <==> is_coinbase == true time_first_seen: i64, } struct TxColumn<'a> { db: &'a Db, cf_tx: &'a CF, cf_blk_by_first_tx: &'a CF, cf_first_tx_by_blk: &'a CF, } /// Write [`BlockTxs`] to the DB. pub struct TxWriter<'a> { col: TxColumn<'a>, } /// Read [`BlockTx`]s from the DB. pub struct TxReader<'a> { col: TxColumn<'a>, } /// Error indicating something went wrong with the tx index. #[derive(Debug, Error, PartialEq, Eq)] pub enum TxsError { /// TxNum must be 8 bytes. #[error("Inconsistent DB: Invalid tx_num bytes: {0:?}")] InvalidTxNumBytes(Vec), /// A block without txs is not valid. #[error("Inconsistent DB: Txs for block {0} not found")] NoTxsForBlock(BlockHeight), /// A tx must always have a block. #[error("Inconsistent DB: Block for tx {0} not found")] TxWithoutBlock(TxNum), } use self::TxsError::*; pub(crate) fn tx_num_to_bytes(tx_num: TxNum) -> [u8; 8] { // big-endian, so txs are sorted ascendingly tx_num.to_be_bytes() } pub(crate) fn bytes_to_tx_num(bytes: &[u8]) -> Result { Ok(TxNum::from_be_bytes( bytes .try_into() .map_err(|_| InvalidTxNumBytes(bytes.to_vec()))?, )) } impl<'a> LookupColumn for TxColumn<'a> { type CheapHash = [u8; 8]; type Data = SerTx; type SerialNum = TxNum; const CF_DATA: &'static str = CF_TX; const CF_INDEX: &'static str = CF_LOOKUP_TX_BY_HASH; fn cheap_hash(key: &[u8; 32]) -> Self::CheapHash { seahash::hash(key).to_be_bytes() } fn data_key(value: &Self::Data) -> &[u8; 32] { &value.txid } fn get_data(&self, tx_num: Self::SerialNum) -> Result> { self.get_tx(tx_num) } } impl<'a> TxColumn<'a> { fn new(db: &'a Db) -> Result { let cf_tx = db.cf(CF_TX)?; let cf_blk_by_first_tx = db.cf(CF_BLK_BY_FIRST_TX)?; let cf_first_tx_by_blk = db.cf(CF_FIRST_TX_BY_BLK)?; db.cf(CF_LOOKUP_TX_BY_HASH)?; Ok(TxColumn { db, cf_tx, cf_blk_by_first_tx, cf_first_tx_by_blk, }) } fn get_tx(&self, tx_num: TxNum) -> Result> { match self.db.get(self.cf_tx, tx_num_to_bytes(tx_num))? { Some(bytes) => Ok(Some(db_deserialize::(&bytes)?)), None => Ok(None), } } } impl<'a> TxWriter<'a> { /// Create a new [`TxWriter`]. pub fn new(db: &'a Db) -> Result { Ok(TxWriter { col: TxColumn::new(db)?, }) } /// Insert and index the txs into the DB. pub fn insert( &self, batch: &mut WriteBatch, block_txs: &BlockTxs, mem_data: &mut TxsMemData, ) -> Result { let stats = &mut mem_data.stats; let t_start = Instant::now(); stats.n_total += block_txs.txs.len(); let mut last_tx_num_iterator = self.col.db.iterator_end(self.col.cf_tx); let first_new_tx = match last_tx_num_iterator.next() { Some(result) => { let (tx_num, _) = result?; bytes_to_tx_num(&tx_num)? + 1 } None => 0, }; batch.put_cf( self.col.cf_blk_by_first_tx, tx_num_to_bytes(first_new_tx), bh_to_bytes(block_txs.block_height), ); batch.put_cf( self.col.cf_first_tx_by_blk, bh_to_bytes(block_txs.block_height), tx_num_to_bytes(first_new_tx), ); let mut index_pairs = Vec::with_capacity(block_txs.txs.len()); let mut next_tx_num = first_new_tx; for tx in &block_txs.txs { let ser_tx = SerTx::from(tx); batch.put_cf( self.col.cf_tx, tx_num_to_bytes(next_tx_num), db_serialize(&ser_tx)?, ); index_pairs.push((next_tx_num, tx.txid.as_bytes())); next_tx_num += 1; } let t_index = Instant::now(); LookupByHash::insert_pairs(self.col.db, batch, index_pairs)?; stats.t_index += t_index.elapsed().as_secs_f64(); stats.t_total += t_start.elapsed().as_secs_f64(); Ok(first_new_tx) } /// Delete and unindex the txs from the DB. pub fn delete( &self, batch: &mut WriteBatch, block_txs: &BlockTxs, mem_data: &mut TxsMemData, ) -> Result { let stats = &mut mem_data.stats; let t_start = Instant::now(); stats.n_total += block_txs.txs.len(); let first_tx_num = self .col .db .get( self.col.cf_first_tx_by_blk, bh_to_bytes(block_txs.block_height), )? .ok_or(NoTxsForBlock(block_txs.block_height))?; let first_tx_num = bytes_to_tx_num(&first_tx_num)?; let mut next_tx_num = first_tx_num; let mut index_pairs = Vec::with_capacity(block_txs.txs.len()); for tx in &block_txs.txs { batch.delete_cf(self.col.cf_tx, tx_num_to_bytes(next_tx_num)); index_pairs.push((next_tx_num, tx.txid.as_bytes())); next_tx_num += 1; } batch.delete_cf( self.col.cf_blk_by_first_tx, tx_num_to_bytes(first_tx_num), ); batch.delete_cf( self.col.cf_first_tx_by_blk, bh_to_bytes(block_txs.block_height), ); let t_index = Instant::now(); LookupByHash::delete_pairs(self.col.db, batch, index_pairs)?; stats.t_index += t_index.elapsed().as_secs_f64(); stats.t_total += t_start.elapsed().as_secs_f64(); Ok(first_tx_num) } /// Add the column families used for txs. pub(crate) fn add_cfs(columns: &mut Vec) { columns.push(ColumnFamilyDescriptor::new(CF_TX, Options::default())); columns.push(ColumnFamilyDescriptor::new( CF_BLK_BY_FIRST_TX, Options::default(), )); columns.push(ColumnFamilyDescriptor::new( CF_FIRST_TX_BY_BLK, Options::default(), )); - LookupByHash::add_cfs(columns, CF_LOOKUP_TX_BY_HASH); + LookupByHash::add_cfs(columns); } } impl std::fmt::Debug for TxWriter<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "TxWriter {{ ... }}") } } impl<'a> TxReader<'a> { /// Create a new [`TxReader`]. pub fn new(db: &'a Db) -> Result { Ok(TxReader { col: TxColumn::new(db)?, }) } /// Read a tx by txid, or None if not in the DB. pub fn tx_by_txid(&self, txid: &TxId) -> Result> { match self.tx_and_num_by_txid(txid)? { Some((_, block_tx)) => Ok(Some(block_tx)), None => Ok(None), } } /// Read a [`BlockTx`] and its [`TxNum`] by [`TxId`], or [`None`] if not in /// the DB. pub fn tx_and_num_by_txid( &self, txid: &TxId, ) -> Result> { let (tx_num, ser_tx) = match LookupByHash::get(&self.col, self.col.db, txid.as_bytes())? { Some(tuple) => tuple, None => return Ok(None), }; let block_height = self.block_height_by_tx_num(tx_num)?; Ok(Some(( tx_num, BlockTx { entry: TxEntry::from(ser_tx), block_height, }, ))) } /// Read just the [`TxNum`] of the tx by [`TxId`], or [`None`] if not in the /// DB. This is faster than [`TxReader::tx_and_num_by_txid`]. pub fn tx_num_by_txid(&self, txid: &TxId) -> Result> { match LookupByHash::get(&self.col, self.col.db, txid.as_bytes())? { Some((tx_num, _)) => Ok(Some(tx_num)), None => Ok(None), } } /// Read the [`BlockTx`] by [`TxNum`], or [`None`] if not in the DB. pub fn tx_by_tx_num(&self, tx_num: TxNum) -> Result> { let Some(ser_tx) = self.col.get_tx(tx_num)? else { return Ok(None); }; let block_height = self.block_height_by_tx_num(tx_num)?; Ok(Some(BlockTx { entry: TxEntry::from(ser_tx), block_height, })) } /// Read just the [`TxId`] of the tx by [`TxNum`], or [`None`] if not in the /// DB. This is faster than [`TxReader::tx_and_num_by_txid`]. pub fn txid_by_tx_num(&self, tx_num: TxNum) -> Result> { let Some(ser_tx) = self.col.get_tx(tx_num)? else { return Ok(None); }; Ok(Some(TxId::from(ser_tx.txid))) } /// Read the first [`TxNum`] of a [`BlockHeight`], or [`None`] if not in the /// DB. This is useful for getting all the txs in a block, by iterating /// between this (inclusive) and the next block's first tx_num (exclusive), /// we get all tx nums of the block. pub fn first_tx_num_by_block( &self, block_height: BlockHeight, ) -> Result> { match self .col .db .get(self.col.cf_first_tx_by_blk, bh_to_bytes(block_height))? { Some(first_tx_num) => Ok(Some(bytes_to_tx_num(&first_tx_num)?)), None => Ok(None), } } /// Return the range of [`TxNum`]s of the block, or None if the block /// doesn't exist. pub fn block_tx_num_range( &self, block_height: BlockHeight, ) -> Result>> { let tx_num_start = match self.first_tx_num_by_block(block_height)? { Some(tx_num) => tx_num, None => return Ok(None), }; let tx_num_end = match self.first_tx_num_by_block(block_height + 1)? { Some(first_tx_num_next) => first_tx_num_next, None => match self.last_tx_num()? { Some(last_tx_num) => last_tx_num + 1, None => return Err(NoTxsForBlock(block_height).into()), }, }; Ok(Some(tx_num_start..tx_num_end)) } /// Read the last [`TxNum`] of the DB. /// This is useful when iterating over the txs of the last block. pub fn last_tx_num(&self) -> Result> { let mut iter = self.col.db.iterator_end(self.col.cf_tx); match iter.next() { Some(result) => { let (key, _) = result?; Ok(Some(bytes_to_tx_num(&key)?)) } None => Ok(None), } } /// Read the block height the tx_num has. Err if not found. fn block_height_by_tx_num(&self, tx_num: TxNum) -> Result { let mut iter = self.col.db.iterator( self.col.cf_blk_by_first_tx, &tx_num_to_bytes(tx_num), rocksdb::Direction::Reverse, ); match iter.next() { Some(result) => { let (_, block_height) = result?; bytes_to_bh(&block_height) } None => Err(TxWithoutBlock(tx_num).into()), } } } impl std::fmt::Debug for TxReader<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "TxReader {{ ... }}") } } impl From<&'_ TxEntry> for SerTx { fn from(value: &TxEntry) -> Self { SerTx { txid: value.txid.to_bytes(), data_pos: value.data_pos, undo_pos: value.undo_pos, time_first_seen: value.time_first_seen, } } } impl From for TxEntry { fn from(value: SerTx) -> Self { TxEntry { txid: TxId::from(value.txid), data_pos: value.data_pos, undo_pos: value.undo_pos, time_first_seen: value.time_first_seen, is_coinbase: value.undo_pos == 0, } } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::tx::TxId; use pretty_assertions::assert_eq; use rocksdb::WriteBatch; use crate::{ db::Db, io::{BlockTx, BlockTxs, TxEntry, TxReader, TxWriter, TxsMemData}, }; #[test] fn test_insert_txs() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--txs")?; let db = Db::open(tempdir.path())?; let tx_writer = TxWriter::new(&db)?; let tx_reader = TxReader::new(&db)?; let mut mem_data = TxsMemData::default(); let tx1 = TxEntry { txid: TxId::from([1; 32]), data_pos: 100, undo_pos: 0, time_first_seen: 123456, is_coinbase: true, }; let block_tx1 = BlockTx { entry: tx1.clone(), block_height: 0, }; let block1 = BlockTxs { block_height: 0, txs: vec![tx1], }; assert_eq!(tx_reader.last_tx_num()?, None); assert_eq!(tx_reader.block_tx_num_range(0)?, None); { // insert genesis tx let mut batch = WriteBatch::default(); assert_eq!( tx_writer.insert(&mut batch, &block1, &mut mem_data)?, 0 ); db.write_batch(batch)?; let tx_reader = TxReader::new(&db)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, None); assert_eq!(tx_reader.last_tx_num()?, Some(0)); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.tx_by_txid(&TxId::from([0; 32]))?, None); assert_eq!(tx_reader.tx_num_by_txid(&TxId::from([0; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1.clone()), ); assert_eq!(tx_reader.tx_by_tx_num(0)?, Some(block_tx1.clone())); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([1; 32]))?, Some(0), ); } let tx2 = TxEntry { txid: TxId::from([2; 32]), data_pos: 200, undo_pos: 20, time_first_seen: 234567, is_coinbase: false, }; let block_tx2 = BlockTx { entry: tx2.clone(), block_height: 1, }; let tx3 = TxEntry { txid: TxId::from([3; 32]), data_pos: 300, undo_pos: 30, time_first_seen: 345678, is_coinbase: false, }; let block_tx3 = BlockTx { entry: tx3.clone(), block_height: 1, }; let block2 = BlockTxs { block_height: 1, txs: vec![tx2, tx3], }; { // insert 2 more txs let mut batch = WriteBatch::default(); assert_eq!( tx_writer.insert(&mut batch, &block2, &mut mem_data)?, 1 ); db.write_batch(batch)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, Some(1)); assert_eq!(tx_reader.first_tx_num_by_block(2)?, None); assert_eq!(tx_reader.last_tx_num()?, Some(2)); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.block_tx_num_range(1)?, Some(1..3)); assert_eq!(tx_reader.tx_by_txid(&TxId::from([0; 32]))?, None); assert_eq!(tx_reader.tx_num_by_txid(&TxId::from([0; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([1; 32]))?, Some(0), ); assert_eq!(tx_reader.tx_by_tx_num(0)?, Some(block_tx1.clone())); assert_eq!( tx_reader.tx_by_txid(&TxId::from([2; 32]))?, Some(block_tx2.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([2; 32]))?, Some(1), ); assert_eq!(tx_reader.tx_by_tx_num(1)?, Some(block_tx2)); assert_eq!( tx_reader.tx_by_txid(&TxId::from([3; 32]))?, Some(block_tx3.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([3; 32]))?, Some(2), ); assert_eq!(tx_reader.tx_by_tx_num(2)?, Some(block_tx3)); } { // delete latest block let mut batch = WriteBatch::default(); assert_eq!( tx_writer.delete(&mut batch, &block2, &mut mem_data)?, 1 ); db.write_batch(batch)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, None); assert_eq!(tx_reader.last_tx_num()?, Some(0)); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.block_tx_num_range(1)?, None); assert_eq!(tx_reader.tx_by_txid(&TxId::from([0; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1.clone()), ); assert_eq!(tx_reader.tx_by_tx_num(0)?, Some(block_tx1.clone())); assert_eq!(tx_reader.tx_by_txid(&TxId::from([2; 32]))?, None); assert_eq!(tx_reader.tx_by_tx_num(1)?, None); assert_eq!(tx_reader.tx_by_txid(&TxId::from([3; 32]))?, None); assert_eq!(tx_reader.tx_by_tx_num(2)?, None); } let tx2 = TxEntry { txid: TxId::from([102; 32]), data_pos: 200, undo_pos: 20, time_first_seen: 234567, is_coinbase: false, }; let block_tx2 = BlockTx { entry: tx2.clone(), block_height: 1, }; let tx3 = TxEntry { txid: TxId::from([103; 32]), data_pos: 300, undo_pos: 30, time_first_seen: 345678, is_coinbase: false, }; let block_tx3 = BlockTx { entry: tx3.clone(), block_height: 1, }; let block3 = BlockTxs { block_height: 1, txs: vec![tx2, tx3], }; { // Add reorg block let mut batch = WriteBatch::default(); assert_eq!( tx_writer.insert(&mut batch, &block3, &mut mem_data)?, 1 ); db.write_batch(batch)?; assert_eq!(tx_reader.first_tx_num_by_block(0)?, Some(0)); assert_eq!(tx_reader.first_tx_num_by_block(1)?, Some(1)); assert_eq!(tx_reader.first_tx_num_by_block(2)?, None); assert_eq!(tx_reader.block_tx_num_range(0)?, Some(0..1)); assert_eq!(tx_reader.block_tx_num_range(1)?, Some(1..3)); assert_eq!(tx_reader.block_tx_num_range(2)?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([1; 32]))?, Some(block_tx1), ); assert_eq!(tx_reader.tx_by_txid(&TxId::from([2; 32]))?, None); assert_eq!( tx_reader.tx_by_txid(&TxId::from([102; 32]))?, Some(block_tx2.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([102; 32]))?, Some(1), ); assert_eq!(tx_reader.tx_by_tx_num(1)?, Some(block_tx2)); assert_eq!( tx_reader.tx_by_txid(&TxId::from([103; 32]))?, Some(block_tx3.clone()), ); assert_eq!( tx_reader.tx_num_by_txid(&TxId::from([103; 32]))?, Some(2), ); assert_eq!(tx_reader.tx_by_tx_num(2)?, Some(block_tx3)); } Ok(()) } } diff --git a/chronik/chronik-db/src/reverse_lookup.rs b/chronik/chronik-db/src/reverse_lookup.rs index ee6d4e970..6e2ba9e17 100644 --- a/chronik/chronik-db/src/reverse_lookup.rs +++ b/chronik/chronik-db/src/reverse_lookup.rs @@ -1,538 +1,533 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`LookupColumn`] and [`ReverseLookup`]. //! //! Allows reverse-looking up data that has been indexed using a serial number. //! //! Say you have the following column: //! //! `SerialNum -> Data` //! //! This could be mapping: //! - `BlockHeight -> SerBlock`, where `SerBlock` includes `BlockHash` and some //! header data). //! - `TxNum -> SerTx`, where `SerTx` includes `TxId` and some other data. //! //! `Data` contains a hash `[u8; 32]` that you might look up from. //! //! This would be the `BlockHash` for a `BlockHeight -> BlockMeta` column and //! `TxId` for a `TxNum -> TxMeta` column. //! //! We *could* simply add a table `[u8; 32] -> SerialNum`, e.g. `TxId -> TxNum`, //! but this would redundantly store the hash of the tx twice. //! //! Instead, we store: //! //! `CheapHash -> [SerialNum]` //! //! The `[]` indicate a list of `SerialNum`s, as the hash function for //! `CheapHash` is not cryptographically secure and only outputs 4-8 bytes. //! It therefore is expected to occasionally have collisions, which are tracked //! in a list. //! //! Then, you can use [`ReverseLookup`] to maintain an index from `[u8; 32]` to //! `SerialNum` compactly. //! //! To resolve collisions during lookup from `[u8; 32]`: //! 1. Hash `[u8; 32]` to get `CheapHash`. //! 2. Lookup the matching `SerialNum`s in the index and iterate over them. //! 3. Lookup the `Data` in the original `SerialNum -> Data` table. //! 4. Test whether `Data` has the queried `[u8; 32]`, and return that. //! 5. Otherwise, `Key` is not part of the database. use std::{ collections::{BTreeSet, HashMap}, fmt::{Debug, Display}, hash::Hash, marker::PhantomData, }; use abc_rust_error::Result; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{Db, CF}, ser::{db_deserialize, db_serialize}, }; /// Struct for maintaining a reverse lookup index. /// You cannot construct this; you should use a typedef: /// /// `type MyIndex = ReverseLookup` /// /// Then you can use the associated functions like this: /// /// `MyIndex::insert_pairs(db, batch, [(1234, &[123; 32])])?;` pub(crate) struct ReverseLookup(PhantomData); /// Trait providing the data to build a reverse lookup index. pub(crate) trait LookupColumn { /// Number uniquely identifying `Value`s, e.g. `BlockHeight` or `TxNum`. type SerialNum: Copy + for<'a> Deserialize<'a> + Display + Ord + Serialize; /// A short hash, compacting keys of type `[u8; 32]`. type CheapHash: AsRef<[u8]> + Eq + Hash; /// Data stored in the column, e.g. `SerBlock`. type Data; /// Name of the `SerialNum -> Value` CF. const CF_DATA: &'static str; /// Name of the `CheapHash -> [SerialNum]` CF. const CF_INDEX: &'static str; /// Calculate a short `CheapHash` from the given key. fn cheap_hash(key: &[u8; 32]) -> Self::CheapHash; /// Extract the key to index by from the data. fn data_key(value: &Self::Data) -> &[u8; 32]; /// Fetch the data from the db using the serial num. fn get_data(&self, number: Self::SerialNum) -> Result>; } #[derive(Debug, Error, PartialEq)] pub(crate) enum IndexError { #[error( "Inconsistent DB: Lookup index {cf_data_name} contains {serial_str}, \ but value column {cf_index_name} doesn't" )] NotInDataColumn { serial_str: String, cf_data_name: &'static str, cf_index_name: &'static str, }, } impl IndexError { fn not_in_data_column(serial: L::SerialNum) -> IndexError { IndexError::NotInDataColumn { serial_str: serial.to_string(), cf_data_name: L::CF_DATA, cf_index_name: L::CF_INDEX, } } } impl ReverseLookup { /// Add the cfs required by the reverse lookup index. - pub(crate) fn add_cfs( - columns: &mut Vec, - lookup_cf_name: &'static str, - ) { - columns.push(ColumnFamilyDescriptor::new( - lookup_cf_name, - Options::default(), - )); + pub(crate) fn add_cfs(columns: &mut Vec) { + columns + .push(ColumnFamilyDescriptor::new(L::CF_INDEX, Options::default())); } /// Read by key from the DB using the index. pub(crate) fn get( lookup_column: &L, db: &Db, key: &[u8; 32], ) -> Result> { let cf_index = db.cf(L::CF_INDEX)?; let cheap_hash = L::cheap_hash(key); // Lookup CheapHash -> [SerialNum] let serials = match db.get(cf_index, cheap_hash)? { Some(serials) => serials, None => return Ok(None), }; // Iterate serials, read each's data, find the one with the given key. // // This could in theory be a DoS attack by purposefully having the // indexer index lots of colliding keys, however, since keys are // expected to use a cryptographically secure hash function (e.g. // SHA-256), it will be expensive to find data that actually collides. for serial in db_deserialize::>(&serials)? { let value = lookup_column .get_data(serial)? .ok_or_else(|| IndexError::not_in_data_column::(serial))?; if L::data_key(&value) == key { return Ok(Some((serial, value))); } } // We have a key that collides with others but no actual match Ok(None) } /// Insert into the index. pub(crate) fn insert_pairs<'a>( db: &Db, batch: &mut WriteBatch, pairs: impl IntoIterator, ) -> Result<()> { let cf_index = db.cf(L::CF_INDEX)?; let mut new_entries = HashMap::>::new(); // Fill new_entries with either data from the DB or add new entries for (serial, key) in pairs { let serials = Self::get_or_fetch(db, cf_index, &mut new_entries, key)?; serials.insert(serial); } // Add/override all the entries with the inserted serials for (key, serials) in new_entries { let serials = db_serialize(&Vec::from_iter(serials))?; batch.put_cf(cf_index, key, &serials); } Ok(()) } /// Delete from the index. pub(crate) fn delete_pairs<'a>( db: &Db, batch: &mut WriteBatch, pairs: impl IntoIterator, ) -> Result<()> { let cf_index = db.cf(L::CF_INDEX)?; let mut new_entries = HashMap::>::new(); for (serial, key) in pairs { let serials = Self::get_or_fetch(db, cf_index, &mut new_entries, key)?; if !serials.remove(&serial) { return Err(IndexError::not_in_data_column::(serial).into()); } } for (key, serials) in new_entries { if serials.is_empty() { // Delete the entry from the DB if it doesn't contain any // serials anymore batch.delete_cf(cf_index, key); } else { // Otherwise, override entry with only the remaining serials let serials = db_serialize(&Vec::from_iter(serials))?; batch.put_cf(cf_index, key, &serials); } } Ok(()) } /// Query from `new_entries`, or from DB and then store in `new_entries`. fn get_or_fetch<'a>( db: &Db, index_cf: &CF, new_entries: &'a mut HashMap>, key: &[u8; 32], ) -> Result<&'a mut BTreeSet> { use std::collections::hash_map::Entry; let cheap_hash = L::cheap_hash(key); match new_entries.entry(cheap_hash) { Entry::Occupied(entry) => Ok(entry.into_mut()), Entry::Vacant(entry) => { let serials = match db.get(index_cf, entry.key().as_ref())? { Some(serials) => { db_deserialize::>(&serials)? } None => vec![], }; Ok(entry.insert(BTreeSet::from_iter(serials))) } } } } #[cfg(test)] mod tests { use std::fmt::Debug; use abc_rust_error::Result; use pretty_assertions::assert_eq; use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch}; use serde::{Deserialize, Serialize}; use crate::{ db::{Db, CF}, reverse_lookup::{LookupColumn, ReverseLookup}, ser::{db_deserialize, db_serialize}, }; #[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize)] #[repr(C)] struct TestData { key: [u8; 32], payload: u32, } struct TestColumn<'a> { cf_data: &'a CF, db: &'a Db, } type Index<'a> = ReverseLookup>; impl Debug for TestColumn<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "TestColumn {{ .. }}") } } const CF_TEST: &str = "test"; const CF_TEST_INDEX: &str = "test_index"; impl<'a> LookupColumn for TestColumn<'a> { type CheapHash = [u8; 1]; type Data = TestData; type SerialNum = u32; const CF_DATA: &'static str = CF_TEST; const CF_INDEX: &'static str = CF_TEST_INDEX; fn cheap_hash(key: &[u8; 32]) -> [u8; 1] { // "Hash" here simply takes the mod 10, giving us lots of collisions [key[0] % 10] } fn data_key(data: &Self::Data) -> &[u8; 32] { &data.key } fn get_data( &self, number: Self::SerialNum, ) -> Result> { match self.db.get(self.cf_data, number.to_be_bytes())? { Some(bytes) => Ok(Some(db_deserialize(&bytes)?)), None => Ok(None), } } } #[test] fn test_reverse_lookup() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--lookup")?; let mut cfs = vec![ColumnFamilyDescriptor::new(CF_TEST, Options::default())]; - Index::add_cfs(&mut cfs, CF_TEST_INDEX); + Index::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let cf_data = db.cf(CF_TEST)?; let column = TestColumn { db: &db, cf_data }; // First insert let number0: u32 = 100; let key0 = [100; 32]; let data0 = TestData { key: key0, payload: 0xdeadbeef, }; { let mut batch = WriteBatch::default(); batch.put_cf( db.cf(CF_TEST)?, number0.to_be_bytes(), db_serialize(&data0)?, ); Index::insert_pairs( &db, &mut batch, [(number0, &key0)].into_iter(), )?; db.write_batch(batch)?; assert_eq!( Index::get(&column, &db, &key0)?, Some((number0, data0)) ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), Some(db_serialize(&vec![number0])?.as_ref()), ); } // Second insert, inserts 3 at once let number1: u32 = 101; let key1 = [101; 32]; // collides with key3 let data1 = TestData { key: key1, payload: 0xcafe, }; let number2: u32 = 110; let key2 = [110; 32]; // collides with key0 let data2 = TestData { key: key2, payload: 0xabcd, }; let number3: u32 = 111; let key3 = [111; 32]; // collides with key1 let data3 = TestData { key: key3, payload: 0xfedc, }; { let mut batch = WriteBatch::default(); batch.put_cf( db.cf(CF_TEST)?, number1.to_be_bytes(), db_serialize(&data1)?, ); batch.put_cf( db.cf(CF_TEST)?, number2.to_be_bytes(), db_serialize(&data2)?, ); batch.put_cf( db.cf(CF_TEST)?, number3.to_be_bytes(), db_serialize(&data3)?, ); Index::insert_pairs( &db, &mut batch, [(number1, &key1), (number2, &key2), (number3, &key3)] .into_iter(), )?; db.write_batch(batch)?; assert_eq!( Index::get(&column, &db, &key0)?, Some((number0, data0)) ); assert_eq!( Index::get(&column, &db, &key1)?, Some((number1, data1)) ); assert_eq!( Index::get(&column, &db, &key2)?, Some((number2, data2)) ); assert_eq!( Index::get(&column, &db, &key3)?, Some((number3, data3)) ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), Some(db_serialize(&vec![number0, number2])?.as_ref()), ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [1])?.as_deref(), Some(db_serialize(&vec![number1, number3])?.as_ref()), ); } // Delete key1, key2, key3 { let mut batch = WriteBatch::default(); Index::delete_pairs( &db, &mut batch, [(number1, &key1), (number2, &key2), (number3, &key3)] .into_iter(), )?; db.write_batch(batch)?; assert_eq!(Index::get(&column, &db, &key1)?, None); assert_eq!(Index::get(&column, &db, &key2)?, None); assert_eq!(Index::get(&column, &db, &key3)?, None); assert_eq!( Index::get(&column, &db, &key0)?, Some((number0, data0)) ); assert_eq!( db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), Some(db_serialize(&vec![number0])?.as_ref()), ); assert_eq!(db.get(db.cf(CF_TEST_INDEX)?, [1])?.as_deref(), None); } // Delete key0 { let mut batch = WriteBatch::default(); Index::delete_pairs( &db, &mut batch, [(number0, &key0)].into_iter(), )?; db.write_batch(batch)?; assert_eq!(Index::get(&column, &db, &key0)?, None); assert_eq!(db.get(db.cf(CF_TEST_INDEX)?, [0])?.as_deref(), None); assert_eq!(db.get(db.cf(CF_TEST_INDEX)?, [1])?.as_deref(), None); } Ok(()) } #[test] fn test_reverse_lookup_rng() -> Result<()> { abc_rust_error::install(); let mut rng = fastrand::Rng::with_seed(0); let tempdir = tempdir::TempDir::new("chronik-db--lookup_rng")?; let mut cfs = vec![ColumnFamilyDescriptor::new(CF_TEST, Options::default())]; - Index::add_cfs(&mut cfs, CF_TEST_INDEX); + Index::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let cf_data = db.cf(CF_TEST)?; let column = TestColumn { db: &db, cf_data }; let test_data = (0u32..1000) .map(|num| { let mut data = TestData { key: [0; 32], payload: rng.u32(0..u32::MAX), }; rng.fill(&mut data.key); (num, data) }) .collect::>(); let insert_data = |entries: &[(u32, TestData)]| -> Result<()> { let mut batch = WriteBatch::default(); let pairs = entries.iter().map(|&(num, ref data)| (num, &data.key)); Index::insert_pairs(&db, &mut batch, pairs)?; for &(num, ref data) in entries { batch.put_cf(cf_data, num.to_be_bytes(), db_serialize(data)?); } db.write_batch(batch)?; Ok(()) }; let delete_data = |entries: &[(u32, TestData)]| -> Result<()> { let mut batch = WriteBatch::default(); let pairs = entries.iter().map(|&(num, ref data)| (num, &data.key)); Index::delete_pairs(&db, &mut batch, pairs)?; for &(num, _) in entries { batch.delete_cf(cf_data, num.to_be_bytes()); } db.write_batch(batch)?; Ok(()) }; let check_data = |entries: &[(u32, TestData)]| -> Result<()> { for &(expected_num, ref expected_data) in entries { let (actual_num, actual_data) = Index::get(&column, &db, &expected_data.key)?.unwrap(); assert_eq!(expected_num, actual_num); assert_eq!(*expected_data, actual_data); } Ok(()) }; let check_not_data = |entries: &[(u32, TestData)]| -> Result<()> { for (_, data) in entries { assert!(Index::get(&column, &db, &data.key)?.is_none()); } Ok(()) }; // Insert first 400 entries insert_data(&test_data[..400])?; check_data(&test_data[..400])?; check_not_data(&test_data[400..])?; // Insert next 600 entries insert_data(&test_data[400..])?; check_data(&test_data)?; // Delete last 600 entries again delete_data(&test_data[400..])?; check_data(&test_data[..400])?; check_not_data(&test_data[400..])?; // Delete remaining 400 entries delete_data(&test_data[..400])?; check_not_data(&test_data)?; Ok(()) } }