diff --git a/chronik/chronik-db/src/db.rs b/chronik/chronik-db/src/db.rs index bd4d15bb9..d6e7eee02 100644 --- a/chronik/chronik-db/src/db.rs +++ b/chronik/chronik-db/src/db.rs @@ -1,210 +1,226 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`Db`] and errors, which encapsulates a database. //! Read and write operations should exclusively be done with dedicated writers //! and readers, such as [`crate::io::BlockWriter`]. use std::path::Path; use abc_rust_error::Result; pub use rocksdb::WriteBatch; use rocksdb::{ColumnFamilyDescriptor, IteratorMode}; use thiserror::Error; use crate::{ groups::{ ScriptHistoryWriter, ScriptUtxoWriter, TokenIdHistoryWriter, TokenIdUtxoWriter, }, io::{ token::TokenWriter, BlockStatsWriter, BlockWriter, MetadataWriter, SpentByWriter, TxWriter, }, }; // All column family names used by Chronik should be defined here /// Column family name for the block data. pub const CF_BLK: &str = "blk"; /// Column family for the first tx_num of the block. Used to get a list of the /// txs of the block. pub const CF_BLK_BY_FIRST_TX: &str = "blk_by_first_tx"; /// Column family for stats about blocks. pub const CF_BLK_STATS: &str = "blk_stats"; /// Column family for the block height of the first tx_num of that block. Used /// to get the block height of a tx. pub const CF_FIRST_TX_BY_BLK: &str = "first_tx_by_blk"; /// Column family to lookup a block by its hash. pub const CF_LOOKUP_BLK_BY_HASH: &str = "lookup_blk_by_hash"; /// Column family to lookup a tx by its hash. pub const CF_LOOKUP_TX_BY_HASH: &str = "lookup_tx_by_hash"; /// Column family name for db metadata. pub const CF_META: &str = "meta"; /// Column family to store tx history by script. pub const CF_SCRIPT_HISTORY: &str = "script_history"; /// Column family to store number of txs by script. pub const CF_SCRIPT_HISTORY_NUM_TXS: &str = "script_history_num_txs"; /// Column family for utxos by script. pub const CF_SCRIPT_UTXO: &str = "script_utxo"; /// Column family to store tx history by token ID. pub const CF_TOKEN_ID_HISTORY: &str = "token_id_history"; /// Column family to store number of txs by token ID. pub const CF_TOKEN_ID_HISTORY_NUM_TXS: &str = "token_id_history_num_txs"; /// Column family for utxos by token ID. pub const CF_TOKEN_ID_UTXO: &str = "token_id_utxo"; /// Column family to store which outputs have been spent by which tx inputs. pub const CF_SPENT_BY: &str = "spent_by"; /// Column family for genesis info by token_tx_num pub const CF_TOKEN_GENESIS_INFO: &str = "token_genesis_info"; /// Column family for TokenMeta by token_tx_num pub const CF_TOKEN_META: &str = "token_meta"; /// Column family for token tx data by tx pub const CF_TOKEN_TX: &str = "token_tx"; /// Column family for the tx data. pub const CF_TX: &str = "tx"; pub(crate) type CF = rocksdb::ColumnFamily; /// Indexer database. /// Owns the underlying [`rocksdb::DB`] instance. #[derive(Debug)] pub struct Db { db: rocksdb::DB, cf_names: Vec, } /// Errors indicating something went wrong with the database itself. #[derive(Debug, Error)] pub enum DbError { /// Column family requested but not defined during `Db::open`. #[error("Column family {0} doesn't exist")] NoSuchColumnFamily(String), /// Error with RocksDB itself, e.g. db inconsistency. #[error("RocksDB error: {0}")] RocksDb(rocksdb::Error), } use self::DbError::*; impl Db { /// Opens the database under the specified path. /// Creates the database file and necessary column families if necessary. pub fn open(path: impl AsRef) -> Result { let mut cfs = Vec::new(); BlockWriter::add_cfs(&mut cfs); BlockStatsWriter::add_cfs(&mut cfs); MetadataWriter::add_cfs(&mut cfs); TxWriter::add_cfs(&mut cfs); ScriptHistoryWriter::add_cfs(&mut cfs); ScriptUtxoWriter::add_cfs(&mut cfs); SpentByWriter::add_cfs(&mut cfs); TokenWriter::add_cfs(&mut cfs); TokenIdHistoryWriter::add_cfs(&mut cfs); TokenIdUtxoWriter::add_cfs(&mut cfs); Self::open_with_cfs(path, cfs) } pub(crate) fn open_with_cfs( path: impl AsRef, cfs: Vec, ) -> Result { let db_options = Self::db_options(); let cf_names = cfs.iter().map(|cf| cf.name().to_string()).collect(); let db = rocksdb::DB::open_cf_descriptors(&db_options, path, cfs) .map_err(RocksDb)?; Ok(Db { db, cf_names }) } fn db_options() -> rocksdb::Options { let mut db_options = rocksdb::Options::default(); db_options.create_if_missing(true); db_options.create_missing_column_families(true); db_options } /// Destroy the DB, i.e. delete all it's associated files. /// /// According to the RocksDB docs, this differs from removing the dir: /// DestroyDB() will take care of the case where the RocksDB database is /// stored in multiple directories. For instance, a single DB can be /// configured to store its data in multiple directories by specifying /// different paths to DBOptions::db_paths, DBOptions::db_log_dir, and /// DBOptions::wal_dir. pub fn destroy(path: impl AsRef) -> Result<()> { let db_options = Self::db_options(); rocksdb::DB::destroy(&db_options, path).map_err(RocksDb)?; Ok(()) } /// Return a column family handle with the given name. pub fn cf(&self, name: &str) -> Result<&CF> { Ok(self .db .cf_handle(name) .ok_or_else(|| NoSuchColumnFamily(name.to_string()))?) } pub(crate) fn get( &self, cf: &CF, key: impl AsRef<[u8]>, ) -> Result>> { Ok(self.db.get_pinned_cf(cf, key).map_err(RocksDb)?) } pub(crate) fn multi_get( &self, cf: &CF, keys: impl IntoIterator>, sorted_inputs: bool, ) -> Result>>> { self.db .batched_multi_get_cf(cf, keys, sorted_inputs) .into_iter() .map(|value| value.map_err(|err| RocksDb(err).into())) .collect() } pub(crate) fn iterator_end( &self, cf: &CF, ) -> impl Iterator, Box<[u8]>)>> + '_ { self.db .iterator_cf(cf, IteratorMode::End) .map(|result| Ok(result.map_err(RocksDb)?)) } pub(crate) fn iterator( &self, cf: &CF, start: &[u8], direction: rocksdb::Direction, ) -> impl Iterator, Box<[u8]>)>> + '_ { self.db .iterator_cf(cf, IteratorMode::From(start, direction)) .map(|result| Ok(result.map_err(RocksDb)?)) } + pub(crate) fn full_iterator( + &self, + cf: &CF, + ) -> impl Iterator, Box<[u8]>)>> + '_ { + self.db + .full_iterator_cf(cf, IteratorMode::Start) + .map(|result| Ok(result.map_err(RocksDb)?)) + } + + pub(crate) fn estimate_num_keys(&self, cf: &CF) -> Result> { + Ok(self + .db + .property_int_value_cf(cf, "rocksdb.estimate-num-keys") + .map_err(RocksDb)?) + } + /// Writes the batch to the Db atomically. pub fn write_batch(&self, write_batch: WriteBatch) -> Result<()> { self.db.write_without_wal(write_batch).map_err(RocksDb)?; Ok(()) } /// Whether any of the column families in the DB have any data. /// /// Note: RocksDB forbids not opening all column families, therefore, this /// will always iter through all column families. pub fn is_db_empty(&self) -> Result { for cf_name in &self.cf_names { let cf = self.cf(cf_name)?; let mut cf_iter = self.db.full_iterator_cf(cf, IteratorMode::Start); if cf_iter.next().is_some() { return Ok(false); } } Ok(true) } } diff --git a/chronik/chronik-db/src/group.rs b/chronik/chronik-db/src/group.rs index 0b0fdbd63..35714d44a 100644 --- a/chronik/chronik-db/src/group.rs +++ b/chronik/chronik-db/src/group.rs @@ -1,140 +1,142 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`Group`] and [`GroupQuery`]. use bitcoinsuite_core::tx::{Tx, TxOutput}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use crate::io::{GroupHistoryConf, GroupUtxoConf}; /// Struct giving impls of [`Group`] all the necessary data to determine the /// member of the group. #[derive(Clone, Copy, Debug)] pub struct GroupQuery<'a> { /// Whether the tx is a coinbase tx. pub is_coinbase: bool, /// The transaction that should be grouped. pub tx: &'a Tx, } /// Item returned by `members_tx`. #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] pub struct MemberItem { /// Index of the item in the list of inputs/outputs. pub idx: usize, /// Member of this item. pub member: M, } /// Groups txs and determines which members they are. /// /// A member is one instance in a group and can be anything in a tx, e.g. the /// input and output scripts, the SLP token ID, a SWaP signal, a smart contract /// etc. pub trait Group { /// Iterator over the members found for a given [`GroupQuery`]. type Iter<'a>: IntoIterator>> + 'a; /// Member of a group, this is what txs will be grouped by. /// /// We use a HashMap and a BTreeMap to group txs, so it must implement /// [`std::hash::Hash`] and [`Ord`]. type Member<'a>: std::hash::Hash + Eq + Ord; /// Serialized member, this is what will be used as key in the DB. /// Normally, this will be a [`Vec`] or an [`u8`] array or slice. /// /// We use this to allow members to separate between the code that groups /// them from the serialization of members. For example, we can group by raw /// input/output script bytecode, but then use compressed scripts when /// adding to the DB. That way, we don't have to run the compression every /// time we compare/hash elements for grouping. /// /// Note: For group history, this will be suffixed by a 4-byte page number. type MemberSer<'a>: AsRef<[u8]> + 'a; /// Auxillary data when grouping members type Aux; /// Data attached to a UTXO for this group. type UtxoData: UtxoData; /// Find the group's members in the given query's tx's inputs. /// /// Note: This is allowed to return a member multiple times per query. /// /// Note: The returned iterator is allowed to borrow from the query. fn input_members<'a>( &self, query: GroupQuery<'a>, aux: &Self::Aux, ) -> Self::Iter<'a>; /// Find the group's members in the given query's tx's outputs. /// /// Note: This is allowed to return a member multiple times per query. /// /// Note: The returned iterator is allowed to borrow from the query. fn output_members<'a>( &self, query: GroupQuery<'a>, aux: &Self::Aux, ) -> Self::Iter<'a>; /// Serialize the given member. fn ser_member<'a>(&self, member: &Self::Member<'a>) -> Self::MemberSer<'a>; /// The [`GroupHistoryConf`] for this group. fn tx_history_conf() -> GroupHistoryConf; /// The [`GroupUtxoConf`] for this group. fn utxo_conf() -> GroupUtxoConf; } /// Data atttached to a UTXO by a group. /// There's basically only 2 meaningful variants here, one with script (where /// the member is anything, e.g. a token ID) and one without script, where the /// member is the script itself and therefore storing it in the UTXO is /// redundant. -pub trait UtxoData: Default + for<'a> Deserialize<'a> + Serialize { +pub trait UtxoData: + Default + for<'a> Deserialize<'a> + Serialize + 'static +{ /// Function that extracts the [`UtxoData`] from an output. fn from_output(output: &TxOutput) -> Self; } /// [`UtxoData`] that only stores the output value but not the script. /// This is useful where the member itself is the script so storing it would be /// redundant. pub type UtxoDataValue = i64; impl UtxoData for UtxoDataValue { fn from_output(output: &TxOutput) -> Self { output.value } } /// [`UtxoData`] that stores the full output, including value and script. /// This is useful where the member isn't the script, e.g. a token ID. pub type UtxoDataOutput = (i64, Bytes); impl UtxoData for UtxoDataOutput { fn from_output(output: &TxOutput) -> Self { (output.value, output.script.bytecode().clone()) } } /// Helper which returns the `G::Member`s of both inputs and outputs of the /// group for the tx. pub fn tx_members_for_group<'a, G: Group>( group: &G, query: GroupQuery<'a>, aux: &G::Aux, ) -> impl Iterator> { group .input_members(query, aux) .into_iter() .chain(group.output_members(query, aux)) .map(|item| item.member) } diff --git a/chronik/chronik-db/src/io/group_utxos.rs b/chronik/chronik-db/src/io/group_utxos.rs index 9d7612ca7..beced8624 100644 --- a/chronik/chronik-db/src/io/group_utxos.rs +++ b/chronik/chronik-db/src/io/group_utxos.rs @@ -1,526 +1,613 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`GroupUtxoWriter`] and [`GroupUtxoReader`]. -use std::{ - collections::{hash_map::Entry, HashMap}, - marker::PhantomData, - time::Instant, -}; +use std::{marker::PhantomData, time::Instant}; use abc_rust_error::Result; -use rocksdb::WriteBatch; +use chronik_util::log; +use rocksdb::{compaction_filter::Decision, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{Db, CF}, group::{Group, GroupQuery, UtxoData}, index_tx::IndexTx, - io::TxNum, - ser::{db_deserialize, db_serialize}, + io::{group_utxos::GroupUtxoError::*, merge::catch_merge_errors, TxNum}, + ser::{db_deserialize, db_deserialize_vec, db_serialize, db_serialize_vec}, }; +const INSERT: u8 = b'I'; +const DELETE: u8 = b'D'; + /// Configuration for group utxos reader/writers. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct GroupUtxoConf { /// Column family to store the group utxos entries. pub cf_name: &'static str, } struct GroupUtxoColumn<'a> { db: &'a Db, cf: &'a CF, } /// Outpoint in the DB, but with [`TxNum`] instead of `TxId` for the txid. #[derive( Clone, Copy, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, )] pub struct UtxoOutpoint { /// [`TxNum`] of tx of the outpoint. pub tx_num: TxNum, /// Output of the tx referenced by the outpoint. pub out_idx: u32, } /// Entry in the UTXO DB for a group. #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] pub struct UtxoEntry { /// Outpoint of the UTXO. pub outpoint: UtxoOutpoint, /// Data attached to the UTXO attached to the UTXO for easy access, e.g. /// the UTXO value and/or script. pub data: D, } /// Write UTXOs of a group to the DB. #[derive(Debug)] pub struct GroupUtxoWriter<'a, G: Group> { col: GroupUtxoColumn<'a>, group: G, } /// Read UTXOs of a group from the DB. #[derive(Debug)] pub struct GroupUtxoReader<'a, G: Group> { col: GroupUtxoColumn<'a>, phantom: PhantomData, } /// In-memory data for indexing UTXOs of a group. #[derive(Debug, Default)] pub struct GroupUtxoMemData { /// Stats about cache hits, num requests etc. pub stats: GroupUtxoStats, } /// Stats about cache hits, num requests etc. #[derive(Clone, Debug, Default)] pub struct GroupUtxoStats { /// Total number of txs updated. pub n_total: usize, /// Time [s] for insert/delete. pub t_total: f64, - /// Time [s] for fetching UTXOs. - pub t_fetch: f64, - /// Time [s] for indexing UTXOs. - pub t_index: f64, } /// Error indicating something went wrong with the tx index. #[derive(Debug, Error, PartialEq, Eq)] pub enum GroupUtxoError { /// UTXO already in the DB #[error( "Duplicate UTXO: {0:?} has been added twice to the member's UTXOs" )] DuplicateUtxo(UtxoOutpoint), /// UTXO already in the DB #[error("UTXO doesn't exist: {0:?} is not in the member's UTXOs")] UtxoDoesntExist(UtxoOutpoint), + + /// Used merge_cf incorrectly, prefix must either be I or D. + #[error( + "Bad usage of merge: Unknown prefix {0:02x}, expected I or D: {}", + hex::encode(.1), + )] + UnknownOperandPrefix(u8, Vec), + + /// Upgrade failed. + #[error( + "Upgrade failed, could not parse {} for key {}: {error}", + hex::encode(.value), + hex::encode(.key), + )] + UpgradeFailed { + /// Key that failed + key: Box<[u8]>, + /// Value that failed parsing in the old format + value: Box<[u8]>, + /// Error message + error: String, + }, +} + +fn partial_merge_utxos( + _key: &[u8], + _existing_value: Option<&[u8]>, + _operands: &rocksdb::MergeOperands, +) -> Option> { + // We don't use partial merge + None +} + +fn init_merge_utxos Deserialize<'a>>( + _key: &[u8], + existing_value: Option<&[u8]>, + operands: &rocksdb::MergeOperands, +) -> Result>> { + let mut entries = match existing_value { + Some(entries_ser) => db_deserialize_vec::>(entries_ser)?, + None => vec![], + }; + if operands.into_iter().all(|operand| operand[0] == INSERT) { + // If we only have inserts, we can pre-allocate the exact memory we need + entries.reserve_exact(operands.len()); + } + Ok(entries) +} + +fn apply_merge_utxos Deserialize<'a>>( + _key: &[u8], + entries: &mut Vec>, + operand: &[u8], +) -> Result<()> { + match operand[0] { + INSERT => { + let new_entry = db_deserialize::>(&operand[1..])?; + match entries.binary_search_by_key(&&new_entry.outpoint, |entry| { + &entry.outpoint + }) { + Ok(_) => return Err(DuplicateUtxo(new_entry.outpoint).into()), + Err(insert_idx) => entries.insert(insert_idx, new_entry), + } + } + DELETE => { + let delete_outpoint = + db_deserialize::(&operand[1..])?; + match entries.binary_search_by_key(&&delete_outpoint, |entry| { + &entry.outpoint + }) { + Ok(delete_idx) => entries.remove(delete_idx), + Err(_) => return Err(UtxoDoesntExist(delete_outpoint).into()), + }; + } + _ => { + return Err( + UnknownOperandPrefix(operand[0], operand.to_vec()).into() + ); + } + } + Ok(()) +} + +fn ser_merge_utxos( + _key: &[u8], + entries: Vec>, +) -> Result> { + db_serialize_vec::>(entries) } -use self::GroupUtxoError::*; +// We must use a compaction filter that removes empty entries +fn compaction_filter_utxos(_level: u32, _key: &[u8], value: &[u8]) -> Decision { + if value.is_empty() { + Decision::Remove + } else { + Decision::Keep + } +} impl<'a> GroupUtxoColumn<'a> { fn new(db: &'a Db, conf: &GroupUtxoConf) -> Result { let cf = db.cf(conf.cf_name)?; Ok(GroupUtxoColumn { db, cf }) } } impl<'a, G: Group> GroupUtxoWriter<'a, G> { /// Create a new [`GroupUtxoWriter`]. pub fn new(db: &'a Db, group: G) -> Result { let conf = G::utxo_conf(); let col = GroupUtxoColumn::new(db, &conf)?; Ok(GroupUtxoWriter { col, group }) } /// Insert the txs of a block from the UTXOs in the DB for the group. /// /// Add all the UTXOs created by the outputs of the txs to the DB, remove /// all the UTXOs spend by the inputs of the txs. pub fn insert<'tx>( &self, batch: &mut WriteBatch, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, mem_data: &mut GroupUtxoMemData, ) -> Result<()> { let stats = &mut mem_data.stats; stats.n_total += txs.len(); let t_start = Instant::now(); - let mut updated_utxos = - HashMap::, Vec>>::new(); for index_tx in txs { let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.output_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); let new_entry = Self::output_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::insert_utxo_entry(new_entry, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + self.insert_utxo_entry(batch, &item.member, new_entry)?; } } for index_tx in txs { if index_tx.is_coinbase { continue; } let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.input_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); - let delete_entry = Self::input_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::delete_utxo_entry(&delete_entry.outpoint, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + let delete_outpoint = + Self::input_utxo(index_tx, item.idx).outpoint; + self.delete_utxo_entry(batch, &item.member, &delete_outpoint)?; } } - self.update_write_batch(batch, &updated_utxos)?; stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } /// Remove the txs of a block from the UTXOs in the DB for the group. /// /// Add all the UTXOs spent by the inputs of the txs and remove all the /// UTXOs created by the outputs of the txs. pub fn delete<'tx>( &self, batch: &mut WriteBatch, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, mem_data: &mut GroupUtxoMemData, ) -> Result<()> { let stats = &mut mem_data.stats; stats.n_total += txs.len(); let t_start = Instant::now(); - let mut updated_utxos = - HashMap::, Vec>>::new(); for index_tx in txs { if index_tx.is_coinbase { continue; } let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.input_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); let new_entry = Self::input_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::insert_utxo_entry(new_entry, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + self.insert_utxo_entry(batch, &item.member, new_entry)?; } } for index_tx in txs { let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.output_members(query, aux) { - let t_fetch = Instant::now(); - let entries = - self.get_or_fetch(&mut updated_utxos, item.member)?; - stats.t_fetch += t_fetch.elapsed().as_secs_f64(); - let delete_entry = Self::output_utxo(index_tx, item.idx); - let t_index = Instant::now(); - Self::delete_utxo_entry(&delete_entry.outpoint, entries)?; - stats.t_index += t_index.elapsed().as_secs_f64(); + let delete_outpoint = + Self::output_utxo(index_tx, item.idx).outpoint; + self.delete_utxo_entry(batch, &item.member, &delete_outpoint)?; } } - self.update_write_batch(batch, &updated_utxos)?; stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } + /// Upgrade the DB from version 10 to version 11 + pub fn upgrade_10_to_11(&self) -> Result<()> { + log!( + "Upgrading Chronik UTXO set for {}. Do not kill the process \ + during upgrade, it will corrupt the database.\n", + G::utxo_conf().cf_name + ); + let estimated_num_keys = + self.col.db.estimate_num_keys(self.col.cf)?.unwrap_or(0); + let mut batch = WriteBatch::default(); + for (db_idx, old_utxos_ser) in + self.col.db.full_iterator(self.col.cf).enumerate() + { + let (key, old_utxos_ser) = old_utxos_ser?; + let utxos = match db_deserialize::>>( + &old_utxos_ser, + ) { + Ok(utxos) => utxos, + Err(err) => { + return Err(UpgradeFailed { + key, + value: old_utxos_ser, + error: err.to_string(), + } + .into()); + } + }; + let new_utxos_ser = + db_serialize_vec::>(utxos)?; + batch.put_cf(self.col.cf, key, new_utxos_ser); + if db_idx % 10000 == 0 { + log!("Upgraded {db_idx} of {estimated_num_keys} (estimated)\n"); + self.col.db.write_batch(batch)?; + batch = WriteBatch::default(); + } + } + self.col.db.write_batch(batch)?; + log!("Upgrade for {} complete\n", G::utxo_conf().cf_name); + Ok(()) + } + pub(crate) fn add_cfs(columns: &mut Vec) { - columns.push(rocksdb::ColumnFamilyDescriptor::new( - G::utxo_conf().cf_name, - rocksdb::Options::default(), - )); + let cf_name = G::utxo_conf().cf_name; + let mut options = rocksdb::Options::default(); + let merge_op_name = format!("{}::merge_op_utxos", cf_name); + options.set_merge_operator( + merge_op_name.as_str(), + catch_merge_errors( + init_merge_utxos::, + apply_merge_utxos::, + ser_merge_utxos::, + ), + partial_merge_utxos, + ); + let compaction_filter_name = + format!("{}::compaction_filter_utxos", cf_name); + options.set_compaction_filter( + &compaction_filter_name, + compaction_filter_utxos, + ); + columns.push(rocksdb::ColumnFamilyDescriptor::new(cf_name, options)); } fn output_utxo( index_tx: &IndexTx<'_>, idx: usize, ) -> UtxoEntry { UtxoEntry { outpoint: UtxoOutpoint { tx_num: index_tx.tx_num, out_idx: idx as u32, }, data: G::UtxoData::from_output(&index_tx.tx.outputs[idx]), } } fn input_utxo( index_tx: &IndexTx<'_>, idx: usize, ) -> UtxoEntry { UtxoEntry { outpoint: UtxoOutpoint { tx_num: index_tx.input_nums[idx], out_idx: index_tx.tx.inputs[idx].prev_out.out_idx, }, data: index_tx.tx.inputs[idx] .coin .as_ref() .map(|coin| G::UtxoData::from_output(&coin.output)) .unwrap_or_default(), } } fn insert_utxo_entry( + &self, + batch: &mut WriteBatch, + member: &G::Member<'_>, new_entry: UtxoEntry, - entries: &mut Vec>, ) -> Result<()> { - match entries - .binary_search_by_key(&&new_entry.outpoint, |entry| &entry.outpoint) - { - Ok(_) => return Err(DuplicateUtxo(new_entry.outpoint).into()), - Err(insert_idx) => entries.insert(insert_idx, new_entry), - } + batch.merge_cf( + self.col.cf, + self.group.ser_member(member), + [[INSERT].as_ref(), &db_serialize(&new_entry)?].concat(), + ); Ok(()) } fn delete_utxo_entry( - delete_outpoint: &UtxoOutpoint, - entries: &mut Vec>, - ) -> Result<()> { - match entries - .binary_search_by_key(&delete_outpoint, |entry| &entry.outpoint) - { - Ok(delete_idx) => entries.remove(delete_idx), - Err(_) => return Err(UtxoDoesntExist(*delete_outpoint).into()), - }; - Ok(()) - } - - fn get_or_fetch<'u, 'tx>( - &self, - utxos: &'u mut HashMap, Vec>>, - member: G::Member<'tx>, - ) -> Result<&'u mut Vec>> { - match utxos.entry(member) { - Entry::Occupied(entry) => Ok(entry.into_mut()), - Entry::Vacant(entry) => { - let member_ser = self.group.ser_member(entry.key()); - let db_entries = - match self.col.db.get(self.col.cf, member_ser.as_ref())? { - Some(data) => db_deserialize::< - Vec>, - >(&data)?, - None => vec![], - }; - Ok(entry.insert(db_entries)) - } - } - } - - fn update_write_batch( &self, batch: &mut WriteBatch, - utxos: &HashMap, Vec>>, + member: &G::Member<'_>, + delete_outpoint: &UtxoOutpoint, ) -> Result<()> { - for (member, utxos) in utxos { - let member_ser = self.group.ser_member(member); - if utxos.is_empty() { - batch.delete_cf(self.col.cf, member_ser.as_ref()); - } else { - batch.put_cf( - self.col.cf, - member_ser.as_ref(), - db_serialize(&utxos)?, - ); - } - } + batch.merge_cf( + self.col.cf, + self.group.ser_member(member), + [[DELETE].as_ref(), &db_serialize(&delete_outpoint)?].concat(), + ); Ok(()) } } impl<'a, G: Group> GroupUtxoReader<'a, G> { /// Create a new [`GroupUtxoReader`]. pub fn new(db: &'a Db) -> Result { let conf = G::utxo_conf(); let col = GroupUtxoColumn::new(db, &conf)?; Ok(GroupUtxoReader { col, phantom: PhantomData, }) } /// Query the UTXOs for the given member. pub fn utxos( &self, member: &[u8], ) -> Result>>> { match self.col.db.get(self.col.cf, member)? { Some(entry) => { - Ok(Some(db_deserialize::>>(&entry)?)) + let entries = + db_deserialize_vec::>(&entry)?; + if entries.is_empty() { + // Usually compaction catches this and removes such entries, + // but it's run later so have to filter here manually + return Ok(None); + } + Ok(Some(entries)) } None => Ok(None), } } } impl std::fmt::Debug for GroupUtxoColumn<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GroupUtxoColumn {{ .. }}") } } #[cfg(test)] mod tests { use std::cell::RefCell; use abc_rust_error::Result; use bitcoinsuite_core::tx::Tx; use rocksdb::WriteBatch; use crate::{ db::Db, index_tx::prepare_indexed_txs, io::{ BlockTxs, GroupUtxoMemData, GroupUtxoReader, GroupUtxoWriter, TxEntry, TxWriter, TxsMemData, UtxoEntry, UtxoOutpoint, }, test::{make_inputs_tx, ser_value, ValueGroup}, }; #[test] fn test_value_group_utxos() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--group_history")?; let mut cfs = Vec::new(); GroupUtxoWriter::::add_cfs(&mut cfs); TxWriter::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let tx_writer = TxWriter::new(&db)?; let mem_data = RefCell::new(GroupUtxoMemData::default()); let txs_mem_data = RefCell::new(TxsMemData::default()); let group_writer = GroupUtxoWriter::new(&db, ValueGroup)?; let group_reader = GroupUtxoReader::::new(&db)?; let block_height = RefCell::new(-1); let txs_batch = |txs: &[Tx]| BlockTxs { txs: txs .iter() .map(|tx| TxEntry { txid: tx.txid(), ..Default::default() }) .collect(), block_height: *block_height.borrow(), }; let connect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); *block_height.borrow_mut() += 1; let first_tx_num = tx_writer.insert( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.insert( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; Ok(()) }; let disconnect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); let first_tx_num = tx_writer.delete( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.delete( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; *block_height.borrow_mut() -= 1; Ok(()) }; let utxo = |tx_num, out_idx, value| UtxoEntry { outpoint: UtxoOutpoint { tx_num, out_idx }, data: value, }; let read_utxos = |val: i64| group_reader.utxos(&ser_value(val)); let block0 = vec![make_inputs_tx(0x01, [(0x00, u32::MAX, 0xffff)], [100, 200])]; connect_block(&block0)?; assert_eq!(read_utxos(100)?, Some(vec![utxo(0, 0, 100)])); assert_eq!(read_utxos(200)?, Some(vec![utxo(0, 1, 200)])); let block1 = vec![ make_inputs_tx(0x02, [(0x00, u32::MAX, 0xffff)], [200]), make_inputs_tx(0x03, [(0x01, 0, 100)], [10, 20, 10]), make_inputs_tx( 0x04, [(0x03, 0, 10), (0x01, 1, 200), (0x03, 1, 20)], [200], ), ]; connect_block(&block1)?; assert_eq!(read_utxos(10)?, Some(vec![utxo(2, 2, 10)])); assert_eq!(read_utxos(20)?, None); assert_eq!(read_utxos(100)?, None); assert_eq!( read_utxos(200)?, Some(vec![utxo(1, 0, 200), utxo(3, 0, 200)]), ); disconnect_block(&block1)?; assert_eq!(read_utxos(10)?, None); assert_eq!(read_utxos(20)?, None); assert_eq!(read_utxos(100)?, Some(vec![utxo(0, 0, 100)])); assert_eq!(read_utxos(200)?, Some(vec![utxo(0, 1, 200)])); // Reorg block let block1 = vec![ make_inputs_tx(0x02, [(0x00, u32::MAX, 0xffff)], [200]), make_inputs_tx(0x10, [(0x01, 0, 100)], [100, 200, 100]), make_inputs_tx( 0x11, [(0x10, 0, 100), (0x10, 1, 200), (0x01, 1, 200)], [200], ), make_inputs_tx(0x12, [(0x11, 0, 200)], [200]), ]; connect_block(&block1)?; assert_eq!(read_utxos(100)?, Some(vec![utxo(2, 2, 100)])); assert_eq!( read_utxos(200)?, Some(vec![utxo(1, 0, 200), utxo(4, 0, 200)]), ); disconnect_block(&block1)?; assert_eq!(read_utxos(100)?, Some(vec![utxo(0, 0, 100)])); assert_eq!(read_utxos(200)?, Some(vec![utxo(0, 1, 200)])); disconnect_block(&block0)?; assert_eq!(read_utxos(100)?, None); assert_eq!(read_utxos(200)?, None); Ok(()) } } diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs index b1d9aa280..e205f6125 100644 --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -1,954 +1,995 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`ChronikIndexer`] to index blocks and txs. use std::{io::Write, path::PathBuf}; use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; use chronik_bridge::{ffi, util::expect_unique_ptr}; use chronik_db::{ db::{Db, WriteBatch}, groups::{ ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, TokenIdGroup, TokenIdGroupAux, TokenIdHistoryWriter, TokenIdUtxoWriter, }, index_tx::prepare_indexed_txs, io::{ merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, TxReader, TxWriter, }, mem::{MemData, MemDataConf, Mempool, MempoolTx}, }; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, indexer::ChronikIndexerError::*, query::{ QueryBlocks, QueryBroadcast, QueryGroupHistory, QueryGroupUtxos, QueryTxs, UtxoProtobufOutput, UtxoProtobufValue, }, subs::{BlockMsg, BlockMsgType, Subs}, subs_group::TxMsgType, }; -const CURRENT_INDEXER_VERSION: SchemaVersion = 10; +const CURRENT_INDEXER_VERSION: SchemaVersion = 11; +const LAST_UPGRADABLE_VERSION: SchemaVersion = 10; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] pub struct ChronikIndexerParams { /// Folder where the node stores its data, net-dependent. pub datadir_net: PathBuf, /// Whether to clear the DB before opening the DB, e.g. when reindexing. pub wipe_db: bool, /// Whether Chronik should index SLP/ALP token txs. pub enable_token_index: bool, /// Whether to output Chronik performance statistics into a perf/ folder pub enable_perf_stats: bool, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. #[derive(Debug)] pub struct ChronikIndexer { db: Db, mem_data: MemData, mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, subs: RwLock, perf_path: Option, is_token_index_enabled: bool, } /// Access to the bitcoind node. #[derive(Debug)] pub struct Node { /// FFI bridge to the node. pub bridge: cxx::UniquePtr, } /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { /// Data about the block (w/o txs) pub db_block: DbBlock, /// Txs in the block, with locations of where they are stored on disk. pub block_txs: BlockTxs, /// Block size in bytes. pub size: u64, /// Txs in the block, with inputs/outputs so we can group them. pub txs: Vec, } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikIndexerError { /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateDirFailed(PathBuf), /// Cannot rewind blocks that bitcoind doesn't have #[error( "Cannot rewind Chronik, it contains block {0} that the node doesn't \ have. You may need to use -reindex/-chronikreindex, or delete \ indexes/chronik and restart" )] CannotRewindChronik(BlockHash), /// Lower block doesn't exist but higher block does #[error( "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" )] BlocksBelowMissing { /// Lower height that is missing missing: BlockHeight, /// Higher height that exists exists: BlockHeight, }, /// Corrupted schema version #[error( "Corrupted schema version in the Chronik database, consider running \ -reindex/-chronikreindex" )] CorruptedSchemaVersion, /// Missing schema version for non-empty database #[error( "Missing schema version in non-empty Chronik database, consider \ running -reindex/-chronikreindex" )] MissingSchemaVersion, /// This Chronik instance is outdated #[error( "Chronik outdated: Chronik has version {}, but the database has \ version {0}. Upgrade your node to the appropriate version.", CURRENT_INDEXER_VERSION )] ChronikOutdated(SchemaVersion), /// Database is outdated #[error( - "DB outdated: Chronik has version {}, but the database has version \ - {0}. -reindex/-chronikreindex to reindex the database to the new \ - version.", - CURRENT_INDEXER_VERSION + "DB outdated: Chronik has version {CURRENT_INDEXER_VERSION}, but the \ + database has version {0}. The last upgradable version is \ + {LAST_UPGRADABLE_VERSION}. -reindex/-chronikreindex to reindex the \ + database to the new version." )] DatabaseOutdated(SchemaVersion), /// Cannot enable token index on a DB that previously had it disabled #[error( "Cannot enable -chroniktokenindex on a DB that previously had it \ disabled. Provide -reindex/-chronikreindex to reindex the database \ with token data, or specify -chroniktokenindex=0 to disable the \ token index again." )] CannotEnableTokenIndex, } impl ChronikIndexer { /// Setup the indexer with the given parameters, e.g. open the DB etc. pub fn setup(params: ChronikIndexerParams) -> Result { let indexes_path = params.datadir_net.join("indexes"); let perf_path = params.datadir_net.join("perf"); if !indexes_path.exists() { std::fs::create_dir(&indexes_path) .wrap_err_with(|| CreateDirFailed(indexes_path.clone()))?; } if params.enable_perf_stats && !perf_path.exists() { std::fs::create_dir(&perf_path) .wrap_err_with(|| CreateDirFailed(perf_path.clone()))?; } let db_path = indexes_path.join("chronik"); if params.wipe_db { log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } + log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; - verify_schema_version(&db)?; + let schema_version = verify_schema_version(&db)?; verify_enable_token_index(&db, params.enable_token_index)?; + upgrade_db_if_needed(&db, schema_version, params.enable_token_index)?; + let mempool = Mempool::new(ScriptGroup, params.enable_token_index); Ok(ChronikIndexer { db, mempool, mem_data: MemData::new(MemDataConf {}), script_group: ScriptGroup, avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(ScriptGroup)), perf_path: params.enable_perf_stats.then_some(perf_path), is_token_index_enabled: params.enable_token_index, }) } /// Resync Chronik index to the node pub fn resync_indexer( &mut self, bridge: &ffi::ChronikBridge, ) -> Result<()> { let block_reader = BlockReader::new(&self.db)?; let indexer_tip = block_reader.tip()?; let Ok(node_tip_index) = bridge.get_chain_tip() else { if let Some(indexer_tip) = &indexer_tip { return Err( CannotRewindChronik(indexer_tip.hash.clone()).into() ); } return Ok(()); }; let node_tip_info = ffi::get_block_info(node_tip_index); let node_height = node_tip_info.height; let node_tip_hash = BlockHash::from(node_tip_info.hash); let start_height = match indexer_tip { Some(tip) if tip.hash != node_tip_hash => { let indexer_tip_hash = tip.hash.clone(); let indexer_height = tip.height; log!( "Node and Chronik diverged, node is on block \ {node_tip_hash} at height {node_height}, and Chronik is \ on block {indexer_tip_hash} at height {indexer_height}.\n" ); let indexer_tip_index = bridge .lookup_block_index(tip.hash.to_bytes()) .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; self.rewind_indexer(bridge, indexer_tip_index, &tip)? } Some(tip) => tip.height, None => { log!( "Chronik database empty, syncing to block {node_tip_hash} \ at height {node_height}.\n" ); -1 } }; let tip_height = node_tip_info.height; for height in start_height + 1..=tip_height { if bridge.shutdown_requested() { log!("Stopped re-sync adding blocks\n"); return Ok(()); } let block_index = ffi::get_block_ancestor(node_tip_index, height)?; let block = self.load_chronik_block(bridge, block_index)?; let hash = block.db_block.hash.clone(); self.handle_block_connected(block)?; log_chronik!( "Added block {hash}, height {height}/{tip_height} to Chronik\n" ); if height % 100 == 0 { log!( "Synced Chronik up to block {hash} at height \ {height}/{tip_height}\n" ); } } log!( "Chronik completed re-syncing with the node, both are now at \ block {node_tip_hash} at height {node_height}.\n" ); if let Some(perf_path) = &self.perf_path { let mut resync_stats = std::fs::File::create(perf_path.join("resync_stats.txt"))?; write!(&mut resync_stats, "{:#.3?}", self.mem_data.stats())?; } Ok(()) } fn rewind_indexer( &mut self, bridge: &ffi::ChronikBridge, indexer_tip_index: &ffi::CBlockIndex, indexer_db_tip: &DbBlock, ) -> Result { let indexer_height = indexer_db_tip.height; let fork_block_index = bridge .find_fork(indexer_tip_index) .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; let fork_info = ffi::get_block_info(fork_block_index); let fork_block_hash = BlockHash::from(fork_info.hash); let fork_height = fork_info.height; let revert_height = fork_height + 1; log!( "The last common block is {fork_block_hash} at height \ {fork_height}.\n" ); log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); for height in (revert_height..indexer_height).rev() { if bridge.shutdown_requested() { log!("Stopped re-sync rewinding blocks\n"); // return MAX here so we don't add any blocks return Ok(BlockHeight::MAX); } let db_block = BlockReader::new(&self.db)? .by_height(height)? .ok_or(BlocksBelowMissing { missing: height, exists: indexer_height, })?; let block_index = bridge .lookup_block_index(db_block.hash.to_bytes()) .map_err(|_| CannotRewindChronik(db_block.hash))?; let block = self.load_chronik_block(bridge, block_index)?; self.handle_block_disconnected(block)?; } Ok(fork_info.height) } /// Add transaction to the indexer's mempool. pub fn handle_tx_added_to_mempool( &mut self, mempool_tx: MempoolTx, ) -> Result<()> { let result = self.mempool.insert(&self.db, mempool_tx)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::AddedToMempool, &result.token_id_aux, ); Ok(()) } /// Remove tx from the indexer's mempool, e.g. by a conflicting tx, expiry /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { let result = self.mempool.remove(txid)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::RemovedFromMempool, &result.token_id_aux, ); Ok(()) } /// Add the block to the index. pub fn handle_block_connected( &mut self, block: ChronikBlock, ) -> Result<()> { let height = block.db_block.height; let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.insert(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.insert( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.insert( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; let token_id_aux; if self.is_token_index_enabled { let processed_token_batch = token_writer.insert(&mut batch, &index_txs)?; token_id_aux = TokenIdGroupAux::from_batch(&index_txs, &processed_token_batch); token_id_history_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; } else { token_id_aux = TokenIdGroupAux::default(); } self.db.write_batch(batch)?; for tx in &block.block_txs.txs { self.mempool.remove_mined(&tx.txid)?; } merge::check_for_errors()?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Connected, hash: block.db_block.hash, height: block.db_block.height, }); subs.handle_block_tx_events( &block.txs, TxMsgType::Confirmed, &token_id_aux, ); Ok(()) } /// Remove the block from the index. pub fn handle_block_disconnected( &mut self, block: ChronikBlock, ) -> Result<()> { let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; block_writer.delete(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.delete( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.delete( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; if self.is_token_index_enabled { let token_id_aux = TokenIdGroupAux::from_db(&index_txs, &self.db)?; token_id_history_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; token_writer.delete(&mut batch, &index_txs)?; } self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Disconnected, hash: block.db_block.hash, height: block.db_block.height, }); Ok(()) } /// Block finalized with Avalanche. pub fn handle_block_finalized( &mut self, block: ChronikBlock, ) -> Result<()> { self.avalanche.finalize_block(block.db_block.height)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Finalized, hash: block.db_block.hash, height: block.db_block.height, }); let tx_reader = TxReader::new(&self.db)?; let first_tx_num = tx_reader .first_tx_num_by_block(block.db_block.height)? .unwrap(); let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; let token_id_aux = if self.is_token_index_enabled { TokenIdGroupAux::from_db(&index_txs, &self.db)? } else { TokenIdGroupAux::default() }; subs.handle_block_tx_events( &block.txs, TxMsgType::Finalized, &token_id_aux, ); Ok(()) } /// Return [`QueryBroadcast`] to broadcast tx to the network. pub fn broadcast<'a>(&'a self, node: &'a Node) -> QueryBroadcast<'a> { QueryBroadcast { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryBlocks`] to read blocks from the DB. pub fn blocks<'a>(&'a self, node: &'a Node) -> QueryBlocks<'a> { QueryBlocks { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryTxs`] to return txs from mempool/DB. pub fn txs<'a>(&'a self, node: &'a Node) -> QueryTxs<'a> { QueryTxs { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupHistory`] for scripts to query the tx history of /// scripts. pub fn script_history<'a>( &'a self, node: &'a Node, ) -> Result> { Ok(QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.script_history(), group: self.script_group.clone(), node, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupUtxos`] for scripts to query the utxos of scripts. pub fn script_utxos( &self, ) -> Result> { Ok(QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.script_utxos(), group: self.script_group.clone(), utxo_mapper: UtxoProtobufValue, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupHistory`] for token IDs to query the tx history of /// token IDs. pub fn token_id_history<'a>( &'a self, node: &'a Node, ) -> QueryGroupHistory<'a, TokenIdGroup> { QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.token_id_history(), group: TokenIdGroup, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupUtxos`] for token IDs to query the utxos of token IDs pub fn token_id_utxos( &self, ) -> QueryGroupUtxos<'_, TokenIdGroup, UtxoProtobufOutput> { QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.token_id_utxos(), group: TokenIdGroup, utxo_mapper: UtxoProtobufOutput, is_token_index_enabled: self.is_token_index_enabled, } } /// Subscribers, behind read/write lock pub fn subs(&self) -> &RwLock { &self.subs } /// Build a ChronikBlock from a ffi::Block. pub fn make_chronik_block(&self, block: ffi::Block) -> ChronikBlock { let db_block = DbBlock { hash: BlockHash::from(block.hash), prev_hash: BlockHash::from(block.prev_hash), height: block.height, n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, }; let block_txs = BlockTxs { block_height: block.height, txs: block .txs .iter() .map(|tx| { let txid = TxId::from(tx.tx.txid); TxEntry { txid, data_pos: tx.data_pos, undo_pos: tx.undo_pos, time_first_seen: match self.mempool.tx(&txid) { Some(tx) => tx.time_first_seen, None => 0, }, is_coinbase: tx.undo_pos == 0, } }) .collect(), }; let txs = block .txs .into_iter() .map(|block_tx| Tx::from(block_tx.tx)) .collect::>(); ChronikBlock { db_block, block_txs, size: block.size, txs, } } /// Load a ChronikBlock from the node given the CBlockIndex. pub fn load_chronik_block( &self, bridge: &ffi::ChronikBridge, block_index: &ffi::CBlockIndex, ) -> Result { let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let ffi_block_undo = bridge.load_block_undo(block_index)?; let ffi_block_undo = expect_unique_ptr("load_block_undo", &ffi_block_undo); let block = ffi::bridge_block(ffi_block, ffi_block_undo, block_index)?; Ok(self.make_chronik_block(block)) } } -fn verify_schema_version(db: &Db) -> Result<()> { +fn verify_schema_version(db: &Db) -> Result { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; - match metadata_reader + let schema_version = match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { Some(schema_version) => { assert!(!is_empty, "Empty DB can't have a schema version"); if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } - if schema_version < CURRENT_INDEXER_VERSION { + if schema_version < LAST_UPGRADABLE_VERSION { return Err(DatabaseOutdated(schema_version).into()); } + log!( + "Chronik has version {CURRENT_INDEXER_VERSION}, DB has \ + version {schema_version}\n" + ); + schema_version } None => { if !is_empty { return Err(MissingSchemaVersion.into()); } let mut batch = WriteBatch::default(); metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; + log!( + "Chronik has version {CURRENT_INDEXER_VERSION}, initialized \ + DB with that version\n" + ); + CURRENT_INDEXER_VERSION } - } - log!("Chronik has version {CURRENT_INDEXER_VERSION}\n"); - Ok(()) + }; + Ok(schema_version) } fn verify_enable_token_index(db: &Db, enable_token_index: bool) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let token_writer = TokenWriter::new(db)?; let is_empty = db.is_db_empty()?; let is_token_index_enabled = metadata_reader.is_token_index_enabled()?; let mut batch = WriteBatch::default(); if !is_empty { // Cannot enable token index if not already previously enabled if enable_token_index && !is_token_index_enabled { return Err(CannotEnableTokenIndex.into()); } // Wipe token index if previously enabled and now disabled if !enable_token_index && is_token_index_enabled { log!( "Warning: Wiping existing token index, since \ -chroniktokenindex=0\n" ); log!("You will need to -reindex/-chronikreindex to restore\n"); token_writer.wipe(&mut batch); } } metadata_writer .update_is_token_index_enabled(&mut batch, enable_token_index)?; db.write_batch(batch)?; Ok(()) } +fn upgrade_db_if_needed( + db: &Db, + schema_version: u64, + enable_token_index: bool, +) -> Result<()> { + // DB has version 10, upgrade to 11 + if schema_version == 10 { + upgrade_10_to_11(db, enable_token_index)?; + } + Ok(()) +} + +fn upgrade_10_to_11(db: &Db, enable_token_index: bool) -> Result<()> { + log!("Upgrading Chronik DB from version 10 to 11...\n"); + let script_utxo_writer = ScriptUtxoWriter::new(db, ScriptGroup)?; + script_utxo_writer.upgrade_10_to_11()?; + if enable_token_index { + let token_id_utxo_writer = TokenIdUtxoWriter::new(db, TokenIdGroup)?; + token_id_utxo_writer.upgrade_10_to_11()?; + } + let mut batch = WriteBatch::default(); + let metadata_writer = MetadataWriter::new(db)?; + metadata_writer.update_schema_version(&mut batch, 11)?; + db.write_batch(batch)?; + log!("Successfully upgraded Chronik DB from version 10 to 11.\n"); + Ok(()) +} + impl Node { /// If `result` is [`Err`], logs and aborts the node. pub fn ok_or_abort(&self, func_name: &str, result: Result) { if let Err(report) = result { log_chronik!("{report:?}\n"); self.bridge.abort_node( &format!("ERROR Chronik in {func_name}"), &format!("{report:#}"), ); } } } impl std::fmt::Debug for ChronikIndexerParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikIndexerParams") .field("datadir_net", &self.datadir_net) .field("wipe_db", &self.wipe_db) .field("fn_compress_script", &"..") .finish() } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use chronik_db::{ db::{Db, WriteBatch, CF_META}, io::{BlockReader, BlockTxs, DbBlock, MetadataReader, MetadataWriter}, }; use pretty_assertions::assert_eq; use crate::indexer::{ ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams, CURRENT_INDEXER_VERSION, }; #[test] fn test_indexer() -> Result<()> { let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); let params = ChronikIndexerParams { datadir_net: datadir_net.clone(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, }; // regtest folder doesn't exist yet -> error assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CreateDirFailed(datadir_net.join("indexes")), ); // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; let mut indexer = ChronikIndexer::setup(params.clone())?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); // DB is empty assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); let block = ChronikBlock { db_block: DbBlock { hash: BlockHash::from([4; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1deadbef, timestamp: 1234567890, file_num: 0, data_pos: 1337, }, block_txs: BlockTxs { block_height: 0, txs: vec![], }, size: 285, txs: vec![], }; // Add block indexer.handle_block_connected(block.clone())?; assert_eq!( BlockReader::new(&indexer.db)?.by_height(0)?, Some(block.db_block.clone()) ); // Remove block again indexer.handle_block_disconnected(block.clone())?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); // Add block then wipe, block not there indexer.handle_block_connected(block)?; std::mem::drop(indexer); let indexer = ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..params })?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); Ok(()) } #[test] fn test_schema_version() -> Result<()> { let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { datadir_net: dir.path().to_path_buf(), wipe_db: false, enable_token_index: false, enable_perf_stats: false, }; // Setting up DB first time sets the schema version ChronikIndexer::setup(params.clone())?; { let db = Db::open(&chronik_path)?; assert_eq!( MetadataReader::new(&db)?.schema_version()?, Some(CURRENT_INDEXER_VERSION) ); } // Opening DB again works fine ChronikIndexer::setup(params.clone())?; // Override DB schema version to 0 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?; db.write_batch(batch)?; } // -> DB too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::DatabaseOutdated(0), ); // Override DB schema version to CURRENT_INDEXER_VERSION + 1 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version( &mut batch, CURRENT_INDEXER_VERSION + 1, )?; db.write_batch(batch)?; } // -> Chronik too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), ); // Corrupt schema version { let db = Db::open(&chronik_path)?; let cf_meta = db.cf(CF_META)?; let mut batch = WriteBatch::default(); batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]); db.write_batch(batch)?; } assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CorruptedSchemaVersion, ); // New db path, but has existing data let new_dir = dir.path().join("new"); let new_chronik_path = new_dir.join("indexes").join("chronik"); std::fs::create_dir_all(&new_chronik_path)?; let new_params = ChronikIndexerParams { datadir_net: new_dir, wipe_db: false, ..params }; { // new db with obscure field in meta let db = Db::open(&new_chronik_path)?; let mut batch = WriteBatch::default(); batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR"); db.write_batch(batch)?; } // Error: non-empty DB without schema version assert_eq!( ChronikIndexer::setup(new_params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..new_params })?; Ok(()) } }