Changeset View
Changeset View
Standalone View
Standalone View
chronik/chronik-indexer/src/indexer.rs
Show All 13 Lines | |||||
use chronik_bridge::{ffi, util::expect_unique_ptr}; | use chronik_bridge::{ffi, util::expect_unique_ptr}; | ||||
use chronik_db::{ | use chronik_db::{ | ||||
db::{Db, WriteBatch}, | db::{Db, WriteBatch}, | ||||
groups::{ | groups::{ | ||||
FnCompressScript, ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, | FnCompressScript, ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, | ||||
}, | }, | ||||
index_tx::prepare_indexed_txs, | index_tx::prepare_indexed_txs, | ||||
io::{ | io::{ | ||||
BlockHeight, BlockReader, BlockTxs, BlockWriter, DbBlock, | BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, | ||||
MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, | DbBlock, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, | ||||
TxWriter, | TxEntry, TxWriter, | ||||
}, | }, | ||||
mem::{Mempool, MempoolTx}, | mem::{Mempool, MempoolTx}, | ||||
}; | }; | ||||
use chronik_util::{log, log_chronik}; | use chronik_util::{log, log_chronik}; | ||||
use thiserror::Error; | use thiserror::Error; | ||||
use tokio::sync::RwLock; | use tokio::sync::RwLock; | ||||
use crate::{ | use crate::{ | ||||
avalanche::Avalanche, | avalanche::Avalanche, | ||||
query::{QueryBlocks, QueryGroupHistory, QueryGroupUtxos, QueryTxs}, | query::{QueryBlocks, QueryGroupHistory, QueryGroupUtxos, QueryTxs}, | ||||
subs::{BlockMsg, BlockMsgType, Subs}, | subs::{BlockMsg, BlockMsgType, Subs}, | ||||
subs_group::TxMsgType, | subs_group::TxMsgType, | ||||
}; | }; | ||||
const CURRENT_INDEXER_VERSION: SchemaVersion = 6; | const CURRENT_INDEXER_VERSION: SchemaVersion = 7; | ||||
/// Params for setting up a [`ChronikIndexer`] instance. | /// Params for setting up a [`ChronikIndexer`] instance. | ||||
#[derive(Clone)] | #[derive(Clone)] | ||||
pub struct ChronikIndexerParams { | pub struct ChronikIndexerParams { | ||||
/// Folder where the node stores its data, net-dependent. | /// Folder where the node stores its data, net-dependent. | ||||
pub datadir_net: PathBuf, | pub datadir_net: PathBuf, | ||||
/// Whether to clear the DB before opening the DB, e.g. when reindexing. | /// Whether to clear the DB before opening the DB, e.g. when reindexing. | ||||
pub wipe_db: bool, | pub wipe_db: bool, | ||||
Show All 13 Lines | |||||
/// Block to be indexed by Chronik. | /// Block to be indexed by Chronik. | ||||
#[derive(Clone, Debug, Default, Eq, PartialEq)] | #[derive(Clone, Debug, Default, Eq, PartialEq)] | ||||
pub struct ChronikBlock { | pub struct ChronikBlock { | ||||
/// Data about the block (w/o txs) | /// Data about the block (w/o txs) | ||||
pub db_block: DbBlock, | pub db_block: DbBlock, | ||||
/// Txs in the block, with locations of where they are stored on disk. | /// Txs in the block, with locations of where they are stored on disk. | ||||
pub block_txs: BlockTxs, | pub block_txs: BlockTxs, | ||||
/// Block size in bytes. | |||||
pub size: u64, | |||||
/// Txs in the block, with inputs/outputs so we can group them. | /// Txs in the block, with inputs/outputs so we can group them. | ||||
pub txs: Vec<Tx>, | pub txs: Vec<Tx>, | ||||
} | } | ||||
/// Errors for [`BlockWriter`] and [`BlockReader`]. | /// Errors for [`BlockWriter`] and [`BlockReader`]. | ||||
#[derive(Debug, Eq, Error, PartialEq)] | #[derive(Debug, Eq, Error, PartialEq)] | ||||
pub enum ChronikIndexerError { | pub enum ChronikIndexerError { | ||||
/// Failed creating the folder for the indexes | /// Failed creating the folder for the indexes | ||||
▲ Show 20 Lines • Show All 206 Lines • ▼ Show 20 Lines | pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { | ||||
Ok(()) | Ok(()) | ||||
} | } | ||||
/// Add the block to the index. | /// Add the block to the index. | ||||
pub fn handle_block_connected( | pub fn handle_block_connected( | ||||
&mut self, | &mut self, | ||||
block: ChronikBlock, | block: ChronikBlock, | ||||
) -> Result<()> { | ) -> Result<()> { | ||||
let height = block.db_block.height; | |||||
let mut batch = WriteBatch::default(); | let mut batch = WriteBatch::default(); | ||||
let block_writer = BlockWriter::new(&self.db)?; | let block_writer = BlockWriter::new(&self.db)?; | ||||
let tx_writer = TxWriter::new(&self.db)?; | let tx_writer = TxWriter::new(&self.db)?; | ||||
let block_stats_writer = BlockStatsWriter::new(&self.db)?; | |||||
let script_history_writer = | let script_history_writer = | ||||
ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; | ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; | ||||
let script_utxo_writer = | let script_utxo_writer = | ||||
ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; | ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; | ||||
let spent_by_writer = SpentByWriter::new(&self.db)?; | let spent_by_writer = SpentByWriter::new(&self.db)?; | ||||
block_writer.insert(&mut batch, &block.db_block)?; | block_writer.insert(&mut batch, &block.db_block)?; | ||||
let first_tx_num = tx_writer.insert(&mut batch, &block.block_txs)?; | let first_tx_num = tx_writer.insert(&mut batch, &block.block_txs)?; | ||||
let index_txs = | let index_txs = | ||||
prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; | prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; | ||||
block_stats_writer | |||||
.insert(&mut batch, height, block.size, &index_txs)?; | |||||
script_history_writer.insert(&mut batch, &index_txs)?; | script_history_writer.insert(&mut batch, &index_txs)?; | ||||
script_utxo_writer.insert(&mut batch, &index_txs)?; | script_utxo_writer.insert(&mut batch, &index_txs)?; | ||||
spent_by_writer.insert(&mut batch, &index_txs)?; | spent_by_writer.insert(&mut batch, &index_txs)?; | ||||
self.db.write_batch(batch)?; | self.db.write_batch(batch)?; | ||||
for tx in &block.block_txs.txs { | for tx in &block.block_txs.txs { | ||||
self.mempool.remove_mined(&tx.txid)?; | self.mempool.remove_mined(&tx.txid)?; | ||||
} | } | ||||
let subs = self.subs.get_mut(); | let subs = self.subs.get_mut(); | ||||
Show All 11 Lines | impl ChronikIndexer { | ||||
/// Remove the block from the index. | /// Remove the block from the index. | ||||
pub fn handle_block_disconnected( | pub fn handle_block_disconnected( | ||||
&mut self, | &mut self, | ||||
block: ChronikBlock, | block: ChronikBlock, | ||||
) -> Result<()> { | ) -> Result<()> { | ||||
let mut batch = WriteBatch::default(); | let mut batch = WriteBatch::default(); | ||||
let block_writer = BlockWriter::new(&self.db)?; | let block_writer = BlockWriter::new(&self.db)?; | ||||
let tx_writer = TxWriter::new(&self.db)?; | let tx_writer = TxWriter::new(&self.db)?; | ||||
let block_stats_writer = BlockStatsWriter::new(&self.db)?; | |||||
let script_history_writer = | let script_history_writer = | ||||
ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; | ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; | ||||
let script_utxo_writer = | let script_utxo_writer = | ||||
ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; | ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; | ||||
let spent_by_writer = SpentByWriter::new(&self.db)?; | let spent_by_writer = SpentByWriter::new(&self.db)?; | ||||
block_writer.delete(&mut batch, &block.db_block)?; | block_writer.delete(&mut batch, &block.db_block)?; | ||||
let first_tx_num = tx_writer.delete(&mut batch, &block.block_txs)?; | let first_tx_num = tx_writer.delete(&mut batch, &block.block_txs)?; | ||||
let index_txs = | let index_txs = | ||||
prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; | prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; | ||||
block_stats_writer.delete(&mut batch, block.db_block.height); | |||||
script_history_writer.delete(&mut batch, &index_txs)?; | script_history_writer.delete(&mut batch, &index_txs)?; | ||||
script_utxo_writer.delete(&mut batch, &index_txs)?; | script_utxo_writer.delete(&mut batch, &index_txs)?; | ||||
spent_by_writer.delete(&mut batch, &index_txs)?; | spent_by_writer.delete(&mut batch, &index_txs)?; | ||||
self.avalanche.disconnect_block(block.db_block.height)?; | self.avalanche.disconnect_block(block.db_block.height)?; | ||||
self.db.write_batch(batch)?; | self.db.write_batch(batch)?; | ||||
let subs = self.subs.get_mut(); | let subs = self.subs.get_mut(); | ||||
subs.broadcast_block_msg(BlockMsg { | subs.broadcast_block_msg(BlockMsg { | ||||
msg_type: BlockMsgType::Disconnected, | msg_type: BlockMsgType::Disconnected, | ||||
▲ Show 20 Lines • Show All 105 Lines • ▼ Show 20 Lines | ) -> Result<ChronikBlock> { | ||||
let txs = block | let txs = block | ||||
.txs | .txs | ||||
.into_iter() | .into_iter() | ||||
.map(|block_tx| Tx::from(block_tx.tx)) | .map(|block_tx| Tx::from(block_tx.tx)) | ||||
.collect::<Vec<_>>(); | .collect::<Vec<_>>(); | ||||
Ok(ChronikBlock { | Ok(ChronikBlock { | ||||
db_block, | db_block, | ||||
block_txs, | block_txs, | ||||
size: block.size, | |||||
txs, | txs, | ||||
}) | }) | ||||
} | } | ||||
} | } | ||||
fn verify_schema_version(db: &Db) -> Result<()> { | fn verify_schema_version(db: &Db) -> Result<()> { | ||||
let metadata_reader = MetadataReader::new(db)?; | let metadata_reader = MetadataReader::new(db)?; | ||||
let metadata_writer = MetadataWriter::new(db)?; | let metadata_writer = MetadataWriter::new(db)?; | ||||
▲ Show 20 Lines • Show All 88 Lines • ▼ Show 20 Lines | fn test_indexer() -> Result<()> { | ||||
timestamp: 1234567890, | timestamp: 1234567890, | ||||
file_num: 0, | file_num: 0, | ||||
data_pos: 1337, | data_pos: 1337, | ||||
}, | }, | ||||
block_txs: BlockTxs { | block_txs: BlockTxs { | ||||
block_height: 0, | block_height: 0, | ||||
txs: vec![], | txs: vec![], | ||||
}, | }, | ||||
size: 285, | |||||
txs: vec![], | txs: vec![], | ||||
}; | }; | ||||
// Add block | // Add block | ||||
indexer.handle_block_connected(block.clone())?; | indexer.handle_block_connected(block.clone())?; | ||||
assert_eq!( | assert_eq!( | ||||
BlockReader::new(&indexer.db)?.by_height(0)?, | BlockReader::new(&indexer.db)?.by_height(0)?, | ||||
Some(block.db_block.clone()) | Some(block.db_block.clone()) | ||||
▲ Show 20 Lines • Show All 120 Lines • Show Last 20 Lines |