diff --git a/chronik/chronik-db/src/db.rs b/chronik/chronik-db/src/db.rs --- a/chronik/chronik-db/src/db.rs +++ b/chronik/chronik-db/src/db.rs @@ -187,6 +187,22 @@ .map(|result| Ok(result.map_err(RocksDb)?)) } + pub(crate) fn full_iterator( + &self, + cf: &CF, + ) -> impl Iterator, Box<[u8]>)>> + '_ { + self.db + .full_iterator_cf(cf, IteratorMode::Start) + .map(|result| Ok(result.map_err(RocksDb)?)) + } + + pub(crate) fn estimate_num_keys(&self, cf: &CF) -> Result> { + Ok(self + .db + .property_int_value_cf(cf, "rocksdb.estimate-num-keys") + .map_err(RocksDb)?) + } + /// Writes the batch to the Db atomically. pub fn write_batch(&self, write_batch: WriteBatch) -> Result<()> { self.db.write_without_wal(write_batch).map_err(RocksDb)?; diff --git a/chronik/chronik-db/src/group.rs b/chronik/chronik-db/src/group.rs --- a/chronik/chronik-db/src/group.rs +++ b/chronik/chronik-db/src/group.rs @@ -99,7 +99,9 @@ /// the member is anything, e.g. a token ID) and one without script, where the /// member is the script itself and therefore storing it in the UTXO is /// redundant. -pub trait UtxoData: Default + for<'a> Deserialize<'a> + Serialize { +pub trait UtxoData: + Default + for<'a> Deserialize<'a> + Serialize + 'static +{ /// Function that extracts the [`UtxoData`] from an output. fn from_output(output: &TxOutput) -> Self; } diff --git a/chronik/chronik-db/src/io/group_utxos.rs b/chronik/chronik-db/src/io/group_utxos.rs --- a/chronik/chronik-db/src/io/group_utxos.rs +++ b/chronik/chronik-db/src/io/group_utxos.rs @@ -4,14 +4,11 @@ //! Module for [`GroupUtxoWriter`] and [`GroupUtxoReader`]. -use std::{ - collections::{hash_map::Entry, HashMap}, - marker::PhantomData, - time::Instant, -}; +use std::{marker::PhantomData, time::Instant}; use abc_rust_error::Result; -use rocksdb::WriteBatch; +use chronik_util::log; +use rocksdb::{compaction_filter::Decision, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -19,10 +16,13 @@ db::{Db, CF}, group::{Group, GroupQuery, UtxoData}, index_tx::IndexTx, - io::TxNum, - ser::{db_deserialize, db_serialize}, + io::{group_utxos::GroupUtxoError::*, merge::catch_merge_errors, TxNum}, + ser::{db_deserialize, db_deserialize_vec, db_serialize, db_serialize_vec}, }; +const INSERT: u8 = b'I'; +const DELETE: u8 = b'D'; + /// Configuration for group utxos reader/writers. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct GroupUtxoConf { @@ -94,10 +94,6 @@ pub n_total: usize, /// Time [s] for insert/delete. pub t_total: f64, - /// Time [s] for fetching UTXOs. - pub t_fetch: f64, - /// Time [s] for indexing UTXOs. - pub t_index: f64, } /// Error indicating something went wrong with the tx index. @@ -112,9 +108,104 @@ /// UTXO already in the DB #[error("UTXO doesn't exist: {0:?} is not in the member's UTXOs")] UtxoDoesntExist(UtxoOutpoint), + + /// Used merge_cf incorrectly, prefix must either be I or D. + #[error( + "Bad usage of merge: Unknown prefix {0:02x}, expected I or D: {}", + hex::encode(.1), + )] + UnknownOperandPrefix(u8, Vec), + + /// Upgrade failed. + #[error( + "Upgrade failed, could not parse {} for key {}: {error}", + hex::encode(.value), + hex::encode(.key), + )] + UpgradeFailed { + /// Key that failed + key: Box<[u8]>, + /// Value that failed parsing in the old format + value: Box<[u8]>, + /// Error message + error: String, + }, +} + +fn partial_merge_utxos( + _key: &[u8], + _existing_value: Option<&[u8]>, + _operands: &rocksdb::MergeOperands, +) -> Option> { + // We don't use partial merge + None +} + +fn init_merge_utxos Deserialize<'a>>( + _key: &[u8], + existing_value: Option<&[u8]>, + operands: &rocksdb::MergeOperands, +) -> Result>> { + let mut entries = match existing_value { + Some(entries_ser) => db_deserialize_vec::>(entries_ser)?, + None => vec![], + }; + if operands.into_iter().all(|operand| operand[0] == INSERT) { + // If we only have inserts, we can pre-allocate the exact memory we need + entries.reserve_exact(operands.len()); + } + Ok(entries) +} + +fn apply_merge_utxos Deserialize<'a>>( + _key: &[u8], + entries: &mut Vec>, + operand: &[u8], +) -> Result<()> { + match operand[0] { + INSERT => { + let new_entry = db_deserialize::>(&operand[1..])?; + match entries.binary_search_by_key(&&new_entry.outpoint, |entry| { + &entry.outpoint + }) { + Ok(_) => return Err(DuplicateUtxo(new_entry.outpoint).into()), + Err(insert_idx) => entries.insert(insert_idx, new_entry), + } + } + DELETE => { + let delete_outpoint = + db_deserialize::(&operand[1..])?; + match entries.binary_search_by_key(&&delete_outpoint, |entry| { + &entry.outpoint + }) { + Ok(delete_idx) => entries.remove(delete_idx), + Err(_) => return Err(UtxoDoesntExist(delete_outpoint).into()), + }; + } + _ => { + return Err( + UnknownOperandPrefix(operand[0], operand.to_vec()).into() + ); + } + } + Ok(()) +} + +fn ser_merge_utxos( + _key: &[u8], + entries: Vec>, +) -> Result> { + db_serialize_vec::>(entries) } -use self::GroupUtxoError::*; +// We must use a compaction filter that removes empty entries +fn compaction_filter_utxos(_level: u32, _key: &[u8], value: &[u8]) -> Decision { + if value.is_empty() { + Decision::Remove + } else { + Decision::Keep + } +} impl<'a> GroupUtxoColumn<'a> { fn new(db: &'a Db, conf: &GroupUtxoConf) -> Result { @@ -145,22 +236,14 @@ let stats = &mut mem_data.stats; stats.n_total += txs.len(); let t_start = Instant::now(); - let mut updated_utxos = - HashMap::, Vec>>::new(); for index_tx in txs { let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.output_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); let new_entry = Self::output_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::insert_utxo_entry(new_entry, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + self.insert_utxo_entry(batch, &item.member, new_entry)?; } } for index_tx in txs { @@ -172,17 +255,11 @@ tx: index_tx.tx, }; for item in self.group.input_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); - let delete_entry = Self::input_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::delete_utxo_entry(&delete_entry.outpoint, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + let delete_outpoint = + Self::input_utxo(index_tx, item.idx).outpoint; + self.delete_utxo_entry(batch, &item.member, &delete_outpoint)?; } } - self.update_write_batch(batch, &updated_utxos)?; stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } @@ -201,8 +278,6 @@ let stats = &mut mem_data.stats; stats.n_total += txs.len(); let t_start = Instant::now(); - let mut updated_utxos = - HashMap::, Vec>>::new(); for index_tx in txs { if index_tx.is_coinbase { continue; @@ -212,14 +287,8 @@ tx: index_tx.tx, }; for item in self.group.input_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); let new_entry = Self::input_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::insert_utxo_entry(new_entry, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + self.insert_utxo_entry(batch, &item.member, new_entry)?; } } for index_tx in txs { @@ -228,26 +297,76 @@ tx: index_tx.tx, }; for item in self.group.output_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); - let delete_entry = Self::output_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::delete_utxo_entry(&delete_entry.outpoint, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + let delete_outpoint = + Self::output_utxo(index_tx, item.idx).outpoint; + self.delete_utxo_entry(batch, &item.member, &delete_outpoint)?; } } - self.update_write_batch(batch, &updated_utxos)?; stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } + /// Upgrade the DB from version 10 to version 11 + pub fn upgrade_10_to_11(&self) -> Result<()> { + log!( + "Upgrading Chronik UTXO set for {}. Do not kill the process \ + during upgrade, it will corrupt the database.\n", + G::utxo_conf().cf_name + ); + let estimated_num_keys = + self.col.db.estimate_num_keys(self.col.cf)?.unwrap_or(0); + let mut batch = WriteBatch::default(); + for (db_idx, old_utxos_ser) in + self.col.db.full_iterator(self.col.cf).enumerate() + { + let (key, old_utxos_ser) = old_utxos_ser?; + let utxos = match db_deserialize::>>( + &old_utxos_ser, + ) { + Ok(utxos) => utxos, + Err(err) => { + return Err(UpgradeFailed { + key, + value: old_utxos_ser, + error: err.to_string(), + } + .into()); + } + }; + let new_utxos_ser = + db_serialize_vec::>(utxos)?; + batch.put_cf(self.col.cf, key, new_utxos_ser); + if db_idx % 10000 == 0 { + log!("Upgraded {db_idx} of {estimated_num_keys} (estimated)\n"); + self.col.db.write_batch(batch)?; + batch = WriteBatch::default(); + } + } + self.col.db.write_batch(batch)?; + log!("Upgrade for {} complete\n", G::utxo_conf().cf_name); + Ok(()) + } + pub(crate) fn add_cfs(columns: &mut Vec) { - columns.push(rocksdb::ColumnFamilyDescriptor::new( - G::utxo_conf().cf_name, - rocksdb::Options::default(), - )); + let cf_name = G::utxo_conf().cf_name; + let mut options = rocksdb::Options::default(); + let merge_op_name = format!("{}::merge_op_utxos", cf_name); + options.set_merge_operator( + merge_op_name.as_str(), + catch_merge_errors( + init_merge_utxos::, + apply_merge_utxos::, + ser_merge_utxos::, + ), + partial_merge_utxos, + ); + let compaction_filter_name = + format!("{}::compaction_filter_utxos", cf_name); + options.set_compaction_filter( + &compaction_filter_name, + compaction_filter_utxos, + ); + columns.push(rocksdb::ColumnFamilyDescriptor::new(cf_name, options)); } fn output_utxo( @@ -281,69 +400,30 @@ } fn insert_utxo_entry( + &self, + batch: &mut WriteBatch, + member: &G::Member<'_>, new_entry: UtxoEntry, - entries: &mut Vec>, ) -> Result<()> { - match entries - .binary_search_by_key(&&new_entry.outpoint, |entry| &entry.outpoint) - { - Ok(_) => return Err(DuplicateUtxo(new_entry.outpoint).into()), - Err(insert_idx) => entries.insert(insert_idx, new_entry), - } + batch.merge_cf( + self.col.cf, + self.group.ser_member(member), + [[INSERT].as_ref(), &db_serialize(&new_entry)?].concat(), + ); Ok(()) } fn delete_utxo_entry( - delete_outpoint: &UtxoOutpoint, - entries: &mut Vec>, - ) -> Result<()> { - match entries - .binary_search_by_key(&delete_outpoint, |entry| &entry.outpoint) - { - Ok(delete_idx) => entries.remove(delete_idx), - Err(_) => return Err(UtxoDoesntExist(*delete_outpoint).into()), - }; - Ok(()) - } - - fn get_or_fetch<'u, 'tx>( - &self, - utxos: &'u mut HashMap, Vec>>, - member: G::Member<'tx>, - ) -> Result<&'u mut Vec>> { - match utxos.entry(member) { - Entry::Occupied(entry) => Ok(entry.into_mut()), - Entry::Vacant(entry) => { - let member_ser = self.group.ser_member(entry.key()); - let db_entries = - match self.col.db.get(self.col.cf, member_ser.as_ref())? { - Some(data) => db_deserialize::< - Vec>, - >(&data)?, - None => vec![], - }; - Ok(entry.insert(db_entries)) - } - } - } - - fn update_write_batch( &self, batch: &mut WriteBatch, - utxos: &HashMap, Vec>>, + member: &G::Member<'_>, + delete_outpoint: &UtxoOutpoint, ) -> Result<()> { - for (member, utxos) in utxos { - let member_ser = self.group.ser_member(member); - if utxos.is_empty() { - batch.delete_cf(self.col.cf, member_ser.as_ref()); - } else { - batch.put_cf( - self.col.cf, - member_ser.as_ref(), - db_serialize(&utxos)?, - ); - } - } + batch.merge_cf( + self.col.cf, + self.group.ser_member(member), + [[DELETE].as_ref(), &db_serialize(&delete_outpoint)?].concat(), + ); Ok(()) } } @@ -366,7 +446,14 @@ ) -> Result>>> { match self.col.db.get(self.col.cf, member)? { Some(entry) => { - Ok(Some(db_deserialize::>>(&entry)?)) + let entries = + db_deserialize_vec::>(&entry)?; + if entries.is_empty() { + // Usually compaction catches this and removes such entries, + // but it's run later so have to filter here manually + return Ok(None); + } + Ok(Some(entries)) } None => Ok(None), } diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -42,7 +42,8 @@ subs_group::TxMsgType, }; -const CURRENT_INDEXER_VERSION: SchemaVersion = 10; +const CURRENT_INDEXER_VERSION: SchemaVersion = 11; +const LAST_UPGRADABLE_VERSION: SchemaVersion = 10; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] @@ -140,10 +141,10 @@ /// Database is outdated #[error( - "DB outdated: Chronik has version {}, but the database has version \ - {0}. -reindex/-chronikreindex to reindex the database to the new \ - version.", - CURRENT_INDEXER_VERSION + "DB outdated: Chronik has version {CURRENT_INDEXER_VERSION}, but the \ + database has version {0}. The last upgradable version is \ + {LAST_UPGRADABLE_VERSION}. -reindex/-chronikreindex to reindex the \ + database to the new version." )] DatabaseOutdated(SchemaVersion), @@ -175,10 +176,13 @@ log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } + log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; - verify_schema_version(&db)?; + let schema_version = verify_schema_version(&db)?; verify_enable_token_index(&db, params.enable_token_index)?; + upgrade_db_if_needed(&db, schema_version, params.enable_token_index)?; + let mempool = Mempool::new(ScriptGroup, params.enable_token_index); Ok(ChronikIndexer { db, @@ -684,11 +688,11 @@ } } -fn verify_schema_version(db: &Db) -> Result<()> { +fn verify_schema_version(db: &Db) -> Result { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; - match metadata_reader + let schema_version = match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { @@ -697,9 +701,14 @@ if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } - if schema_version < CURRENT_INDEXER_VERSION { + if schema_version < LAST_UPGRADABLE_VERSION { return Err(DatabaseOutdated(schema_version).into()); } + log!( + "Chronik has version {CURRENT_INDEXER_VERSION}, DB has \ + version {schema_version}\n" + ); + schema_version } None => { if !is_empty { @@ -709,10 +718,14 @@ metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; + log!( + "Chronik has version {CURRENT_INDEXER_VERSION}, initialized \ + DB with that version\n" + ); + CURRENT_INDEXER_VERSION } - } - log!("Chronik has version {CURRENT_INDEXER_VERSION}\n"); - Ok(()) + }; + Ok(schema_version) } fn verify_enable_token_index(db: &Db, enable_token_index: bool) -> Result<()> { @@ -743,6 +756,34 @@ Ok(()) } +fn upgrade_db_if_needed( + db: &Db, + schema_version: u64, + enable_token_index: bool, +) -> Result<()> { + // DB has version 10, upgrade to 11 + if schema_version == 10 { + upgrade_10_to_11(db, enable_token_index)?; + } + Ok(()) +} + +fn upgrade_10_to_11(db: &Db, enable_token_index: bool) -> Result<()> { + log!("Upgrading Chronik DB from version 10 to 11...\n"); + let script_utxo_writer = ScriptUtxoWriter::new(db, ScriptGroup)?; + script_utxo_writer.upgrade_10_to_11()?; + if enable_token_index { + let token_id_utxo_writer = TokenIdUtxoWriter::new(db, TokenIdGroup)?; + token_id_utxo_writer.upgrade_10_to_11()?; + } + let mut batch = WriteBatch::default(); + let metadata_writer = MetadataWriter::new(db)?; + metadata_writer.update_schema_version(&mut batch, 11)?; + db.write_batch(batch)?; + log!("Successfully upgraded Chronik DB from version 10 to 11.\n"); + Ok(()) +} + impl Node { /// If `result` is [`Err`], logs and aborts the node. pub fn ok_or_abort(&self, func_name: &str, result: Result) {