diff --git a/chronik/chronik-db/src/group.rs b/chronik/chronik-db/src/group.rs index 35714d44a..2a2ae5331 100644 --- a/chronik/chronik-db/src/group.rs +++ b/chronik/chronik-db/src/group.rs @@ -1,142 +1,142 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`Group`] and [`GroupQuery`]. use bitcoinsuite_core::tx::{Tx, TxOutput}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use crate::io::{GroupHistoryConf, GroupUtxoConf}; /// Struct giving impls of [`Group`] all the necessary data to determine the /// member of the group. #[derive(Clone, Copy, Debug)] pub struct GroupQuery<'a> { /// Whether the tx is a coinbase tx. pub is_coinbase: bool, /// The transaction that should be grouped. pub tx: &'a Tx, } /// Item returned by `members_tx`. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct MemberItem { /// Index of the item in the list of inputs/outputs. pub idx: usize, /// Member of this item. pub member: M, } /// Groups txs and determines which members they are. /// /// A member is one instance in a group and can be anything in a tx, e.g. the /// input and output scripts, the SLP token ID, a SWaP signal, a smart contract /// etc. pub trait Group { /// Iterator over the members found for a given [`GroupQuery`]. type Iter<'a>: IntoIterator>> + 'a; /// Member of a group, this is what txs will be grouped by. /// /// We use a HashMap and a BTreeMap to group txs, so it must implement /// [`std::hash::Hash`] and [`Ord`]. type Member<'a>: std::hash::Hash + Eq + Ord; /// Serialized member, this is what will be used as key in the DB. /// Normally, this will be a [`Vec`] or an [`u8`] array or slice. /// /// We use this to allow members to separate between the code that groups /// them from the serialization of members. For example, we can group by raw /// input/output script bytecode, but then use compressed scripts when /// adding to the DB. That way, we don't have to run the compression every /// time we compare/hash elements for grouping. /// /// Note: For group history, this will be suffixed by a 4-byte page number. - type MemberSer<'a>: AsRef<[u8]> + 'a; + type MemberSer: AsRef<[u8]>; /// Auxillary data when grouping members type Aux; /// Data attached to a UTXO for this group. type UtxoData: UtxoData; /// Find the group's members in the given query's tx's inputs. /// /// Note: This is allowed to return a member multiple times per query. /// /// Note: The returned iterator is allowed to borrow from the query. fn input_members<'a>( &self, query: GroupQuery<'a>, aux: &Self::Aux, ) -> Self::Iter<'a>; /// Find the group's members in the given query's tx's outputs. /// /// Note: This is allowed to return a member multiple times per query. /// /// Note: The returned iterator is allowed to borrow from the query. fn output_members<'a>( &self, query: GroupQuery<'a>, aux: &Self::Aux, ) -> Self::Iter<'a>; /// Serialize the given member. - fn ser_member<'a>(&self, member: &Self::Member<'a>) -> Self::MemberSer<'a>; + fn ser_member(&self, member: &Self::Member<'_>) -> Self::MemberSer; /// The [`GroupHistoryConf`] for this group. fn tx_history_conf() -> GroupHistoryConf; /// The [`GroupUtxoConf`] for this group. fn utxo_conf() -> GroupUtxoConf; } /// Data atttached to a UTXO by a group. /// There's basically only 2 meaningful variants here, one with script (where /// the member is anything, e.g. a token ID) and one without script, where the /// member is the script itself and therefore storing it in the UTXO is /// redundant. pub trait UtxoData: Default + for<'a> Deserialize<'a> + Serialize + 'static { /// Function that extracts the [`UtxoData`] from an output. fn from_output(output: &TxOutput) -> Self; } /// [`UtxoData`] that only stores the output value but not the script. /// This is useful where the member itself is the script so storing it would be /// redundant. pub type UtxoDataValue = i64; impl UtxoData for UtxoDataValue { fn from_output(output: &TxOutput) -> Self { output.value } } /// [`UtxoData`] that stores the full output, including value and script. /// This is useful where the member isn't the script, e.g. a token ID. pub type UtxoDataOutput = (i64, Bytes); impl UtxoData for UtxoDataOutput { fn from_output(output: &TxOutput) -> Self { (output.value, output.script.bytecode().clone()) } } /// Helper which returns the `G::Member`s of both inputs and outputs of the /// group for the tx. pub fn tx_members_for_group<'a, G: Group>( group: &G, query: GroupQuery<'a>, aux: &G::Aux, ) -> impl Iterator> { group .input_members(query, aux) .into_iter() .chain(group.output_members(query, aux)) .map(|item| item.member) } diff --git a/chronik/chronik-db/src/groups/script.rs b/chronik/chronik-db/src/groups/script.rs index 12b02c27b..459b18950 100644 --- a/chronik/chronik-db/src/groups/script.rs +++ b/chronik/chronik-db/src/groups/script.rs @@ -1,226 +1,226 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use bitcoinsuite_core::{ script::{compress_script_variant, Script}, tx::Tx, }; use bytes::Bytes; use crate::{ db::{CF_SCRIPT_HISTORY, CF_SCRIPT_HISTORY_NUM_TXS, CF_SCRIPT_UTXO}, group::{Group, GroupQuery, MemberItem, UtxoDataValue}, io::{ GroupHistoryConf, GroupHistoryReader, GroupHistoryWriter, GroupUtxoConf, GroupUtxoReader, GroupUtxoWriter, }, mem::{MempoolGroupHistory, MempoolGroupUtxos}, }; /// Index the mempool tx history of scripts pub type MempoolScriptHistory = MempoolGroupHistory; /// Index the mempool UTXOs of scripts pub type MempoolScriptUtxos = MempoolGroupUtxos; /// Index the tx history of script in the DB pub type ScriptHistoryWriter<'a> = GroupHistoryWriter<'a, ScriptGroup>; /// Read the tx history of scripts in the DB pub type ScriptHistoryReader<'a> = GroupHistoryReader<'a, ScriptGroup>; /// Index the UTXOs of scripts in the DB pub type ScriptUtxoWriter<'a> = GroupUtxoWriter<'a, ScriptGroup>; /// Read the UTXOs of scripts in the DB pub type ScriptUtxoReader<'a> = GroupUtxoReader<'a, ScriptGroup>; /// Group txs by input/output scripts. #[derive(Clone, Debug)] pub struct ScriptGroup; /// Iterator over the scripts of a tx #[derive(Debug)] pub struct ScriptGroupIter<'a> { is_coinbase: bool, tx: &'a Tx, idx: usize, is_outputs: bool, } impl<'a> Iterator for ScriptGroupIter<'a> { type Item = MemberItem<&'a Script>; fn next(&mut self) -> Option { if self.is_coinbase && !self.is_outputs { return None; } let idx = self.idx; self.idx += 1; Some(MemberItem { idx, member: if self.is_outputs { &self.tx.outputs.get(idx)?.script } else { &self.tx.inputs.get(idx)?.coin.as_ref()?.output.script }, }) } } impl Group for ScriptGroup { type Aux = (); type Iter<'a> = ScriptGroupIter<'a>; type Member<'a> = &'a Script; - type MemberSer<'a> = Bytes; + type MemberSer = Bytes; type UtxoData = UtxoDataValue; fn input_members<'a>( &self, query: GroupQuery<'a>, _aux: &(), ) -> Self::Iter<'a> { ScriptGroupIter { is_coinbase: query.is_coinbase, tx: query.tx, idx: 0, is_outputs: false, } } fn output_members<'a>( &self, query: GroupQuery<'a>, _aux: &(), ) -> Self::Iter<'a> { ScriptGroupIter { is_coinbase: query.is_coinbase, tx: query.tx, idx: 0, is_outputs: true, } } - fn ser_member<'a>(&self, member: &Self::Member<'a>) -> Self::MemberSer<'a> { + fn ser_member(&self, member: &Self::Member<'_>) -> Self::MemberSer { compress_script_variant(&member.variant()) } fn tx_history_conf() -> GroupHistoryConf { GroupHistoryConf { cf_page_name: CF_SCRIPT_HISTORY, cf_num_txs_name: CF_SCRIPT_HISTORY_NUM_TXS, page_size: 1000, } } fn utxo_conf() -> GroupUtxoConf { GroupUtxoConf { cf_name: CF_SCRIPT_UTXO, } } } #[cfg(test)] mod tests { use bitcoinsuite_core::{ script::Script, tx::{Coin, Tx, TxId, TxInput, TxMut, TxOutput}, }; use crate::{ group::{tx_members_for_group, Group, GroupQuery, MemberItem}, groups::ScriptGroup, }; #[test] fn test_script_group() { let script_group = ScriptGroup; let tx = Tx::with_txid( TxId::from([0; 32]), TxMut { inputs: [[0x51].as_ref(), &[0x52]] .into_iter() .map(|script| TxInput { coin: Some(Coin { output: TxOutput { script: Script::new(script.into()), ..Default::default() }, ..Default::default() }), ..Default::default() }) .collect(), outputs: [[0x53].as_ref(), &[0x51]] .into_iter() .map(|script| TxOutput { script: Script::new(script.into()), ..Default::default() }) .collect(), ..Default::default() }, ); let make_script = |script: Vec| Script::new(script.into()); fn make_member_item( idx: usize, script: &Script, ) -> MemberItem<&Script> { MemberItem { idx, member: script, } } let query = GroupQuery { is_coinbase: false, tx: &tx, }; assert_eq!( tx_members_for_group(&script_group, query, &()).collect::>(), vec![ &make_script(vec![0x51]), &make_script(vec![0x52]), &make_script(vec![0x53]), &make_script(vec![0x51]), ], ); assert_eq!( script_group.input_members(query, &()).collect::>(), vec![ make_member_item(0, &make_script(vec![0x51])), make_member_item(1, &make_script(vec![0x52])), ], ); assert_eq!( script_group.output_members(query, &()).collect::>(), vec![ make_member_item(0, &make_script(vec![0x53])), make_member_item(1, &make_script(vec![0x51])), ], ); let query = GroupQuery { is_coinbase: true, tx: &tx, }; assert_eq!( tx_members_for_group(&script_group, query, &()).collect::>(), vec![ &Script::new(vec![0x53].into()), &Script::new(vec![0x51].into()), ], ); assert_eq!( script_group.input_members(query, &()).collect::>(), vec![], ); assert_eq!( script_group.output_members(query, &()).collect::>(), vec![ make_member_item(0, &make_script(vec![0x53])), make_member_item(1, &make_script(vec![0x51])), ], ); assert_eq!( script_group.ser_member(&&make_script(vec![0x53])), [[0x07].as_ref(), &[0x53]].concat(), ); } } diff --git a/chronik/chronik-db/src/groups/token_id.rs b/chronik/chronik-db/src/groups/token_id.rs index 480219fbf..ed9fb83c3 100644 --- a/chronik/chronik-db/src/groups/token_id.rs +++ b/chronik/chronik-db/src/groups/token_id.rs @@ -1,223 +1,223 @@ // Copyright (c) 2024 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use std::collections::{BTreeMap, BTreeSet, HashMap}; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; use bitcoinsuite_slp::token_id::TokenId; use crate::{ db::{ Db, CF_TOKEN_ID_HISTORY, CF_TOKEN_ID_HISTORY_NUM_TXS, CF_TOKEN_ID_UTXO, }, group::{Group, GroupQuery, MemberItem, UtxoDataOutput}, index_tx::IndexTx, io::{ token::{DbToken, ProcessedTokenTxBatch, TokenReader}, GroupHistoryConf, GroupHistoryReader, GroupHistoryWriter, GroupUtxoConf, GroupUtxoReader, GroupUtxoWriter, }, mem::{MempoolGroupHistory, MempoolGroupUtxos, MempoolTokens}, }; /// Index the mempool tx history of token IDs pub type MempoolTokenIdHistory = MempoolGroupHistory; /// Index the mempool UTXOs of token IDs pub type MempoolTokenIdUtxos = MempoolGroupUtxos; /// Index the tx history of script in the DB pub type TokenIdHistoryWriter<'a> = GroupHistoryWriter<'a, TokenIdGroup>; /// Read the tx history of token IDs in the DB pub type TokenIdHistoryReader<'a> = GroupHistoryReader<'a, TokenIdGroup>; /// Index the UTXOs of token IDs in the DB pub type TokenIdUtxoWriter<'a> = GroupUtxoWriter<'a, TokenIdGroup>; /// Read the UTXOs of token IDs in the DB pub type TokenIdUtxoReader<'a> = GroupUtxoReader<'a, TokenIdGroup>; /// Group txs by token ID. #[derive(Clone, Debug)] pub struct TokenIdGroup; type MaybeTokenIds = Vec>; /// Auxillary data for indexing by token ID #[derive(Debug, Default)] pub struct TokenIdGroupAux { txs: HashMap, } impl Group for TokenIdGroup { type Aux = TokenIdGroupAux; type Iter<'a> = Vec>; type Member<'a> = TokenId; - type MemberSer<'a> = [u8; 32]; + type MemberSer = [u8; 32]; type UtxoData = UtxoDataOutput; fn input_members<'a>( &self, query: GroupQuery<'a>, aux: &TokenIdGroupAux, ) -> Self::Iter<'a> { if query.is_coinbase { return vec![]; } let Some((input_token_ids, _)) = aux.txs.get(query.tx.txid_ref()) else { return vec![]; }; let mut inputs = Vec::with_capacity(query.tx.inputs.len()); for (idx, token_id) in input_token_ids.iter().enumerate() { if let Some(token_id) = token_id { inputs.push(MemberItem { idx, member: *token_id, }); } } inputs } fn output_members<'a>( &self, query: GroupQuery<'a>, aux: &TokenIdGroupAux, ) -> Self::Iter<'a> { let Some((_, output_token_ids)) = aux.txs.get(query.tx.txid_ref()) else { return vec![]; }; let mut output_scripts = Vec::with_capacity(query.tx.outputs.len()); for (idx, token_id) in output_token_ids.iter().enumerate() { if let Some(token_id) = token_id { output_scripts.push(MemberItem { idx, member: *token_id, }); } } output_scripts } - fn ser_member<'a>(&self, member: &Self::Member<'a>) -> Self::MemberSer<'a> { + fn ser_member(&self, member: &Self::Member<'_>) -> Self::MemberSer { member.to_be_bytes() } fn tx_history_conf() -> GroupHistoryConf { GroupHistoryConf { cf_page_name: CF_TOKEN_ID_HISTORY, cf_num_txs_name: CF_TOKEN_ID_HISTORY_NUM_TXS, page_size: 1000, } } fn utxo_conf() -> GroupUtxoConf { GroupUtxoConf { cf_name: CF_TOKEN_ID_UTXO, } } } impl TokenIdGroupAux { /// Build aux data from a processed token batch pub fn from_batch( txs: &[IndexTx<'_>], batch: &ProcessedTokenTxBatch, ) -> Self { let mut aux = HashMap::with_capacity(batch.valid_txs.len()); for tx in txs { let Some(spent_tokens) = batch.spent_tokens.get(&tx.tx_num) else { continue; }; let token_tx = batch.valid_txs.get(&tx.tx_num); aux.insert( tx.tx.txid(), ( spent_tokens .iter() .map(|token| Some(token.as_ref()?.token.meta.token_id)) .collect::>(), token_tx .map(|token_tx| { token_tx .outputs .iter() .map(|output| { Some( token_tx .token(output.as_ref()?) .meta .token_id, ) }) .collect::>() }) .unwrap_or_else(|| vec![None; tx.tx.outputs.len()]), ), ); } TokenIdGroupAux { txs: aux } } /// Retrieve the aux data from the DB pub fn from_db(txs: &[IndexTx<'_>], db: &Db) -> Result { let token_reader = TokenReader::new(db)?; let tx_nums = txs.iter().map(|tx| tx.tx_num).collect::>(); let db_token_txs = token_reader .token_txs(&tx_nums)? .into_iter() .collect::>(); let token_tx_nums = db_token_txs .values() .flat_map(|token_tx| token_tx.token_tx_nums.iter().cloned()) .collect::>(); let token_metas = token_reader .token_metas(&token_tx_nums)? .into_iter() .collect::>(); let mut aux = HashMap::with_capacity(db_token_txs.len()); for tx in txs { let Some(db_token_tx) = db_token_txs.get(&tx.tx_num) else { continue; }; let db_token_id = |db_token: &DbToken| -> Option { let token_tx_num = db_token_tx.token_tx_num(db_token)?; Some(token_metas.get(&token_tx_num)?.token_id) }; let input_token_ids = db_token_tx .inputs .iter() .map(db_token_id) .collect::>(); let output_token_ids = db_token_tx .outputs .iter() .map(db_token_id) .collect::>(); aux.insert(tx.tx.txid(), (input_token_ids, output_token_ids)); } Ok(TokenIdGroupAux { txs: aux }) } /// Retrieve the aux data from the mempool pub fn from_mempool(tx: &Tx, mempool: &MempoolTokens) -> Self { let mut aux = HashMap::new(); let Some(token_tx) = mempool.token_tx(tx.txid_ref()) else { return TokenIdGroupAux::default(); }; let output_token_ids = token_tx .outputs .iter() .map(|output| Some(token_tx.token(output.as_ref()?).meta.token_id)) .collect::>(); let input_token_ids = match mempool.tx_token_inputs(tx.txid_ref()) { Some(spent_tokens) => spent_tokens .iter() .map(|token| Some(token.as_ref()?.token.meta.token_id)) .collect::>(), None => vec![None; tx.inputs.len()], }; aux.insert(tx.txid(), (input_token_ids, output_token_ids)); TokenIdGroupAux { txs: aux } } } diff --git a/chronik/chronik-db/src/io/group_history.rs b/chronik/chronik-db/src/io/group_history.rs index 57a2d7584..20939ca2e 100644 --- a/chronik/chronik-db/src/io/group_history.rs +++ b/chronik/chronik-db/src/io/group_history.rs @@ -1,681 +1,681 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use std::{collections::BTreeMap, marker::PhantomData, time::Instant}; use abc_rust_error::Result; use rocksdb::WriteBatch; use thiserror::Error; use crate::{ db::{Db, CF}, group::{tx_members_for_group, Group, GroupQuery}, index_tx::IndexTx, io::{ group_history::GroupHistoryError::*, merge::catch_merge_errors, TxNum, }, ser::{db_deserialize_vec, db_serialize_vec}, }; /// Represent page numbers with 32-bit unsigned integers. type PageNum = u32; /// Represent num txs with 32-bit unsigned integers. /// Note: This implies that scripts can at most have 2^32 txs. type NumTxs = u32; const CONCAT: u8 = b'C'; const TRIM: u8 = b'T'; /// Configuration for group history reader/writers. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct GroupHistoryConf { /// Column family to store the group history pages. pub cf_page_name: &'static str, /// Column family to store the last page num of the group history. pub cf_num_txs_name: &'static str, /// Page size for each member of the group. pub page_size: NumTxs, } struct GroupHistoryColumn<'a> { db: &'a Db, cf_page: &'a CF, cf_num_txs: &'a CF, } /// Write txs grouped and paginated to the DB. /// /// This is primarily meant to store the tx history of an address, but it can /// also be used to index the history of other groups, especially of new /// protocols. /// /// Txs are stored paginated, because in the case of addresses, there already /// exist addresses with millions of txs. While RocksDB can handle multi MB /// entries, it would significantly slow down both reading and writing of this /// address, which could pose a DoS risk. /// /// Each page is stored at the key ` + <4-byte page num>` /// /// Txs in a member are ordered strictly ascendingly, both within a page, and /// also between pages, such that the entire tx history of a member can be /// iterated by going through pages 0..N and going through all of the txs of /// each page. #[derive(Debug)] pub struct GroupHistoryWriter<'a, G: Group> { col: GroupHistoryColumn<'a>, conf: GroupHistoryConf, group: G, } /// Read pages of grouped txs from the DB. #[derive(Debug)] pub struct GroupHistoryReader<'a, G: Group> { col: GroupHistoryColumn<'a>, conf: GroupHistoryConf, phantom: PhantomData, } /// In-memory data for the tx history. #[derive(Debug, Default)] pub struct GroupHistoryMemData { /// Stats about cache hits, num requests etc. pub stats: GroupHistoryStats, } /// Stats about cache hits, num requests etc. #[derive(Clone, Debug, Default)] pub struct GroupHistoryStats { /// Total number of members updated. pub n_total: usize, /// Time [s] for insert/delete. pub t_total: f64, /// Time [s] for grouping txs. pub t_group: f64, /// Time [s] for serializing members. pub t_ser_members: f64, /// Time [s] for fetching existing tx data. pub t_fetch: f64, } /// Error indicating that something went wrong with writing group history data. #[derive(Debug, Error, PartialEq, Eq)] pub enum GroupHistoryError { /// Bad num_txs size #[error("Inconsistent DB: Bad num_txs size: {0:?}")] BadNumTxsSize(Vec), /// Used merge_cf incorrectly, prefix must either be C or T. #[error( "Bad usage of merge: Unknown prefix {0:02x}, expected C or T: {}", hex::encode(.1), )] UnknownOperandPrefix(u8, Vec), } struct FetchedNumTxs<'tx, G: Group> { members_num_txs: Vec, grouped_txs: BTreeMap, Vec>, - ser_members: Vec>, + ser_members: Vec, } pub(crate) fn bytes_to_num_txs(bytes: &[u8]) -> Result { Ok(NumTxs::from_be_bytes( bytes .try_into() .map_err(|_| BadNumTxsSize(bytes.to_vec()))?, )) } fn partial_merge_concat_trim( _key: &[u8], _existing_value: Option<&[u8]>, _operands: &rocksdb::MergeOperands, ) -> Option> { // We don't use partial merge None } fn init_concat_trim( _key: &[u8], existing_value: Option<&[u8]>, operands: &rocksdb::MergeOperands, ) -> Result> { let mut bytes = existing_value.unwrap_or(&[]).to_vec(); if operands.iter().all(|operand| operand[0] == CONCAT) { bytes.reserve_exact( operands.iter().map(|operand| operand.len() - 1).sum(), ); } Ok(bytes) } fn apply_concat_trim( _key: &[u8], bytes: &mut Vec, operand: &[u8], ) -> Result<()> { if operand[0] == CONCAT { bytes.extend_from_slice(&operand[1..]); } else if operand[0] == TRIM { let trim_len = NumTxs::from_be_bytes(operand[1..5].try_into().unwrap()); bytes.drain(bytes.len() - trim_len as usize..); } else { return Err(UnknownOperandPrefix(operand[0], operand.to_vec()).into()); } Ok(()) } fn ser_concat_trim(_key: &[u8], bytes: Vec) -> Result> { Ok(bytes) } impl<'a> GroupHistoryColumn<'a> { fn new(db: &'a Db, conf: &GroupHistoryConf) -> Result { let cf_page = db.cf(conf.cf_page_name)?; let cf_num_txs = db.cf(conf.cf_num_txs_name)?; Ok(GroupHistoryColumn { db, cf_page, cf_num_txs, }) } fn get_page_txs( &self, member_ser: &[u8], page_num: PageNum, ) -> Result>> { let key = key_for_member_page(member_ser, page_num); let value = match self.db.get(self.cf_page, &key)? { Some(value) => value, None => return Ok(None), }; Ok(Some(db_deserialize_vec::(&value)?)) } } impl<'a, G: Group> GroupHistoryWriter<'a, G> { /// Create a new [`GroupHistoryWriter`]. pub fn new(db: &'a Db, group: G) -> Result { let conf = G::tx_history_conf(); let col = GroupHistoryColumn::new(db, &conf)?; Ok(GroupHistoryWriter { col, conf, group }) } /// Group the txs, then insert them to into each member of the group. pub fn insert( &self, batch: &mut WriteBatch, txs: &[IndexTx<'_>], aux: &G::Aux, mem_data: &mut GroupHistoryMemData, ) -> Result<()> { let t_start = Instant::now(); let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?; for ((mut new_tx_nums, member_ser), mut num_txs) in fetched .grouped_txs .into_values() .zip(fetched.ser_members) .zip(fetched.members_num_txs) { let mut page_num = num_txs / self.conf.page_size; let mut last_page_num_txs = num_txs % self.conf.page_size; loop { let space_left = (self.conf.page_size - last_page_num_txs) as usize; let num_new_txs = space_left.min(new_tx_nums.len()); let merge_tx_nums = db_serialize_vec(new_tx_nums.drain(..num_new_txs))?; batch.merge_cf( self.col.cf_page, key_for_member_page(member_ser.as_ref(), page_num), [[CONCAT].as_ref(), &merge_tx_nums].concat(), ); num_txs += num_new_txs as NumTxs; if new_tx_nums.is_empty() { batch.put_cf( self.col.cf_num_txs, member_ser.as_ref(), num_txs.to_be_bytes(), ); break; } last_page_num_txs = 0; page_num += 1; } } mem_data.stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } /// Group the txs, then delete them from each member of the group. pub fn delete( &self, batch: &mut WriteBatch, txs: &[IndexTx<'_>], aux: &G::Aux, mem_data: &mut GroupHistoryMemData, ) -> Result<()> { let t_start = Instant::now(); let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?; for ((mut removed_tx_nums, member_ser), mut num_txs) in fetched .grouped_txs .into_values() .zip(fetched.ser_members) .zip(fetched.members_num_txs) { let mut num_remaining_removes = removed_tx_nums.len(); let mut page_num = num_txs / self.conf.page_size; let mut last_page_num_txs = num_txs % self.conf.page_size; loop { let num_page_removes = (last_page_num_txs as usize).min(num_remaining_removes); let key = key_for_member_page(member_ser.as_ref(), page_num); if num_page_removes == last_page_num_txs as usize { batch.delete_cf(self.col.cf_page, key) } else { let merge_removed_txs = db_serialize_vec( removed_tx_nums .drain(removed_tx_nums.len() - num_page_removes..), )?; let num_trimmed_bytes = merge_removed_txs.len() as NumTxs; batch.merge_cf( self.col.cf_page, key, [[TRIM].as_ref(), &num_trimmed_bytes.to_be_bytes()] .concat(), ); } num_txs -= num_page_removes as NumTxs; num_remaining_removes -= num_page_removes; if num_remaining_removes == 0 { if num_txs > 0 { batch.put_cf( self.col.cf_num_txs, member_ser.as_ref(), num_txs.to_be_bytes(), ); } else { batch.delete_cf( self.col.cf_num_txs, member_ser.as_ref(), ); } break; } if page_num > 0 { page_num -= 1; last_page_num_txs = self.conf.page_size; } } } mem_data.stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } fn fetch_members_num_txs<'tx>( &self, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, mem_data: &mut GroupHistoryMemData, ) -> Result> { let GroupHistoryMemData { stats } = mem_data; let t_group = Instant::now(); let grouped_txs = self.group_txs(txs, aux); stats.t_group += t_group.elapsed().as_secs_f64(); let t_ser_members = Instant::now(); let ser_members = grouped_txs .keys() .map(|key| self.group.ser_member(key)) .collect::>(); stats.t_ser_members += t_ser_members.elapsed().as_secs_f64(); stats.n_total += grouped_txs.len(); let t_fetch = Instant::now(); let num_txs_keys = ser_members.iter().map(|member_ser| member_ser.as_ref()); let fetched_num_txs = self.col .db .multi_get(self.col.cf_num_txs, num_txs_keys, true)?; let mut members_num_txs = Vec::with_capacity(fetched_num_txs.len()); for db_num_txs in fetched_num_txs { members_num_txs.push(match db_num_txs { Some(db_num_txs) => bytes_to_num_txs(&db_num_txs)?, None => 0, }); } stats.t_fetch += t_fetch.elapsed().as_secs_f64(); Ok(FetchedNumTxs { members_num_txs, grouped_txs, ser_members, }) } fn group_txs<'tx>( &self, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, ) -> BTreeMap, Vec> { let mut group_tx_nums = BTreeMap::, Vec>::new(); for index_tx in txs { let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for member in tx_members_for_group(&self.group, query, aux) { let tx_nums = group_tx_nums.entry(member).or_default(); if let Some(&last_tx_num) = tx_nums.last() { if last_tx_num == index_tx.tx_num { continue; } } tx_nums.push(index_tx.tx_num); } } group_tx_nums } pub(crate) fn add_cfs(columns: &mut Vec) { let conf = G::tx_history_conf(); let mut page_options = rocksdb::Options::default(); let merge_op_name = format!("{}::merge_op_concat", conf.cf_page_name); page_options.set_merge_operator( merge_op_name.as_str(), catch_merge_errors( init_concat_trim, apply_concat_trim, ser_concat_trim, ), partial_merge_concat_trim, ); columns.push(rocksdb::ColumnFamilyDescriptor::new( conf.cf_page_name, page_options, )); columns.push(rocksdb::ColumnFamilyDescriptor::new( conf.cf_num_txs_name, rocksdb::Options::default(), )); } } impl std::fmt::Debug for GroupHistoryColumn<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GroupHistoryColumn {{ .. }}") } } impl<'a, G: Group> GroupHistoryReader<'a, G> { /// Create a new [`GroupHistoryReader`]. pub fn new(db: &'a Db) -> Result { let conf = G::tx_history_conf(); let col = GroupHistoryColumn::new(db, &conf)?; Ok(GroupHistoryReader { col, conf, phantom: PhantomData, }) } /// Read the tx_nums for the given member on the given page, or None, if the /// page doesn't exist in the DB. pub fn page_txs( &self, member_ser: &[u8], page_num: PageNum, ) -> Result>> { self.col.get_page_txs(member_ser, page_num) } /// Total number of pages and txs for this serialized member. /// The result tuple is (num_pages, num_txs). pub fn member_num_pages_and_txs( &self, member_ser: &[u8], ) -> Result<(usize, usize)> { let num_txs = match self.col.db.get(self.col.cf_num_txs, member_ser)? { Some(bytes) => bytes_to_num_txs(&bytes)?, None => return Ok((0, 0)), }; let num_pages = (num_txs + self.conf.page_size - 1) / self.conf.page_size; Ok((num_pages as usize, num_txs as usize)) } /// Size of pages the data is stored in. pub fn page_size(&self) -> usize { self.conf.page_size as usize } } fn key_for_member_page(member_ser: &[u8], page_num: PageNum) -> Vec { [member_ser, &page_num.to_be_bytes()].concat() } #[cfg(test)] mod tests { use std::cell::RefCell; use abc_rust_error::Result; use bitcoinsuite_core::tx::Tx; use rocksdb::WriteBatch; use crate::{ db::Db, index_tx::prepare_indexed_txs, io::{ group_history::PageNum, merge::{check_for_errors, MERGE_ERROR_LOCK}, BlockTxs, GroupHistoryMemData, GroupHistoryReader, GroupHistoryWriter, TxEntry, TxNum, TxWriter, TxsMemData, }, test::{make_value_tx, ser_value, ValueGroup}, }; #[test] fn test_value_group_history() -> Result<()> { let _guard = MERGE_ERROR_LOCK.lock().unwrap(); abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--group_history")?; let mut cfs = Vec::new(); GroupHistoryWriter::::add_cfs(&mut cfs); TxWriter::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let tx_writer = TxWriter::new(&db)?; let group_writer = GroupHistoryWriter::new(&db, ValueGroup)?; let group_reader = GroupHistoryReader::::new(&db)?; let mem_data = RefCell::new(GroupHistoryMemData::default()); let txs_mem_data = RefCell::new(TxsMemData::default()); let block_height = RefCell::new(-1); let txs_batch = |txs: &[Tx]| BlockTxs { txs: txs .iter() .map(|tx| TxEntry { txid: tx.txid(), ..Default::default() }) .collect(), block_height: *block_height.borrow(), }; let connect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); *block_height.borrow_mut() += 1; let first_tx_num = tx_writer.insert( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.insert( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; Ok(()) }; let disconnect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); let first_tx_num = tx_writer.delete( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.delete( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; *block_height.borrow_mut() -= 1; Ok(()) }; let read_page = |val: i64, page_num: PageNum| -> Result>> { group_reader.page_txs(&ser_value(val), page_num) }; let read_num_pages_and_txs = |val: i64| -> Result<(usize, usize)> { group_reader.member_num_pages_and_txs(&ser_value(val)) }; // Only adds an entry for value=10 (coinbase inputs are ignored) let block0 = [make_value_tx(0, [0xffff], [10])]; connect_block(&block0)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_num_pages_and_txs(0xffff)?, (0, 0)); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_num_pages_and_txs(10)?, (1, 1)); // Block that adds a lot of pages to value=10, one entry to value=20 let block1 = [ make_value_tx(1, [0xffff], [10]), make_value_tx(2, [10], []), make_value_tx(3, [20], []), // value=20 make_value_tx(4, [10], []), make_value_tx(5, [10], []), make_value_tx(6, [10], []), make_value_tx(7, [10], []), make_value_tx(8, [10], []), make_value_tx(9, [10], []), ]; connect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 4])); assert_eq!(read_page(10, 1)?, Some(vec![5, 6, 7, 8])); assert_eq!(read_page(10, 2)?, Some(vec![9])); assert_eq!(read_page(10, 3)?, None); assert_eq!(read_num_pages_and_txs(10)?, (3, 9)); assert_eq!(read_page(20, 0)?, Some(vec![3])); assert_eq!(read_page(20, 1)?, None); assert_eq!(read_num_pages_and_txs(20)?, (1, 1)); // Only tx_num=0 remains // The other pages have been removed from the DB entirely disconnect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(10, 2)?, None); assert_eq!(read_page(20, 0)?, None); // Re-org block, with all kinds of input + output values let block1 = [ make_value_tx(1, [0xffff], [10]), make_value_tx(2, [10], [10, 20, 30]), make_value_tx(3, [10, 40], [10, 10, 40]), make_value_tx(4, [10], [40, 30, 40]), ]; connect_block(&block1)?; // all txs add to value=10, with 2 pages assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4])); assert_eq!(read_num_pages_and_txs(10)?, (2, 5)); // only tx_num=2 adds to value=20 assert_eq!(read_page(20, 0)?, Some(vec![2])); assert_eq!(read_num_pages_and_txs(20)?, (1, 1)); // tx_num=2 and tx_num=4 add to value=30 assert_eq!(read_page(30, 0)?, Some(vec![2, 4])); assert_eq!(read_num_pages_and_txs(30)?, (1, 2)); // tx_num=3 and tx_num=4 add to value=40 assert_eq!(read_page(40, 0)?, Some(vec![3, 4])); assert_eq!(read_num_pages_and_txs(40)?, (1, 2)); // Delete that block also disconnect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(20, 0)?, None); assert_eq!(read_page(30, 0)?, None); assert_eq!(read_page(40, 0)?, None); // Add it back in connect_block(&block1)?; // Add new block, adding 1 tx to 20, 6 txs to 30, 4 txs to 40 let block2 = [ make_value_tx(5, [0xffff], [40, 30]), make_value_tx(6, [30, 10], [30]), make_value_tx(7, [10], [30]), make_value_tx(8, [40], [30]), make_value_tx(9, [10], [20]), ]; connect_block(&block2)?; assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4, 6, 7, 9])); assert_eq!(read_page(10, 2)?, None); assert_eq!(read_num_pages_and_txs(10)?, (2, 8)); assert_eq!(read_page(20, 0)?, Some(vec![2, 9])); assert_eq!(read_page(20, 1)?, None); assert_eq!(read_num_pages_and_txs(20)?, (1, 2)); assert_eq!(read_page(30, 0)?, Some(vec![2, 4, 5, 6])); assert_eq!(read_page(30, 1)?, Some(vec![7, 8])); assert_eq!(read_page(30, 2)?, None); assert_eq!(read_num_pages_and_txs(30)?, (2, 6)); assert_eq!(read_page(40, 0)?, Some(vec![3, 4, 5, 8])); assert_eq!(read_page(40, 1)?, None); assert_eq!(read_num_pages_and_txs(40)?, (1, 4)); // Remove all blocks disconnect_block(&block2)?; assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4])); assert_eq!(read_page(20, 0)?, Some(vec![2])); assert_eq!(read_page(30, 0)?, Some(vec![2, 4])); assert_eq!(read_page(30, 1)?, None); assert_eq!(read_page(40, 0)?, Some(vec![3, 4])); assert_eq!(read_page(40, 1)?, None); disconnect_block(&block1)?; assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(20, 0)?, None); assert_eq!(read_page(30, 0)?, None); assert_eq!(read_page(40, 0)?, None); disconnect_block(&block0)?; assert_eq!(read_page(10, 0)?, None); drop(db); rocksdb::DB::destroy(&rocksdb::Options::default(), tempdir.path())?; let _ = check_for_errors(); Ok(()) } } diff --git a/chronik/chronik-db/src/mem/group_history.rs b/chronik/chronik-db/src/mem/group_history.rs index a2b3ef370..f1747a0de 100644 --- a/chronik/chronik-db/src/mem/group_history.rs +++ b/chronik/chronik-db/src/mem/group_history.rs @@ -1,170 +1,170 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use std::collections::{BTreeSet, HashMap}; use bitcoinsuite_core::tx::TxId; use crate::{ group::{tx_members_for_group, Group, GroupQuery}, mem::MempoolTx, }; /// Index the tx history of a group. /// /// Individual items are sorted by [`MempoolTx`]`::time_first_seen` first and /// then by TxId, so when accessing them, they are returned in chronological /// order. #[derive(Debug, Default)] pub struct MempoolGroupHistory { /// (time_first_seen, txid), so we sort by time history: HashMap, BTreeSet<(i64, TxId)>>, group: G, } impl MempoolGroupHistory { /// Create a new [`MempoolGroupHistory`] for the given group. pub fn new(group: G) -> MempoolGroupHistory { MempoolGroupHistory { history: HashMap::new(), group, } } /// Index the given [`MempoolTx`] by this group. pub fn insert(&mut self, tx: &MempoolTx, aux: &G::Aux) { let query = GroupQuery { is_coinbase: false, tx: &tx.tx, }; for member in tx_members_for_group(&self.group, query, aux) { - let member_ser: G::MemberSer<'_> = self.group.ser_member(&member); + let member_ser: G::MemberSer = self.group.ser_member(&member); if !self.history.contains_key(member_ser.as_ref()) { self.history .insert(member_ser.as_ref().to_vec(), BTreeSet::new()); } let member_history = self .history .get_mut(member_ser.as_ref()) .expect("Impossible"); member_history.insert((tx.time_first_seen, tx.tx.txid())); } } /// Remove the given [`MempoolTx`] from the history index. pub fn remove(&mut self, tx: &MempoolTx, aux: &G::Aux) { let query = GroupQuery { is_coinbase: false, tx: &tx.tx, }; for member in tx_members_for_group(&self.group, query, aux) { - let member_ser: G::MemberSer<'_> = self.group.ser_member(&member); + let member_ser: G::MemberSer = self.group.ser_member(&member); if let Some(entries) = self.history.get_mut(member_ser.as_ref()) { entries.remove(&(tx.time_first_seen, tx.tx.txid())); if entries.is_empty() { self.history.remove(member_ser.as_ref()); } } } } /// Return the history of a given serialized member as an ordered /// [`BTreeSet`], or None if there are no entries. pub fn member_history( &self, member_ser: &[u8], ) -> Option<&BTreeSet<(i64, TxId)>> { self.history.get(member_ser) } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::tx::TxId; use crate::{ mem::{MempoolGroupHistory, MempoolTx}, test::{make_value_tx, ser_value, ValueGroup}, }; #[test] fn test_mempool_group_history() -> Result<()> { let mempool = std::cell::RefCell::new(MempoolGroupHistory::new(ValueGroup)); let add_tx = |tx: &MempoolTx| mempool.borrow_mut().insert(tx, &()); let remove_tx = |tx: &MempoolTx| mempool.borrow_mut().remove(tx, &()); let member_history = |val: i64| -> Option> { mempool .borrow_mut() .member_history(&ser_value(val)) .map(|history| history.iter().copied().collect::>()) }; fn make_mempool_tx( txid_num: u8, input_values: [i64; N], output_values: [i64; M], time_first_seen: i64, ) -> MempoolTx { MempoolTx { tx: make_value_tx(txid_num, input_values, output_values), time_first_seen, } } let tx1 = make_mempool_tx(1, [10], [], 1000); add_tx(&tx1); assert_eq!(member_history(10), Some(vec![(1000, TxId::from([1; 32]))])); remove_tx(&tx1); assert_eq!(member_history(10), None); let tx2 = make_mempool_tx(2, [], [10], 900); add_tx(&tx1); add_tx(&tx2); assert_eq!( member_history(10), Some(vec![ (900, TxId::from([2; 32])), (1000, TxId::from([1; 32])), ]), ); let tx4 = make_mempool_tx(4, [10], [10], 1000); add_tx(&tx4); assert_eq!( member_history(10), Some(vec![ (900, TxId::from([2; 32])), (1000, TxId::from([1; 32])), (1000, TxId::from([4; 32])), ]), ); let tx3 = make_mempool_tx(3, [10, 10], [10, 20], 1000); add_tx(&tx3); assert_eq!( member_history(10), Some(vec![ (900, TxId::from([2; 32])), (1000, TxId::from([1; 32])), (1000, TxId::from([3; 32])), (1000, TxId::from([4; 32])), ]), ); assert_eq!(member_history(20), Some(vec![(1000, TxId::from([3; 32]))])); remove_tx(&tx4); remove_tx(&tx1); remove_tx(&tx3); assert_eq!(member_history(10), Some(vec![(900, TxId::from([2; 32]))])); assert_eq!(member_history(20), None); Ok(()) } } diff --git a/chronik/chronik-db/src/test/value_group.rs b/chronik/chronik-db/src/test/value_group.rs index 2badf99d8..b4923f767 100644 --- a/chronik/chronik-db/src/test/value_group.rs +++ b/chronik/chronik-db/src/test/value_group.rs @@ -1,135 +1,135 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use bitcoinsuite_core::tx::{ Coin, OutPoint, Tx, TxId, TxInput, TxMut, TxOutput, }; use crate::{ group::{Group, GroupQuery, MemberItem, UtxoDataValue}, io::{GroupHistoryConf, GroupUtxoConf}, }; /// Index by output/input value. While useless in pactice, this makes /// writing tests very convenient and showcases how Group can be used. #[derive(Debug, Default, Eq, PartialEq)] pub(crate) struct ValueGroup; impl Group for ValueGroup { type Aux = (); type Iter<'a> = Vec>; type Member<'a> = i64; - type MemberSer<'a> = [u8; 8]; + type MemberSer = [u8; 8]; type UtxoData = UtxoDataValue; fn input_members( &self, query: GroupQuery<'_>, _aux: &(), ) -> Self::Iter<'_> { let mut inputs = Vec::new(); if !query.is_coinbase { for (idx, input) in query.tx.inputs.iter().enumerate() { if let Some(coin) = &input.coin { inputs.push(MemberItem { idx, member: coin.output.value, }); } } } inputs } fn output_members( &self, query: GroupQuery<'_>, _aux: &(), ) -> Self::Iter<'_> { let mut outputs = Vec::new(); for (idx, output) in query.tx.outputs.iter().enumerate() { outputs.push(MemberItem { idx, member: output.value, }); } outputs } - fn ser_member<'a>(&self, value: &i64) -> Self::MemberSer<'a> { + fn ser_member(&self, value: &i64) -> Self::MemberSer { ser_value(*value) } fn tx_history_conf() -> GroupHistoryConf { GroupHistoryConf { cf_page_name: "value_history", cf_num_txs_name: "value_history_num_txs", page_size: 4, } } fn utxo_conf() -> GroupUtxoConf { GroupUtxoConf { cf_name: "value_utxo", } } } /// Serialize the value as array pub(crate) fn ser_value(value: i64) -> [u8; 8] { value.to_be_bytes() } /// Make a tx with inputs and outputs having the given values. /// Also pass _tx_num, which is ignored, but allows tests to document the tx_num /// of this tx. pub(crate) fn make_value_tx( txid_num: u8, input_values: [i64; N], output_values: [i64; M], ) -> Tx { make_inputs_tx( txid_num, input_values.map(|value| (0, 0, value)), output_values, ) } pub(crate) fn make_inputs_tx( txid_num: u8, input_values: [(u8, u32, i64); N], output_values: [i64; M], ) -> Tx { Tx::with_txid( TxId::from([txid_num; 32]), TxMut { version: 0, inputs: input_values .into_iter() .map(|(input_txid_num, out_idx, value)| TxInput { prev_out: OutPoint { txid: TxId::from([input_txid_num; 32]), out_idx, }, coin: Some(Coin { output: TxOutput { value, ..Default::default() }, ..Default::default() }), ..Default::default() }) .collect(), outputs: output_values .into_iter() .map(|value| TxOutput { value, ..Default::default() }) .collect(), locktime: 0, }, ) }