diff --git a/chronik/bitcoinsuite-core/src/tx/tx.rs b/chronik/bitcoinsuite-core/src/tx/tx.rs index 7e6093869..58db948eb 100644 --- a/chronik/bitcoinsuite-core/src/tx/tx.rs +++ b/chronik/bitcoinsuite-core/src/tx/tx.rs @@ -1,296 +1,304 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use crate::{ error::DataError, script::Script, ser::{BitcoinSer, BitcoinSerializer}, tx::TxId, }; /// CTransaction, a Bitcoin transaction. /// /// ``` /// # use bitcoinsuite_core::tx::{Tx, TxId, TxMut}; /// let txid = TxId::from([3; 32]); /// let tx = Tx::with_txid( /// txid, /// TxMut { /// version: 1, /// inputs: vec![], /// outputs: vec![], /// locktime: 0, /// }, /// ); /// assert_eq!(tx.txid(), txid); /// assert_eq!(tx.version, 1); /// assert_eq!(tx.inputs, vec![]); /// assert_eq!(tx.outputs, vec![]); /// assert_eq!(tx.locktime, 0); /// ``` /// /// Immutable version of [`TxMut`], so this will fail: /// ```compile_fail /// # use bitcoinsuite_core::tx::Tx; /// let mut tx = Tx::default(); /// tx.version = 1; /// ``` #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct Tx { txid: TxId, tx: TxMut, } /// Bitcoin transaction. Mutable version of [`Tx`], like CMutableTransaction. /// /// The biggest difference is that it doesn't have a txid() method, which we /// cannot know without hashing the tx every time, which would be expensive to /// compute. #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct TxMut { /// nVersion of the tx. pub version: i32, /// Tx inputs. pub inputs: Vec, /// Tx outputs. pub outputs: Vec, /// Locktime of the tx. pub locktime: u32, } /// COutPoint, pointing to a coin being spent. #[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct OutPoint { /// TxId of the output of the coin. pub txid: TxId, /// Index in the outputs of the tx of the coin. pub out_idx: u32, } /// Points to an input spending a coin. #[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct SpentBy { /// TxId of the tx with the input. pub txid: TxId, /// Index in the inputs of the tx. pub input_idx: u32, } /// CTxIn, spending an unspent output. #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct TxInput { /// Points to an output being spent. pub prev_out: OutPoint, /// scriptSig unlocking the output. pub script: Script, /// nSequence. pub sequence: u32, /// Coin being spent by this tx. /// May be [`None`] for coinbase txs or if the spent coin is unknown. pub coin: Option, } /// CTxOut, creating a new output. #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct TxOutput { /// Value of the output. pub value: i64, /// Script locking the output. pub script: Script, } /// Coin, can be spent by providing a valid unlocking script. #[derive(Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)] pub struct Coin { /// Output, locking the coins. pub output: TxOutput, /// Height of the coin in the chain. pub height: i32, /// Whether the coin is a coinbase. pub is_coinbase: bool, } +/// Empty tx, serializes to 00000000000000000000. +pub static EMPTY_TX: TxMut = TxMut { + version: 0, + inputs: vec![], + outputs: vec![], + locktime: 0, +}; + impl Tx { /// Create a new [`Tx`] with a given [`TxId`] and [`TxMut`]. /// /// It is the responsibility of the caller to ensure the `txid` matches /// `tx`. pub fn with_txid(txid: TxId, tx: TxMut) -> Self { Tx { txid, tx } } /// [`TxId`] of this [`Tx`]. pub fn txid(&self) -> TxId { self.txid } /// Like [`Tx::txid`], but as a reference. pub fn txid_ref(&self) -> &TxId { &self.txid } } impl std::ops::Deref for Tx { type Target = TxMut; fn deref(&self) -> &Self::Target { &self.tx } } impl BitcoinSer for TxMut { fn ser_to(&self, bytes: &mut S) { self.version.ser_to(bytes); self.inputs.ser_to(bytes); self.outputs.ser_to(bytes); self.locktime.ser_to(bytes); } fn deser(data: &mut bytes::Bytes) -> Result { Ok(TxMut { version: BitcoinSer::deser(data)?, inputs: BitcoinSer::deser(data)?, outputs: BitcoinSer::deser(data)?, locktime: BitcoinSer::deser(data)?, }) } } impl BitcoinSer for Tx { fn ser_to(&self, bytes: &mut S) { TxMut::ser_to(self, bytes) } fn deser(data: &mut bytes::Bytes) -> Result { let tx = TxMut::deser(data)?; Ok(Tx::with_txid(TxId::from_tx(&tx), tx)) } } impl BitcoinSer for OutPoint { fn ser_to(&self, bytes: &mut S) { self.txid.ser_to(bytes); self.out_idx.ser_to(bytes); } fn deser(data: &mut bytes::Bytes) -> Result { Ok(OutPoint { txid: BitcoinSer::deser(data)?, out_idx: BitcoinSer::deser(data)?, }) } } impl BitcoinSer for TxInput { fn ser_to(&self, bytes: &mut S) { self.prev_out.ser_to(bytes); self.script.ser_to(bytes); self.sequence.ser_to(bytes); } fn deser(data: &mut bytes::Bytes) -> Result { Ok(TxInput { prev_out: BitcoinSer::deser(data)?, script: BitcoinSer::deser(data)?, sequence: BitcoinSer::deser(data)?, coin: None, }) } } impl BitcoinSer for TxOutput { fn ser_to(&self, bytes: &mut S) { self.value.ser_to(bytes); self.script.ser_to(bytes); } fn deser(data: &mut bytes::Bytes) -> Result { Ok(TxOutput { value: BitcoinSer::deser(data)?, script: BitcoinSer::deser(data)?, }) } } #[cfg(test)] mod tests { use bytes::Bytes; use crate::{ script::Script, ser::BitcoinSer, tx::{OutPoint, Tx, TxId, TxInput, TxMut, TxOutput}, }; fn verify_ser(tx: TxMut, ser: &[u8]) { assert_eq!(tx.ser().as_ref(), ser); assert_eq!(tx.ser_len(), ser.len()); let mut bytes = Bytes::copy_from_slice(ser); assert_eq!(tx, TxMut::deser(&mut bytes).unwrap()); let tx = Tx::with_txid(TxId::from_tx(&tx), tx); assert_eq!(tx.ser().as_ref(), ser); assert_eq!(tx.ser_len(), ser.len()); let mut bytes = Bytes::copy_from_slice(ser); assert_eq!(tx, Tx::deser(&mut bytes).unwrap()); } #[test] fn test_ser_tx() -> Result<(), hex::FromHexError> { verify_ser(TxMut::default(), &[0; 10]); verify_ser( TxMut { version: 0x12345678, inputs: vec![], outputs: vec![], locktime: 0x9abcdef1, }, &hex::decode("785634120000f1debc9a")?, ); let genesis_tx = TxMut { version: 1, inputs: vec![TxInput { prev_out: OutPoint { txid: TxId::from([0; 32]), out_idx: 0xffff_ffff, }, script: Script::new( hex::decode( "04ffff001d0104455468652054696d65732030332f4a616e2f3230\ 3039204368616e63656c6c6f72206f6e206272696e6b206f662073\ 65636f6e64206261696c6f757420666f722062616e6b73", )? .into(), ), sequence: 0xffff_ffff, coin: None, }], outputs: vec![TxOutput { value: 5000000000, script: Script::new( hex::decode( "4104678afdb0fe5548271967f1a67130b7105cd6a828e03909a679\ 62e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7\ ba0b8d578a4c702b6bf11d5fac", )? .into(), ), }], locktime: 0, }; verify_ser( genesis_tx, &hex::decode( "01000000010000000000000000000000000000000000000000000000000000\ 000000000000ffffffff4d04ffff001d0104455468652054696d6573203033\ 2f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f\ 66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff01\ 00f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828\ e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384d\ f7ba0b8d578a4c702b6bf11d5fac00000000" )?, ); Ok(()) } } diff --git a/chronik/chronik-cpp/chronik.cpp b/chronik/chronik-cpp/chronik.cpp index fee83e931..591aa7f45 100644 --- a/chronik/chronik-cpp/chronik.cpp +++ b/chronik/chronik-cpp/chronik.cpp @@ -1,78 +1,80 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include #include #include #include #include #include #include #include namespace chronik { // Duration between WebSocket pings initiated by Chronik. // 45s has been empirically established as a reliable duration for both browser // and NodeJS WebSockets. static constexpr std::chrono::seconds WS_PING_INTERVAL_DEFAULT{45s}; // Ping duration is just 5s on regtest to speed up ping tests and make // functional tests more reliable. static constexpr std::chrono::seconds WS_PING_INTERVAL_REGTEST{5s}; template rust::Vec ToRustVec(const C &container) { rust::Vec vec; vec.reserve(container.size()); std::copy(container.begin(), container.end(), std::back_inserter(vec)); return vec; } bool Start(const Config &config, const node::NodeContext &node, bool fWipe) { const bool is_pause_allowed = gArgs.GetBoolArg("-chronikallowpause", false); const CChainParams ¶ms = config.GetChainParams(); if (is_pause_allowed && !params.IsTestChain()) { return InitError(_("Using -chronikallowpause on a mainnet chain is not " "allowed for security reasons.")); } return chronik_bridge::setup_chronik( { .datadir_net = gArgs.GetDataDirNet().u8string(), .hosts = ToRustVec(gArgs.IsArgSet("-chronikbind") ? gArgs.GetArgs("-chronikbind") : DEFAULT_BINDS), .default_port = BaseParams().ChronikPort(), .wipe_db = fWipe, .enable_token_index = gArgs.GetBoolArg("-chroniktokenindex", true), + .enable_lokad_id_index = + gArgs.GetBoolArg("-chroniklokadidindex", true), .is_pause_allowed = is_pause_allowed, .enable_perf_stats = gArgs.GetBoolArg("-chronikperfstats", false), .ws_ping_interval_secs = params.NetworkIDString() == CBaseChainParams::REGTEST ? uint64_t(count_seconds(WS_PING_INTERVAL_REGTEST)) : uint64_t(count_seconds(WS_PING_INTERVAL_DEFAULT)), .tx_num_cache = { .num_buckets = (size_t)gArgs.GetIntArg("-chroniktxnumcachebuckets", DEFAULT_TX_NUM_CACHE_BUCKETS), .bucket_size = (size_t)gArgs.GetIntArg( "-chroniktxnumcachebucketsize", DEFAULT_TX_NUM_CACHE_BUCKET_SIZE), }, }, config, node); } void Stop() { LogPrintf("Stopping Chronik...\n"); StopChronikValidationInterface(); } } // namespace chronik diff --git a/chronik/chronik-db/src/db.rs b/chronik/chronik-db/src/db.rs index d6e7eee02..54718635f 100644 --- a/chronik/chronik-db/src/db.rs +++ b/chronik/chronik-db/src/db.rs @@ -1,226 +1,231 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`Db`] and errors, which encapsulates a database. //! Read and write operations should exclusively be done with dedicated writers //! and readers, such as [`crate::io::BlockWriter`]. use std::path::Path; use abc_rust_error::Result; pub use rocksdb::WriteBatch; use rocksdb::{ColumnFamilyDescriptor, IteratorMode}; use thiserror::Error; use crate::{ groups::{ - ScriptHistoryWriter, ScriptUtxoWriter, TokenIdHistoryWriter, - TokenIdUtxoWriter, + LokadIdHistoryWriter, ScriptHistoryWriter, ScriptUtxoWriter, + TokenIdHistoryWriter, TokenIdUtxoWriter, }, io::{ token::TokenWriter, BlockStatsWriter, BlockWriter, MetadataWriter, SpentByWriter, TxWriter, }, }; // All column family names used by Chronik should be defined here /// Column family name for the block data. pub const CF_BLK: &str = "blk"; /// Column family for the first tx_num of the block. Used to get a list of the /// txs of the block. pub const CF_BLK_BY_FIRST_TX: &str = "blk_by_first_tx"; /// Column family for stats about blocks. pub const CF_BLK_STATS: &str = "blk_stats"; /// Column family for the block height of the first tx_num of that block. Used /// to get the block height of a tx. pub const CF_FIRST_TX_BY_BLK: &str = "first_tx_by_blk"; +/// Column family to store tx history by LOKAD ID. +pub const CF_LOKAD_ID_HISTORY: &str = "lokad_id_history"; +/// Column family to store number of txs by LOKAD ID. +pub const CF_LOKAD_ID_HISTORY_NUM_TXS: &str = "lokad_id_history_num_txs"; /// Column family to lookup a block by its hash. pub const CF_LOOKUP_BLK_BY_HASH: &str = "lookup_blk_by_hash"; /// Column family to lookup a tx by its hash. pub const CF_LOOKUP_TX_BY_HASH: &str = "lookup_tx_by_hash"; /// Column family name for db metadata. pub const CF_META: &str = "meta"; /// Column family to store tx history by script. pub const CF_SCRIPT_HISTORY: &str = "script_history"; /// Column family to store number of txs by script. pub const CF_SCRIPT_HISTORY_NUM_TXS: &str = "script_history_num_txs"; /// Column family for utxos by script. pub const CF_SCRIPT_UTXO: &str = "script_utxo"; /// Column family to store tx history by token ID. pub const CF_TOKEN_ID_HISTORY: &str = "token_id_history"; /// Column family to store number of txs by token ID. pub const CF_TOKEN_ID_HISTORY_NUM_TXS: &str = "token_id_history_num_txs"; /// Column family for utxos by token ID. pub const CF_TOKEN_ID_UTXO: &str = "token_id_utxo"; /// Column family to store which outputs have been spent by which tx inputs. pub const CF_SPENT_BY: &str = "spent_by"; /// Column family for genesis info by token_tx_num pub const CF_TOKEN_GENESIS_INFO: &str = "token_genesis_info"; /// Column family for TokenMeta by token_tx_num pub const CF_TOKEN_META: &str = "token_meta"; /// Column family for token tx data by tx pub const CF_TOKEN_TX: &str = "token_tx"; /// Column family for the tx data. pub const CF_TX: &str = "tx"; pub(crate) type CF = rocksdb::ColumnFamily; /// Indexer database. /// Owns the underlying [`rocksdb::DB`] instance. #[derive(Debug)] pub struct Db { db: rocksdb::DB, cf_names: Vec, } /// Errors indicating something went wrong with the database itself. #[derive(Debug, Error)] pub enum DbError { /// Column family requested but not defined during `Db::open`. #[error("Column family {0} doesn't exist")] NoSuchColumnFamily(String), /// Error with RocksDB itself, e.g. db inconsistency. #[error("RocksDB error: {0}")] RocksDb(rocksdb::Error), } use self::DbError::*; impl Db { /// Opens the database under the specified path. /// Creates the database file and necessary column families if necessary. pub fn open(path: impl AsRef) -> Result { let mut cfs = Vec::new(); BlockWriter::add_cfs(&mut cfs); BlockStatsWriter::add_cfs(&mut cfs); MetadataWriter::add_cfs(&mut cfs); TxWriter::add_cfs(&mut cfs); ScriptHistoryWriter::add_cfs(&mut cfs); ScriptUtxoWriter::add_cfs(&mut cfs); SpentByWriter::add_cfs(&mut cfs); TokenWriter::add_cfs(&mut cfs); TokenIdHistoryWriter::add_cfs(&mut cfs); TokenIdUtxoWriter::add_cfs(&mut cfs); + LokadIdHistoryWriter::add_cfs(&mut cfs); Self::open_with_cfs(path, cfs) } pub(crate) fn open_with_cfs( path: impl AsRef, cfs: Vec, ) -> Result { let db_options = Self::db_options(); let cf_names = cfs.iter().map(|cf| cf.name().to_string()).collect(); let db = rocksdb::DB::open_cf_descriptors(&db_options, path, cfs) .map_err(RocksDb)?; Ok(Db { db, cf_names }) } fn db_options() -> rocksdb::Options { let mut db_options = rocksdb::Options::default(); db_options.create_if_missing(true); db_options.create_missing_column_families(true); db_options } /// Destroy the DB, i.e. delete all it's associated files. /// /// According to the RocksDB docs, this differs from removing the dir: /// DestroyDB() will take care of the case where the RocksDB database is /// stored in multiple directories. For instance, a single DB can be /// configured to store its data in multiple directories by specifying /// different paths to DBOptions::db_paths, DBOptions::db_log_dir, and /// DBOptions::wal_dir. pub fn destroy(path: impl AsRef) -> Result<()> { let db_options = Self::db_options(); rocksdb::DB::destroy(&db_options, path).map_err(RocksDb)?; Ok(()) } /// Return a column family handle with the given name. pub fn cf(&self, name: &str) -> Result<&CF> { Ok(self .db .cf_handle(name) .ok_or_else(|| NoSuchColumnFamily(name.to_string()))?) } pub(crate) fn get( &self, cf: &CF, key: impl AsRef<[u8]>, ) -> Result>> { Ok(self.db.get_pinned_cf(cf, key).map_err(RocksDb)?) } pub(crate) fn multi_get( &self, cf: &CF, keys: impl IntoIterator>, sorted_inputs: bool, ) -> Result>>> { self.db .batched_multi_get_cf(cf, keys, sorted_inputs) .into_iter() .map(|value| value.map_err(|err| RocksDb(err).into())) .collect() } pub(crate) fn iterator_end( &self, cf: &CF, ) -> impl Iterator, Box<[u8]>)>> + '_ { self.db .iterator_cf(cf, IteratorMode::End) .map(|result| Ok(result.map_err(RocksDb)?)) } pub(crate) fn iterator( &self, cf: &CF, start: &[u8], direction: rocksdb::Direction, ) -> impl Iterator, Box<[u8]>)>> + '_ { self.db .iterator_cf(cf, IteratorMode::From(start, direction)) .map(|result| Ok(result.map_err(RocksDb)?)) } pub(crate) fn full_iterator( &self, cf: &CF, ) -> impl Iterator, Box<[u8]>)>> + '_ { self.db .full_iterator_cf(cf, IteratorMode::Start) .map(|result| Ok(result.map_err(RocksDb)?)) } pub(crate) fn estimate_num_keys(&self, cf: &CF) -> Result> { Ok(self .db .property_int_value_cf(cf, "rocksdb.estimate-num-keys") .map_err(RocksDb)?) } /// Writes the batch to the Db atomically. pub fn write_batch(&self, write_batch: WriteBatch) -> Result<()> { self.db.write_without_wal(write_batch).map_err(RocksDb)?; Ok(()) } /// Whether any of the column families in the DB have any data. /// /// Note: RocksDB forbids not opening all column families, therefore, this /// will always iter through all column families. pub fn is_db_empty(&self) -> Result { for cf_name in &self.cf_names { let cf = self.cf(cf_name)?; let mut cf_iter = self.db.full_iterator_cf(cf, IteratorMode::Start); if cf_iter.next().is_some() { return Ok(false); } } Ok(true) } } diff --git a/chronik/chronik-db/src/groups/lokad_id.rs b/chronik/chronik-db/src/groups/lokad_id.rs new file mode 100644 index 000000000..dddd4ccc9 --- /dev/null +++ b/chronik/chronik-db/src/groups/lokad_id.rs @@ -0,0 +1,88 @@ +// Copyright (c) 2024 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +use bitcoinsuite_core::tx::EMPTY_TX; +use bitcoinsuite_slp::lokad_id::{parse_tx_lokad_ids, LokadId, LokadIdIter}; + +use crate::{ + db::{CF_LOKAD_ID_HISTORY, CF_LOKAD_ID_HISTORY_NUM_TXS}, + group::{Group, GroupQuery, MemberItem, UtxoDataOutput}, + io::{ + GroupHistoryConf, GroupHistoryReader, GroupHistoryWriter, GroupUtxoConf, + }, + mem::MempoolGroupHistory, +}; + +/// Index the mempool tx history of scripts +pub type MempoolLokadIdHistory = MempoolGroupHistory; +/// Index the tx history of script in the DB +pub type LokadIdHistoryWriter<'a> = GroupHistoryWriter<'a, LokadIdGroup>; +/// Read the tx history of scripts in the DB +pub type LokadIdHistoryReader<'a> = GroupHistoryReader<'a, LokadIdGroup>; + +/// Group txs by input/output scripts. +#[derive(Clone, Debug)] +pub struct LokadIdGroup; + +/// Iterator over the scripts of a tx +#[derive(Debug)] +pub struct LokadIdGroupIter<'a> { + iter: std::iter::Enumerate>, +} + +impl<'a> Iterator for LokadIdGroupIter<'a> { + type Item = MemberItem; + + fn next(&mut self) -> Option { + let (idx, member) = self.iter.next()?; + Some(MemberItem { idx, member }) + } +} + +impl Group for LokadIdGroup { + type Aux = (); + type Iter<'a> = LokadIdGroupIter<'a>; + type Member<'a> = LokadId; + type MemberSer = LokadId; + // Ignored, LOKAD ID doesn't apply to UTXOs but to entire txs + type UtxoData = UtxoDataOutput; + + fn input_members<'a>( + &self, + _query: GroupQuery<'a>, + _aux: &(), + ) -> Self::Iter<'a> { + // Don't use; actual parsing happens in output_members + LokadIdGroupIter { + iter: parse_tx_lokad_ids(&EMPTY_TX).enumerate(), + } + } + + fn output_members<'a>( + &self, + query: GroupQuery<'a>, + _aux: &(), + ) -> Self::Iter<'a> { + // LOKAD ID is per-tx not per-utxo, so we only use output_members + LokadIdGroupIter { + iter: parse_tx_lokad_ids(query.tx).enumerate(), + } + } + + fn ser_member(&self, member: &Self::Member<'_>) -> Self::MemberSer { + *member + } + + fn tx_history_conf() -> GroupHistoryConf { + GroupHistoryConf { + cf_page_name: CF_LOKAD_ID_HISTORY, + cf_num_txs_name: CF_LOKAD_ID_HISTORY_NUM_TXS, + page_size: 1000, + } + } + + fn utxo_conf() -> GroupUtxoConf { + panic!("LokadIdGroup should not be used to group UTXOs") + } +} diff --git a/chronik/chronik-db/src/groups/mod.rs b/chronik/chronik-db/src/groups/mod.rs index 7b18eefd2..b69b4713e 100644 --- a/chronik/chronik-db/src/groups/mod.rs +++ b/chronik/chronik-db/src/groups/mod.rs @@ -1,11 +1,13 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Collection of group implementations to group transactions by when indexing. +mod lokad_id; mod script; mod token_id; +pub use self::lokad_id::*; pub use self::script::*; pub use self::token_id::*; diff --git a/chronik/chronik-db/src/io/group_history.rs b/chronik/chronik-db/src/io/group_history.rs index 20939ca2e..fcc7d65a2 100644 --- a/chronik/chronik-db/src/io/group_history.rs +++ b/chronik/chronik-db/src/io/group_history.rs @@ -1,681 +1,687 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use std::{collections::BTreeMap, marker::PhantomData, time::Instant}; use abc_rust_error::Result; use rocksdb::WriteBatch; use thiserror::Error; use crate::{ db::{Db, CF}, group::{tx_members_for_group, Group, GroupQuery}, index_tx::IndexTx, io::{ group_history::GroupHistoryError::*, merge::catch_merge_errors, TxNum, }, ser::{db_deserialize_vec, db_serialize_vec}, }; /// Represent page numbers with 32-bit unsigned integers. type PageNum = u32; /// Represent num txs with 32-bit unsigned integers. /// Note: This implies that scripts can at most have 2^32 txs. type NumTxs = u32; const CONCAT: u8 = b'C'; const TRIM: u8 = b'T'; /// Configuration for group history reader/writers. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct GroupHistoryConf { /// Column family to store the group history pages. pub cf_page_name: &'static str, /// Column family to store the last page num of the group history. pub cf_num_txs_name: &'static str, /// Page size for each member of the group. pub page_size: NumTxs, } struct GroupHistoryColumn<'a> { db: &'a Db, cf_page: &'a CF, cf_num_txs: &'a CF, } /// Write txs grouped and paginated to the DB. /// /// This is primarily meant to store the tx history of an address, but it can /// also be used to index the history of other groups, especially of new /// protocols. /// /// Txs are stored paginated, because in the case of addresses, there already /// exist addresses with millions of txs. While RocksDB can handle multi MB /// entries, it would significantly slow down both reading and writing of this /// address, which could pose a DoS risk. /// /// Each page is stored at the key ` + <4-byte page num>` /// /// Txs in a member are ordered strictly ascendingly, both within a page, and /// also between pages, such that the entire tx history of a member can be /// iterated by going through pages 0..N and going through all of the txs of /// each page. #[derive(Debug)] pub struct GroupHistoryWriter<'a, G: Group> { col: GroupHistoryColumn<'a>, conf: GroupHistoryConf, group: G, } /// Read pages of grouped txs from the DB. #[derive(Debug)] pub struct GroupHistoryReader<'a, G: Group> { col: GroupHistoryColumn<'a>, conf: GroupHistoryConf, phantom: PhantomData, } /// In-memory data for the tx history. #[derive(Debug, Default)] pub struct GroupHistoryMemData { /// Stats about cache hits, num requests etc. pub stats: GroupHistoryStats, } /// Stats about cache hits, num requests etc. #[derive(Clone, Debug, Default)] pub struct GroupHistoryStats { /// Total number of members updated. pub n_total: usize, /// Time [s] for insert/delete. pub t_total: f64, /// Time [s] for grouping txs. pub t_group: f64, /// Time [s] for serializing members. pub t_ser_members: f64, /// Time [s] for fetching existing tx data. pub t_fetch: f64, } /// Error indicating that something went wrong with writing group history data. #[derive(Debug, Error, PartialEq, Eq)] pub enum GroupHistoryError { /// Bad num_txs size #[error("Inconsistent DB: Bad num_txs size: {0:?}")] BadNumTxsSize(Vec), /// Used merge_cf incorrectly, prefix must either be C or T. #[error( "Bad usage of merge: Unknown prefix {0:02x}, expected C or T: {}", hex::encode(.1), )] UnknownOperandPrefix(u8, Vec), } struct FetchedNumTxs<'tx, G: Group> { members_num_txs: Vec, grouped_txs: BTreeMap, Vec>, ser_members: Vec, } pub(crate) fn bytes_to_num_txs(bytes: &[u8]) -> Result { Ok(NumTxs::from_be_bytes( bytes .try_into() .map_err(|_| BadNumTxsSize(bytes.to_vec()))?, )) } fn partial_merge_concat_trim( _key: &[u8], _existing_value: Option<&[u8]>, _operands: &rocksdb::MergeOperands, ) -> Option> { // We don't use partial merge None } fn init_concat_trim( _key: &[u8], existing_value: Option<&[u8]>, operands: &rocksdb::MergeOperands, ) -> Result> { let mut bytes = existing_value.unwrap_or(&[]).to_vec(); if operands.iter().all(|operand| operand[0] == CONCAT) { bytes.reserve_exact( operands.iter().map(|operand| operand.len() - 1).sum(), ); } Ok(bytes) } fn apply_concat_trim( _key: &[u8], bytes: &mut Vec, operand: &[u8], ) -> Result<()> { if operand[0] == CONCAT { bytes.extend_from_slice(&operand[1..]); } else if operand[0] == TRIM { let trim_len = NumTxs::from_be_bytes(operand[1..5].try_into().unwrap()); bytes.drain(bytes.len() - trim_len as usize..); } else { return Err(UnknownOperandPrefix(operand[0], operand.to_vec()).into()); } Ok(()) } fn ser_concat_trim(_key: &[u8], bytes: Vec) -> Result> { Ok(bytes) } impl<'a> GroupHistoryColumn<'a> { fn new(db: &'a Db, conf: &GroupHistoryConf) -> Result { let cf_page = db.cf(conf.cf_page_name)?; let cf_num_txs = db.cf(conf.cf_num_txs_name)?; Ok(GroupHistoryColumn { db, cf_page, cf_num_txs, }) } fn get_page_txs( &self, member_ser: &[u8], page_num: PageNum, ) -> Result>> { let key = key_for_member_page(member_ser, page_num); let value = match self.db.get(self.cf_page, &key)? { Some(value) => value, None => return Ok(None), }; Ok(Some(db_deserialize_vec::(&value)?)) } } impl<'a, G: Group> GroupHistoryWriter<'a, G> { /// Create a new [`GroupHistoryWriter`]. pub fn new(db: &'a Db, group: G) -> Result { let conf = G::tx_history_conf(); let col = GroupHistoryColumn::new(db, &conf)?; Ok(GroupHistoryWriter { col, conf, group }) } /// Group the txs, then insert them to into each member of the group. pub fn insert( &self, batch: &mut WriteBatch, txs: &[IndexTx<'_>], aux: &G::Aux, mem_data: &mut GroupHistoryMemData, ) -> Result<()> { let t_start = Instant::now(); let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?; for ((mut new_tx_nums, member_ser), mut num_txs) in fetched .grouped_txs .into_values() .zip(fetched.ser_members) .zip(fetched.members_num_txs) { let mut page_num = num_txs / self.conf.page_size; let mut last_page_num_txs = num_txs % self.conf.page_size; loop { let space_left = (self.conf.page_size - last_page_num_txs) as usize; let num_new_txs = space_left.min(new_tx_nums.len()); let merge_tx_nums = db_serialize_vec(new_tx_nums.drain(..num_new_txs))?; batch.merge_cf( self.col.cf_page, key_for_member_page(member_ser.as_ref(), page_num), [[CONCAT].as_ref(), &merge_tx_nums].concat(), ); num_txs += num_new_txs as NumTxs; if new_tx_nums.is_empty() { batch.put_cf( self.col.cf_num_txs, member_ser.as_ref(), num_txs.to_be_bytes(), ); break; } last_page_num_txs = 0; page_num += 1; } } mem_data.stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } /// Group the txs, then delete them from each member of the group. pub fn delete( &self, batch: &mut WriteBatch, txs: &[IndexTx<'_>], aux: &G::Aux, mem_data: &mut GroupHistoryMemData, ) -> Result<()> { let t_start = Instant::now(); let fetched = self.fetch_members_num_txs(txs, aux, mem_data)?; for ((mut removed_tx_nums, member_ser), mut num_txs) in fetched .grouped_txs .into_values() .zip(fetched.ser_members) .zip(fetched.members_num_txs) { let mut num_remaining_removes = removed_tx_nums.len(); let mut page_num = num_txs / self.conf.page_size; let mut last_page_num_txs = num_txs % self.conf.page_size; loop { let num_page_removes = (last_page_num_txs as usize).min(num_remaining_removes); let key = key_for_member_page(member_ser.as_ref(), page_num); if num_page_removes == last_page_num_txs as usize { batch.delete_cf(self.col.cf_page, key) } else { let merge_removed_txs = db_serialize_vec( removed_tx_nums .drain(removed_tx_nums.len() - num_page_removes..), )?; let num_trimmed_bytes = merge_removed_txs.len() as NumTxs; batch.merge_cf( self.col.cf_page, key, [[TRIM].as_ref(), &num_trimmed_bytes.to_be_bytes()] .concat(), ); } num_txs -= num_page_removes as NumTxs; num_remaining_removes -= num_page_removes; if num_remaining_removes == 0 { if num_txs > 0 { batch.put_cf( self.col.cf_num_txs, member_ser.as_ref(), num_txs.to_be_bytes(), ); } else { batch.delete_cf( self.col.cf_num_txs, member_ser.as_ref(), ); } break; } if page_num > 0 { page_num -= 1; last_page_num_txs = self.conf.page_size; } } } mem_data.stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } + /// Clear all history data from the DB + pub fn wipe(&self, batch: &mut WriteBatch) { + batch.delete_range_cf(self.col.cf_page, [].as_ref(), &[0xff; 16]); + batch.delete_range_cf(self.col.cf_num_txs, [].as_ref(), &[0xff; 16]); + } + fn fetch_members_num_txs<'tx>( &self, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, mem_data: &mut GroupHistoryMemData, ) -> Result> { let GroupHistoryMemData { stats } = mem_data; let t_group = Instant::now(); let grouped_txs = self.group_txs(txs, aux); stats.t_group += t_group.elapsed().as_secs_f64(); let t_ser_members = Instant::now(); let ser_members = grouped_txs .keys() .map(|key| self.group.ser_member(key)) .collect::>(); stats.t_ser_members += t_ser_members.elapsed().as_secs_f64(); stats.n_total += grouped_txs.len(); let t_fetch = Instant::now(); let num_txs_keys = ser_members.iter().map(|member_ser| member_ser.as_ref()); let fetched_num_txs = self.col .db .multi_get(self.col.cf_num_txs, num_txs_keys, true)?; let mut members_num_txs = Vec::with_capacity(fetched_num_txs.len()); for db_num_txs in fetched_num_txs { members_num_txs.push(match db_num_txs { Some(db_num_txs) => bytes_to_num_txs(&db_num_txs)?, None => 0, }); } stats.t_fetch += t_fetch.elapsed().as_secs_f64(); Ok(FetchedNumTxs { members_num_txs, grouped_txs, ser_members, }) } fn group_txs<'tx>( &self, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, ) -> BTreeMap, Vec> { let mut group_tx_nums = BTreeMap::, Vec>::new(); for index_tx in txs { let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for member in tx_members_for_group(&self.group, query, aux) { let tx_nums = group_tx_nums.entry(member).or_default(); if let Some(&last_tx_num) = tx_nums.last() { if last_tx_num == index_tx.tx_num { continue; } } tx_nums.push(index_tx.tx_num); } } group_tx_nums } pub(crate) fn add_cfs(columns: &mut Vec) { let conf = G::tx_history_conf(); let mut page_options = rocksdb::Options::default(); let merge_op_name = format!("{}::merge_op_concat", conf.cf_page_name); page_options.set_merge_operator( merge_op_name.as_str(), catch_merge_errors( init_concat_trim, apply_concat_trim, ser_concat_trim, ), partial_merge_concat_trim, ); columns.push(rocksdb::ColumnFamilyDescriptor::new( conf.cf_page_name, page_options, )); columns.push(rocksdb::ColumnFamilyDescriptor::new( conf.cf_num_txs_name, rocksdb::Options::default(), )); } } impl std::fmt::Debug for GroupHistoryColumn<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GroupHistoryColumn {{ .. }}") } } impl<'a, G: Group> GroupHistoryReader<'a, G> { /// Create a new [`GroupHistoryReader`]. pub fn new(db: &'a Db) -> Result { let conf = G::tx_history_conf(); let col = GroupHistoryColumn::new(db, &conf)?; Ok(GroupHistoryReader { col, conf, phantom: PhantomData, }) } /// Read the tx_nums for the given member on the given page, or None, if the /// page doesn't exist in the DB. pub fn page_txs( &self, member_ser: &[u8], page_num: PageNum, ) -> Result>> { self.col.get_page_txs(member_ser, page_num) } /// Total number of pages and txs for this serialized member. /// The result tuple is (num_pages, num_txs). pub fn member_num_pages_and_txs( &self, member_ser: &[u8], ) -> Result<(usize, usize)> { let num_txs = match self.col.db.get(self.col.cf_num_txs, member_ser)? { Some(bytes) => bytes_to_num_txs(&bytes)?, None => return Ok((0, 0)), }; let num_pages = (num_txs + self.conf.page_size - 1) / self.conf.page_size; Ok((num_pages as usize, num_txs as usize)) } /// Size of pages the data is stored in. pub fn page_size(&self) -> usize { self.conf.page_size as usize } } fn key_for_member_page(member_ser: &[u8], page_num: PageNum) -> Vec { [member_ser, &page_num.to_be_bytes()].concat() } #[cfg(test)] mod tests { use std::cell::RefCell; use abc_rust_error::Result; use bitcoinsuite_core::tx::Tx; use rocksdb::WriteBatch; use crate::{ db::Db, index_tx::prepare_indexed_txs, io::{ group_history::PageNum, merge::{check_for_errors, MERGE_ERROR_LOCK}, BlockTxs, GroupHistoryMemData, GroupHistoryReader, GroupHistoryWriter, TxEntry, TxNum, TxWriter, TxsMemData, }, test::{make_value_tx, ser_value, ValueGroup}, }; #[test] fn test_value_group_history() -> Result<()> { let _guard = MERGE_ERROR_LOCK.lock().unwrap(); abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--group_history")?; let mut cfs = Vec::new(); GroupHistoryWriter::::add_cfs(&mut cfs); TxWriter::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let tx_writer = TxWriter::new(&db)?; let group_writer = GroupHistoryWriter::new(&db, ValueGroup)?; let group_reader = GroupHistoryReader::::new(&db)?; let mem_data = RefCell::new(GroupHistoryMemData::default()); let txs_mem_data = RefCell::new(TxsMemData::default()); let block_height = RefCell::new(-1); let txs_batch = |txs: &[Tx]| BlockTxs { txs: txs .iter() .map(|tx| TxEntry { txid: tx.txid(), ..Default::default() }) .collect(), block_height: *block_height.borrow(), }; let connect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); *block_height.borrow_mut() += 1; let first_tx_num = tx_writer.insert( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.insert( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; Ok(()) }; let disconnect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); let first_tx_num = tx_writer.delete( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.delete( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; *block_height.borrow_mut() -= 1; Ok(()) }; let read_page = |val: i64, page_num: PageNum| -> Result>> { group_reader.page_txs(&ser_value(val), page_num) }; let read_num_pages_and_txs = |val: i64| -> Result<(usize, usize)> { group_reader.member_num_pages_and_txs(&ser_value(val)) }; // Only adds an entry for value=10 (coinbase inputs are ignored) let block0 = [make_value_tx(0, [0xffff], [10])]; connect_block(&block0)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_num_pages_and_txs(0xffff)?, (0, 0)); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_num_pages_and_txs(10)?, (1, 1)); // Block that adds a lot of pages to value=10, one entry to value=20 let block1 = [ make_value_tx(1, [0xffff], [10]), make_value_tx(2, [10], []), make_value_tx(3, [20], []), // value=20 make_value_tx(4, [10], []), make_value_tx(5, [10], []), make_value_tx(6, [10], []), make_value_tx(7, [10], []), make_value_tx(8, [10], []), make_value_tx(9, [10], []), ]; connect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 4])); assert_eq!(read_page(10, 1)?, Some(vec![5, 6, 7, 8])); assert_eq!(read_page(10, 2)?, Some(vec![9])); assert_eq!(read_page(10, 3)?, None); assert_eq!(read_num_pages_and_txs(10)?, (3, 9)); assert_eq!(read_page(20, 0)?, Some(vec![3])); assert_eq!(read_page(20, 1)?, None); assert_eq!(read_num_pages_and_txs(20)?, (1, 1)); // Only tx_num=0 remains // The other pages have been removed from the DB entirely disconnect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(10, 2)?, None); assert_eq!(read_page(20, 0)?, None); // Re-org block, with all kinds of input + output values let block1 = [ make_value_tx(1, [0xffff], [10]), make_value_tx(2, [10], [10, 20, 30]), make_value_tx(3, [10, 40], [10, 10, 40]), make_value_tx(4, [10], [40, 30, 40]), ]; connect_block(&block1)?; // all txs add to value=10, with 2 pages assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4])); assert_eq!(read_num_pages_and_txs(10)?, (2, 5)); // only tx_num=2 adds to value=20 assert_eq!(read_page(20, 0)?, Some(vec![2])); assert_eq!(read_num_pages_and_txs(20)?, (1, 1)); // tx_num=2 and tx_num=4 add to value=30 assert_eq!(read_page(30, 0)?, Some(vec![2, 4])); assert_eq!(read_num_pages_and_txs(30)?, (1, 2)); // tx_num=3 and tx_num=4 add to value=40 assert_eq!(read_page(40, 0)?, Some(vec![3, 4])); assert_eq!(read_num_pages_and_txs(40)?, (1, 2)); // Delete that block also disconnect_block(&block1)?; assert_eq!(read_page(0xffff, 0)?, None); assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(20, 0)?, None); assert_eq!(read_page(30, 0)?, None); assert_eq!(read_page(40, 0)?, None); // Add it back in connect_block(&block1)?; // Add new block, adding 1 tx to 20, 6 txs to 30, 4 txs to 40 let block2 = [ make_value_tx(5, [0xffff], [40, 30]), make_value_tx(6, [30, 10], [30]), make_value_tx(7, [10], [30]), make_value_tx(8, [40], [30]), make_value_tx(9, [10], [20]), ]; connect_block(&block2)?; assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4, 6, 7, 9])); assert_eq!(read_page(10, 2)?, None); assert_eq!(read_num_pages_and_txs(10)?, (2, 8)); assert_eq!(read_page(20, 0)?, Some(vec![2, 9])); assert_eq!(read_page(20, 1)?, None); assert_eq!(read_num_pages_and_txs(20)?, (1, 2)); assert_eq!(read_page(30, 0)?, Some(vec![2, 4, 5, 6])); assert_eq!(read_page(30, 1)?, Some(vec![7, 8])); assert_eq!(read_page(30, 2)?, None); assert_eq!(read_num_pages_and_txs(30)?, (2, 6)); assert_eq!(read_page(40, 0)?, Some(vec![3, 4, 5, 8])); assert_eq!(read_page(40, 1)?, None); assert_eq!(read_num_pages_and_txs(40)?, (1, 4)); // Remove all blocks disconnect_block(&block2)?; assert_eq!(read_page(10, 0)?, Some(vec![0, 1, 2, 3])); assert_eq!(read_page(10, 1)?, Some(vec![4])); assert_eq!(read_page(20, 0)?, Some(vec![2])); assert_eq!(read_page(30, 0)?, Some(vec![2, 4])); assert_eq!(read_page(30, 1)?, None); assert_eq!(read_page(40, 0)?, Some(vec![3, 4])); assert_eq!(read_page(40, 1)?, None); disconnect_block(&block1)?; assert_eq!(read_page(10, 0)?, Some(vec![0])); assert_eq!(read_page(10, 1)?, None); assert_eq!(read_page(20, 0)?, None); assert_eq!(read_page(30, 0)?, None); assert_eq!(read_page(40, 0)?, None); disconnect_block(&block0)?; assert_eq!(read_page(10, 0)?, None); drop(db); rocksdb::DB::destroy(&rocksdb::Options::default(), tempdir.path())?; let _ = check_for_errors(); Ok(()) } } diff --git a/chronik/chronik-db/src/io/group_utxos.rs b/chronik/chronik-db/src/io/group_utxos.rs index beced8624..35d39510a 100644 --- a/chronik/chronik-db/src/io/group_utxos.rs +++ b/chronik/chronik-db/src/io/group_utxos.rs @@ -1,613 +1,618 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`GroupUtxoWriter`] and [`GroupUtxoReader`]. use std::{marker::PhantomData, time::Instant}; use abc_rust_error::Result; use chronik_util::log; use rocksdb::{compaction_filter::Decision, WriteBatch}; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::{ db::{Db, CF}, group::{Group, GroupQuery, UtxoData}, index_tx::IndexTx, io::{group_utxos::GroupUtxoError::*, merge::catch_merge_errors, TxNum}, ser::{db_deserialize, db_deserialize_vec, db_serialize, db_serialize_vec}, }; const INSERT: u8 = b'I'; const DELETE: u8 = b'D'; /// Configuration for group utxos reader/writers. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct GroupUtxoConf { /// Column family to store the group utxos entries. pub cf_name: &'static str, } struct GroupUtxoColumn<'a> { db: &'a Db, cf: &'a CF, } /// Outpoint in the DB, but with [`TxNum`] instead of `TxId` for the txid. #[derive( Clone, Copy, Debug, Default, Deserialize, Eq, Ord, PartialEq, PartialOrd, Serialize, )] pub struct UtxoOutpoint { /// [`TxNum`] of tx of the outpoint. pub tx_num: TxNum, /// Output of the tx referenced by the outpoint. pub out_idx: u32, } /// Entry in the UTXO DB for a group. #[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)] pub struct UtxoEntry { /// Outpoint of the UTXO. pub outpoint: UtxoOutpoint, /// Data attached to the UTXO attached to the UTXO for easy access, e.g. /// the UTXO value and/or script. pub data: D, } /// Write UTXOs of a group to the DB. #[derive(Debug)] pub struct GroupUtxoWriter<'a, G: Group> { col: GroupUtxoColumn<'a>, group: G, } /// Read UTXOs of a group from the DB. #[derive(Debug)] pub struct GroupUtxoReader<'a, G: Group> { col: GroupUtxoColumn<'a>, phantom: PhantomData, } /// In-memory data for indexing UTXOs of a group. #[derive(Debug, Default)] pub struct GroupUtxoMemData { /// Stats about cache hits, num requests etc. pub stats: GroupUtxoStats, } /// Stats about cache hits, num requests etc. #[derive(Clone, Debug, Default)] pub struct GroupUtxoStats { /// Total number of txs updated. pub n_total: usize, /// Time [s] for insert/delete. pub t_total: f64, } /// Error indicating something went wrong with the tx index. #[derive(Debug, Error, PartialEq, Eq)] pub enum GroupUtxoError { /// UTXO already in the DB #[error( "Duplicate UTXO: {0:?} has been added twice to the member's UTXOs" )] DuplicateUtxo(UtxoOutpoint), /// UTXO already in the DB #[error("UTXO doesn't exist: {0:?} is not in the member's UTXOs")] UtxoDoesntExist(UtxoOutpoint), /// Used merge_cf incorrectly, prefix must either be I or D. #[error( "Bad usage of merge: Unknown prefix {0:02x}, expected I or D: {}", hex::encode(.1), )] UnknownOperandPrefix(u8, Vec), /// Upgrade failed. #[error( "Upgrade failed, could not parse {} for key {}: {error}", hex::encode(.value), hex::encode(.key), )] UpgradeFailed { /// Key that failed key: Box<[u8]>, /// Value that failed parsing in the old format value: Box<[u8]>, /// Error message error: String, }, } fn partial_merge_utxos( _key: &[u8], _existing_value: Option<&[u8]>, _operands: &rocksdb::MergeOperands, ) -> Option> { // We don't use partial merge None } fn init_merge_utxos Deserialize<'a>>( _key: &[u8], existing_value: Option<&[u8]>, operands: &rocksdb::MergeOperands, ) -> Result>> { let mut entries = match existing_value { Some(entries_ser) => db_deserialize_vec::>(entries_ser)?, None => vec![], }; if operands.into_iter().all(|operand| operand[0] == INSERT) { // If we only have inserts, we can pre-allocate the exact memory we need entries.reserve_exact(operands.len()); } Ok(entries) } fn apply_merge_utxos Deserialize<'a>>( _key: &[u8], entries: &mut Vec>, operand: &[u8], ) -> Result<()> { match operand[0] { INSERT => { let new_entry = db_deserialize::>(&operand[1..])?; match entries.binary_search_by_key(&&new_entry.outpoint, |entry| { &entry.outpoint }) { Ok(_) => return Err(DuplicateUtxo(new_entry.outpoint).into()), Err(insert_idx) => entries.insert(insert_idx, new_entry), } } DELETE => { let delete_outpoint = db_deserialize::(&operand[1..])?; match entries.binary_search_by_key(&&delete_outpoint, |entry| { &entry.outpoint }) { Ok(delete_idx) => entries.remove(delete_idx), Err(_) => return Err(UtxoDoesntExist(delete_outpoint).into()), }; } _ => { return Err( UnknownOperandPrefix(operand[0], operand.to_vec()).into() ); } } Ok(()) } fn ser_merge_utxos( _key: &[u8], entries: Vec>, ) -> Result> { db_serialize_vec::>(entries) } // We must use a compaction filter that removes empty entries fn compaction_filter_utxos(_level: u32, _key: &[u8], value: &[u8]) -> Decision { if value.is_empty() { Decision::Remove } else { Decision::Keep } } impl<'a> GroupUtxoColumn<'a> { fn new(db: &'a Db, conf: &GroupUtxoConf) -> Result { let cf = db.cf(conf.cf_name)?; Ok(GroupUtxoColumn { db, cf }) } } impl<'a, G: Group> GroupUtxoWriter<'a, G> { /// Create a new [`GroupUtxoWriter`]. pub fn new(db: &'a Db, group: G) -> Result { let conf = G::utxo_conf(); let col = GroupUtxoColumn::new(db, &conf)?; Ok(GroupUtxoWriter { col, group }) } /// Insert the txs of a block from the UTXOs in the DB for the group. /// /// Add all the UTXOs created by the outputs of the txs to the DB, remove /// all the UTXOs spend by the inputs of the txs. pub fn insert<'tx>( &self, batch: &mut WriteBatch, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, mem_data: &mut GroupUtxoMemData, ) -> Result<()> { let stats = &mut mem_data.stats; stats.n_total += txs.len(); let t_start = Instant::now(); for index_tx in txs { let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.output_members(query, aux) { let new_entry = Self::output_utxo(index_tx, item.idx); self.insert_utxo_entry(batch, &item.member, new_entry)?; } } for index_tx in txs { if index_tx.is_coinbase { continue; } let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.input_members(query, aux) { let delete_outpoint = Self::input_utxo(index_tx, item.idx).outpoint; self.delete_utxo_entry(batch, &item.member, &delete_outpoint)?; } } stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } /// Remove the txs of a block from the UTXOs in the DB for the group. /// /// Add all the UTXOs spent by the inputs of the txs and remove all the /// UTXOs created by the outputs of the txs. pub fn delete<'tx>( &self, batch: &mut WriteBatch, txs: &'tx [IndexTx<'tx>], aux: &G::Aux, mem_data: &mut GroupUtxoMemData, ) -> Result<()> { let stats = &mut mem_data.stats; stats.n_total += txs.len(); let t_start = Instant::now(); for index_tx in txs { if index_tx.is_coinbase { continue; } let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.input_members(query, aux) { let new_entry = Self::input_utxo(index_tx, item.idx); self.insert_utxo_entry(batch, &item.member, new_entry)?; } } for index_tx in txs { let query = GroupQuery { is_coinbase: index_tx.is_coinbase, tx: index_tx.tx, }; for item in self.group.output_members(query, aux) { let delete_outpoint = Self::output_utxo(index_tx, item.idx).outpoint; self.delete_utxo_entry(batch, &item.member, &delete_outpoint)?; } } stats.t_total += t_start.elapsed().as_secs_f64(); Ok(()) } + /// Clear all UTXO data from the DB + pub fn wipe(&self, batch: &mut WriteBatch) { + batch.delete_range_cf(self.col.cf, [].as_ref(), &[0xff; 16]); + } + /// Upgrade the DB from version 10 to version 11 pub fn upgrade_10_to_11(&self) -> Result<()> { log!( "Upgrading Chronik UTXO set for {}. Do not kill the process \ during upgrade, it will corrupt the database.\n", G::utxo_conf().cf_name ); let estimated_num_keys = self.col.db.estimate_num_keys(self.col.cf)?.unwrap_or(0); let mut batch = WriteBatch::default(); for (db_idx, old_utxos_ser) in self.col.db.full_iterator(self.col.cf).enumerate() { let (key, old_utxos_ser) = old_utxos_ser?; let utxos = match db_deserialize::>>( &old_utxos_ser, ) { Ok(utxos) => utxos, Err(err) => { return Err(UpgradeFailed { key, value: old_utxos_ser, error: err.to_string(), } .into()); } }; let new_utxos_ser = db_serialize_vec::>(utxos)?; batch.put_cf(self.col.cf, key, new_utxos_ser); if db_idx % 10000 == 0 { log!("Upgraded {db_idx} of {estimated_num_keys} (estimated)\n"); self.col.db.write_batch(batch)?; batch = WriteBatch::default(); } } self.col.db.write_batch(batch)?; log!("Upgrade for {} complete\n", G::utxo_conf().cf_name); Ok(()) } pub(crate) fn add_cfs(columns: &mut Vec) { let cf_name = G::utxo_conf().cf_name; let mut options = rocksdb::Options::default(); let merge_op_name = format!("{}::merge_op_utxos", cf_name); options.set_merge_operator( merge_op_name.as_str(), catch_merge_errors( init_merge_utxos::, apply_merge_utxos::, ser_merge_utxos::, ), partial_merge_utxos, ); let compaction_filter_name = format!("{}::compaction_filter_utxos", cf_name); options.set_compaction_filter( &compaction_filter_name, compaction_filter_utxos, ); columns.push(rocksdb::ColumnFamilyDescriptor::new(cf_name, options)); } fn output_utxo( index_tx: &IndexTx<'_>, idx: usize, ) -> UtxoEntry { UtxoEntry { outpoint: UtxoOutpoint { tx_num: index_tx.tx_num, out_idx: idx as u32, }, data: G::UtxoData::from_output(&index_tx.tx.outputs[idx]), } } fn input_utxo( index_tx: &IndexTx<'_>, idx: usize, ) -> UtxoEntry { UtxoEntry { outpoint: UtxoOutpoint { tx_num: index_tx.input_nums[idx], out_idx: index_tx.tx.inputs[idx].prev_out.out_idx, }, data: index_tx.tx.inputs[idx] .coin .as_ref() .map(|coin| G::UtxoData::from_output(&coin.output)) .unwrap_or_default(), } } fn insert_utxo_entry( &self, batch: &mut WriteBatch, member: &G::Member<'_>, new_entry: UtxoEntry, ) -> Result<()> { batch.merge_cf( self.col.cf, self.group.ser_member(member), [[INSERT].as_ref(), &db_serialize(&new_entry)?].concat(), ); Ok(()) } fn delete_utxo_entry( &self, batch: &mut WriteBatch, member: &G::Member<'_>, delete_outpoint: &UtxoOutpoint, ) -> Result<()> { batch.merge_cf( self.col.cf, self.group.ser_member(member), [[DELETE].as_ref(), &db_serialize(&delete_outpoint)?].concat(), ); Ok(()) } } impl<'a, G: Group> GroupUtxoReader<'a, G> { /// Create a new [`GroupUtxoReader`]. pub fn new(db: &'a Db) -> Result { let conf = G::utxo_conf(); let col = GroupUtxoColumn::new(db, &conf)?; Ok(GroupUtxoReader { col, phantom: PhantomData, }) } /// Query the UTXOs for the given member. pub fn utxos( &self, member: &[u8], ) -> Result>>> { match self.col.db.get(self.col.cf, member)? { Some(entry) => { let entries = db_deserialize_vec::>(&entry)?; if entries.is_empty() { // Usually compaction catches this and removes such entries, // but it's run later so have to filter here manually return Ok(None); } Ok(Some(entries)) } None => Ok(None), } } } impl std::fmt::Debug for GroupUtxoColumn<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "GroupUtxoColumn {{ .. }}") } } #[cfg(test)] mod tests { use std::cell::RefCell; use abc_rust_error::Result; use bitcoinsuite_core::tx::Tx; use rocksdb::WriteBatch; use crate::{ db::Db, index_tx::prepare_indexed_txs, io::{ BlockTxs, GroupUtxoMemData, GroupUtxoReader, GroupUtxoWriter, TxEntry, TxWriter, TxsMemData, UtxoEntry, UtxoOutpoint, }, test::{make_inputs_tx, ser_value, ValueGroup}, }; #[test] fn test_value_group_utxos() -> Result<()> { abc_rust_error::install(); let tempdir = tempdir::TempDir::new("chronik-db--group_history")?; let mut cfs = Vec::new(); GroupUtxoWriter::::add_cfs(&mut cfs); TxWriter::add_cfs(&mut cfs); let db = Db::open_with_cfs(tempdir.path(), cfs)?; let tx_writer = TxWriter::new(&db)?; let mem_data = RefCell::new(GroupUtxoMemData::default()); let txs_mem_data = RefCell::new(TxsMemData::default()); let group_writer = GroupUtxoWriter::new(&db, ValueGroup)?; let group_reader = GroupUtxoReader::::new(&db)?; let block_height = RefCell::new(-1); let txs_batch = |txs: &[Tx]| BlockTxs { txs: txs .iter() .map(|tx| TxEntry { txid: tx.txid(), ..Default::default() }) .collect(), block_height: *block_height.borrow(), }; let connect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); *block_height.borrow_mut() += 1; let first_tx_num = tx_writer.insert( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.insert( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; Ok(()) }; let disconnect_block = |txs: &[Tx]| -> Result<()> { let mut batch = WriteBatch::default(); let first_tx_num = tx_writer.delete( &mut batch, &txs_batch(txs), &mut txs_mem_data.borrow_mut(), )?; let index_txs = prepare_indexed_txs(&db, first_tx_num, txs)?; group_writer.delete( &mut batch, &index_txs, &(), &mut mem_data.borrow_mut(), )?; db.write_batch(batch)?; *block_height.borrow_mut() -= 1; Ok(()) }; let utxo = |tx_num, out_idx, value| UtxoEntry { outpoint: UtxoOutpoint { tx_num, out_idx }, data: value, }; let read_utxos = |val: i64| group_reader.utxos(&ser_value(val)); let block0 = vec![make_inputs_tx(0x01, [(0x00, u32::MAX, 0xffff)], [100, 200])]; connect_block(&block0)?; assert_eq!(read_utxos(100)?, Some(vec![utxo(0, 0, 100)])); assert_eq!(read_utxos(200)?, Some(vec![utxo(0, 1, 200)])); let block1 = vec![ make_inputs_tx(0x02, [(0x00, u32::MAX, 0xffff)], [200]), make_inputs_tx(0x03, [(0x01, 0, 100)], [10, 20, 10]), make_inputs_tx( 0x04, [(0x03, 0, 10), (0x01, 1, 200), (0x03, 1, 20)], [200], ), ]; connect_block(&block1)?; assert_eq!(read_utxos(10)?, Some(vec![utxo(2, 2, 10)])); assert_eq!(read_utxos(20)?, None); assert_eq!(read_utxos(100)?, None); assert_eq!( read_utxos(200)?, Some(vec![utxo(1, 0, 200), utxo(3, 0, 200)]), ); disconnect_block(&block1)?; assert_eq!(read_utxos(10)?, None); assert_eq!(read_utxos(20)?, None); assert_eq!(read_utxos(100)?, Some(vec![utxo(0, 0, 100)])); assert_eq!(read_utxos(200)?, Some(vec![utxo(0, 1, 200)])); // Reorg block let block1 = vec![ make_inputs_tx(0x02, [(0x00, u32::MAX, 0xffff)], [200]), make_inputs_tx(0x10, [(0x01, 0, 100)], [100, 200, 100]), make_inputs_tx( 0x11, [(0x10, 0, 100), (0x10, 1, 200), (0x01, 1, 200)], [200], ), make_inputs_tx(0x12, [(0x11, 0, 200)], [200]), ]; connect_block(&block1)?; assert_eq!(read_utxos(100)?, Some(vec![utxo(2, 2, 100)])); assert_eq!( read_utxos(200)?, Some(vec![utxo(1, 0, 200), utxo(4, 0, 200)]), ); disconnect_block(&block1)?; assert_eq!(read_utxos(100)?, Some(vec![utxo(0, 0, 100)])); assert_eq!(read_utxos(200)?, Some(vec![utxo(0, 1, 200)])); disconnect_block(&block0)?; assert_eq!(read_utxos(100)?, None); assert_eq!(read_utxos(200)?, None); Ok(()) } } diff --git a/chronik/chronik-db/src/io/metadata.rs b/chronik/chronik-db/src/io/metadata.rs index 4d2765107..3f972f67d 100644 --- a/chronik/chronik-db/src/io/metadata.rs +++ b/chronik/chronik-db/src/io/metadata.rs @@ -1,115 +1,142 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. use abc_rust_error::Result; use rocksdb::ColumnFamilyDescriptor; use crate::{ db::{Db, CF, CF_META}, ser::{db_deserialize, db_serialize}, }; /// Type for the version of the database, to let us know when we're outdated. pub type SchemaVersion = u64; /// Field in the `meta` cf storing the schema version. pub const FIELD_SCHEMA_VERSION: &[u8] = b"SCHEMA_VERSION"; /// Field in the `meta` cf storing whether Chronik had the token index enabled pub const FIELD_TOKEN_INDEX_ENABLED: &[u8] = b"TOKEN_INDEX_ENABLED"; +/// Field in the `meta` cf whether Chronik had the LOKAD ID index enabled +pub const FIELD_LOKAD_ID_INDEX_ENABLED: &[u8] = b"LOKAD_ID_INDEX_ENABLED"; + /// Write database metadata pub struct MetadataWriter<'a> { cf: &'a CF, } /// Read database metadata pub struct MetadataReader<'a> { db: &'a Db, cf: &'a CF, } impl<'a> MetadataWriter<'a> { /// Create a writer to the database for metadata pub fn new(db: &'a Db) -> Result { let cf = db.cf(CF_META)?; Ok(MetadataWriter { cf }) } /// Update the schema version of the database pub fn update_schema_version( &self, batch: &mut rocksdb::WriteBatch, schema_version: SchemaVersion, ) -> Result<()> { batch.put_cf( self.cf, FIELD_SCHEMA_VERSION, db_serialize(&schema_version)?, ); Ok(()) } /// Update the flag storing whether the token index is enabled in the DB pub fn update_is_token_index_enabled( &self, batch: &mut rocksdb::WriteBatch, is_token_index_enabled: bool, ) -> Result<()> { batch.put_cf( self.cf, FIELD_TOKEN_INDEX_ENABLED, db_serialize::(&is_token_index_enabled)?, ); Ok(()) } + /// Update the flag storing whether the LOKAD ID index is enabled in the DB + pub fn update_is_lokad_id_index_enabled( + &self, + batch: &mut rocksdb::WriteBatch, + is_lokad_id_index_enabled: bool, + ) -> Result<()> { + batch.put_cf( + self.cf, + FIELD_LOKAD_ID_INDEX_ENABLED, + db_serialize::(&is_lokad_id_index_enabled)?, + ); + Ok(()) + } + pub(crate) fn add_cfs(columns: &mut Vec) { columns.push(ColumnFamilyDescriptor::new( CF_META, rocksdb::Options::default(), )); } } impl std::fmt::Debug for MetadataWriter<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "MetadataWriter {{ ... }}") } } impl<'a> MetadataReader<'a> { /// Create a reader from the database for metadata pub fn new(db: &'a Db) -> Result { let cf = db.cf(CF_META)?; Ok(MetadataReader { db, cf }) } /// Read the schema version of the database pub fn schema_version(&self) -> Result> { match self.db.get(self.cf, FIELD_SCHEMA_VERSION)? { Some(ser_schema_version) => { Ok(Some(db_deserialize(&ser_schema_version)?)) } None => Ok(None), } } /// Read whether the token index was enabled pub fn is_token_index_enabled(&self) -> Result { match self.db.get(self.cf, FIELD_TOKEN_INDEX_ENABLED)? { Some(ser_token_index) => { Ok(db_deserialize::(&ser_token_index)?) } // By default, the token index is enabled None => Ok(true), } } + + /// Read whether the LOKAD ID index is enabled, or None if unspecified + pub fn is_lokad_id_index_enabled(&self) -> Result> { + match self.db.get(self.cf, FIELD_LOKAD_ID_INDEX_ENABLED)? { + Some(ser_token_index) => { + Ok(Some(db_deserialize::(&ser_token_index)?)) + } + None => Ok(None), + } + } } impl std::fmt::Debug for MetadataReader<'_> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "MetadataReader {{ ... }}") } } diff --git a/chronik/chronik-db/src/mem/mempool.rs b/chronik/chronik-db/src/mem/mempool.rs index caf89f6f7..13feb6117 100644 --- a/chronik/chronik-db/src/mem/mempool.rs +++ b/chronik/chronik-db/src/mem/mempool.rs @@ -1,207 +1,230 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`Mempool`], to index mempool txs. use std::{borrow::Cow, collections::HashMap}; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; use thiserror::Error; use crate::{ db::Db, groups::{ - MempoolScriptHistory, MempoolScriptUtxos, MempoolTokenIdHistory, - MempoolTokenIdUtxos, ScriptGroup, TokenIdGroup, TokenIdGroupAux, + LokadIdGroup, MempoolLokadIdHistory, MempoolScriptHistory, + MempoolScriptUtxos, MempoolTokenIdHistory, MempoolTokenIdUtxos, + ScriptGroup, TokenIdGroup, TokenIdGroupAux, }, mem::{MempoolSpentBy, MempoolTokens}, }; /// Mempool of the indexer. This stores txs from the node again, but having a /// copy here simplifies the implementation significantly. If this redundancy /// becomes an issue (e.g. excessive RAM usage), we can optimize this later. #[derive(Debug)] pub struct Mempool { txs: HashMap, script_history: MempoolScriptHistory, script_utxos: MempoolScriptUtxos, spent_by: MempoolSpentBy, tokens: MempoolTokens, token_id_history: MempoolTokenIdHistory, token_id_utxos: MempoolTokenIdUtxos, + lokad_id_history: MempoolLokadIdHistory, is_token_index_enabled: bool, + is_lokad_id_index_enabled: bool, } /// Result after adding a tx to the mempool #[derive(Debug)] pub struct MempoolResult<'m> { /// Mempool tx that was just added pub mempool_tx: Cow<'m, MempoolTx>, /// [`TokenIdGroupAux`] generated while indexing the mempool tx pub token_id_aux: TokenIdGroupAux, } /// Transaction in the mempool. #[derive(Clone, Debug, PartialEq, Eq)] pub struct MempoolTx { /// Transaction, including spent coins. pub tx: Tx, /// Time this tx has been added to the node's mempool. pub time_first_seen: i64, } /// Something went wrong with the mempool. #[derive(Debug, Error)] pub enum MempoolError { /// Tried removing a tx from the mempool that doesn't exist. #[error("No such mempool tx: {0}")] NoSuchMempoolTx(TxId), /// Tried adding a tx to the mempool that already exists. #[error("Tx {0} already exists in mempool")] DuplicateTx(TxId), } use self::MempoolError::*; impl Mempool { /// Create a new [`Mempool`]. - pub fn new(script_group: ScriptGroup, enable_token_index: bool) -> Self { + pub fn new( + script_group: ScriptGroup, + enable_token_index: bool, + is_lokad_id_index_enabled: bool, + ) -> Self { Mempool { txs: HashMap::new(), script_history: MempoolScriptHistory::new(script_group.clone()), script_utxos: MempoolScriptUtxos::new(script_group), spent_by: MempoolSpentBy::default(), tokens: MempoolTokens::default(), token_id_history: MempoolTokenIdHistory::new(TokenIdGroup), token_id_utxos: MempoolTokenIdUtxos::new(TokenIdGroup), + lokad_id_history: MempoolLokadIdHistory::new(LokadIdGroup), is_token_index_enabled: enable_token_index, + is_lokad_id_index_enabled, } } /// Insert tx into the mempool. pub fn insert( &mut self, db: &Db, mempool_tx: MempoolTx, ) -> Result> { let txid = mempool_tx.tx.txid(); self.script_history.insert(&mempool_tx, &()); self.script_utxos.insert( &mempool_tx, |txid| self.txs.contains_key(txid), &(), )?; self.spent_by.insert(&mempool_tx)?; let token_id_aux; if self.is_token_index_enabled { self.tokens .insert(db, &mempool_tx, |txid| self.txs.contains_key(txid))?; token_id_aux = TokenIdGroupAux::from_mempool(&mempool_tx.tx, &self.tokens); self.token_id_history.insert(&mempool_tx, &token_id_aux); self.token_id_utxos.insert( &mempool_tx, |txid| self.txs.contains_key(txid), &token_id_aux, )?; } else { token_id_aux = TokenIdGroupAux::default(); } + if self.is_lokad_id_index_enabled { + self.lokad_id_history.insert(&mempool_tx, &()); + } if self.txs.insert(txid, mempool_tx).is_some() { return Err(DuplicateTx(txid).into()); } Ok(MempoolResult { mempool_tx: Cow::Borrowed(&self.txs[&txid]), token_id_aux, }) } /// Remove tx from the mempool. pub fn remove(&mut self, txid: TxId) -> Result> { let mempool_tx = match self.txs.remove(&txid) { Some(mempool_tx) => mempool_tx, None => return Err(NoSuchMempoolTx(txid).into()), }; self.script_history.remove(&mempool_tx, &()); self.script_utxos.remove( &mempool_tx, |txid| self.txs.contains_key(txid), &(), )?; self.spent_by.remove(&mempool_tx)?; let token_id_aux; if self.is_token_index_enabled { token_id_aux = TokenIdGroupAux::from_mempool(&mempool_tx.tx, &self.tokens); self.token_id_history.remove(&mempool_tx, &token_id_aux); self.token_id_utxos.remove( &mempool_tx, |txid| self.txs.contains_key(txid), &token_id_aux, )?; self.tokens.remove(&txid); } else { token_id_aux = TokenIdGroupAux::default(); } + if self.is_lokad_id_index_enabled { + self.lokad_id_history.remove(&mempool_tx, &()); + } Ok(MempoolResult { mempool_tx: Cow::Owned(mempool_tx), token_id_aux, }) } /// Remove mined tx from the mempool. pub fn remove_mined(&mut self, txid: &TxId) -> Result> { if let Some(mempool_tx) = self.txs.remove(txid) { self.script_history.remove(&mempool_tx, &()); self.script_utxos.remove_mined(&mempool_tx, &()); self.spent_by.remove(&mempool_tx)?; if self.is_token_index_enabled { let token_id_aux = TokenIdGroupAux::from_mempool(&mempool_tx.tx, &self.tokens); self.token_id_history.remove(&mempool_tx, &token_id_aux); self.token_id_utxos.remove_mined(&mempool_tx, &token_id_aux); self.tokens.remove(txid); } + if self.is_lokad_id_index_enabled { + self.lokad_id_history.remove(&mempool_tx, &()); + } return Ok(Some(mempool_tx)); } Ok(None) } /// Get a tx by [`TxId`], or [`None`], if not found. pub fn tx(&self, txid: &TxId) -> Option<&MempoolTx> { self.txs.get(txid) } /// Tx history of scripts in the mempool. pub fn script_history(&self) -> &MempoolScriptHistory { &self.script_history } /// Tx history of UTXOs in the mempool. pub fn script_utxos(&self) -> &MempoolScriptUtxos { &self.script_utxos } /// Which tx outputs have been spent by tx in the mempool. pub fn spent_by(&self) -> &MempoolSpentBy { &self.spent_by } /// Token data of txs in the mempool. pub fn tokens(&self) -> &MempoolTokens { &self.tokens } /// Tx history of token IDs in the mempool. pub fn token_id_history(&self) -> &MempoolTokenIdHistory { &self.token_id_history } /// Tx history of UTXOs by token ID in the mempool. pub fn token_id_utxos(&self) -> &MempoolTokenIdUtxos { &self.token_id_utxos } + + /// Tx history of LOKAD IDs in the mempool. + pub fn lokad_id_history(&self) -> &MempoolLokadIdHistory { + &self.lokad_id_history + } } diff --git a/chronik/chronik-http/src/handlers.rs b/chronik/chronik-http/src/handlers.rs index f85fd6d82..1174e920c 100644 --- a/chronik/chronik-http/src/handlers.rs +++ b/chronik/chronik-http/src/handlers.rs @@ -1,194 +1,246 @@ //! Module for Chronik handlers. use std::{collections::HashMap, fmt::Display, str::FromStr}; use abc_rust_error::{Report, Result}; use bitcoinsuite_slp::token_id::TokenId; use chronik_indexer::indexer::{ChronikIndexer, Node}; use chronik_proto::proto; use hyper::Uri; use thiserror::Error; -use crate::{error::ReportError, parse::parse_script_variant_hex}; +use crate::{ + error::ReportError, + parse::{parse_lokad_id_hex, parse_script_variant_hex}, +}; /// Errors for HTTP handlers. #[derive(Debug, Error, PartialEq)] pub enum ChronikHandlerError { /// Not found #[error("404: Not found: {0}")] RouteNotFound(Uri), /// Query parameter has an invalid value #[error("400: Invalid param {param_name}: {param_value}, {msg}")] InvalidParam { /// Name of the param param_name: String, /// Value of the param param_value: String, /// Human-readable error message. msg: String, }, } use self::ChronikHandlerError::*; fn get_param( params: &HashMap, param_name: &str, ) -> Result> where T::Err: Display, { let Some(param) = params.get(param_name) else { return Ok(None); }; Ok(Some(param.parse::().map_err(|err| InvalidParam { param_name: param_name.to_string(), param_value: param.to_string(), msg: err.to_string(), })?)) } /// Fallback route that returns a 404 response pub async fn handle_not_found(uri: Uri) -> Result<(), ReportError> { Err(Report::from(RouteNotFound(uri)).into()) } /// Return a page of the txs of a block. pub async fn handle_block_txs( hash_or_height: String, query_params: &HashMap, indexer: &ChronikIndexer, node: &Node, ) -> Result { let blocks = indexer.blocks(node); let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); blocks.block_txs(hash_or_height, page_num as usize, page_size as usize) } /// Return a page of the confirmed txs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_confirmed_txs( script_type: &str, payload: &str, query_params: &HashMap, indexer: &ChronikIndexer, node: &Node, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_history = indexer.script_history(node)?; let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); let script = script_variant.to_script(); script_history.confirmed_txs(&script, page_num as usize, page_size as usize) } /// Return a page of the tx history of the given script, in reverse /// chronological order, i.e. the latest transaction first and then going back /// in time. Scripts are identified by script_type and payload. pub async fn handle_script_history( script_type: &str, payload: &str, query_params: &HashMap, indexer: &ChronikIndexer, node: &Node, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_history = indexer.script_history(node)?; let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); let script = script_variant.to_script(); script_history.rev_history(&script, page_num as usize, page_size as usize) } /// Return a page of the unconfirmed txs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_unconfirmed_txs( script_type: &str, payload: &str, indexer: &ChronikIndexer, node: &Node, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_history = indexer.script_history(node)?; let script = script_variant.to_script(); script_history.unconfirmed_txs(&script) } /// Return the UTXOs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_utxos( script_type: &str, payload: &str, indexer: &ChronikIndexer, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_utxos = indexer.script_utxos()?; let script = script_variant.to_script(); let utxos = script_utxos.utxos(&script)?; Ok(proto::ScriptUtxos { script: script.bytecode().to_vec(), utxos, }) } /// Return a page of the confirmed txs of the given token ID. pub async fn handle_token_id_confirmed_txs( token_id_hex: &str, query_params: &HashMap, indexer: &ChronikIndexer, node: &Node, ) -> Result { let token_id = token_id_hex.parse::()?; let token_id_history = indexer.token_id_history(node); let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); token_id_history.confirmed_txs( token_id, page_num as usize, page_size as usize, ) } /// Return a page of the tx history of the given token ID, in reverse /// chronological order, i.e. the latest transaction first and then going back /// in time. pub async fn handle_token_id_history( token_id_hex: &str, query_params: &HashMap, indexer: &ChronikIndexer, node: &Node, ) -> Result { let token_id = token_id_hex.parse::()?; let token_id_history = indexer.token_id_history(node); let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); token_id_history.rev_history( token_id, page_num as usize, page_size as usize, ) } /// Return a page of the unconfirmed txs of the given token ID. pub async fn handle_token_id_unconfirmed_txs( token_id_hex: &str, indexer: &ChronikIndexer, node: &Node, ) -> Result { let token_id = token_id_hex.parse::()?; let token_id_history = indexer.token_id_history(node); token_id_history.unconfirmed_txs(token_id) } /// Return the UTXOs of the given token ID. pub async fn handle_token_id_utxos( token_id_hex: &str, indexer: &ChronikIndexer, ) -> Result { let token_id = token_id_hex.parse::()?; let token_id_utxos = indexer.token_id_utxos(); let utxos = token_id_utxos.utxos(token_id)?; Ok(proto::Utxos { utxos }) } + +/// Return a page of the confirmed txs of the given LOKAD ID. +pub async fn handle_lokad_id_confirmed_txs( + lokad_id_hex: &str, + query_params: &HashMap, + indexer: &ChronikIndexer, + node: &Node, +) -> Result { + let lokad_id = parse_lokad_id_hex(lokad_id_hex)?; + let lokad_id_history = indexer.lokad_id_history(node); + let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); + let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); + lokad_id_history.confirmed_txs( + lokad_id, + page_num as usize, + page_size as usize, + ) +} + +/// Return a page of the tx history of the given LOKAD ID, in reverse +/// chronological order, i.e. the latest transaction first and then going back +/// in time. +pub async fn handle_lokad_id_history( + lokad_id_hex: &str, + query_params: &HashMap, + indexer: &ChronikIndexer, + node: &Node, +) -> Result { + let lokad_id = parse_lokad_id_hex(lokad_id_hex)?; + let lokad_id_history = indexer.lokad_id_history(node); + let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); + let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); + lokad_id_history.rev_history( + lokad_id, + page_num as usize, + page_size as usize, + ) +} + +/// Return a page of the unconfirmed txs of the given LOKAD ID. +pub async fn handle_lokad_id_unconfirmed_txs( + lokad_id_hex: &str, + indexer: &ChronikIndexer, + node: &Node, +) -> Result { + let lokad_id = parse_lokad_id_hex(lokad_id_hex)?; + let lokad_id_history = indexer.lokad_id_history(node); + lokad_id_history.unconfirmed_txs(lokad_id) +} diff --git a/chronik/chronik-http/src/parse.rs b/chronik/chronik-http/src/parse.rs index 433c70c87..ce1b72031 100644 --- a/chronik/chronik-http/src/parse.rs +++ b/chronik/chronik-http/src/parse.rs @@ -1,55 +1,72 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for parsing data coming from clients. use abc_rust_error::Result; use bitcoinsuite_core::{ error::DataError, script::{ScriptType, ScriptTypeError, ScriptVariant}, }; +use bitcoinsuite_slp::lokad_id::LokadId; use thiserror::Error; /// Errors indicating parsing failed. #[derive(Debug, Error, PartialEq)] pub enum ChronikParseError { /// Invalid hex #[error("400: Invalid hex: {0}")] InvalidHex(hex::FromHexError), /// script_type is invalid #[error("400: {0}")] InvalidScriptType(ScriptTypeError), /// Script payload invalid for script_type #[error("400: Invalid payload for {0:?}: {1}")] InvalidScriptPayload(ScriptType, DataError), + + /// LOKAD ID must be 4 bytes + #[error("400: Invalid LOKAD ID length: expected 4 but got {0}")] + InvalidLokadIdLength(usize), } use self::ChronikParseError::*; /// Parse the data as hex. pub fn parse_hex(payload: &str) -> Result> { Ok(hex::decode(payload).map_err(InvalidHex)?) } /// Parse the [`ScriptVariant`] with a hex payload (e.g. from URL). pub fn parse_script_variant_hex( script_type: &str, payload_hex: &str, ) -> Result { parse_script_variant(script_type, &parse_hex(payload_hex)?) } /// Parse the [`ScriptVariant`] with a byte payload (e.g. from protobuf). pub fn parse_script_variant( script_type: &str, payload: &[u8], ) -> Result { let script_type = script_type .parse::() .map_err(InvalidScriptType)?; Ok(ScriptVariant::from_type_and_payload(script_type, payload) .map_err(|err| InvalidScriptPayload(script_type, err))?) } + +/// Parse LOKAD ID hex string +pub fn parse_lokad_id_hex(lokad_id_hex: &str) -> Result { + let lokad_id = parse_hex(lokad_id_hex)?; + parse_lokad_id(&lokad_id) +} + +/// Parse LOKAD ID bytestring +pub fn parse_lokad_id(lokad_id: &[u8]) -> Result { + Ok(LokadId::try_from(lokad_id) + .map_err(|_| InvalidLokadIdLength(lokad_id.len()))?) +} diff --git a/chronik/chronik-http/src/server.rs b/chronik/chronik-http/src/server.rs index 351967760..b76907613 100644 --- a/chronik/chronik-http/src/server.rs +++ b/chronik/chronik-http/src/server.rs @@ -1,519 +1,583 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`ChronikServer`]. use std::collections::HashMap; use std::time::Duration; use std::{net::SocketAddr, sync::Arc}; use abc_rust_error::{Result, WrapErr}; use axum::{ extract::{Path, Query, WebSocketUpgrade}, response::IntoResponse, routing::{self, MethodFilter}, Extension, Router, }; use bitcoinsuite_core::tx::TxId; use chronik_bridge::ffi; use chronik_indexer::{ indexer::{ChronikIndexer, Node}, pause::PauseNotify, }; use chronik_proto::proto; use thiserror::Error; use tokio::sync::RwLock; use crate::{ error::ReportError, handlers, protobuf::Protobuf, ws::handle_subscribe_socket, }; /// Ref-counted indexer with read or write access pub type ChronikIndexerRef = Arc>; /// Ref-counted access to the bitcoind node pub type NodeRef = Arc; /// Ref-counted pause notifier for Chronik indexing pub type PauseNotifyRef = Arc; /// Settings to tune Chronik #[derive(Clone, Debug)] pub struct ChronikSettings { /// Duration between WebSocket pings initiated by Chronik. pub ws_ping_interval: Duration, } /// Params defining what and where to serve for [`ChronikServer`]. #[derive(Clone, Debug)] pub struct ChronikServerParams { /// Host address (port + IP) where to serve Chronik at. pub hosts: Vec, /// Indexer to read data from pub indexer: ChronikIndexerRef, /// Access to the bitcoind node pub node: NodeRef, /// Handle for pausing/resuming indexing any updates from the node pub pause_notify: PauseNotifyRef, /// Settings to tune Chronik pub settings: ChronikSettings, } /// Chronik HTTP server, holding all the data/handles required to serve an /// instance. #[derive(Debug)] pub struct ChronikServer { tcp_listeners: Vec, indexer: ChronikIndexerRef, node: NodeRef, pause_notify: PauseNotifyRef, settings: ChronikSettings, } /// Errors for [`ChronikServer`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikServerError { /// Binding to host address failed #[error("Chronik failed binding to {0}: {1}")] FailedBindingAddress(SocketAddr, String), /// Serving Chronik failed #[error("Chronik failed serving: {0}")] ServingFailed(String), /// Query is neither a hex hash nor an integer string #[error("400: Not a hash or height: {0}")] NotHashOrHeight(String), /// Query is not a txid #[error("400: Not a txid: {0}")] NotTxId(String), /// Block not found in DB #[error("404: Block not found: {0}")] BlockNotFound(String), } use self::ChronikServerError::*; impl ChronikServer { /// Binds the Chronik server on the given hosts pub fn setup(params: ChronikServerParams) -> Result { let tcp_listeners = params .hosts .into_iter() .map(|host| -> Result<_> { let tcp = std::net::TcpListener::bind(host).map_err(|err| { FailedBindingAddress(host, err.to_string()) })?; // Important: We need to set non-blocking ourselves tcp.set_nonblocking(true)?; Ok(tokio::net::TcpListener::from_std(tcp)?) }) .collect::>>()?; Ok(ChronikServer { tcp_listeners, indexer: params.indexer, node: params.node, pause_notify: params.pause_notify, settings: params.settings, }) } /// Serve a Chronik HTTP endpoint with the given parameters. pub async fn serve(self) -> Result<()> { let app = Self::make_router( self.indexer, self.node, self.pause_notify, self.settings, ); let servers = self .tcp_listeners .into_iter() .zip(std::iter::repeat(app)) .map(|(tcp_listener, app)| { Box::pin(async move { axum::serve(tcp_listener, app.into_make_service()) .await .map_err(|err| ServingFailed(err.to_string())) }) }); let (result, _, _) = futures::future::select_all(servers).await; result?; Ok(()) } fn make_router( indexer: ChronikIndexerRef, node: NodeRef, pause_notify: PauseNotifyRef, settings: ChronikSettings, ) -> Router { Router::new() .route("/blockchain-info", routing::get(handle_blockchain_info)) .route("/block/:hash_or_height", routing::get(handle_block)) .route("/block-txs/:hash_or_height", routing::get(handle_block_txs)) .route("/blocks/:start/:end", routing::get(handle_block_range)) .route("/chronik-info", routing::get(handle_chronik_info)) .route("/tx/:txid", routing::get(handle_tx)) .route("/token/:txid", routing::get(handle_token_info)) .route( "/validate-tx", routing::post(handle_validate_tx) .on(MethodFilter::OPTIONS, handle_post_options), ) .route( "/broadcast-tx", routing::post(handle_broadcast_tx) .on(MethodFilter::OPTIONS, handle_post_options), ) .route( "/broadcast-txs", routing::post(handle_broadcast_txs) .on(MethodFilter::OPTIONS, handle_post_options), ) .route("/raw-tx/:txid", routing::get(handle_raw_tx)) .route( "/script/:type/:payload/confirmed-txs", routing::get(handle_script_confirmed_txs), ) .route( "/script/:type/:payload/history", routing::get(handle_script_history), ) .route( "/script/:type/:payload/unconfirmed-txs", routing::get(handle_script_unconfirmed_txs), ) .route( "/script/:type/:payload/utxos", routing::get(handle_script_utxos), ) .route( "/token-id/:token_id/confirmed-txs", routing::get(handle_token_id_confirmed_txs), ) .route( "/token-id/:token_id/history", routing::get(handle_token_id_history), ) .route( "/token-id/:token_id/unconfirmed-txs", routing::get(handle_token_id_unconfirmed_txs), ) .route( "/token-id/:token_id/utxos", routing::get(handle_token_id_utxos), ) + .route( + "/lokad-id/:lokad_id/confirmed-txs", + routing::get(handle_lokad_id_confirmed_txs), + ) + .route( + "/lokad-id/:lokad_id/history", + routing::get(handle_lokad_id_history), + ) + .route( + "/lokad-id/:lokad_id/unconfirmed-txs", + routing::get(handle_lokad_id_unconfirmed_txs), + ) .route("/ws", routing::get(handle_ws)) .route("/pause", routing::get(handle_pause)) .route("/resume", routing::get(handle_resume)) .fallback(handlers::handle_not_found) .layer(Extension(indexer)) .layer(Extension(node)) .layer(Extension(pause_notify)) .layer(Extension(settings)) } } async fn handle_blockchain_info( Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let blocks = indexer.blocks(&node); Ok(Protobuf(blocks.blockchain_info()?)) } async fn handle_chronik_info( ) -> Result, ReportError> { let this_chronik_version: String = env!("CARGO_PKG_VERSION").to_string(); let chronik_info = proto::ChronikInfo { version: this_chronik_version, }; Ok(Protobuf(chronik_info)) } async fn handle_block_range( Path((start_height, end_height)): Path<(i32, i32)>, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let blocks = indexer.blocks(&node); Ok(Protobuf(blocks.by_range(start_height, end_height)?)) } async fn handle_block( Path(hash_or_height): Path, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let blocks = indexer.blocks(&node); Ok(Protobuf(blocks.by_hash_or_height(hash_or_height)?)) } async fn handle_block_txs( Path(hash_or_height): Path, Query(query_params): Query>, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_block_txs( hash_or_height, &query_params, &indexer, &node, ) .await?, )) } async fn handle_tx( Path(txid): Path, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; Ok(Protobuf(indexer.txs(&node).tx_by_id(txid)?)) } async fn handle_token_info( Path(txid): Path, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; Ok(Protobuf(indexer.txs(&node).token_info(&txid)?)) } async fn handle_broadcast_tx( Extension(indexer): Extension, Extension(node): Extension, Protobuf(request): Protobuf, ) -> Result, ReportError> { let indexer = indexer.read().await; let txids_result = indexer .broadcast(node.as_ref()) .broadcast_txs(&[request.raw_tx.into()], request.skip_token_checks); // Drop indexer before syncing otherwise we get a deadlock drop(indexer); // Block for indexer being synced before returning so the user can query // the broadcast txs right away ffi::sync_with_validation_interface_queue(); let txids = txids_result?; Ok(Protobuf(proto::BroadcastTxResponse { txid: txids[0].to_vec(), })) } async fn handle_broadcast_txs( Extension(indexer): Extension, Extension(node): Extension, Protobuf(request): Protobuf, ) -> Result, ReportError> { let indexer = indexer.read().await; let txids_result = indexer.broadcast(node.as_ref()).broadcast_txs( &request .raw_txs .into_iter() .map(Into::into) .collect::>(), request.skip_token_checks, ); // Drop indexer before syncing otherwise we get a deadlock drop(indexer); // Block for indexer being synced before returning so the user can query // the broadcast txs right away ffi::sync_with_validation_interface_queue(); let txids = txids_result?; Ok(Protobuf(proto::BroadcastTxsResponse { txids: txids.into_iter().map(|txid| txid.to_vec()).collect(), })) } async fn handle_validate_tx( Extension(indexer): Extension, Extension(node): Extension, Protobuf(raw_tx): Protobuf, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( indexer .broadcast(node.as_ref()) .validate_tx(raw_tx.raw_tx)?, )) } async fn handle_raw_tx( Path(txid): Path, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; Ok(Protobuf(indexer.txs(&node).raw_tx_by_id(&txid)?)) } async fn handle_script_confirmed_txs( Path((script_type, payload)): Path<(String, String)>, Query(query_params): Query>, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_confirmed_txs( &script_type, &payload, &query_params, &indexer, &node, ) .await?, )) } async fn handle_script_history( Path((script_type, payload)): Path<(String, String)>, Query(query_params): Query>, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_history( &script_type, &payload, &query_params, &indexer, &node, ) .await?, )) } async fn handle_script_unconfirmed_txs( Path((script_type, payload)): Path<(String, String)>, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_unconfirmed_txs( &script_type, &payload, &indexer, &node, ) .await?, )) } async fn handle_script_utxos( Path((script_type, payload)): Path<(String, String)>, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_utxos(&script_type, &payload, &indexer).await?, )) } async fn handle_token_id_confirmed_txs( Path(token_id_hex): Path, Query(query_params): Query>, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_token_id_confirmed_txs( &token_id_hex, &query_params, &indexer, &node, ) .await?, )) } async fn handle_token_id_history( Path(token_id_hex): Path, Query(query_params): Query>, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_token_id_history( &token_id_hex, &query_params, &indexer, &node, ) .await?, )) } async fn handle_token_id_unconfirmed_txs( Path(token_id_hex): Path, Extension(indexer): Extension, Extension(node): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_token_id_unconfirmed_txs( &token_id_hex, &indexer, &node, ) .await?, )) } async fn handle_token_id_utxos( Path(token_id_hex): Path, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_token_id_utxos(&token_id_hex, &indexer).await?, )) } +async fn handle_lokad_id_confirmed_txs( + Path(lokad_id_hex): Path, + Query(query_params): Query>, + Extension(indexer): Extension, + Extension(node): Extension, +) -> Result, ReportError> { + let indexer = indexer.read().await; + Ok(Protobuf( + handlers::handle_lokad_id_confirmed_txs( + &lokad_id_hex, + &query_params, + &indexer, + &node, + ) + .await?, + )) +} + +async fn handle_lokad_id_history( + Path(lokad_id_hex): Path, + Query(query_params): Query>, + Extension(indexer): Extension, + Extension(node): Extension, +) -> Result, ReportError> { + let indexer = indexer.read().await; + Ok(Protobuf( + handlers::handle_lokad_id_history( + &lokad_id_hex, + &query_params, + &indexer, + &node, + ) + .await?, + )) +} + +async fn handle_lokad_id_unconfirmed_txs( + Path(lokad_id_hex): Path, + Extension(indexer): Extension, + Extension(node): Extension, +) -> Result, ReportError> { + let indexer = indexer.read().await; + Ok(Protobuf( + handlers::handle_lokad_id_unconfirmed_txs( + &lokad_id_hex, + &indexer, + &node, + ) + .await?, + )) +} + async fn handle_pause( Extension(pause_notify): Extension, ) -> Result, ReportError> { pause_notify.pause()?; Ok(Protobuf(proto::Empty {})) } async fn handle_resume( Extension(pause_notify): Extension, ) -> Result, ReportError> { pause_notify.resume()?; Ok(Protobuf(proto::Empty {})) } async fn handle_ws( ws: WebSocketUpgrade, Extension(indexer): Extension, Extension(settings): Extension, ) -> impl IntoResponse { ws.on_upgrade(|ws| handle_subscribe_socket(ws, indexer, settings)) } async fn handle_post_options( ) -> Result, ReportError> { axum::http::Response::builder() .header("Allow", "OPTIONS, HEAD, POST") .body(axum::body::Body::empty()) .map_err(|err| ReportError(err.into())) } diff --git a/chronik/chronik-http/src/ws.rs b/chronik/chronik-http/src/ws.rs index 011db7ad6..29edb0e2c 100644 --- a/chronik/chronik-http/src/ws.rs +++ b/chronik/chronik-http/src/ws.rs @@ -1,325 +1,367 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`handle_subscribe_socket`]. use std::{collections::HashMap, time::Duration}; use abc_rust_error::Result; use axum::extract::ws::{self, WebSocket}; use bitcoinsuite_core::script::ScriptVariant; -use bitcoinsuite_slp::token_id::TokenId; +use bitcoinsuite_slp::{lokad_id::LokadId, token_id::TokenId}; use chronik_indexer::{ subs::{BlockMsg, BlockMsgType}, subs_group::{TxMsg, TxMsgType}, }; use chronik_proto::proto; use chronik_util::log_chronik; use futures::future::select_all; use prost::Message; use thiserror::Error; use tokio::sync::broadcast; use crate::{ error::report_status_error, - parse::parse_script_variant, + parse::{parse_lokad_id, parse_script_variant}, server::{ChronikIndexerRef, ChronikSettings}, }; /// Errors for [`ChronikServer`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikWsError { /// Unexpected [`ws::Message`] type. #[error("Unexpected message type {0}")] UnexpectedMessageType(&'static str), /// [`proto::WsSub`] must have the `sub_type` field set. #[error("400: Missing sub_type in WsSub message")] MissingSubType, } use self::ChronikWsError::*; enum WsAction { Close, Sub(WsSub), Message(ws::Message), Nothing, } struct WsSub { is_unsub: bool, sub_type: WsSubType, } enum WsSubType { Blocks, Script(ScriptVariant), TokenId(TokenId), + LokadId(LokadId), } type SubRecvBlocks = Option>; type SubRecvScripts = HashMap>; type SubRecvTokenId = HashMap>; +type SubRecvLokadId = HashMap>; struct SubRecv { blocks: SubRecvBlocks, scripts: SubRecvScripts, token_ids: SubRecvTokenId, + lokad_ids: SubRecvLokadId, ws_ping_interval: Duration, } impl SubRecv { async fn recv_action(&mut self) -> Result { tokio::select! { biased; action = Self::recv_blocks(&mut self.blocks) => action, action = Self::recv_scripts(&mut self.scripts) => action, action = Self::recv_token_ids(&mut self.token_ids) => action, + action = Self::recv_lokad_ids(&mut self.lokad_ids) => action, action = Self::schedule_ping(self.ws_ping_interval) => action, } } async fn recv_blocks(blocks: &mut SubRecvBlocks) -> Result { match blocks { Some(blocks) => sub_block_msg_action(blocks.recv().await), None => futures::future::pending().await, } } #[allow(clippy::mutable_key_type)] async fn recv_scripts(scripts: &mut SubRecvScripts) -> Result { if scripts.is_empty() { futures::future::pending().await } else { let script_receivers = select_all( scripts .values_mut() .map(|receiver| Box::pin(receiver.recv())), ); let (tx_msg, _, _) = script_receivers.await; sub_tx_msg_action(tx_msg) } } async fn recv_token_ids( token_ids: &mut SubRecvTokenId, ) -> Result { if token_ids.is_empty() { futures::future::pending().await } else { let token_ids_receivers = select_all( token_ids .values_mut() .map(|receiver| Box::pin(receiver.recv())), ); let (tx_msg, _, _) = token_ids_receivers.await; sub_tx_msg_action(tx_msg) } } + async fn recv_lokad_ids( + lokad_ids: &mut SubRecvLokadId, + ) -> Result { + if lokad_ids.is_empty() { + futures::future::pending().await + } else { + let lokad_ids_receivers = select_all( + lokad_ids + .values_mut() + .map(|receiver| Box::pin(receiver.recv())), + ); + let (tx_msg, _, _) = lokad_ids_receivers.await; + sub_tx_msg_action(tx_msg) + } + } + async fn schedule_ping(ws_ping_interval: Duration) -> Result { tokio::time::sleep(ws_ping_interval).await; let ping_payload = b"Bitcoin ABC Chronik Indexer".to_vec(); Ok(WsAction::Message(ws::Message::Ping(ping_payload))) } async fn handle_sub(&mut self, sub: WsSub, indexer: &ChronikIndexerRef) { let indexer = indexer.read().await; let mut subs = indexer.subs().write().await; match sub.sub_type { WsSubType::Blocks => { if sub.is_unsub { log_chronik!("WS unsubscribe from blocks\n"); self.blocks = None; } else { log_chronik!("WS subscribe to blocks\n"); // Silently ignore multiple subs to blocks if self.blocks.is_none() { self.blocks = Some(subs.sub_to_block_msgs()); } } } WsSubType::Script(script_variant) => { let script = script_variant.to_script(); if sub.is_unsub { log_chronik!("WS unsubscribe from {}\n", script_variant); std::mem::drop(self.scripts.remove(&script_variant)); subs.subs_script_mut().unsubscribe_from_member(&&script) } else { log_chronik!("WS subscribe to {}\n", script_variant); let recv = subs.subs_script_mut().subscribe_to_member(&&script); self.scripts.insert(script_variant, recv); } } WsSubType::TokenId(token_id) => { if sub.is_unsub { log_chronik!("WS unsubscribe from token ID {token_id}\n"); std::mem::drop(self.token_ids.remove(&token_id)); subs.subs_token_id_mut().unsubscribe_from_member(&token_id) } else { log_chronik!("WS subscribe to token ID {token_id}\n"); let recv = subs.subs_token_id_mut().subscribe_to_member(&token_id); self.token_ids.insert(token_id, recv); } } + WsSubType::LokadId(lokad_id) => { + if sub.is_unsub { + log_chronik!( + "WS unsubscribe from LOKAD ID {}\n", + hex::encode(lokad_id) + ); + std::mem::drop(self.lokad_ids.remove(&lokad_id)); + subs.subs_lokad_id_mut().unsubscribe_from_member(&lokad_id) + } else { + log_chronik!( + "WS subscribe to LOKAD ID {}\n", + hex::encode(lokad_id) + ); + let recv = + subs.subs_lokad_id_mut().subscribe_to_member(&lokad_id); + self.lokad_ids.insert(lokad_id, recv); + } + } } } async fn cleanup(self, indexer: &ChronikIndexerRef) { if self.scripts.is_empty() { return; } let indexer = indexer.read().await; let mut subs = indexer.subs().write().await; for (script_variant, receiver) in self.scripts { std::mem::drop(receiver); subs.subs_script_mut() .unsubscribe_from_member(&&script_variant.to_script()); } } } fn sub_client_msg_action( client_msg: Option>, ) -> Result { let client_msg = match client_msg { Some(client_msg) => client_msg, None => return Ok(WsAction::Close), }; match client_msg { Ok(ws::Message::Binary(data)) => { use proto::ws_sub::SubType; let sub = proto::WsSub::decode(data.as_slice())?; Ok(WsAction::Sub(WsSub { is_unsub: sub.is_unsub, sub_type: match sub.sub_type { None => return Err(MissingSubType.into()), Some(SubType::Blocks(_)) => WsSubType::Blocks, Some(SubType::Script(script)) => { WsSubType::Script(parse_script_variant( &script.script_type, &script.payload, )?) } Some(SubType::TokenId(token_id)) => WsSubType::TokenId( token_id.token_id.parse::()?, ), + Some(SubType::LokadId(lokad_id)) => { + WsSubType::LokadId(parse_lokad_id(&lokad_id.lokad_id)?) + } }, })) } Ok(ws::Message::Text(_)) => Err(UnexpectedMessageType("Text").into()), Ok(ws::Message::Ping(ping)) => { Ok(WsAction::Message(ws::Message::Pong(ping))) } Ok(ws::Message::Pong(_pong)) => Ok(WsAction::Nothing), Ok(ws::Message::Close(_)) | Err(_) => Ok(WsAction::Close), } } fn sub_block_msg_action( block_msg: Result, ) -> Result { use proto::{ws_msg::MsgType, BlockMsgType::*}; let Ok(block_msg) = block_msg else { return Ok(WsAction::Nothing); }; let block_msg_type = match block_msg.msg_type { BlockMsgType::Connected => BlkConnected, BlockMsgType::Disconnected => BlkDisconnected, BlockMsgType::Finalized => BlkFinalized, }; let msg_type = Some(MsgType::Block(proto::MsgBlock { msg_type: block_msg_type as _, block_hash: block_msg.hash.to_vec(), block_height: block_msg.height, })); let msg_proto = proto::WsMsg { msg_type }; let msg = ws::Message::Binary(msg_proto.encode_to_vec()); Ok(WsAction::Message(msg)) } fn sub_tx_msg_action( tx_msg: Result, ) -> Result { use proto::{ws_msg::MsgType, TxMsgType::*}; let tx_msg = match tx_msg { Ok(tx_msg) => tx_msg, Err(_) => return Ok(WsAction::Nothing), }; let tx_msg_type = match tx_msg.msg_type { TxMsgType::AddedToMempool => TxAddedToMempool, TxMsgType::RemovedFromMempool => TxRemovedFromMempool, TxMsgType::Confirmed => TxConfirmed, TxMsgType::Finalized => TxFinalized, }; let msg_type = Some(MsgType::Tx(proto::MsgTx { msg_type: tx_msg_type as _, txid: tx_msg.txid.to_vec(), })); let msg_proto = proto::WsMsg { msg_type }; let msg = ws::Message::Binary(msg_proto.encode_to_vec()); Ok(WsAction::Message(msg)) } /// Future for a WS connection, which will run indefinitely until the WS will be /// closed. pub async fn handle_subscribe_socket( mut socket: WebSocket, indexer: ChronikIndexerRef, settings: ChronikSettings, ) { let mut recv = SubRecv { blocks: Default::default(), scripts: Default::default(), token_ids: Default::default(), + lokad_ids: Default::default(), ws_ping_interval: settings.ws_ping_interval, }; let mut last_msg = None; loop { let sub_action = tokio::select! { client_msg = socket.recv() => sub_client_msg_action(client_msg), action = recv.recv_action() => action, }; let subscribe_action = match sub_action { // Deduplicate identical consecutive msgs Ok(WsAction::Message(ws::Message::Binary(msg))) => { if last_msg.as_ref() == Some(&msg) { WsAction::Nothing } else { last_msg = Some(msg.clone()); WsAction::Message(ws::Message::Binary(msg)) } } Ok(subscribe_action) => subscribe_action, // Turn Err into Message and reply Err(report) => { let (_, error_proto) = report_status_error(report); let msg_proto = proto::WsMsg { msg_type: Some(proto::ws_msg::MsgType::Error(error_proto)), }; WsAction::Message(ws::Message::Binary( msg_proto.encode_to_vec(), )) } }; match subscribe_action { WsAction::Close => { recv.cleanup(&indexer).await; return; } WsAction::Sub(sub) => recv.handle_sub(sub, &indexer).await, WsAction::Message(msg) => match socket.send(msg).await { Ok(()) => {} Err(_) => return, }, WsAction::Nothing => {} } } } diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs index d7c01c4f0..d2be05c36 100644 --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -1,1018 +1,1180 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`ChronikIndexer`] to index blocks and txs. use std::{io::Write, path::PathBuf}; use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; use chronik_bridge::{ffi, util::expect_unique_ptr}; use chronik_db::{ db::{Db, WriteBatch}, groups::{ - ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, TokenIdGroup, - TokenIdGroupAux, TokenIdHistoryWriter, TokenIdUtxoWriter, + LokadIdGroup, LokadIdHistoryWriter, ScriptGroup, ScriptHistoryWriter, + ScriptUtxoWriter, TokenIdGroup, TokenIdGroupAux, TokenIdHistoryWriter, + TokenIdUtxoWriter, }, index_tx::{ prepare_indexed_txs_cached, PrepareUpdateMode, TxNumCacheSettings, }, io::{ merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, TxReader, TxWriter, }, mem::{MemData, MemDataConf, Mempool, MempoolTx}, }; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, indexer::ChronikIndexerError::*, query::{ QueryBlocks, QueryBroadcast, QueryGroupHistory, QueryGroupUtxos, QueryTxs, UtxoProtobufOutput, UtxoProtobufValue, }, subs::{BlockMsg, BlockMsgType, Subs}, subs_group::TxMsgType, }; const CURRENT_INDEXER_VERSION: SchemaVersion = 11; const LAST_UPGRADABLE_VERSION: SchemaVersion = 10; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] pub struct ChronikIndexerParams { /// Folder where the node stores its data, net-dependent. pub datadir_net: PathBuf, /// Whether to clear the DB before opening the DB, e.g. when reindexing. pub wipe_db: bool, /// Whether Chronik should index SLP/ALP token txs. pub enable_token_index: bool, + /// Whether Chronik should index txs by LOKAD ID. + /// This will be overridden to `true` if the DB is empty and + /// `enable_lokad_id_index_specified` is false. + pub enable_lokad_id_index: bool, /// Whether to output Chronik performance statistics into a perf/ folder pub enable_perf_stats: bool, /// Settings for tuning TxNumCache. pub tx_num_cache: TxNumCacheSettings, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. #[derive(Debug)] pub struct ChronikIndexer { db: Db, mem_data: MemData, mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, subs: RwLock, perf_path: Option, is_token_index_enabled: bool, + is_lokad_id_index_enabled: bool, + /// Whether the LOKAD ID index needs to be reindexed, will be set to + /// `false` after it caught up with the rest of Chronik. + needs_lokad_id_reindex: bool, } /// Access to the bitcoind node. #[derive(Debug)] pub struct Node { /// FFI bridge to the node. pub bridge: cxx::UniquePtr, } /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { /// Data about the block (w/o txs) pub db_block: DbBlock, /// Txs in the block, with locations of where they are stored on disk. pub block_txs: BlockTxs, /// Block size in bytes. pub size: u64, /// Txs in the block, with inputs/outputs so we can group them. pub txs: Vec, } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikIndexerError { /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateDirFailed(PathBuf), /// Cannot rewind blocks that bitcoind doesn't have #[error( "Cannot rewind Chronik, it contains block {0} that the node doesn't \ have. You may need to use -reindex/-chronikreindex, or delete \ indexes/chronik and restart" )] CannotRewindChronik(BlockHash), /// Lower block doesn't exist but higher block does #[error( "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" )] BlocksBelowMissing { /// Lower height that is missing missing: BlockHeight, /// Higher height that exists exists: BlockHeight, }, /// Corrupted schema version #[error( "Corrupted schema version in the Chronik database, consider running \ -reindex/-chronikreindex" )] CorruptedSchemaVersion, /// Missing schema version for non-empty database #[error( "Missing schema version in non-empty Chronik database, consider \ running -reindex/-chronikreindex" )] MissingSchemaVersion, /// This Chronik instance is outdated #[error( "Chronik outdated: Chronik has version {}, but the database has \ version {0}. Upgrade your node to the appropriate version.", CURRENT_INDEXER_VERSION )] ChronikOutdated(SchemaVersion), /// Database is outdated #[error( "DB outdated: Chronik has version {CURRENT_INDEXER_VERSION}, but the \ database has version {0}. The last upgradable version is \ {LAST_UPGRADABLE_VERSION}. -reindex/-chronikreindex to reindex the \ database to the new version." )] DatabaseOutdated(SchemaVersion), /// Cannot enable token index on a DB that previously had it disabled #[error( "Cannot enable -chroniktokenindex on a DB that previously had it \ disabled. Provide -reindex/-chronikreindex to reindex the database \ with token data, or specify -chroniktokenindex=0 to disable the \ token index again." )] CannotEnableTokenIndex, } impl ChronikIndexer { /// Setup the indexer with the given parameters, e.g. open the DB etc. pub fn setup(params: ChronikIndexerParams) -> Result { let indexes_path = params.datadir_net.join("indexes"); let perf_path = params.datadir_net.join("perf"); if !indexes_path.exists() { std::fs::create_dir(&indexes_path) .wrap_err_with(|| CreateDirFailed(indexes_path.clone()))?; } if params.enable_perf_stats && !perf_path.exists() { std::fs::create_dir(&perf_path) .wrap_err_with(|| CreateDirFailed(perf_path.clone()))?; } let db_path = indexes_path.join("chronik"); if params.wipe_db { log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; + let is_db_empty = db.is_db_empty()?; let schema_version = verify_schema_version(&db)?; verify_enable_token_index(&db, params.enable_token_index)?; + let needs_lokad_id_reindex = verify_lokad_id_index( + &db, + is_db_empty, + params.enable_lokad_id_index, + )?; upgrade_db_if_needed(&db, schema_version, params.enable_token_index)?; - let mempool = Mempool::new(ScriptGroup, params.enable_token_index); + let mempool = Mempool::new( + ScriptGroup, + params.enable_token_index, + params.enable_lokad_id_index, + ); Ok(ChronikIndexer { db, mempool, mem_data: MemData::new(MemDataConf { tx_num_cache: params.tx_num_cache, }), script_group: ScriptGroup, avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(ScriptGroup)), perf_path: params.enable_perf_stats.then_some(perf_path), is_token_index_enabled: params.enable_token_index, + is_lokad_id_index_enabled: params.enable_token_index, + needs_lokad_id_reindex, }) } /// Resync Chronik index to the node pub fn resync_indexer( &mut self, bridge: &ffi::ChronikBridge, ) -> Result<()> { let block_reader = BlockReader::new(&self.db)?; let indexer_tip = block_reader.tip()?; let Ok(node_tip_index) = bridge.get_chain_tip() else { if let Some(indexer_tip) = &indexer_tip { return Err( CannotRewindChronik(indexer_tip.hash.clone()).into() ); } return Ok(()); }; let node_tip_info = ffi::get_block_info(node_tip_index); let node_height = node_tip_info.height; let node_tip_hash = BlockHash::from(node_tip_info.hash); let start_height = match indexer_tip { Some(tip) if tip.hash != node_tip_hash => { let indexer_tip_hash = tip.hash.clone(); let indexer_height = tip.height; log!( "Node and Chronik diverged, node is on block \ {node_tip_hash} at height {node_height}, and Chronik is \ on block {indexer_tip_hash} at height {indexer_height}.\n" ); let indexer_tip_index = bridge .lookup_block_index(tip.hash.to_bytes()) .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; self.rewind_indexer(bridge, indexer_tip_index, &tip)? } Some(tip) => tip.height, None => { log!( "Chronik database empty, syncing to block {node_tip_hash} \ at height {node_height}.\n" ); -1 } }; + if self.needs_lokad_id_reindex { + self.reindex_lokad_id_index(bridge, node_tip_index, start_height)?; + self.needs_lokad_id_reindex = false; + } let tip_height = node_tip_info.height; for height in start_height + 1..=tip_height { if bridge.shutdown_requested() { log!("Stopped re-sync adding blocks\n"); return Ok(()); } let block_index = ffi::get_block_ancestor(node_tip_index, height)?; let block = self.load_chronik_block(bridge, block_index)?; let hash = block.db_block.hash.clone(); self.handle_block_connected(block)?; log_chronik!( "Added block {hash}, height {height}/{tip_height} to Chronik\n" ); if height % 100 == 0 { log!( "Synced Chronik up to block {hash} at height \ {height}/{tip_height}\n" ); } } log!( "Chronik completed re-syncing with the node, both are now at \ block {node_tip_hash} at height {node_height}.\n" ); if let Some(perf_path) = &self.perf_path { let mut resync_stats = std::fs::File::create(perf_path.join("resync_stats.txt"))?; write!(&mut resync_stats, "{:#.3?}", self.mem_data.stats())?; } Ok(()) } fn rewind_indexer( &mut self, bridge: &ffi::ChronikBridge, indexer_tip_index: &ffi::CBlockIndex, indexer_db_tip: &DbBlock, ) -> Result { let indexer_height = indexer_db_tip.height; let fork_block_index = bridge .find_fork(indexer_tip_index) .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; let fork_info = ffi::get_block_info(fork_block_index); let fork_block_hash = BlockHash::from(fork_info.hash); let fork_height = fork_info.height; let revert_height = fork_height + 1; log!( "The last common block is {fork_block_hash} at height \ {fork_height}.\n" ); log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); for height in (revert_height..indexer_height).rev() { if bridge.shutdown_requested() { log!("Stopped re-sync rewinding blocks\n"); // return MAX here so we don't add any blocks return Ok(BlockHeight::MAX); } let db_block = BlockReader::new(&self.db)? .by_height(height)? .ok_or(BlocksBelowMissing { missing: height, exists: indexer_height, })?; let block_index = bridge .lookup_block_index(db_block.hash.to_bytes()) .map_err(|_| CannotRewindChronik(db_block.hash))?; let block = self.load_chronik_block(bridge, block_index)?; self.handle_block_disconnected(block)?; } Ok(fork_info.height) } + fn reindex_lokad_id_index( + &mut self, + bridge: &ffi::ChronikBridge, + node_tip_index: &ffi::CBlockIndex, + end_height: BlockHeight, + ) -> Result<()> { + let lokad_id_writer = + LokadIdHistoryWriter::new(&self.db, LokadIdGroup)?; + let tx_reader = TxReader::new(&self.db)?; + let metadata_writer = MetadataWriter::new(&self.db)?; + + // First, wipe the LOKAD ID index + let mut batch = WriteBatch::default(); + lokad_id_writer.wipe(&mut batch); + self.db.write_batch(batch)?; + + for height in 0..=end_height { + if bridge.shutdown_requested() { + log!("Stopped reindexing LOKAD ID index\n"); + return Ok(()); + } + let block_index = ffi::get_block_ancestor(node_tip_index, height)?; + let block = self.load_chronik_block(bridge, block_index)?; + let first_tx_num = tx_reader + .first_tx_num_by_block(block.db_block.height)? + .unwrap(); + let index_txs = prepare_indexed_txs_cached( + &self.db, + first_tx_num, + &block.txs, + &mut self.mem_data.tx_num_cache, + PrepareUpdateMode::Add, + )?; + let hash = block.db_block.hash.clone(); + let mut batch = WriteBatch::default(); + lokad_id_writer.insert( + &mut batch, + &index_txs, + &(), + &mut GroupHistoryMemData::default(), + )?; + self.db.write_batch(batch)?; + if height % 100 == 0 { + log!( + "Synced Chronik LOKAD ID index up to block {hash} at \ + height {height}/{end_height} (-chroniklokadidindex=0 to \ + disable)\n" + ); + } + } + + let mut batch = WriteBatch::default(); + metadata_writer.update_is_lokad_id_index_enabled(&mut batch, true)?; + self.db.write_batch(batch)?; + + Ok(()) + } + /// Add transaction to the indexer's mempool. pub fn handle_tx_added_to_mempool( &mut self, mempool_tx: MempoolTx, ) -> Result<()> { let result = self.mempool.insert(&self.db, mempool_tx)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::AddedToMempool, &result.token_id_aux, ); Ok(()) } /// Remove tx from the indexer's mempool, e.g. by a conflicting tx, expiry /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { let result = self.mempool.remove(txid)?; self.subs.get_mut().handle_tx_event( &result.mempool_tx.tx, TxMsgType::RemovedFromMempool, &result.token_id_aux, ); Ok(()) } /// Add the block to the index. pub fn handle_block_connected( &mut self, block: ChronikBlock, ) -> Result<()> { let height = block.db_block.height; let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; + let lokad_id_history_writer = + LokadIdHistoryWriter::new(&self.db, LokadIdGroup)?; block_writer.insert(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.insert( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs_cached( &self.db, first_tx_num, &block.txs, &mut self.mem_data.tx_num_cache, PrepareUpdateMode::Add, )?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.insert( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.insert( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; + if self.is_lokad_id_index_enabled { + lokad_id_history_writer.insert( + &mut batch, + &index_txs, + &(), + &mut GroupHistoryMemData::default(), + )?; + } let token_id_aux; if self.is_token_index_enabled { let processed_token_batch = token_writer.insert(&mut batch, &index_txs)?; token_id_aux = TokenIdGroupAux::from_batch(&index_txs, &processed_token_batch); token_id_history_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.insert( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; } else { token_id_aux = TokenIdGroupAux::default(); } self.db.write_batch(batch)?; for tx in &block.block_txs.txs { self.mempool.remove_mined(&tx.txid)?; } merge::check_for_errors()?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Connected, hash: block.db_block.hash, height: block.db_block.height, }); subs.handle_block_tx_events( &block.txs, TxMsgType::Confirmed, &token_id_aux, ); Ok(()) } /// Remove the block from the index. pub fn handle_block_disconnected( &mut self, block: ChronikBlock, ) -> Result<()> { let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; let token_writer = TokenWriter::new(&self.db)?; let token_id_history_writer = TokenIdHistoryWriter::new(&self.db, TokenIdGroup)?; let token_id_utxo_writer = TokenIdUtxoWriter::new(&self.db, TokenIdGroup)?; + let lokad_id_history_writer = + LokadIdHistoryWriter::new(&self.db, LokadIdGroup)?; block_writer.delete(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.delete( &mut batch, &block.block_txs, &mut self.mem_data.txs, )?; let index_txs = prepare_indexed_txs_cached( &self.db, first_tx_num, &block.txs, &mut self.mem_data.tx_num_cache, PrepareUpdateMode::Delete, )?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_history, )?; script_utxo_writer.delete( &mut batch, &index_txs, &(), &mut self.mem_data.script_utxos, )?; spent_by_writer.delete( &mut batch, &index_txs, &mut self.mem_data.spent_by, )?; + if self.is_lokad_id_index_enabled { + // Skip delete if rewinding indexer; will be wiped later anyway + if !self.needs_lokad_id_reindex { + lokad_id_history_writer.delete( + &mut batch, + &index_txs, + &(), + &mut GroupHistoryMemData::default(), + )?; + } + } if self.is_token_index_enabled { let token_id_aux = TokenIdGroupAux::from_db(&index_txs, &self.db)?; token_id_history_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupHistoryMemData::default(), )?; token_id_utxo_writer.delete( &mut batch, &index_txs, &token_id_aux, &mut GroupUtxoMemData::default(), )?; token_writer.delete(&mut batch, &index_txs)?; } self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Disconnected, hash: block.db_block.hash, height: block.db_block.height, }); Ok(()) } /// Block finalized with Avalanche. pub fn handle_block_finalized( &mut self, block: ChronikBlock, ) -> Result<()> { self.avalanche.finalize_block(block.db_block.height)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Finalized, hash: block.db_block.hash, height: block.db_block.height, }); let tx_reader = TxReader::new(&self.db)?; let first_tx_num = tx_reader .first_tx_num_by_block(block.db_block.height)? .unwrap(); let index_txs = prepare_indexed_txs_cached( &self.db, first_tx_num, &block.txs, &mut self.mem_data.tx_num_cache, PrepareUpdateMode::Read, )?; let token_id_aux = if self.is_token_index_enabled { TokenIdGroupAux::from_db(&index_txs, &self.db)? } else { TokenIdGroupAux::default() }; subs.handle_block_tx_events( &block.txs, TxMsgType::Finalized, &token_id_aux, ); Ok(()) } /// Return [`QueryBroadcast`] to broadcast tx to the network. pub fn broadcast<'a>(&'a self, node: &'a Node) -> QueryBroadcast<'a> { QueryBroadcast { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryBlocks`] to read blocks from the DB. pub fn blocks<'a>(&'a self, node: &'a Node) -> QueryBlocks<'a> { QueryBlocks { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryTxs`] to return txs from mempool/DB. pub fn txs<'a>(&'a self, node: &'a Node) -> QueryTxs<'a> { QueryTxs { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupHistory`] for scripts to query the tx history of /// scripts. pub fn script_history<'a>( &'a self, node: &'a Node, ) -> Result> { Ok(QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.script_history(), group: self.script_group.clone(), node, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupUtxos`] for scripts to query the utxos of scripts. pub fn script_utxos( &self, ) -> Result> { Ok(QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.script_utxos(), group: self.script_group.clone(), utxo_mapper: UtxoProtobufValue, is_token_index_enabled: self.is_token_index_enabled, }) } /// Return [`QueryGroupHistory`] for token IDs to query the tx history of /// token IDs. pub fn token_id_history<'a>( &'a self, node: &'a Node, ) -> QueryGroupHistory<'a, TokenIdGroup> { QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.token_id_history(), group: TokenIdGroup, node, is_token_index_enabled: self.is_token_index_enabled, } } /// Return [`QueryGroupUtxos`] for token IDs to query the utxos of token IDs pub fn token_id_utxos( &self, ) -> QueryGroupUtxos<'_, TokenIdGroup, UtxoProtobufOutput> { QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.token_id_utxos(), group: TokenIdGroup, utxo_mapper: UtxoProtobufOutput, is_token_index_enabled: self.is_token_index_enabled, } } + /// Return [`QueryGroupHistory`] for LOKAD IDs to query the tx history of + /// LOKAD IDs. + pub fn lokad_id_history<'a>( + &'a self, + node: &'a Node, + ) -> QueryGroupHistory<'a, LokadIdGroup> { + QueryGroupHistory { + db: &self.db, + avalanche: &self.avalanche, + mempool: &self.mempool, + mempool_history: self.mempool.lokad_id_history(), + group: LokadIdGroup, + node, + is_token_index_enabled: self.is_token_index_enabled, + } + } + /// Subscribers, behind read/write lock pub fn subs(&self) -> &RwLock { &self.subs } /// Build a ChronikBlock from a ffi::Block. pub fn make_chronik_block(&self, block: ffi::Block) -> ChronikBlock { let db_block = DbBlock { hash: BlockHash::from(block.hash), prev_hash: BlockHash::from(block.prev_hash), height: block.height, n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, }; let block_txs = BlockTxs { block_height: block.height, txs: block .txs .iter() .map(|tx| { let txid = TxId::from(tx.tx.txid); TxEntry { txid, data_pos: tx.data_pos, undo_pos: tx.undo_pos, time_first_seen: match self.mempool.tx(&txid) { Some(tx) => tx.time_first_seen, None => 0, }, is_coinbase: tx.undo_pos == 0, } }) .collect(), }; let txs = block .txs .into_iter() .map(|block_tx| Tx::from(block_tx.tx)) .collect::>(); ChronikBlock { db_block, block_txs, size: block.size, txs, } } /// Load a ChronikBlock from the node given the CBlockIndex. pub fn load_chronik_block( &self, bridge: &ffi::ChronikBridge, block_index: &ffi::CBlockIndex, ) -> Result { let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let ffi_block_undo = bridge.load_block_undo(block_index)?; let ffi_block_undo = expect_unique_ptr("load_block_undo", &ffi_block_undo); let block = ffi::bridge_block(ffi_block, ffi_block_undo, block_index)?; Ok(self.make_chronik_block(block)) } } fn verify_schema_version(db: &Db) -> Result { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; let schema_version = match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { Some(schema_version) => { assert!(!is_empty, "Empty DB can't have a schema version"); if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } if schema_version < LAST_UPGRADABLE_VERSION { return Err(DatabaseOutdated(schema_version).into()); } log!( "Chronik has version {CURRENT_INDEXER_VERSION}, DB has \ version {schema_version}\n" ); schema_version } None => { if !is_empty { return Err(MissingSchemaVersion.into()); } let mut batch = WriteBatch::default(); metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; log!( "Chronik has version {CURRENT_INDEXER_VERSION}, initialized \ DB with that version\n" ); CURRENT_INDEXER_VERSION } }; Ok(schema_version) } fn verify_enable_token_index(db: &Db, enable_token_index: bool) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let token_writer = TokenWriter::new(db)?; let is_empty = db.is_db_empty()?; let is_token_index_enabled = metadata_reader.is_token_index_enabled()?; let mut batch = WriteBatch::default(); if !is_empty { // Cannot enable token index if not already previously enabled if enable_token_index && !is_token_index_enabled { return Err(CannotEnableTokenIndex.into()); } // Wipe token index if previously enabled and now disabled if !enable_token_index && is_token_index_enabled { log!( "Warning: Wiping existing token index, since \ -chroniktokenindex=0\n" ); log!("You will need to -reindex/-chronikreindex to restore\n"); token_writer.wipe(&mut batch); } } metadata_writer .update_is_token_index_enabled(&mut batch, enable_token_index)?; db.write_batch(batch)?; Ok(()) } fn upgrade_db_if_needed( db: &Db, schema_version: u64, enable_token_index: bool, ) -> Result<()> { // DB has version 10, upgrade to 11 if schema_version == 10 { upgrade_10_to_11(db, enable_token_index)?; } Ok(()) } fn upgrade_10_to_11(db: &Db, enable_token_index: bool) -> Result<()> { log!("Upgrading Chronik DB from version 10 to 11...\n"); let script_utxo_writer = ScriptUtxoWriter::new(db, ScriptGroup)?; script_utxo_writer.upgrade_10_to_11()?; if enable_token_index { let token_id_utxo_writer = TokenIdUtxoWriter::new(db, TokenIdGroup)?; token_id_utxo_writer.upgrade_10_to_11()?; } let mut batch = WriteBatch::default(); let metadata_writer = MetadataWriter::new(db)?; metadata_writer.update_schema_version(&mut batch, 11)?; db.write_batch(batch)?; log!("Successfully upgraded Chronik DB from version 10 to 11.\n"); Ok(()) } +/// Verify user config and DB are in sync. Returns whether the LOKAD ID index +/// needs to be reindexed. +fn verify_lokad_id_index( + db: &Db, + is_db_empty: bool, + enable: bool, +) -> Result { + let metadata_reader = MetadataReader::new(db)?; + let metadata_writer = MetadataWriter::new(db)?; + let lokad_id_writer = LokadIdHistoryWriter::new(db, LokadIdGroup)?; + let is_enabled_db = metadata_reader + .is_lokad_id_index_enabled()? + .unwrap_or(false); + let mut batch = WriteBatch::default(); + if !is_db_empty { + if enable && !is_enabled_db { + // DB non-empty without LOKAD ID index, but index enabled -> reindex + return Ok(true); + } + if !enable && is_enabled_db { + // Otherwise, the LOKAD ID index has been enabled and now + // specified to be disabled, so we wipe the index. + log!( + "Warning: Wiping existing LOKAD ID index, since \ + -chroniklokadidindex=0\n" + ); + log!( + "You will need to specify -chroniklokadidindex=1 to restore\n" + ); + lokad_id_writer.wipe(&mut batch); + } + } + metadata_writer.update_is_lokad_id_index_enabled(&mut batch, enable)?; + db.write_batch(batch)?; + Ok(false) +} + impl Node { /// If `result` is [`Err`], logs and aborts the node. pub fn ok_or_abort(&self, func_name: &str, result: Result) { if let Err(report) = result { log_chronik!("{report:?}\n"); self.bridge.abort_node( &format!("ERROR Chronik in {func_name}"), &format!("{report:#}"), ); } } } impl std::fmt::Debug for ChronikIndexerParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikIndexerParams") .field("datadir_net", &self.datadir_net) .field("wipe_db", &self.wipe_db) .field("fn_compress_script", &"..") .finish() } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use chronik_db::{ db::{Db, WriteBatch, CF_META}, io::{BlockReader, BlockTxs, DbBlock, MetadataReader, MetadataWriter}, }; use pretty_assertions::assert_eq; use crate::indexer::{ ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams, CURRENT_INDEXER_VERSION, }; #[test] fn test_indexer() -> Result<()> { let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); let params = ChronikIndexerParams { datadir_net: datadir_net.clone(), wipe_db: false, enable_token_index: false, + enable_lokad_id_index: false, enable_perf_stats: false, tx_num_cache: Default::default(), }; // regtest folder doesn't exist yet -> error assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CreateDirFailed(datadir_net.join("indexes")), ); // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; let mut indexer = ChronikIndexer::setup(params.clone())?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); // DB is empty assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); let block = ChronikBlock { db_block: DbBlock { hash: BlockHash::from([4; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1deadbef, timestamp: 1234567890, file_num: 0, data_pos: 1337, }, block_txs: BlockTxs { block_height: 0, txs: vec![], }, size: 285, txs: vec![], }; // Add block indexer.handle_block_connected(block.clone())?; assert_eq!( BlockReader::new(&indexer.db)?.by_height(0)?, Some(block.db_block.clone()) ); // Remove block again indexer.handle_block_disconnected(block.clone())?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); // Add block then wipe, block not there indexer.handle_block_connected(block)?; std::mem::drop(indexer); let indexer = ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..params })?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); Ok(()) } #[test] fn test_schema_version() -> Result<()> { let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { datadir_net: dir.path().to_path_buf(), wipe_db: false, enable_token_index: false, + enable_lokad_id_index: false, enable_perf_stats: false, tx_num_cache: Default::default(), }; // Setting up DB first time sets the schema version ChronikIndexer::setup(params.clone())?; { let db = Db::open(&chronik_path)?; assert_eq!( MetadataReader::new(&db)?.schema_version()?, Some(CURRENT_INDEXER_VERSION) ); } // Opening DB again works fine ChronikIndexer::setup(params.clone())?; // Override DB schema version to 0 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?; db.write_batch(batch)?; } // -> DB too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::DatabaseOutdated(0), ); // Override DB schema version to CURRENT_INDEXER_VERSION + 1 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version( &mut batch, CURRENT_INDEXER_VERSION + 1, )?; db.write_batch(batch)?; } // -> Chronik too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), ); // Corrupt schema version { let db = Db::open(&chronik_path)?; let cf_meta = db.cf(CF_META)?; let mut batch = WriteBatch::default(); batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]); db.write_batch(batch)?; } assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CorruptedSchemaVersion, ); // New db path, but has existing data let new_dir = dir.path().join("new"); let new_chronik_path = new_dir.join("indexes").join("chronik"); std::fs::create_dir_all(&new_chronik_path)?; let new_params = ChronikIndexerParams { datadir_net: new_dir, wipe_db: false, ..params }; { // new db with obscure field in meta let db = Db::open(&new_chronik_path)?; let mut batch = WriteBatch::default(); batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR"); db.write_batch(batch)?; } // Error: non-empty DB without schema version assert_eq!( ChronikIndexer::setup(new_params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..new_params })?; Ok(()) } } diff --git a/chronik/chronik-indexer/src/subs.rs b/chronik/chronik-indexer/src/subs.rs index cc5337a9d..9bbe48bb1 100644 --- a/chronik/chronik-indexer/src/subs.rs +++ b/chronik/chronik-indexer/src/subs.rs @@ -1,111 +1,123 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`Subs`]. use bitcoinsuite_core::{block::BlockHash, tx::Tx}; use chronik_db::{ - groups::{ScriptGroup, TokenIdGroup, TokenIdGroupAux}, + groups::{LokadIdGroup, ScriptGroup, TokenIdGroup, TokenIdGroupAux}, io::BlockHeight, }; use chronik_util::log; use tokio::sync::broadcast; use crate::subs_group::{SubsGroup, TxMsgType}; /// Block update message. #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockMsg { /// What happened with the block. pub msg_type: BlockMsgType, /// Hash of the block which we got an update for. pub hash: BlockHash, /// Height of the block which we got an update for. pub height: BlockHeight, } /// Type of message for the block. #[derive(Debug, Clone, PartialEq, Eq)] pub enum BlockMsgType { /// Block connected to the blockchain Connected, /// Block disconnected from the blockchain Disconnected, /// Block has been finalized by Avalanche Finalized, } const BLOCK_CHANNEL_CAPACITY: usize = 16; /// Struct for managing subscriptions to e.g. block updates. #[derive(Debug)] pub struct Subs { subs_block: broadcast::Sender, subs_script: SubsGroup, subs_token_id: SubsGroup, + subs_lokad_id: SubsGroup, } impl Subs { /// Create a new [`Subs`]. pub fn new(script_group: ScriptGroup) -> Self { Subs { subs_block: broadcast::channel(BLOCK_CHANNEL_CAPACITY).0, subs_script: SubsGroup::new(script_group), subs_token_id: SubsGroup::new(TokenIdGroup), + subs_lokad_id: SubsGroup::new(LokadIdGroup), } } /// Add a subscriber to block messages. pub fn sub_to_block_msgs(&self) -> broadcast::Receiver { self.subs_block.subscribe() } /// Mutable reference to the script subscribers. pub fn subs_script_mut(&mut self) -> &mut SubsGroup { &mut self.subs_script } /// Mutable reference to the token ID subscribers. pub fn subs_token_id_mut(&mut self) -> &mut SubsGroup { &mut self.subs_token_id } + /// Mutable reference to the token ID subscribers. + pub fn subs_lokad_id_mut(&mut self) -> &mut SubsGroup { + &mut self.subs_lokad_id + } + /// Send out updates to subscribers for this tx and msg_type. pub fn handle_tx_event( &mut self, tx: &Tx, msg_type: TxMsgType, token_id_aux: &TokenIdGroupAux, ) { self.subs_script.handle_tx_event(tx, &(), msg_type); self.subs_token_id .handle_tx_event(tx, token_id_aux, msg_type); + self.subs_lokad_id.handle_tx_event(tx, &(), msg_type); } /// Send out msg_type updates for the txs of the block to subscribers. pub fn handle_block_tx_events( &mut self, txs: &[Tx], msg_type: TxMsgType, token_id_aux: &TokenIdGroupAux, ) { - if self.subs_script.is_empty() && self.subs_token_id.is_empty() { + if self.subs_script.is_empty() + && self.subs_token_id.is_empty() + && self.subs_lokad_id.is_empty() + { // Short-circuit if no subscriptions return; } for tx in txs { self.subs_script.handle_tx_event(tx, &(), msg_type); self.subs_token_id .handle_tx_event(tx, token_id_aux, msg_type); + self.subs_lokad_id.handle_tx_event(tx, &(), msg_type); } } pub(crate) fn broadcast_block_msg(&self, msg: BlockMsg) { if self.subs_block.receiver_count() > 0 { if let Err(err) = self.subs_block.send(msg) { log!("Unexpected send error: {}\n", err); } } } } diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs index 4ab6694e7..8b2bd48b7 100644 --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -1,294 +1,295 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Rust side of the bridge; these structs and functions are exposed to C++. use std::{ net::{AddrParseError, IpAddr, SocketAddr}, sync::Arc, time::Duration, }; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; use chronik_db::{index_tx::TxNumCacheSettings, mem::MempoolTx}; use chronik_http::server::{ ChronikServer, ChronikServerParams, ChronikSettings, }; use chronik_indexer::{ indexer::{ChronikIndexer, ChronikIndexerParams, Node}, pause::Pause, }; use chronik_plugin::context::PluginContext; use chronik_util::{log, log_chronik, mount_loggers, Loggers}; use thiserror::Error; use tokio::sync::RwLock; use crate::ffi::{self, StartChronikValidationInterface}; /// Errors for [`Chronik`] and [`setup_chronik`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikError { /// Chronik host address failed to parse #[error("Invalid Chronik host address {0:?}: {1}")] InvalidChronikHost(String, AddrParseError), } use self::ChronikError::*; /// Setup the Chronik bridge. Returns a ChronikIndexer object. pub fn setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node: &ffi::NodeContext, ) -> bool { match try_setup_chronik(params, config, node) { Ok(()) => true, Err(report) => { log_chronik!("{report:?}\n"); init_error(&report.to_string()) } } } fn try_setup_chronik( params: ffi::SetupParams, config: &ffi::Config, node_context: &ffi::NodeContext, ) -> Result<()> { abc_rust_error::install(); mount_loggers(Loggers { log: chronik_bridge::ffi::log_print, log_chronik: chronik_bridge::ffi::log_print_chronik, }); let hosts = params .hosts .into_iter() .map(|host| parse_socket_addr(host, params.default_port)) .collect::>>()?; PluginContext::setup()?; log!("Starting Chronik bound to {:?}\n", hosts); let bridge = chronik_bridge::ffi::make_bridge(config, node_context); let bridge_ref = expect_unique_ptr("make_bridge", &bridge); let (pause, pause_notify) = Pause::new_pair(params.is_pause_allowed); let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { datadir_net: params.datadir_net.into(), wipe_db: params.wipe_db, enable_token_index: params.enable_token_index, + enable_lokad_id_index: params.enable_lokad_id_index, enable_perf_stats: params.enable_perf_stats, tx_num_cache: TxNumCacheSettings { bucket_size: params.tx_num_cache.bucket_size, num_buckets: params.tx_num_cache.num_buckets, }, })?; indexer.resync_indexer(bridge_ref)?; if bridge.shutdown_requested() { // Don't setup Chronik if the user requested shutdown during resync return Ok(()); } let indexer = Arc::new(RwLock::new(indexer)); let node = Arc::new(Node { bridge }); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; let server = runtime.block_on({ let indexer = Arc::clone(&indexer); let node = Arc::clone(&node); async move { // try_bind requires a Runtime ChronikServer::setup(ChronikServerParams { hosts, indexer, node, pause_notify: Arc::new(pause_notify), settings: ChronikSettings { ws_ping_interval: Duration::from_secs( params.ws_ping_interval_secs, ), }, }) } })?; runtime.spawn({ let node = Arc::clone(&node); async move { node.ok_or_abort("ChronikServer::serve", server.serve().await); } }); let chronik = Box::new(Chronik { node: Arc::clone(&node), indexer, pause, runtime, }); StartChronikValidationInterface(node_context, chronik); Ok(()) } fn parse_socket_addr(host: String, default_port: u16) -> Result { if let Ok(addr) = host.parse::() { return Ok(addr); } let ip_addr = host .parse::() .map_err(|err| InvalidChronikHost(host, err))?; Ok(SocketAddr::new(ip_addr, default_port)) } /// Contains all db, runtime, tpc, etc. handles needed by Chronik. /// This makes it so when this struct is dropped, all handles are relased /// cleanly. pub struct Chronik { node: Arc, indexer: Arc>, pause: Pause, // Having this here ensures HTTP server, outstanding requests etc. will get // stopped when `Chronik` is dropped. runtime: tokio::runtime::Runtime, } impl Chronik { /// Tx added to the bitcoind mempool pub fn handle_tx_added_to_mempool( &self, ptx: &ffi::CTransaction, spent_coins: &cxx::CxxVector, time_first_seen: i64, ) { self.block_if_paused(); self.node.ok_or_abort( "handle_tx_added_to_mempool", self.add_tx_to_mempool(ptx, spent_coins, time_first_seen), ); } /// Tx removed from the bitcoind mempool pub fn handle_tx_removed_from_mempool(&self, txid: [u8; 32]) { self.block_if_paused(); let mut indexer = self.indexer.blocking_write(); let txid = TxId::from(txid); self.node.ok_or_abort( "handle_tx_removed_from_mempool", indexer.handle_tx_removed_from_mempool(txid), ); log_chronik!("Chronik: transaction {} removed from mempool\n", txid); } /// Block connected to the longest chain pub fn handle_block_connected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { self.block_if_paused(); self.node.ok_or_abort( "handle_block_connected", self.connect_block(block, bindex), ); } /// Block disconnected from the longest chain pub fn handle_block_disconnected( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) { self.block_if_paused(); self.node.ok_or_abort( "handle_block_disconnected", self.disconnect_block(block, bindex), ); } /// Block finalized with Avalanche pub fn handle_block_finalized(&self, bindex: &ffi::CBlockIndex) { self.block_if_paused(); self.node .ok_or_abort("handle_block_finalized", self.finalize_block(bindex)); } fn add_tx_to_mempool( &self, ptx: &ffi::CTransaction, spent_coins: &cxx::CxxVector, time_first_seen: i64, ) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let tx = chronik_bridge::ffi::bridge_tx(ptx, spent_coins)?; let txid = TxId::from(tx.txid); indexer.handle_tx_added_to_mempool(MempoolTx { tx: Tx::from(tx), time_first_seen, })?; log_chronik!("Chronik: transaction {} added to mempool\n", txid); Ok(()) } fn connect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let block_undo = self.node.bridge.load_block_undo(bindex)?; let block = chronik_bridge::ffi::bridge_block(block, &block_undo, bindex)?; let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block); let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_connected(block)?; log_chronik!( "Chronik: block {} connected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn disconnect_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result<()> { let block_undo = self.node.bridge.load_block_undo(bindex)?; let block = chronik_bridge::ffi::bridge_block(block, &block_undo, bindex)?; let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block); let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_disconnected(block)?; log_chronik!( "Chronik: block {} disconnected with {} txs\n", block_hash, num_txs, ); Ok(()) } fn finalize_block(&self, bindex: &ffi::CBlockIndex) -> Result<()> { let mut indexer = self.indexer.blocking_write(); let block = indexer.load_chronik_block(&self.node.bridge, bindex)?; let block_hash = block.db_block.hash.clone(); let num_txs = block.block_txs.txs.len(); indexer.handle_block_finalized(block)?; log_chronik!( "Chronik: block {} finalized with {} txs\n", block_hash, num_txs, ); Ok(()) } fn block_if_paused(&self) { self.pause.block_if_paused(&self.runtime); } } impl std::fmt::Debug for Chronik { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "Chronik {{ .. }}") } } diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs index c86cd7089..1c12477b3 100644 --- a/chronik/chronik-lib/src/ffi.rs +++ b/chronik/chronik-lib/src/ffi.rs @@ -1,116 +1,118 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing the cxx definitions for the bridge from Rust to C++. pub use self::ffi_inner::*; use crate::bridge::{setup_chronik, Chronik}; #[allow(unsafe_code)] #[cxx::bridge(namespace = "chronik_bridge")] mod ffi_inner { /// Params for setting up Chronik #[derive(Debug)] pub struct SetupParams { /// Where the data of the blockchain is stored, dependent on network /// (mainnet, testnet, regtest) pub datadir_net: String, /// Host addresses where the Chronik HTTP endpoint will be served pub hosts: Vec, /// Default port for `hosts` if only an IP address is given pub default_port: u16, /// Whether to clear the DB before proceeding, e.g. when reindexing pub wipe_db: bool, /// Whether Chronik should index SLP/ALP token transactions pub enable_token_index: bool, + /// Whether Chronik should index transactions by LOKAD ID + pub enable_lokad_id_index: bool, /// Whether pausing Chronik indexing is allowed pub is_pause_allowed: bool, /// Whether to output Chronik performance statistics into a perf/ /// folder pub enable_perf_stats: bool, /// Duration between WebSocket pings initiated by Chronik. pub ws_ping_interval_secs: u64, /// Tuning settings for the TxNumCache. pub tx_num_cache: TxNumCacheSettings, } /// Settings for tuning the TxNumCache. #[derive(Debug)] pub struct TxNumCacheSettings { /// How many buckets are on the belt pub num_buckets: usize, /// How many txs are cached in each bucket pub bucket_size: usize, } extern "Rust" { type Chronik; fn setup_chronik( params: SetupParams, config: &Config, node: &NodeContext, ) -> bool; fn handle_tx_added_to_mempool( &self, ptx: &CTransaction, spent_coins: &CxxVector, time_first_seen: i64, ); fn handle_tx_removed_from_mempool(&self, txid: [u8; 32]); fn handle_block_connected(&self, block: &CBlock, bindex: &CBlockIndex); fn handle_block_disconnected( &self, block: &CBlock, bindex: &CBlockIndex, ); fn handle_block_finalized(&self, bindex: &CBlockIndex); } unsafe extern "C++" { include!("blockindex.h"); include!("chronik-cpp/chronik_validationinterface.h"); include!("coins.h"); include!("config.h"); include!("node/context.h"); include!("primitives/block.h"); include!("primitives/transaction.h"); /// CBlockIndex from blockindex.h #[namespace = ""] type CBlockIndex = chronik_bridge::ffi::CBlockIndex; /// ::CBlock from primitives/block.h #[namespace = ""] type CBlock = chronik_bridge::ffi::CBlock; /// ::Coin from coins.h (renamed to CCoin to prevent a name clash) #[namespace = ""] #[cxx_name = "Coin"] type CCoin = chronik_bridge::ffi::CCoin; /// ::Config from config.h #[namespace = ""] type Config = chronik_bridge::ffi::Config; /// ::CTransaction from primitives/transaction.h #[namespace = ""] type CTransaction = chronik_bridge::ffi::CTransaction; /// NodeContext from node/context.h #[namespace = "node"] type NodeContext = chronik_bridge::ffi::NodeContext; /// Bridge to bitcoind to access the node type ChronikBridge = chronik_bridge::ffi::ChronikBridge; /// Register the Chronik instance as CValidationInterface to receive /// chain updates from the node. #[namespace = "chronik"] fn StartChronikValidationInterface( node: &NodeContext, chronik: Box, ); } } diff --git a/chronik/chronik-proto/proto/chronik.proto b/chronik/chronik-proto/proto/chronik.proto index 550c55670..717c4edfa 100644 --- a/chronik/chronik-proto/proto/chronik.proto +++ b/chronik/chronik-proto/proto/chronik.proto @@ -1,498 +1,509 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. syntax = "proto3"; package chronik; // Block on the blockchain message Block { // Info about the block BlockInfo block_info = 1; } // Range of blocks message Blocks { // Queried blocks repeated BlockInfo blocks = 1; } // Info about the state of the blockchain. message BlockchainInfo { // Hash (little-endian) of the current tip bytes tip_hash = 1; // Height of the current tip (genesis has height = 0) int32 tip_height = 2; } // Info about the chronik software this server is running message ChronikInfo { // chronik server version from chronik-http/Cargo.toml string version = 1; } // Info about a block message BlockInfo { // Hash (little-endian) bytes hash = 1; // Hash of the previous block (little-endian) bytes prev_hash = 2; // Height in the chain int32 height = 3; // nBits field encoding the target uint32 n_bits = 4; // Timestamp field of the block int64 timestamp = 5; // Whether the block has been finalized by Avalanche bool is_final = 14; // Block size of this block in bytes (including headers etc.) uint64 block_size = 6; // Number of txs in this block uint64 num_txs = 7; // Total number of tx inputs in block (including coinbase) uint64 num_inputs = 8; // Total number of tx output in block (including coinbase) uint64 num_outputs = 9; // Total number of satoshis spent by tx inputs int64 sum_input_sats = 10; // Block reward for this block int64 sum_coinbase_output_sats = 11; // Total number of satoshis in non-coinbase tx outputs int64 sum_normal_output_sats = 12; // Total number of satoshis burned using OP_RETURN int64 sum_burned_sats = 13; } // Details about a transaction message Tx { // TxId (little-endian) of the tx bytes txid = 1; // nVersion int32 version = 2; // Inputs of the tx (aka. `vin`) repeated TxInput inputs = 3; // Outputs of the tx (aka. `vout`) repeated TxOutput outputs = 4; // nLockTime uint32 lock_time = 5; // Which block this tx is in, or None, if in the mempool BlockMetadata block = 8; // Time this tx has first been added to the mempool, or 0 if unknown int64 time_first_seen = 9; // Serialized size of the tx uint32 size = 11; // Whether this tx is a coinbase tx bool is_coinbase = 12; // Tokens involved in this txs repeated TokenEntry token_entries = 13; // Failed parsing attempts of this tx repeated TokenFailedParsing token_failed_parsings = 14; // Token status, i.e. whether this tx has any tokens or unintentional token burns // or something unexpected, like failed parsings etc. TokenStatus token_status = 15; } // UTXO of a script. message ScriptUtxo { // txid and out_idx of the unspent output. OutPoint outpoint = 1; // Block height of the UTXO, or -1 if in mempool. int32 block_height = 2; // Whether the UTXO has been created in a coinbase tx. bool is_coinbase = 3; // Value of the output, in satoshis. int64 value = 5; // Whether the UTXO has been finalized by Avalanche. bool is_final = 10; // Token value attached to this UTXO Token token = 11; } // UTXO, but with a script attached. message Utxo { // txid and out_idx of the unspent output. OutPoint outpoint = 1; // Block height of the UTXO, or -1 if in mempool. int32 block_height = 2; // Whether the UTXO has been created in a coinbase tx. bool is_coinbase = 3; // Value of the output, in satoshis. int64 value = 4; // Bytecode of the script of the output bytes script = 5; // Whether the UTXO has been finalized by Avalanche. bool is_final = 6; // Token value attached to this UTXO Token token = 7; } // COutPoint, points to a coin being spent by an input. message OutPoint { // TxId of the tx of the output being spent. bytes txid = 1; // Index of the output spent within the transaction. uint32 out_idx = 2; } // Points to an input spending a coin. message SpentBy { // TxId of the tx with the input. bytes txid = 1; // Index in the inputs of the tx. uint32 input_idx = 2; } // CTxIn, spends a coin. message TxInput { // Reference to the coin being spent. OutPoint prev_out = 1; // scriptSig, script unlocking the coin. bytes input_script = 2; // scriptPubKey, script of the output locking the coin. bytes output_script = 3; // value of the output being spent, in satoshis. int64 value = 4; // nSequence of the input. uint32 sequence_no = 5; // Token value attached to this input Token token = 8; } // CTxOut, creates a new coin. message TxOutput { // Value of the coin, in satoshis. int64 value = 1; // scriptPubKey, script locking the output. bytes output_script = 2; // Which tx and input spent this output, if any. SpentBy spent_by = 4; // Token value attached to this output Token token = 5; } // Data about a block which a Tx is in. message BlockMetadata { // Height of the block the tx is in. int32 height = 1; // Hash of the block the tx is in. bytes hash = 2; // nTime of the block the tx is in. int64 timestamp = 3; // Whether the block has been finalized by Avalanche. bool is_final = 4; } // Status of a token tx enum TokenStatus { // Tx involves no tokens whatsover, i.e. neither any burns nor any failed // parsing/coloring or any tokens being created / moved. TOKEN_STATUS_NON_TOKEN = 0; // Tx involves tokens but no unintentional burns or failed parsings/colorings TOKEN_STATUS_NORMAL = 1; // Tx involves tokens but contains unintentional burns or failed parsings/colorings TOKEN_STATUS_NOT_NORMAL = 2; } // ALP token type enum AlpTokenType { // Standard ALP token type ALP_TOKEN_TYPE_STANDARD = 0; } // SLP token type enum SlpTokenType { // Unknown "0" token type SLP_TOKEN_TYPE_NONE = 0; // SLP V1 token type SLP_TOKEN_TYPE_FUNGIBLE = 1; // SLP V2 mint vault token type SLP_TOKEN_TYPE_MINT_VAULT = 2; // NFT1 group token type SLP_TOKEN_TYPE_NFT1_GROUP = 0x81; // NFT1 child token type SLP_TOKEN_TYPE_NFT1_CHILD = 0x41; } // SLP/ALP token type message TokenType { // SLP/ALP token type oneof token_type { // SLP token type. Can have unknown values for unknown token types SlpTokenType slp = 1; // ALP token type. Can have unknown values for unknown token types AlpTokenType alp = 2; } } // SLP/ALP tx type enum TokenTxType { // No tx type, e.g. when input tokens are burned NONE = 0; // Unknown tx type, i.e. for unknown token types UNKNOWN = 1; // GENESIS tx GENESIS = 2; // SEND tx SEND = 3; // MINT tx MINT = 4; // BURN tx BURN = 5; } // Info about a token message TokenInfo { // Hex token_id (in big-endian, like usually displayed to users) of the token. // This is not `bytes` because SLP and ALP use different endiannnes, so to avoid this we use hex, which conventionally implies big-endian in a bitcoin context. string token_id = 1; // Token type of the token TokenType token_type = 2; // Info found in the token's GENESIS tx GenesisInfo genesis_info = 3; // Block of the GENESIS tx, if it's mined already BlockMetadata block = 4; // Time the GENESIS tx has first been seen by the indexer int64 time_first_seen = 5; } // Token involved in a transaction message TokenEntry { // Hex token_id (in big-endian, like usually displayed to users) of the token. // This is not `bytes` because SLP and ALP use different endiannes, so to avoid // this we use hex, which conventionally implies big-endian in a bitcoin context. string token_id = 1; // Token type of the token TokenType token_type = 2; // Tx type of the token; NONE if there's no section that introduced it (e.g. in an accidental burn) TokenTxType tx_type = 3; // For NFT1 Child tokens: group ID string group_token_id = 4; // Whether the validation rules have been violated for this section bool is_invalid = 5; // Human-readable error message of why this entry burned tokens string burn_summary = 6; // Human-readable error messages of why colorings failed repeated TokenFailedColoring failed_colorings = 7; // Number of actually burned tokens (as decimal integer string, e.g. "2000"). // This is because burns can exceed the 64-bit range of values and protobuf doesn't have a nice type to encode this. string actual_burn_amount = 8; // Burn amount the user explicitly opted into uint64 intentional_burn = 9; // Whether any mint batons have been burned of this token bool burns_mint_batons = 10; } // Genesis info found in GENESIS txs of tokens message GenesisInfo { // token_ticker of the token bytes token_ticker = 1; // token_name of the token bytes token_name = 2; // URL of the token bytes url = 3; // token_document_hash of the token (only on SLP) bytes hash = 4; // mint_vault_scripthash (only on SLP V2 Mint Vault) bytes mint_vault_scripthash = 5; // Arbitray payload data of the token (only on ALP) bytes data = 6; // auth_pubkey of the token (only on ALP) bytes auth_pubkey = 7; // decimals of the token, i.e. how many decimal places the token should be displayed with. uint32 decimals = 8; } // Token coloring an input or output message Token { // Hex token_id of the token, see `TokenInfo` for details string token_id = 1; // Token type of the token TokenType token_type = 2; // Index into `token_entries` for `Tx`. -1 for UTXOs int32 entry_idx = 3; // Base token amount of the input/output uint64 amount = 4; // Whether the token is a mint baton bool is_mint_baton = 5; } // A report of a failed parsing attempt of SLP/ALP. // This should always indicate something went wrong when building the tx. message TokenFailedParsing { // For ALP, the index of the pushdata in the OP_RETURN that failed parsing. // -1 if the whole OP_RETURN failed, e.g. for SLP or eMPP int32 pushdata_idx = 1; // The bytes that failed parsing, useful for debugging bytes bytes = 2; // Human-readable message of what went wrong string error = 3; } // A report of a failed coloring attempt of SLP/ALP. // This should always indicate something went wrong when building the tx. message TokenFailedColoring { // For ALP, the index of the pushdata in the OP_RETURN that failed parsing. int32 pushdata_idx = 1; // Human-readable message of what went wrong string error = 3; } // Page with txs message TxHistoryPage { // Txs of the page repeated Tx txs = 1; // How many pages there are total uint32 num_pages = 2; // How many txs there are total uint32 num_txs = 3; } // List of UTXOs of a script message ScriptUtxos { // The serialized script of the UTXOs bytes script = 1; // UTXOs of the script. repeated ScriptUtxo utxos = 2; } // List of UTXOs message Utxos { // UTXOs repeated Utxo utxos = 1; } // Broadcast a single tx message BroadcastTxRequest { // Serialized tx bytes raw_tx = 1; // Whether to skip token checks and broadcast even if tokens are unintentionally burned bool skip_token_checks = 2; } // Response of broadcasting the tx message BroadcastTxResponse { // TxId of the broadcast tx bytes txid = 1; } // Broadcast multiple txs. If one of the txs fails token validation, the entire batch will not be broadcast. message BroadcastTxsRequest { // Serialized txs. repeated bytes raw_txs = 1; // Whether to skip token checks and broadcast even if tokens are unintentionally burned bool skip_token_checks = 2; } // Response of broadcasting txs message BroadcastTxsResponse { // TxIds of the broadcast txs repeated bytes txids = 1; } // Raw serialized tx. message RawTx { // Bytes of the serialized tx. bytes raw_tx = 1; } // Subscription to WebSocket updates. message WsSub { // Set this to `true` to unsubscribe from the event. bool is_unsub = 1; // What kind of updates to subscribe to. oneof sub_type { // Subscription to block updates WsSubBlocks blocks = 2; // Subscription to a script WsSubScript script = 3; // Subscription to a token ID WsSubTokenId token_id = 4; + // Subscription to a lokad ID + WsSubLokadId lokad_id = 5; } } // Subscription to blocks. They will be sent any time a block got connected, // disconnected or finalized. message WsSubBlocks {} // Subscription to a script. They will be sent every time a tx spending the // given script or sending to the given script has been added to/removed from // the mempool, or confirmed in a block. message WsSubScript { // Script type to subscribe to ("p2pkh", "p2sh", "p2pk", "other"). string script_type = 1; // Payload for the given script type: // - 20-byte hash for "p2pkh" and "p2sh" // - 33-byte or 65-byte pubkey for "p2pk" // - Serialized script for "other" bytes payload = 2; } // Subscription to a token ID. They will be sent every time a tx spending or // sending tokens with the token ID. message WsSubTokenId { // Hex token ID to subscribe to. string token_id = 1; } +// Subscription to a LOKAD ID. They will be sent every time a tx matches the given LOKAD ID in one of the following ways: +// - `OP_RETURN ...`: The first output has an OP_RETURN with the given LOKAD ID as first pushop +// - `OP_RETURN OP_RESERVED "..." "..." ...`: The first output has an eMPP encoded OP_RETURN, and one (or more) of the pushops has the LOKAD ID as prefix. +// - ` ...`: An input's scriptSig has the given LOKAD ID as the first pushop +message WsSubLokadId { + // 4-byte LOKAD ID. + bytes lokad_id = 1; +} + // Message coming from the WebSocket message WsMsg { // Kind of message oneof msg_type { // Error, e.g. when a bad message has been sent into the WebSocket. Error error = 1; // Block got connected, disconnected, finalized, etc. MsgBlock block = 2; // Tx got added to/removed from the mempool, or confirmed in a block. MsgTx tx = 3; } } // Block got connected, disconnected, finalized, etc. message MsgBlock { // What happened to the block BlockMsgType msg_type = 1; // Hash of the block (little-endian) bytes block_hash = 2; // Height of the block int32 block_height = 3; } // Type of message for the block enum BlockMsgType { // Block connected to the blockchain BLK_CONNECTED = 0; // Block disconnected from the blockchain BLK_DISCONNECTED = 1; // Block has been finalized by Avalanche BLK_FINALIZED = 2; } // Tx got added to/removed from mempool, or confirmed in a block, etc. message MsgTx { // What happened to the tx TxMsgType msg_type = 1; // Txid of the tx (little-endian) bytes txid = 2; } // Type of message for a tx enum TxMsgType { // Tx added to the mempool TX_ADDED_TO_MEMPOOL = 0; // Tx removed from the mempool TX_REMOVED_FROM_MEMPOOL = 1; // Tx confirmed in a block TX_CONFIRMED = 2; // Tx finalized by Avalanche TX_FINALIZED = 3; } // Empty msg without any data message Empty {} // Error message returned from our APIs. message Error { // 2, as legacy chronik uses this for the message so we're still compatible. string msg = 2; } diff --git a/src/init.cpp b/src/init.cpp index d4c8427b3..99b5e0a90 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1,2924 +1,2927 @@ // Copyright (c) 2009-2010 Satoshi Nakamoto // Copyright (c) 2009-2018 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #if defined(HAVE_CONFIG_H) #include #endif #include #include #include #include #include #include #include // For AVALANCHE_LEGACY_PROOF_DEFAULT #include #include // For AVALANCHE_VOTE_STALE_* #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include