diff --git a/chronik/chronik-db/src/group.rs b/chronik/chronik-db/src/group.rs index fefac3afc..25cf7eb65 100644 --- a/chronik/chronik-db/src/group.rs +++ b/chronik/chronik-db/src/group.rs @@ -1,56 +1,59 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`Group`] and [`GroupQuery`]. use bitcoinsuite_core::tx::Tx; use crate::io::GroupHistoryConf; /// Struct giving impls of [`Group`] all the necessary data to determine the /// member of the group. #[derive(Debug)] pub struct GroupQuery<'a> { /// Whether the tx is a coinbase tx. pub is_coinbase: bool, /// The transaction that should be grouped. pub tx: &'a Tx, } /// Groups txs and determines which members they are. /// /// A member is one instance in a group and can be anything in a tx, e.g. the /// input and output scripts, the SLP token ID, a SWaP signal, a smart contract /// etc. pub trait Group { /// Iterator over the members found for a given [`GroupQuery`]. type Iter<'a>: IntoIterator> + 'a; /// Member of a group, this is what txs will be grouped by. /// /// We use a HashMap to group txs, so it must implement [`std::hash::Hash`]. - type Member<'a>: Into> + std::hash::Hash + Eq; + type Member<'a>: std::hash::Hash + Eq; /// Serialized member, this is what will be used as key in the DB. /// Normally, this will be a [`Vec`] or an [`u8`] array or slice. /// /// We use this to allow members to separate between the code that groups /// them from the serialization of members. For example, we can group by raw /// input/output script bytecode, but then use compressed scripts when /// adding to the DB. That way, we don't have to run the compression every /// time we compare/hash elements for grouping. /// /// Note: For group history, this will be suffixed by a 4-byte page number. type MemberSer<'a>: AsRef<[u8]> + 'a; /// Find the group's members in the given query's tx. /// /// Note: This is allowed to return a member multiple times per query. /// /// Note: The returned iterator is allowed to borrow from the query. fn members_tx<'a>(&self, query: GroupQuery<'a>) -> Self::Iter<'a>; + /// Serialize the given member. + fn ser_member<'a>(&self, member: Self::Member<'a>) -> Self::MemberSer<'a>; + /// The [`GroupHistoryConf`] for this group. fn tx_history_conf() -> GroupHistoryConf; } diff --git a/chronik/chronik-db/src/io/group_history.rs b/chronik/chronik-db/src/io/group_history.rs index fe416c954..f34562386 100644 --- a/chronik/chronik-db/src/io/group_history.rs +++ b/chronik/chronik-db/src/io/group_history.rs @@ -1,422 +1,422 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use std::{collections::HashMap, marker::PhantomData}; use abc_rust_error::Result; use bitcoinsuite_core::tx::Tx; use rocksdb::WriteBatch; use crate::{ db::{Db, CF}, group::{Group, GroupQuery}, io::TxNum, ser::{db_deserialize, db_serialize}, }; /// Represent page numbers with 32-bit unsigned integers. type PageNum = u32; const PAGE_SER_SIZE: usize = 4; /// Configuration for group history reader/writers. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct GroupHistoryConf { /// Column family to store the group history entries. pub cf_name: &'static str, /// Page size for each member of the group. pub page_size: usize, } struct GroupHistoryColumn<'a> { db: &'a Db, cf: &'a CF, } /// Write txs grouped and paginated to the DB. /// /// This is primarily meant to store the tx history of an address, but it can /// also be used to index the history of other groups, especially of new /// protocols. /// /// Txs are stored paginated, because in the case of addresses, there already /// exist addresses with millions of txs. While RocksDB can handle multi MB /// entries, it would significantly slow down both reading and writing of this /// address, which could pose a DoS risk. /// /// Each page is stored at the key ` + <4-byte page num>` /// /// Txs in a member are ordered strictly ascendingly, both within a page, and /// also between pages, such that the entire tx history of a member can be /// iterated by going through pages 0..N and going through all of the txs of /// each page. #[derive(Debug)] pub struct GroupHistoryWriter<'a, G: Group> { col: GroupHistoryColumn<'a>, conf: GroupHistoryConf, group: G, } /// Read pages of grouped txs from the DB. #[derive(Debug)] pub struct GroupHistoryReader<'a, G: Group> { col: GroupHistoryColumn<'a>, conf: GroupHistoryConf, phantom: PhantomData, } impl<'a> GroupHistoryColumn<'a> { fn new(db: &'a Db, conf: &GroupHistoryConf) -> Result { let cf = db.cf(conf.cf_name)?; Ok(GroupHistoryColumn { db, cf }) } fn get_page_txs( &self, member_ser: &[u8], page_num: PageNum, ) -> Result>> { let key = key_for_member_page(member_ser, page_num); let value = match self.db.get(self.cf, &key)? { Some(value) => value, None => return Ok(None), }; Ok(Some(db_deserialize::>(&value)?)) } fn get_member_last_page( &self, member_ser: &[u8], ) -> Result<(u32, Vec)> { let last_key = key_for_member_page(member_ser, u32::MAX); let mut iter = self.db .iterator(self.cf, &last_key, rocksdb::Direction::Reverse); let (key, value) = match iter.next() { Some(result) => { let (key, value) = result?; if &key[..key.len() - PAGE_SER_SIZE] == member_ser { (key, value) } else { return Ok((0, vec![])); } } None => return Ok((0, vec![])), }; let numbers = db_deserialize::>(&value)?; let page_num = PageNum::from_be_bytes( key[key.len() - PAGE_SER_SIZE..].try_into().unwrap(), ); Ok((page_num, numbers)) } } impl<'a, G: Group> GroupHistoryWriter<'a, G> { /// Create a new [`GroupHistoryWriter`]. pub fn new(db: &'a Db, group: G) -> Result { let conf = G::tx_history_conf(); let col = GroupHistoryColumn::new(db, &conf)?; Ok(GroupHistoryWriter { col, conf, group }) } /// Group the txs, then insert them to into each member of the group. pub fn insert( &self, batch: &mut WriteBatch, first_tx_num: TxNum, txs: &[Tx], ) -> Result<()> { let grouped_txs = self.group_txs(first_tx_num, txs); for (member, mut new_tx_nums) in grouped_txs { - let member_ser: G::MemberSer<'_> = member.into(); + let member_ser: G::MemberSer<'_> = self.group.ser_member(member); let (mut page_num, mut last_page_tx_nums) = self.col.get_member_last_page(member_ser.as_ref())?; while !new_tx_nums.is_empty() { let space_left = self.conf.page_size - last_page_tx_nums.len(); let num_new_txs = space_left.min(new_tx_nums.len()); last_page_tx_nums.extend(new_tx_nums.drain(..num_new_txs)); batch.put_cf( self.col.cf, key_for_member_page(member_ser.as_ref(), page_num), db_serialize(&last_page_tx_nums)?, ); last_page_tx_nums.clear(); page_num += 1; } } Ok(()) } /// Group the txs, then delete them from each member of the group. pub fn delete( &self, batch: &mut WriteBatch, first_tx_num: TxNum, txs: &[Tx], ) -> Result<()> { let grouped_txs = self.group_txs(first_tx_num, txs); for (member, removed_tx_nums) in grouped_txs { - let member_ser: G::MemberSer<'_> = member.into(); + let member_ser: G::MemberSer<'_> = self.group.ser_member(member); let mut num_remaining_removes = removed_tx_nums.len(); let (mut page_num, mut last_page_tx_nums) = self.col.get_member_last_page(member_ser.as_ref())?; while num_remaining_removes > 0 { let num_page_removes = last_page_tx_nums.len().min(num_remaining_removes); last_page_tx_nums .drain(last_page_tx_nums.len() - num_page_removes..); let key = key_for_member_page(member_ser.as_ref(), page_num); if last_page_tx_nums.is_empty() { batch.delete_cf(self.col.cf, key) } else { batch.put_cf( self.col.cf, key, db_serialize(&last_page_tx_nums)?, ); } num_remaining_removes -= num_page_removes; if page_num > 0 { page_num -= 1; last_page_tx_nums = self .col .get_page_txs(member_ser.as_ref(), page_num)? .unwrap_or(vec![]); } } } Ok(()) } fn group_txs<'tx>( &self, first_tx_num: TxNum, txs: &'tx [Tx], ) -> HashMap, Vec> { let mut group_tx_nums = HashMap::, Vec>::new(); for (tx_idx, tx) in txs.iter().enumerate() { let tx_num = first_tx_num + tx_idx as u64; let query = GroupQuery { is_coinbase: tx_idx == 0, tx, }; for member in self.group.members_tx(query) { let tx_nums = group_tx_nums.entry(member).or_default(); if let Some(&last_tx_num) = tx_nums.last() { if last_tx_num == tx_num { continue; } } tx_nums.push(tx_num); } } group_tx_nums } #[cfg(test)] pub(crate) fn add_cfs(columns: &mut Vec) { columns.push(rocksdb::ColumnFamilyDescriptor::new( G::tx_history_conf().cf_name, rocksdb::Options::default(), )); } } impl std::fmt::Debug for GroupHistoryColumn<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GroupHistoryColumn {{ .. }}") } } impl<'a, G: Group> GroupHistoryReader<'a, G> { /// Create a new [`GroupHistoryReader`]. pub fn new(db: &'a Db) -> Result { let conf = G::tx_history_conf(); let col = GroupHistoryColumn::new(db, &conf)?; Ok(GroupHistoryReader { col, conf, phantom: PhantomData, }) } /// Read the tx_nums for the given member on the given page, or None, if the /// page doesn't exist in the DB. pub fn page_txs( &self, member_ser: &[u8], page_num: PageNum, ) -> Result>> { self.col.get_page_txs(member_ser, page_num) } /// Total number of txs for this serialized member. pub fn member_num_txs(&self, member_ser: &[u8]) -> Result { let (last_page_num, last_page_txs) = self.col.get_member_last_page(member_ser)?; Ok(self.conf.page_size * last_page_num as usize + last_page_txs.len()) } /// Size of pages the data is stored in. pub fn page_size(&self) -> usize { self.conf.page_size } } fn key_for_member_page(member_ser: &[u8], page_num: PageNum) -> Vec { [member_ser, &page_num.to_be_bytes()].concat() } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::tx::Tx; use rocksdb::WriteBatch; use crate::{ db::Db, io::{ group_history::PageNum, GroupHistoryReader, GroupHistoryWriter, TxNum, }, test::{make_value_tx, ser_value, ValueGroup}, }; #[test] fn test_value_group_history() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--group_history")?; let mut cfs = Vec::new(); GroupHistoryWriter::::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let group_writer = GroupHistoryWriter::new(&db, ValueGroup)?; let group_reader = GroupHistoryReader::::new(&db)?; let first_tx_num = std::cell::RefCell::new(0u64); let connect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); let mut first_tx_num = first_tx_num.borrow_mut(); group_writer.insert(&mut batch, *first_tx_num, txs)?; *first_tx_num += txs.len() as u64; db.write_batch(batch)?; Ok(()) }; let disconnect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); let mut first_tx_num = first_tx_num.borrow_mut(); *first_tx_num -= txs.len() as u64; group_writer.delete(&mut batch, *first_tx_num, txs)?; db.write_batch(batch)?; Ok(()) }; let read_page = |val: i64, page_num: PageNum| -> Result>> { group_reader.page_txs(&ser_value(val), page_num) }; // Only adds an entry for value=10 (coinbase inputs are ignored) let block0 = [make_value_tx(0, [0xffff], [10])]; connect_block(&block0)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0])); // Block that adds a lot of pages to value=10, one entry to value=20 let block1 = [ make_value_tx(1, [0xffff], [10]), make_value_tx(2, [10], []), make_value_tx(3, [20], []), // value=20 make_value_tx(4, [10], []), make_value_tx(5, [10], []), make_value_tx(6, [10], []), make_value_tx(7, [10], []), make_value_tx(8, [10], []), make_value_tx(9, [10], []), ]; connect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 4])); assert_eq!(read_page(10, 1)?, Some(vec![5, 6, 7, 8])); assert_eq!(read_page(10, 2)?, Some(vec![9])); assert_eq!(read_page(10, 3)?, None); assert_eq!(read_page(20, 0)?, Some(vec![3])); assert_eq!(read_page(20, 1)?, None); // Only tx_num=0 remains // The other pages have been removed from the DB entirely disconnect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(10, 2)?, None); assert_eq!(read_page(20, 0)?, None); // Re-org block, with all kinds of input + output values let block1 = [ make_value_tx(1, [0xffff], [10]), make_value_tx(2, [10], [10, 20, 30]), make_value_tx(3, [10, 40], [10, 10, 40]), make_value_tx(4, [10], [40, 30, 40]), ]; connect_block(&block1)?; // all txs add to value=10, with 2 pages assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4])); // only tx_num=2 adds to value=20 assert_eq!(read_page(20, 0)?, Some(vec![2])); // tx_num=2 and tx_num=4 add to value=30 assert_eq!(read_page(30, 0)?, Some(vec![2, 4])); // tx_num=3 and tx_num=4 add to value=40 assert_eq!(read_page(40, 0)?, Some(vec![3, 4])); // Delete that block also disconnect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(20, 0)?, None); assert_eq!(read_page(30, 0)?, None); assert_eq!(read_page(40, 0)?, None); // Add it back in connect_block(&block1)?; // Add new block, adding 1 tx to 20, 6 txs to 30, 4 txs to 40 let block2 = [ make_value_tx(5, [0xffff], [40, 30]), make_value_tx(6, [30, 10], [30]), make_value_tx(7, [10], [30]), make_value_tx(8, [40], [30]), make_value_tx(9, [10], [20]), ]; connect_block(&block2)?; assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4, 6, 7, 9])); assert_eq!(read_page(20, 0)?, Some(vec![2, 9])); assert_eq!(read_page(30, 0)?, Some(vec![2, 4, 5, 6])); assert_eq!(read_page(30, 1)?, Some(vec![7, 8])); assert_eq!(read_page(40, 0)?, Some(vec![3, 4, 5, 8])); assert_eq!(read_page(40, 1)?, None); // Remove all blocks disconnect_block(&block2)?; assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4])); assert_eq!(read_page(20, 0)?, Some(vec![2])); assert_eq!(read_page(30, 0)?, Some(vec![2, 4])); assert_eq!(read_page(30, 1)?, None); assert_eq!(read_page(40, 0)?, Some(vec![3, 4])); assert_eq!(read_page(40, 1)?, None); disconnect_block(&block1)?; assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(20, 0)?, None); assert_eq!(read_page(30, 0)?, None); assert_eq!(read_page(40, 0)?, None); disconnect_block(&block0)?; assert_eq!(read_page(10, 0)?, None); Ok(()) } } diff --git a/chronik/chronik-db/src/mem/group_history.rs b/chronik/chronik-db/src/mem/group_history.rs index 6a058ad45..df6f06683 100644 --- a/chronik/chronik-db/src/mem/group_history.rs +++ b/chronik/chronik-db/src/mem/group_history.rs @@ -1,166 +1,166 @@ use std::collections::{BTreeSet, HashMap}; use bitcoinsuite_core::tx::TxId; use crate::{ group::{Group, GroupQuery}, mem::MempoolTx, }; /// Index the tx history of a group. /// /// Individual items are sorted by [`MempoolTx`]`::time_first_seen` first and /// then by TxId, so when accessing them, they are returned in chronological /// order. #[derive(Debug, Default)] pub struct MempoolGroupHistory { /// (time_first_seen, txid), so we sort by time history: HashMap, BTreeSet<(i64, TxId)>>, group: G, } impl MempoolGroupHistory { /// Create a new [`MempoolGroupHistory`] for the given group. pub fn new(group: G) -> MempoolGroupHistory { MempoolGroupHistory { history: HashMap::new(), group, } } /// Index the given [`MempoolTx`] by this group. pub fn insert(&mut self, tx: &MempoolTx) { let query = GroupQuery { is_coinbase: false, tx: &tx.tx, }; for member in self.group.members_tx(query) { - let member_ser: G::MemberSer<'_> = member.into(); + let member_ser: G::MemberSer<'_> = self.group.ser_member(member); if !self.history.contains_key(member_ser.as_ref()) { self.history .insert(member_ser.as_ref().to_vec(), BTreeSet::new()); } let member_history = self .history .get_mut(member_ser.as_ref()) .expect("Impossible"); member_history.insert((tx.time_first_seen, tx.tx.txid())); } } /// Remove the given [`MempoolTx`] from the history index. pub fn remove(&mut self, tx: &MempoolTx) { let query = GroupQuery { is_coinbase: false, tx: &tx.tx, }; for member in self.group.members_tx(query) { - let member_ser: G::MemberSer<'_> = member.into(); + let member_ser: G::MemberSer<'_> = self.group.ser_member(member); if let Some(entries) = self.history.get_mut(member_ser.as_ref()) { entries.remove(&(tx.time_first_seen, tx.tx.txid())); if entries.is_empty() { self.history.remove(member_ser.as_ref()); } } } } /// Return the history of a given serialized member as an ordered /// [`BTreeSet`], or None if there are no entries. pub fn member_history( &self, member_ser: &[u8], ) -> Option<&BTreeSet<(i64, TxId)>> { self.history.get(member_ser) } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::tx::TxId; use crate::{ mem::{MempoolGroupHistory, MempoolTx}, test::{make_value_tx, ser_value, ValueGroup}, }; #[test] fn test_mempool_group_history() -> Result<()> { let mempool = std::cell::RefCell::new(MempoolGroupHistory::new(ValueGroup)); let add_tx = |tx: &MempoolTx| mempool.borrow_mut().insert(tx); let remove_tx = |tx: &MempoolTx| mempool.borrow_mut().remove(tx); let member_history = |val: i64| -> Option> { mempool .borrow_mut() .member_history(&ser_value(val)) .map(|history| history.iter().copied().collect::>()) }; fn make_mempool_tx( txid_num: u8, input_values: [i64; N], output_values: [i64; M], time_first_seen: i64, ) -> MempoolTx { MempoolTx { tx: make_value_tx(txid_num, input_values, output_values), time_first_seen, } } let tx1 = make_mempool_tx(1, [10], [], 1000); add_tx(&tx1); assert_eq!(member_history(10), Some(vec![(1000, TxId::from([1; 32]))])); remove_tx(&tx1); assert_eq!(member_history(10), None); let tx2 = make_mempool_tx(2, [], [10], 900); add_tx(&tx1); add_tx(&tx2); assert_eq!( member_history(10), Some(vec![ (900, TxId::from([2; 32])), (1000, TxId::from([1; 32])), ]), ); let tx4 = make_mempool_tx(4, [10], [10], 1000); add_tx(&tx4); assert_eq!( member_history(10), Some(vec![ (900, TxId::from([2; 32])), (1000, TxId::from([1; 32])), (1000, TxId::from([4; 32])), ]), ); let tx3 = make_mempool_tx(3, [10, 10], [10, 20], 1000); add_tx(&tx3); assert_eq!( member_history(10), Some(vec![ (900, TxId::from([2; 32])), (1000, TxId::from([1; 32])), (1000, TxId::from([3; 32])), (1000, TxId::from([4; 32])), ]), ); assert_eq!(member_history(20), Some(vec![(1000, TxId::from([3; 32]))])); remove_tx(&tx4); remove_tx(&tx1); remove_tx(&tx3); assert_eq!(member_history(10), Some(vec![(900, TxId::from([2; 32]))])); assert_eq!(member_history(20), None); Ok(()) } } diff --git a/chronik/chronik-db/src/test/value_group.rs b/chronik/chronik-db/src/test/value_group.rs index 916d34cc8..c0a03626b 100644 --- a/chronik/chronik-db/src/test/value_group.rs +++ b/chronik/chronik-db/src/test/value_group.rs @@ -1,84 +1,88 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use bitcoinsuite_core::tx::{Coin, Tx, TxId, TxInput, TxMut, TxOutput}; use crate::{ group::{Group, GroupQuery}, io::GroupHistoryConf, }; /// Index by output/input value. While useless in pactice, this makes /// writing tests very convenient and showcases how Group can be used. pub(crate) struct ValueGroup; impl Group for ValueGroup { - type Iter<'a> = std::vec::IntoIter<[u8; 8]>; - type Member<'a> = [u8; 8]; + type Iter<'a> = std::vec::IntoIter; + type Member<'a> = i64; type MemberSer<'a> = [u8; 8]; fn members_tx(&self, query: GroupQuery<'_>) -> Self::Iter<'_> { let mut values = Vec::new(); if !query.is_coinbase { for input in &query.tx.inputs { if let Some(coin) = &input.coin { - values.push(ser_value(coin.output.value)); + values.push(coin.output.value); } } } for output in &query.tx.outputs { - values.push(ser_value(output.value)); + values.push(output.value); } values.into_iter() } + fn ser_member<'a>(&self, value: i64) -> Self::MemberSer<'a> { + ser_value(value) + } + fn tx_history_conf() -> GroupHistoryConf { GroupHistoryConf { cf_name: "value_history", page_size: 4, } } } /// Serialize the value as array pub(crate) fn ser_value(value: i64) -> [u8; 8] { value.to_be_bytes() } /// Make a tx with inputs and outputs having the given values. /// Also pass _tx_num, which is ignored, but allows tests to document the tx_num /// of this tx. pub(crate) fn make_value_tx( txid_num: u8, input_values: [i64; N], output_values: [i64; M], ) -> Tx { Tx::with_txid( TxId::from([txid_num; 32]), TxMut { version: 0, inputs: input_values .into_iter() .map(|value| TxInput { coin: Some(Coin { output: TxOutput { value, ..Default::default() }, ..Default::default() }), ..Default::default() }) .collect(), outputs: output_values .into_iter() .map(|value| TxOutput { value, ..Default::default() }) .collect(), locktime: 0, }, ) }