diff --git a/chronik/chronik-db/src/reverse_lookup.rs b/chronik/chronik-db/src/reverse_lookup.rs --- a/chronik/chronik-db/src/reverse_lookup.rs +++ b/chronik/chronik-db/src/reverse_lookup.rs @@ -50,12 +50,14 @@ }; use abc_rust_error::Result; -use rocksdb::{ColumnFamilyDescriptor, Options, WriteBatch}; +use rocksdb::{ColumnFamilyDescriptor, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{Db, CF}, + io::merge::catch_merge_errors, + reverse_lookup::IndexError::*, ser::{db_deserialize, db_serialize}, }; @@ -72,7 +74,12 @@ /// Trait providing the data to build a reverse lookup index. pub(crate) trait LookupColumn { /// Number uniquely identifying `Value`s, e.g. `BlockHeight` or `TxNum`. - type SerialNum: Copy + for<'a> Deserialize<'a> + Display + Ord + Serialize; + type SerialNum: Copy + + for<'a> Deserialize<'a> + + Display + + Ord + + Serialize + + 'static; /// A short hash, compacting keys of type `[u8; 32]`. type CheapHash: AsRef<[u8]> + Eq + Hash; @@ -106,11 +113,20 @@ cf_data_name: &'static str, cf_index_name: &'static str, }, + + #[error( + "Inconsistent DB: Tried inserting {serial_str} into {cf_index_name}, \ + but value already exists" + )] + SerialNumAlreadyExists { + serial_str: String, + cf_index_name: &'static str, + }, } impl IndexError { fn not_in_data_column(serial: L::SerialNum) -> IndexError { - IndexError::NotInDataColumn { + NotInDataColumn { serial_str: serial.to_string(), cf_data_name: L::CF_DATA, cf_index_name: L::CF_INDEX, @@ -118,11 +134,71 @@ } } +fn partial_merge_ordered_list( + _key: &[u8], + _existing_value: Option<&[u8]>, + _operands: &rocksdb::MergeOperands, +) -> Option> { + // We don't use partial merge + None +} + +fn init_ordered_list Deserialize<'a>>( + _key: &[u8], + existing_value: Option<&[u8]>, + operands: &rocksdb::MergeOperands, +) -> Result> { + let mut nums = match existing_value { + Some(num) => db_deserialize::>(num)?, + None => vec![], + }; + nums.reserve_exact(operands.len()); + Ok(nums) +} + +fn apply_ordered_list Deserialize<'a> + Display + Ord>( + cf_index_name: &'static str, + nums: &mut Vec, + operand: &[u8], +) -> Result<()> { + let num = db_deserialize::(operand)?; + match nums.binary_search(&num) { + Ok(_) => { + return Err(SerialNumAlreadyExists { + serial_str: num.to_string(), + cf_index_name, + } + .into()) + } + Err(insert_idx) => nums.insert(insert_idx, num), + } + Ok(()) +} + +fn ser_ordered_list( + _key: &[u8], + nums: Vec, +) -> Result> { + db_serialize::>(&nums) +} + impl ReverseLookup { /// Add the cfs required by the reverse lookup index. pub(crate) fn add_cfs(columns: &mut Vec) { - columns - .push(ColumnFamilyDescriptor::new(L::CF_INDEX, Options::default())); + let mut options = rocksdb::Options::default(); + let merge_op_name = format!("{}::merge_ordered_list", L::CF_INDEX); + options.set_merge_operator( + merge_op_name.as_str(), + catch_merge_errors::>( + init_ordered_list::, + |_, nums, operand| { + apply_ordered_list(L::CF_INDEX, nums, operand) + }, + ser_ordered_list::, + ), + partial_merge_ordered_list, + ); + columns.push(ColumnFamilyDescriptor::new(L::CF_INDEX, options)); } /// Read by key from the DB using the index. @@ -163,18 +239,10 @@ pairs: impl IntoIterator, ) -> Result<()> { let cf_index = db.cf(L::CF_INDEX)?; - let mut new_entries = - HashMap::>::new(); - // Fill new_entries with either data from the DB or add new entries + // Use merge_cf to insert serials into the cheap hashes of the keys for (serial, key) in pairs { - let serials = - Self::get_or_fetch(db, cf_index, &mut new_entries, key)?; - serials.insert(serial); - } - // Add/override all the entries with the inserted serials - for (key, serials) in new_entries { - let serials = db_serialize(&Vec::from_iter(serials))?; - batch.put_cf(cf_index, key, &serials); + let cheap_hash = L::cheap_hash(key); + batch.merge_cf(cf_index, cheap_hash, db_serialize(&serial)?); } Ok(()) }