diff --git a/chronik/chronik-http/src/handlers.rs b/chronik/chronik-http/src/handlers.rs index 4d25b99b6..4a35c3369 100644 --- a/chronik/chronik-http/src/handlers.rs +++ b/chronik/chronik-http/src/handlers.rs @@ -1,115 +1,127 @@ //! Module for Chronik handlers. use std::{collections::HashMap, fmt::Display, str::FromStr}; use abc_rust_error::{Report, Result}; use chronik_indexer::indexer::ChronikIndexer; use chronik_proto::proto; use hyper::Uri; use thiserror::Error; use crate::{error::ReportError, parse::parse_script_variant_hex}; /// Errors for HTTP handlers. #[derive(Debug, Error, PartialEq)] pub enum ChronikHandlerError { /// Not found #[error("404: Not found: {0}")] RouteNotFound(Uri), /// Query parameter has an invalid value #[error("400: Invalid param {param_name}: {param_value}, {msg}")] InvalidParam { /// Name of the param param_name: String, /// Value of the param param_value: String, /// Human-readable error message. msg: String, }, } use self::ChronikHandlerError::*; fn get_param( params: &HashMap, param_name: &str, ) -> Result> where T::Err: Display, { let Some(param) = params.get(param_name) else { return Ok(None) }; Ok(Some(param.parse::().map_err(|err| InvalidParam { param_name: param_name.to_string(), param_value: param.to_string(), msg: err.to_string(), })?)) } /// Fallback route that returns a 404 response pub async fn handle_not_found(uri: Uri) -> Result<(), ReportError> { Err(Report::from(RouteNotFound(uri)).into()) } +/// Return a page of the txs of a block. +pub async fn handle_block_txs( + hash_or_height: String, + query_params: &HashMap, + indexer: &ChronikIndexer, +) -> Result { + let blocks = indexer.blocks(); + let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); + let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); + blocks.block_txs(hash_or_height, page_num as usize, page_size as usize) +} + /// Return a page of the confirmed txs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_confirmed_txs( script_type: &str, payload: &str, query_params: &HashMap, indexer: &ChronikIndexer, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_history = indexer.script_history()?; let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); let script = script_variant.to_script(); script_history.confirmed_txs(&script, page_num as usize, page_size as usize) } /// Return a page of the tx history of the given script, in reverse /// chronological order, i.e. the latest transaction first and then going back /// in time. Scripts are identified by script_type and payload. pub async fn handle_script_history( script_type: &str, payload: &str, query_params: &HashMap, indexer: &ChronikIndexer, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_history = indexer.script_history()?; let page_num: u32 = get_param(query_params, "page")?.unwrap_or(0); let page_size: u32 = get_param(query_params, "page_size")?.unwrap_or(25); let script = script_variant.to_script(); script_history.rev_history(&script, page_num as usize, page_size as usize) } /// Return a page of the confirmed txs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_unconfirmed_txs( script_type: &str, payload: &str, indexer: &ChronikIndexer, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_history = indexer.script_history()?; let script = script_variant.to_script(); script_history.unconfirmed_txs(&script) } /// Return the UTXOs of the given script. /// Scripts are identified by script_type and payload. pub async fn handle_script_utxos( script_type: &str, payload: &str, indexer: &ChronikIndexer, ) -> Result { let script_variant = parse_script_variant_hex(script_type, payload)?; let script_utxos = indexer.script_utxos()?; let script = script_variant.to_script(); let utxos = script_utxos.utxos(&script)?; Ok(proto::ScriptUtxos { script: script.bytecode().to_vec(), utxos, }) } diff --git a/chronik/chronik-http/src/server.rs b/chronik/chronik-http/src/server.rs index ce8113368..da55bcc01 100644 --- a/chronik/chronik-http/src/server.rs +++ b/chronik/chronik-http/src/server.rs @@ -1,249 +1,262 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`ChronikServer`]. use std::collections::HashMap; use std::{net::SocketAddr, sync::Arc}; use abc_rust_error::{Result, WrapErr}; use axum::{ extract::{Path, Query, WebSocketUpgrade}, response::IntoResponse, routing, Extension, Router, }; use bitcoinsuite_core::tx::TxId; use chronik_indexer::indexer::ChronikIndexer; use chronik_proto::proto; use hyper::server::conn::AddrIncoming; use thiserror::Error; use tokio::sync::RwLock; use crate::{ error::ReportError, handlers, protobuf::Protobuf, ws::handle_subscribe_socket, }; /// Ref-counted indexer with read or write access pub type ChronikIndexerRef = Arc>; /// Params defining what and where to serve for [`ChronikServer`]. #[derive(Clone, Debug)] pub struct ChronikServerParams { /// Host address (port + IP) where to serve Chronik at. pub hosts: Vec, /// Indexer to read data from pub indexer: ChronikIndexerRef, } /// Chronik HTTP server, holding all the data/handles required to serve an /// instance. #[derive(Debug)] pub struct ChronikServer { server_builders: Vec>, indexer: ChronikIndexerRef, } /// Errors for [`ChronikServer`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikServerError { /// Binding to host address failed #[error("Chronik failed binding to {0}: {1}")] FailedBindingAddress(SocketAddr, String), /// Serving Chronik failed #[error("Chronik failed serving: {0}")] ServingFailed(String), /// Query is neither a hex hash nor an integer string #[error("400: Not a hash or height: {0}")] NotHashOrHeight(String), /// Query is not a txid #[error("400: Not a txid: {0}")] NotTxId(String), /// Block not found in DB #[error("404: Block not found: {0}")] BlockNotFound(String), } use self::ChronikServerError::*; impl ChronikServer { /// Binds the Chronik server on the given hosts pub fn setup(params: ChronikServerParams) -> Result { let server_builders = params .hosts .into_iter() .map(|host| { axum::Server::try_bind(&host).map_err(|err| { FailedBindingAddress(host, err.to_string()).into() }) }) .collect::>>()?; Ok(ChronikServer { server_builders, indexer: params.indexer, }) } /// Serve a Chronik HTTP endpoint with the given parameters. pub async fn serve(self) -> Result<()> { let app = Self::make_router(self.indexer); let servers = self .server_builders .into_iter() .zip(std::iter::repeat(app)) .map(|(server_builder, app)| { Box::pin(async move { server_builder .serve(app.into_make_service()) .await .map_err(|err| ServingFailed(err.to_string())) }) }); let (result, _, _) = futures::future::select_all(servers).await; result?; Ok(()) } fn make_router(indexer: ChronikIndexerRef) -> Router { Router::new() .route("/blockchain-info", routing::get(handle_blockchain_info)) .route("/block/:hash_or_height", routing::get(handle_block)) + .route("/block-txs/:hash_or_height", routing::get(handle_block_txs)) .route("/blocks/:start/:end", routing::get(handle_block_range)) .route("/tx/:txid", routing::get(handle_tx)) .route("/raw-tx/:txid", routing::get(handle_raw_tx)) .route( "/script/:type/:payload/confirmed-txs", routing::get(handle_script_confirmed_txs), ) .route( "/script/:type/:payload/history", routing::get(handle_script_history), ) .route( "/script/:type/:payload/unconfirmed-txs", routing::get(handle_script_unconfirmed_txs), ) .route( "/script/:type/:payload/utxos", routing::get(handle_script_utxos), ) .route("/ws", routing::get(handle_ws)) .fallback(handlers::handle_not_found) .layer(Extension(indexer)) } } async fn handle_blockchain_info( Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let blocks = indexer.blocks(); Ok(Protobuf(blocks.blockchain_info()?)) } async fn handle_block_range( Path((start_height, end_height)): Path<(i32, i32)>, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let blocks = indexer.blocks(); Ok(Protobuf(blocks.by_range(start_height, end_height)?)) } async fn handle_block( Path(hash_or_height): Path, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let blocks = indexer.blocks(); Ok(Protobuf(blocks.by_hash_or_height(hash_or_height)?)) } +async fn handle_block_txs( + Path(hash_or_height): Path, + Query(query_params): Query>, + Extension(indexer): Extension, +) -> Result, ReportError> { + let indexer = indexer.read().await; + Ok(Protobuf( + handlers::handle_block_txs(hash_or_height, &query_params, &indexer) + .await?, + )) +} + async fn handle_tx( Path(txid): Path, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; Ok(Protobuf(indexer.txs().tx_by_id(txid)?)) } async fn handle_raw_tx( Path(txid): Path, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; let txid = txid.parse::().wrap_err(NotTxId(txid))?; Ok(Protobuf(indexer.txs().raw_tx_by_id(&txid)?)) } async fn handle_script_confirmed_txs( Path((script_type, payload)): Path<(String, String)>, Query(query_params): Query>, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_confirmed_txs( &script_type, &payload, &query_params, &indexer, ) .await?, )) } async fn handle_script_history( Path((script_type, payload)): Path<(String, String)>, Query(query_params): Query>, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_history( &script_type, &payload, &query_params, &indexer, ) .await?, )) } async fn handle_script_unconfirmed_txs( Path((script_type, payload)): Path<(String, String)>, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_unconfirmed_txs( &script_type, &payload, &indexer, ) .await?, )) } async fn handle_script_utxos( Path((script_type, payload)): Path<(String, String)>, Extension(indexer): Extension, ) -> Result, ReportError> { let indexer = indexer.read().await; Ok(Protobuf( handlers::handle_script_utxos(&script_type, &payload, &indexer).await?, )) } async fn handle_ws( ws: WebSocketUpgrade, Extension(indexer): Extension, ) -> impl IntoResponse { ws.on_upgrade(|ws| handle_subscribe_socket(ws, indexer)) } diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs index 5e1565ea6..0ac2be903 100644 --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -1,701 +1,702 @@ // Copyright (c) 2022 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module containing [`ChronikIndexer`] to index blocks and txs. use std::path::PathBuf; use abc_rust_error::{Result, WrapErr}; use bitcoinsuite_core::{ block::BlockHash, tx::{Tx, TxId}, }; use chronik_bridge::{ffi, util::expect_unique_ptr}; use chronik_db::{ db::{Db, WriteBatch}, groups::{ FnCompressScript, ScriptGroup, ScriptHistoryWriter, ScriptUtxoWriter, }, index_tx::prepare_indexed_txs, io::{ BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, TxWriter, }, mem::{Mempool, MempoolTx}, }; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, query::{QueryBlocks, QueryGroupHistory, QueryGroupUtxos, QueryTxs}, subs::{BlockMsg, BlockMsgType, Subs}, subs_group::TxMsgType, }; const CURRENT_INDEXER_VERSION: SchemaVersion = 7; /// Params for setting up a [`ChronikIndexer`] instance. #[derive(Clone)] pub struct ChronikIndexerParams { /// Folder where the node stores its data, net-dependent. pub datadir_net: PathBuf, /// Whether to clear the DB before opening the DB, e.g. when reindexing. pub wipe_db: bool, /// Function ptr to compress scripts. pub fn_compress_script: FnCompressScript, } /// Struct for indexing blocks and txs. Maintains db handles and mempool. #[derive(Debug)] pub struct ChronikIndexer { db: Db, mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, subs: RwLock, } /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { /// Data about the block (w/o txs) pub db_block: DbBlock, /// Txs in the block, with locations of where they are stored on disk. pub block_txs: BlockTxs, /// Block size in bytes. pub size: u64, /// Txs in the block, with inputs/outputs so we can group them. pub txs: Vec, } /// Errors for [`BlockWriter`] and [`BlockReader`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikIndexerError { /// Failed creating the folder for the indexes #[error("Failed creating path {0}")] CreateIndexesDirFailed(PathBuf), /// Cannot rewind blocks that bitcoind doesn't have #[error( "Cannot rewind Chronik, it contains block {0} that the node doesn't \ have. You may need to use -reindex/-chronikreindex, or delete \ indexes/chronik and restart" )] CannotRewindChronik(BlockHash), /// Lower block doesn't exist but higher block does #[error( "Inconsistent DB: Block {missing} doesn't exist, but {exists} does" )] BlocksBelowMissing { /// Lower height that is missing missing: BlockHeight, /// Higher height that exists exists: BlockHeight, }, /// Corrupted schema version #[error( "Corrupted schema version in the Chronik database, consider running \ -reindex/-chronikreindex" )] CorruptedSchemaVersion, /// Missing schema version for non-empty database #[error( "Missing schema version in non-empty Chronik database, consider \ running -reindex/-chronikreindex" )] MissingSchemaVersion, /// This Chronik instance is outdated #[error( "Chronik outdated: Chronik has version {}, but the database has \ version {0}. Upgrade your node to the appropriate version.", CURRENT_INDEXER_VERSION )] ChronikOutdated(SchemaVersion), /// Database is outdated #[error( "DB outdated: Chronik has version {}, but the database has version \ {0}. -reindex/-chronikreindex to reindex the database to the new \ version.", CURRENT_INDEXER_VERSION )] DatabaseOutdated(SchemaVersion), } use self::ChronikIndexerError::*; impl ChronikIndexer { /// Setup the indexer with the given parameters, e.g. open the DB etc. pub fn setup(params: ChronikIndexerParams) -> Result { let indexes_path = params.datadir_net.join("indexes"); if !indexes_path.exists() { std::fs::create_dir(&indexes_path).wrap_err_with(|| { CreateIndexesDirFailed(indexes_path.clone()) })?; } let db_path = indexes_path.join("chronik"); if params.wipe_db { log!("Wiping Chronik at {}\n", db_path.to_string_lossy()); Db::destroy(&db_path)?; } log_chronik!("Opening Chronik at {}\n", db_path.to_string_lossy()); let db = Db::open(&db_path)?; verify_schema_version(&db)?; let script_group = ScriptGroup::new(params.fn_compress_script); let mempool = Mempool::new(script_group.clone()); Ok(ChronikIndexer { db, mempool, script_group: script_group.clone(), avalanche: Avalanche::default(), subs: RwLock::new(Subs::new(script_group)), }) } /// Resync Chronik index to the node pub fn resync_indexer( &mut self, bridge: &ffi::ChronikBridge, ) -> Result<()> { let block_reader = BlockReader::new(&self.db)?; let indexer_tip = block_reader.tip()?; let Ok(node_tip_index) = bridge.get_chain_tip() else { if let Some(indexer_tip) = &indexer_tip { return Err( CannotRewindChronik(indexer_tip.hash.clone()).into() ); } return Ok(()); }; let node_tip_info = ffi::get_block_info(node_tip_index); let node_height = node_tip_info.height; let node_tip_hash = BlockHash::from(node_tip_info.hash); let fork_height = match indexer_tip { Some(tip) => { let indexer_tip_hash = tip.hash.clone(); let indexer_height = tip.height; log!( "Node and Chronik diverged, node is on block \ {node_tip_hash} at height {node_height}, and Chronik is \ on block {indexer_tip_hash} at height {indexer_height}.\n" ); let indexer_tip_index = bridge .lookup_block_index(tip.hash.to_bytes()) .map_err(|_| CannotRewindChronik(tip.hash.clone()))?; self.rewind_indexer(bridge, indexer_tip_index, &tip)? } None => { log!( "Chronik database empty, syncing to block {node_tip_hash} \ at height {node_height}.\n" ); -1 } }; let tip_height = node_tip_info.height; for height in fork_height + 1..=tip_height { let block_index = ffi::get_block_ancestor(node_tip_index, height)?; let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let block = self.make_chronik_block(ffi_block, block_index)?; let hash = block.db_block.hash.clone(); self.handle_block_connected(block)?; log_chronik!( "Added block {hash}, height {height}/{tip_height} to Chronik\n" ); if height % 100 == 0 { log!( "Synced Chronik up to block {hash} at height \ {height}/{tip_height}\n" ); } } log!( "Chronik completed re-syncing with the node, both are now at \ block {node_tip_hash} at height {node_height}.\n" ); Ok(()) } fn rewind_indexer( &mut self, bridge: &ffi::ChronikBridge, indexer_tip_index: &ffi::CBlockIndex, indexer_db_tip: &DbBlock, ) -> Result { let indexer_height = indexer_db_tip.height; let fork_block_index = bridge .find_fork(indexer_tip_index) .map_err(|_| CannotRewindChronik(indexer_db_tip.hash.clone()))?; let fork_info = ffi::get_block_info(fork_block_index); let fork_block_hash = BlockHash::from(fork_info.hash); let fork_height = fork_info.height; let revert_height = fork_height + 1; log!( "The last common block is {fork_block_hash} at height \ {fork_height}.\n" ); log!("Reverting Chronik blocks {revert_height} to {indexer_height}.\n"); for height in (revert_height..indexer_height).rev() { let db_block = BlockReader::new(&self.db)? .by_height(height)? .ok_or(BlocksBelowMissing { missing: height, exists: indexer_height, })?; let block_index = bridge .lookup_block_index(db_block.hash.to_bytes()) .map_err(|_| CannotRewindChronik(db_block.hash))?; let ffi_block = bridge.load_block(block_index)?; let ffi_block = expect_unique_ptr("load_block", &ffi_block); let block = self.make_chronik_block(ffi_block, block_index)?; self.handle_block_disconnected(block)?; } Ok(fork_info.height) } /// Add transaction to the indexer's mempool. pub fn handle_tx_added_to_mempool( &mut self, mempool_tx: MempoolTx, ) -> Result<()> { self.subs .get_mut() .handle_tx_event(&mempool_tx.tx, TxMsgType::AddedToMempool); self.mempool.insert(mempool_tx)?; Ok(()) } /// Remove tx from the indexer's mempool, e.g. by a conflicting tx, expiry /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { let mempool_tx = self.mempool.remove(txid)?; self.subs .get_mut() .handle_tx_event(&mempool_tx.tx, TxMsgType::RemovedFromMempool); Ok(()) } /// Add the block to the index. pub fn handle_block_connected( &mut self, block: ChronikBlock, ) -> Result<()> { let height = block.db_block.height; let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; block_writer.insert(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.insert(&mut batch, &block.block_txs)?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer .insert(&mut batch, height, block.size, &index_txs)?; script_history_writer.insert(&mut batch, &index_txs)?; script_utxo_writer.insert(&mut batch, &index_txs)?; spent_by_writer.insert(&mut batch, &index_txs)?; self.db.write_batch(batch)?; for tx in &block.block_txs.txs { self.mempool.remove_mined(&tx.txid)?; } let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Connected, hash: block.db_block.hash, height: block.db_block.height, }); for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Confirmed); } Ok(()) } /// Remove the block from the index. pub fn handle_block_disconnected( &mut self, block: ChronikBlock, ) -> Result<()> { let mut batch = WriteBatch::default(); let block_writer = BlockWriter::new(&self.db)?; let tx_writer = TxWriter::new(&self.db)?; let block_stats_writer = BlockStatsWriter::new(&self.db)?; let script_history_writer = ScriptHistoryWriter::new(&self.db, self.script_group.clone())?; let script_utxo_writer = ScriptUtxoWriter::new(&self.db, self.script_group.clone())?; let spent_by_writer = SpentByWriter::new(&self.db)?; block_writer.delete(&mut batch, &block.db_block)?; let first_tx_num = tx_writer.delete(&mut batch, &block.block_txs)?; let index_txs = prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; block_stats_writer.delete(&mut batch, block.db_block.height); script_history_writer.delete(&mut batch, &index_txs)?; script_utxo_writer.delete(&mut batch, &index_txs)?; spent_by_writer.delete(&mut batch, &index_txs)?; self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Disconnected, hash: block.db_block.hash, height: block.db_block.height, }); Ok(()) } /// Block finalized with Avalanche. pub fn handle_block_finalized( &mut self, block: ChronikBlock, ) -> Result<()> { self.avalanche.finalize_block(block.db_block.height)?; let subs = self.subs.get_mut(); subs.broadcast_block_msg(BlockMsg { msg_type: BlockMsgType::Finalized, hash: block.db_block.hash, height: block.db_block.height, }); for tx in &block.txs { subs.handle_tx_event(tx, TxMsgType::Finalized); } Ok(()) } /// Return [`QueryBlocks`] to read blocks from the DB. pub fn blocks(&self) -> QueryBlocks<'_> { QueryBlocks { db: &self.db, avalanche: &self.avalanche, + mempool: &self.mempool, } } /// Return [`QueryTxs`] to return txs from mempool/DB. pub fn txs(&self) -> QueryTxs<'_> { QueryTxs { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, } } /// Return [`QueryGroupHistory`] for scripts to query the tx history of /// scripts. pub fn script_history(&self) -> Result> { Ok(QueryGroupHistory { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_history: self.mempool.script_history(), group: self.script_group.clone(), }) } /// Return [`QueryGroupUtxos`] for scripts to query the utxos of scripts. pub fn script_utxos(&self) -> Result> { Ok(QueryGroupUtxos { db: &self.db, avalanche: &self.avalanche, mempool: &self.mempool, mempool_utxos: self.mempool.script_utxos(), group: self.script_group.clone(), }) } /// Subscribers, behind read/write lock pub fn subs(&self) -> &RwLock { &self.subs } /// Build the ChronikBlock from the CBlockIndex pub fn make_chronik_block( &self, block: &ffi::CBlock, bindex: &ffi::CBlockIndex, ) -> Result { let block = ffi::bridge_block(block, bindex)?; let db_block = DbBlock { hash: BlockHash::from(block.hash), prev_hash: BlockHash::from(block.prev_hash), height: block.height, n_bits: block.n_bits, timestamp: block.timestamp, file_num: block.file_num, data_pos: block.data_pos, }; let block_txs = BlockTxs { block_height: block.height, txs: block .txs .iter() .map(|tx| { let txid = TxId::from(tx.tx.txid); TxEntry { txid, data_pos: tx.data_pos, undo_pos: tx.undo_pos, time_first_seen: match self.mempool.tx(&txid) { Some(tx) => tx.time_first_seen, None => 0, }, is_coinbase: tx.undo_pos == 0, } }) .collect(), }; let txs = block .txs .into_iter() .map(|block_tx| Tx::from(block_tx.tx)) .collect::>(); Ok(ChronikBlock { db_block, block_txs, size: block.size, txs, }) } } fn verify_schema_version(db: &Db) -> Result<()> { let metadata_reader = MetadataReader::new(db)?; let metadata_writer = MetadataWriter::new(db)?; let is_empty = db.is_db_empty()?; match metadata_reader .schema_version() .wrap_err(CorruptedSchemaVersion)? { Some(schema_version) => { assert!(!is_empty, "Empty DB can't have a schema version"); if schema_version > CURRENT_INDEXER_VERSION { return Err(ChronikOutdated(schema_version).into()); } if schema_version < CURRENT_INDEXER_VERSION { return Err(DatabaseOutdated(schema_version).into()); } } None => { if !is_empty { return Err(MissingSchemaVersion.into()); } let mut batch = WriteBatch::default(); metadata_writer .update_schema_version(&mut batch, CURRENT_INDEXER_VERSION)?; db.write_batch(batch)?; } } log!("Chronik has version {CURRENT_INDEXER_VERSION}\n"); Ok(()) } impl std::fmt::Debug for ChronikIndexerParams { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ChronikIndexerParams") .field("datadir_net", &self.datadir_net) .field("wipe_db", &self.wipe_db) .field("fn_compress_script", &"..") .finish() } } #[cfg(test)] mod tests { use abc_rust_error::Result; use bitcoinsuite_core::block::BlockHash; use chronik_db::{ db::{Db, WriteBatch, CF_META}, groups::prefix_mock_compress, io::{BlockReader, BlockTxs, DbBlock, MetadataReader, MetadataWriter}, }; use pretty_assertions::assert_eq; use crate::indexer::{ ChronikBlock, ChronikIndexer, ChronikIndexerError, ChronikIndexerParams, CURRENT_INDEXER_VERSION, }; #[test] fn test_indexer() -> Result<()> { let tempdir = tempdir::TempDir::new("chronik-indexer--indexer")?; let datadir_net = tempdir.path().join("regtest"); let params = ChronikIndexerParams { datadir_net: datadir_net.clone(), wipe_db: false, fn_compress_script: prefix_mock_compress, }; // regtest folder doesn't exist yet -> error assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CreateIndexesDirFailed( datadir_net.join("indexes"), ), ); // create regtest folder, setup will work now std::fs::create_dir(&datadir_net)?; let mut indexer = ChronikIndexer::setup(params.clone())?; // indexes and indexes/chronik folder now exist assert!(datadir_net.join("indexes").exists()); assert!(datadir_net.join("indexes").join("chronik").exists()); // DB is empty assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); let block = ChronikBlock { db_block: DbBlock { hash: BlockHash::from([4; 32]), prev_hash: BlockHash::from([0; 32]), height: 0, n_bits: 0x1deadbef, timestamp: 1234567890, file_num: 0, data_pos: 1337, }, block_txs: BlockTxs { block_height: 0, txs: vec![], }, size: 285, txs: vec![], }; // Add block indexer.handle_block_connected(block.clone())?; assert_eq!( BlockReader::new(&indexer.db)?.by_height(0)?, Some(block.db_block.clone()) ); // Remove block again indexer.handle_block_disconnected(block.clone())?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); // Add block then wipe, block not there indexer.handle_block_connected(block)?; std::mem::drop(indexer); let indexer = ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..params })?; assert_eq!(BlockReader::new(&indexer.db)?.by_height(0)?, None); Ok(()) } #[test] fn test_schema_version() -> Result<()> { let dir = tempdir::TempDir::new("chronik-indexer--schema_version")?; let chronik_path = dir.path().join("indexes").join("chronik"); let params = ChronikIndexerParams { datadir_net: dir.path().to_path_buf(), wipe_db: false, fn_compress_script: prefix_mock_compress, }; // Setting up DB first time sets the schema version ChronikIndexer::setup(params.clone())?; { let db = Db::open(&chronik_path)?; assert_eq!( MetadataReader::new(&db)?.schema_version()?, Some(CURRENT_INDEXER_VERSION) ); } // Opening DB again works fine ChronikIndexer::setup(params.clone())?; // Override DB schema version to 0 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version(&mut batch, 0)?; db.write_batch(batch)?; } // -> DB too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::DatabaseOutdated(0), ); // Override DB schema version to CURRENT_INDEXER_VERSION + 1 { let db = Db::open(&chronik_path)?; let mut batch = WriteBatch::default(); MetadataWriter::new(&db)?.update_schema_version( &mut batch, CURRENT_INDEXER_VERSION + 1, )?; db.write_batch(batch)?; } // -> Chronik too old assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::ChronikOutdated(CURRENT_INDEXER_VERSION + 1), ); // Corrupt schema version { let db = Db::open(&chronik_path)?; let cf_meta = db.cf(CF_META)?; let mut batch = WriteBatch::default(); batch.put_cf(cf_meta, b"SCHEMA_VERSION", [0xff]); db.write_batch(batch)?; } assert_eq!( ChronikIndexer::setup(params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::CorruptedSchemaVersion, ); // New db path, but has existing data let new_dir = dir.path().join("new"); let new_chronik_path = new_dir.join("indexes").join("chronik"); std::fs::create_dir_all(&new_chronik_path)?; let new_params = ChronikIndexerParams { datadir_net: new_dir, wipe_db: false, ..params }; { // new db with obscure field in meta let db = Db::open(&new_chronik_path)?; let mut batch = WriteBatch::default(); batch.put_cf(db.cf(CF_META)?, b"FOO", b"BAR"); db.write_batch(batch)?; } // Error: non-empty DB without schema version assert_eq!( ChronikIndexer::setup(new_params.clone()) .unwrap_err() .downcast::()?, ChronikIndexerError::MissingSchemaVersion, ); // with wipe it works ChronikIndexer::setup(ChronikIndexerParams { wipe_db: true, ..new_params })?; Ok(()) } } diff --git a/chronik/chronik-indexer/src/query/blocks.rs b/chronik/chronik-indexer/src/query/blocks.rs index 27fc8ecaa..69d17f35b 100644 --- a/chronik/chronik-indexer/src/query/blocks.rs +++ b/chronik/chronik-indexer/src/query/blocks.rs @@ -1,153 +1,266 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`QueryBlocks`], to query blocks. -use abc_rust_error::Result; +use abc_rust_error::{Result, WrapErr}; +use bitcoinsuite_core::{ + block::BlockHash, + tx::{Tx, TxId}, +}; +use chronik_bridge::ffi; use chronik_db::{ db::Db, - io::{BlockHeight, BlockReader, BlockStats, BlockStatsReader, DbBlock}, + io::{ + BlockHeight, BlockReader, BlockStats, BlockStatsReader, DbBlock, + SpentByReader, TxNum, TxReader, + }, + mem::Mempool, }; use chronik_proto::proto; use thiserror::Error; -use crate::{avalanche::Avalanche, query::HashOrHeight}; +use crate::{ + avalanche::Avalanche, + query::{make_tx_proto, HashOrHeight, OutputsSpent}, +}; const MAX_BLOCKS_PAGE_SIZE: usize = 500; +/// Smallest allowed page size +pub const MIN_BLOCK_TXS_PAGE_SIZE: usize = 1; +/// Largest allowed page size +pub const MAX_BLOCK_TXS_PAGE_SIZE: usize = 200; + /// Struct for querying blocks from the DB. #[derive(Debug)] pub struct QueryBlocks<'a> { /// Db. pub db: &'a Db, /// Avalanche. pub avalanche: &'a Avalanche, + /// Mempool + pub mempool: &'a Mempool, } /// Errors indicating something went wrong with querying blocks. #[derive(Debug, Error, PartialEq)] pub enum QueryBlockError { /// Block not found in DB #[error("404: Block not found: {0}")] BlockNotFound(String), /// Invalid block start height #[error("400: Invalid block start height: {0}")] InvalidStartHeight(BlockHeight), /// Invalid block end height #[error("400: Invalid block end height: {0}")] InvalidEndHeight(BlockHeight), /// Blocks page size too large #[error( "400: Blocks page size too large, may not be above {} but got {0}", MAX_BLOCKS_PAGE_SIZE )] BlocksPageSizeTooLarge(usize), /// DB is missing block stats #[error("500: Inconsistent DB: Missing block stats for height {0}")] MissingBlockStats(BlockHeight), + + /// Block has no txs + #[error("500: Inconsistent DB: Block {0} has no txs")] + BlockHasNoTx(BlockHeight), + + /// Block has tx_num that doesn't exist + #[error("500: Inconsistent DB: block {0} has missing tx_num {1}")] + BlockHasMissingTx(BlockHash, TxNum), + + /// Can only request page sizes below a certain maximum. + #[error( + "400: Requested block tx page size {0} is too big, maximum is {}", + MAX_BLOCK_TXS_PAGE_SIZE + )] + RequestPageSizeTooBig(usize), + + /// Can only request page sizes above a certain minimum. + #[error( + "400: Requested block tx page size {0} is too small, minimum is {}", + MIN_BLOCK_TXS_PAGE_SIZE + )] + RequestPageSizeTooSmall(usize), + + /// Reading failed, likely corrupted block data + #[error("500: Reading {0} failed")] + ReadFailure(TxId), } use self::QueryBlockError::*; impl<'a> QueryBlocks<'a> { /// Query a block by hash or height from DB. /// /// `height` may not have any leading zeros, because otherwise it might /// become ambiguous with a hash. pub fn by_hash_or_height( &self, hash_or_height: String, ) -> Result { let db_blocks = BlockReader::new(self.db)?; let block_stats_reader = BlockStatsReader::new(self.db)?; let db_block = match hash_or_height.parse::()? { HashOrHeight::Hash(hash) => db_blocks.by_hash(&hash)?, HashOrHeight::Height(height) => db_blocks.by_height(height)?, }; let db_block = db_block.ok_or(BlockNotFound(hash_or_height))?; let block_stats = block_stats_reader .by_height(db_block.height)? .ok_or(MissingBlockStats(db_block.height))?; Ok(proto::Block { block_info: Some( self.make_block_info_proto(&db_block, &block_stats), ), }) } /// Query blocks by a range of heights. Start and end height are inclusive. pub fn by_range( &self, start_height: BlockHeight, end_height: BlockHeight, ) -> Result { if start_height < 0 { return Err(InvalidStartHeight(start_height).into()); } if end_height < start_height { return Err(InvalidEndHeight(end_height).into()); } let num_blocks = end_height as usize - start_height as usize + 1; if num_blocks > MAX_BLOCKS_PAGE_SIZE { return Err(BlocksPageSizeTooLarge(num_blocks).into()); } let block_reader = BlockReader::new(self.db)?; let block_stats_reader = BlockStatsReader::new(self.db)?; let mut blocks = Vec::with_capacity(num_blocks); for block_height in start_height..=end_height { let block = block_reader.by_height(block_height)?; let block = match block { Some(block) => block, None => break, }; let block_stats = block_stats_reader .by_height(block_height)? .ok_or(MissingBlockStats(block_height))?; blocks.push(self.make_block_info_proto(&block, &block_stats)); } Ok(proto::Blocks { blocks }) } + /// Query the txs of a block, paginated. + pub fn block_txs( + &self, + hash_or_height: String, + request_page_num: usize, + request_page_size: usize, + ) -> Result { + if request_page_size < MIN_BLOCK_TXS_PAGE_SIZE { + return Err(RequestPageSizeTooSmall(request_page_size).into()); + } + if request_page_size > MAX_BLOCK_TXS_PAGE_SIZE { + return Err(RequestPageSizeTooBig(request_page_size).into()); + } + let block_reader = BlockReader::new(self.db)?; + let tx_reader = TxReader::new(self.db)?; + let spent_by_reader = SpentByReader::new(self.db)?; + let db_block = match hash_or_height.parse::()? { + HashOrHeight::Hash(hash) => block_reader.by_hash(&hash)?, + HashOrHeight::Height(height) => block_reader.by_height(height)?, + }; + let db_block = db_block.ok_or(BlockNotFound(hash_or_height))?; + let tx_range = tx_reader + .block_tx_num_range(db_block.height)? + .ok_or(BlockHasNoTx(db_block.height))?; + let tx_offset = + request_page_num.saturating_mul(request_page_size) as u64; + let page_tx_num_start = + tx_range.start.saturating_add(tx_offset).min(tx_range.end); + let page_tx_num_end = page_tx_num_start + .saturating_add(request_page_size as u64) + .min(tx_range.end); + let num_page_txs = (page_tx_num_end - page_tx_num_start) as usize; + let mut txs = Vec::with_capacity(num_page_txs); + for tx_num in page_tx_num_start..page_tx_num_end { + let db_tx = tx_reader + .tx_by_tx_num(tx_num)? + .ok_or(BlockHasMissingTx(db_block.hash.clone(), tx_num))?; + let tx = ffi::load_tx( + db_block.file_num, + db_tx.entry.data_pos, + db_tx.entry.undo_pos, + ) + .wrap_err(ReadFailure(db_tx.entry.txid))?; + let outputs_spent = OutputsSpent::query( + &spent_by_reader, + &tx_reader, + self.mempool.spent_by().outputs_spent(&db_tx.entry.txid), + tx_num, + )?; + txs.push(make_tx_proto( + &Tx::from(tx), + &outputs_spent, + db_tx.entry.time_first_seen, + db_tx.entry.is_coinbase, + Some(&db_block), + self.avalanche, + )); + } + let total_num_txs = (tx_range.end - tx_range.start) as usize; + let total_num_pages = + (total_num_txs + request_page_size - 1) / request_page_size; + Ok(proto::TxHistoryPage { + txs, + num_pages: total_num_pages as u32, + num_txs: total_num_txs as u32, + }) + } + /// Query some info about the blockchain, e.g. the tip hash and height. pub fn blockchain_info(&self) -> Result { let block_reader = BlockReader::new(self.db)?; match block_reader.tip()? { Some(block) => Ok(proto::BlockchainInfo { tip_hash: block.hash.to_vec(), tip_height: block.height, }), None => Ok(proto::BlockchainInfo { tip_hash: vec![0; 32], tip_height: -1, }), } } fn make_block_info_proto( &self, db_block: &DbBlock, block_stats: &BlockStats, ) -> proto::BlockInfo { proto::BlockInfo { hash: db_block.hash.to_vec(), prev_hash: db_block.prev_hash.to_vec(), height: db_block.height, n_bits: db_block.n_bits, timestamp: db_block.timestamp, is_final: self.avalanche.is_final_height(db_block.height), block_size: block_stats.block_size, num_txs: block_stats.num_txs, num_inputs: block_stats.num_inputs, num_outputs: block_stats.num_outputs, sum_input_sats: block_stats.sum_input_sats, sum_coinbase_output_sats: block_stats.sum_coinbase_output_sats, sum_normal_output_sats: block_stats.sum_normal_output_sats, sum_burned_sats: block_stats.sum_burned_sats, } } } diff --git a/test/functional/chronik_block_txs.py b/test/functional/chronik_block_txs.py new file mode 100644 index 000000000..eaeba4af3 --- /dev/null +++ b/test/functional/chronik_block_txs.py @@ -0,0 +1,257 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023 The Bitcoin developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test Chronik's /block-txs/:hash_or_height endpoint. +""" + +from test_framework.address import ( + ADDRESS_ECREG_P2SH_OP_TRUE, + ADDRESS_ECREG_UNSPENDABLE, + P2SH_OP_TRUE, + SCRIPTSIG_OP_TRUE, +) +from test_framework.blocktools import ( + GENESIS_BLOCK_HASH, + create_block, + create_coinbase, + make_conform_to_ctor, +) +from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut +from test_framework.p2p import P2PDataStore +from test_framework.script import OP_RETURN, CScript +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal + + +class ChronikBlockTxsTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + self.extra_args = [['-chronik']] + self.rpc_timeout = 240 + + def skip_test_if_missing_module(self): + self.skip_if_no_chronik() + + def run_test(self): + from test_framework.chronik.client import ChronikClient, pb + from test_framework.chronik.test_data import genesis_cb_tx + + node = self.nodes[0] + node.setmocktime(1300000000) + chronik = ChronikClient('127.0.0.1', node.chronik_port) + + peer = node.add_p2p_connection(P2PDataStore()) + + # Not a valid hash or height + assert_equal(chronik.block_txs('1234f').err(400).msg, + '400: Not a hash or height: 1234f') + assert_equal(chronik.block_txs('00' * 31).err(400).msg, + f'400: Not a hash or height: {"00"*31}') + assert_equal( + chronik.block_txs('01').err(400).msg, + '400: Not a hash or height: 01') + assert_equal(chronik.block_txs('12345678901').err(400).msg, + '400: Not a hash or height: 12345678901') + + assert_equal( + chronik.block_txs('00' * 32, page=0, page_size=201).err(400).msg, + '400: Requested block tx page size 201 is too big, maximum is 200') + assert_equal( + chronik.block_txs('00' * 32, page=0, page_size=0).err(400).msg, + '400: Requested block tx page size 0 is too small, minimum is 1') + assert_equal( + chronik.block_txs('00' * 32, page=0, page_size=2**32).err(400).msg, + '400: Invalid param page_size: 4294967296, ' + + 'number too large to fit in target type') + assert_equal( + chronik.block_txs('00' * 32, page=2**32, page_size=1).err(400).msg, + '400: Invalid param page: 4294967296, ' + + 'number too large to fit in target type') + + assert_equal( + chronik.block_txs(GENESIS_BLOCK_HASH, page=2**32 - 1, page_size=200).ok(), + pb.TxHistoryPage(txs=[], num_pages=1, num_txs=1), + ) + + assert_equal( + chronik.block_txs(GENESIS_BLOCK_HASH).ok(), + pb.TxHistoryPage( + txs=[genesis_cb_tx()], + num_pages=1, + num_txs=1, + ), + ) + + coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] + coinblock = node.getblock(coinblockhash) + cointx = coinblock['tx'][0] + + tip = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] + + coinvalue = 5000000000 + tx1 = CTransaction() + tx1.vin = [CTxIn(outpoint=COutPoint(int(cointx, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE)] + tx1.vout = [ + CTxOut(coinvalue - 10000, P2SH_OP_TRUE), + CTxOut(1000, CScript([OP_RETURN, b'test'])), + ] + tx1.rehash() + + tx2 = CTransaction() + tx2.vin = [CTxIn(outpoint=COutPoint(int(tx1.hash, 16), 0), + scriptSig=SCRIPTSIG_OP_TRUE)] + tx2.vout = [ + CTxOut(3000, CScript([OP_RETURN, b'test'])), + CTxOut(coinvalue - 20000, P2SH_OP_TRUE), + ] + tx2.rehash() + + tx_coinbase = create_coinbase(102, b'\x03' * 33) + + block = create_block(int(tip, 16), + tx_coinbase, + 1300000500) + block.vtx += [tx1, tx2] + make_conform_to_ctor(block) + block.hashMerkleRoot = block.calc_merkle_root() + block.solve() + peer.send_blocks_and_test([block], node) + + block_metadata = pb.BlockMetadata( + height=102, + hash=bytes.fromhex(block.hash)[::-1], + timestamp=1300000500, + ) + + proto_coinbase_tx = pb.Tx( + txid=bytes.fromhex(tx_coinbase.hash)[::-1], + version=1, + inputs=[ + pb.TxInput( + prev_out=pb.OutPoint(txid=bytes(32), out_idx=0xffffffff), + input_script=bytes(tx_coinbase.vin[0].scriptSig), + sequence_no=0xffffffff, + ), + ], + outputs=[ + pb.TxOutput( + value=coinvalue, + output_script=bytes(tx_coinbase.vout[0].scriptPubKey), + ), + pb.TxOutput( + output_script=bytes(CScript([OP_RETURN])), + ), + ], + lock_time=0, + block=block_metadata, + size=len(tx_coinbase.serialize()), + is_coinbase=True, + ) + + proto_tx1 = pb.Tx( + txid=bytes.fromhex(tx1.hash)[::-1], + version=1, + inputs=[ + pb.TxInput( + prev_out=pb.OutPoint(txid=bytes.fromhex(cointx)[::-1], out_idx=0), + input_script=bytes(SCRIPTSIG_OP_TRUE), + output_script=bytes(P2SH_OP_TRUE), + value=coinvalue, + sequence_no=0, + ), + ], + outputs=[ + pb.TxOutput( + value=coinvalue - 10000, + output_script=bytes(P2SH_OP_TRUE), + spent_by=pb.SpentBy( + txid=bytes.fromhex(tx2.hash)[::-1], + input_idx=0, + ), + ), + pb.TxOutput( + value=1000, + output_script=bytes(CScript([OP_RETURN, b'test'])), + ), + ], + lock_time=0, + size=len(tx1.serialize()), + block=block_metadata, + ) + + proto_tx2 = pb.Tx( + txid=bytes.fromhex(tx2.hash)[::-1], + version=1, + inputs=[ + pb.TxInput( + prev_out=pb.OutPoint(txid=bytes.fromhex(tx1.hash)[::-1], out_idx=0), + input_script=bytes(SCRIPTSIG_OP_TRUE), + output_script=bytes(P2SH_OP_TRUE), + value=coinvalue - 10000, + sequence_no=0, + ), + ], + outputs=[ + pb.TxOutput( + value=3000, + output_script=bytes(CScript([OP_RETURN, b'test'])), + ), + pb.TxOutput( + value=coinvalue - 20000, + output_script=bytes(P2SH_OP_TRUE), + ), + ], + lock_time=0, + size=len(tx2.serialize()), + block=block_metadata, + ) + + sorted_tx1, sorted_tx2 = sorted( + [proto_tx1, proto_tx2], key=lambda tx: tx.txid[::-1]) + + for page, tx in enumerate([proto_coinbase_tx, sorted_tx1, sorted_tx2]): + assert_equal( + chronik.block_txs(block.hash, page=page, page_size=1).ok(), + pb.TxHistoryPage( + txs=[tx], + num_pages=3, + num_txs=3, + ), + ) + + assert_equal( + chronik.block_txs(block.hash).ok(), + pb.TxHistoryPage( + txs=[proto_coinbase_tx, sorted_tx1, sorted_tx2], + num_pages=1, + num_txs=3, + ), + ) + + assert_equal( + chronik.block_txs(block.hash, page=0, page_size=2).ok(), + pb.TxHistoryPage( + txs=[proto_coinbase_tx, sorted_tx1], + num_pages=2, + num_txs=3, + ), + ) + assert_equal( + chronik.block_txs(block.hash, page=1, page_size=2).ok(), + pb.TxHistoryPage( + txs=[sorted_tx2], + num_pages=2, + num_txs=3, + ), + ) + + node.invalidateblock(block.hash) + chronik.block_txs(block.hash).err(404) + + +if __name__ == '__main__': + ChronikBlockTxsTest().main() diff --git a/test/functional/test_framework/chronik/client.py b/test/functional/test_framework/chronik/client.py index 34b018c45..3bb01ff81 100644 --- a/test/functional/test_framework/chronik/client.py +++ b/test/functional/test_framework/chronik/client.py @@ -1,163 +1,170 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. import http.client from typing import Union import chronik_pb2 as pb import websocket # Timespan when HTTP requests to Chronik time out DEFAULT_TIMEOUT = 30 class UnexpectedContentType(Exception): pass class ChronikResponse: def __init__(self, status: int, *, ok_proto=None, error_proto=None) -> None: self.status = status self.ok_proto = ok_proto self.error_proto = error_proto def ok(self): if self.status != 200: raise AssertionError( f'Expected OK response, but got status {self.status}, error: ' f'{self.error_proto}') return self.ok_proto def err(self, status: int): if self.status == 200: raise AssertionError( f'Expected error response status {status}, but got OK: {self.ok_proto}') if self.status != status: raise AssertionError( f'Expected error response status {status}, but got different error ' f'status {self.status}, error: {self.error_proto}') return self.error_proto class ChronikScriptClient: def __init__(self, client: 'ChronikClient', script_type: str, script_payload: str) -> None: self.client = client self.script_type = script_type self.script_payload = script_payload - def _query_params(self, page=None, page_size=None) -> str: - if page is not None and page_size is not None: - return f'?page={page}&page_size={page_size}' - elif page is not None: - return f'?page={page}' - elif page_size is not None: - return f'?page_size={page_size}' - else: - return '' - def confirmed_txs(self, page=None, page_size=None): - query = self._query_params(page, page_size) + query = _page_query_params(page, page_size) return self.client._request_get( f'/script/{self.script_type}/{self.script_payload}/confirmed-txs{query}', pb.TxHistoryPage) def history(self, page=None, page_size=None): - query = self._query_params(page, page_size) + query = _page_query_params(page, page_size) return self.client._request_get( f'/script/{self.script_type}/{self.script_payload}/history{query}', pb.TxHistoryPage) def unconfirmed_txs(self): return self.client._request_get( f'/script/{self.script_type}/{self.script_payload}/unconfirmed-txs', pb.TxHistoryPage) def utxos(self): return self.client._request_get( f'/script/{self.script_type}/{self.script_payload}/utxos', pb.ScriptUtxos) class ChronikWs: def __init__(self, ws) -> None: self.ws = ws def recv(self): data = self.ws.recv() ws_msg = pb.WsMsg() ws_msg.ParseFromString(data) return ws_msg def send_bytes(self, data: bytes) -> None: self.ws.send(data, websocket.ABNF.OPCODE_BINARY) def sub_to_blocks(self, *, is_unsub=False) -> None: sub = pb.WsSub(is_unsub=is_unsub, blocks=pb.WsSubBlocks()) self.send_bytes(sub.SerializeToString()) def sub_script(self, script_type: str, payload: bytes, *, is_unsub=False) -> None: sub = pb.WsSub( is_unsub=is_unsub, script=pb.WsSubScript(script_type=script_type, payload=payload), ) self.send_bytes(sub.SerializeToString()) class ChronikClient: CONTENT_TYPE = 'application/x-protobuf' def __init__(self, host: str, port: int, timeout=DEFAULT_TIMEOUT) -> None: self.host = host self.port = port self.timeout = timeout def _request_get(self, path: str, pb_type): kwargs = {} if self.timeout is not None: kwargs['timeout'] = self.timeout client = http.client.HTTPConnection(self.host, self.port, **kwargs) client.request('GET', path) response = client.getresponse() content_type = response.getheader('Content-Type') body = response.read() if content_type != self.CONTENT_TYPE: raise UnexpectedContentType( f'Unexpected Content-Type "{content_type}" (expected ' f'"{self.CONTENT_TYPE}"), body: {repr(body)}' ) if response.status != 200: proto_error = pb.Error() proto_error.ParseFromString(body) return ChronikResponse(response.status, error_proto=proto_error) ok_proto = pb_type() ok_proto.ParseFromString(body) return ChronikResponse(response.status, ok_proto=ok_proto) def blockchain_info(self) -> ChronikResponse: return self._request_get('/blockchain-info', pb.BlockchainInfo) def block(self, hash_or_height: Union[str, int]) -> ChronikResponse: return self._request_get(f'/block/{hash_or_height}', pb.Block) + def block_txs(self, hash_or_height: Union[str, int], + page=None, page_size=None) -> ChronikResponse: + query = _page_query_params(page, page_size) + return self._request_get( + f'/block-txs/{hash_or_height}{query}', pb.TxHistoryPage) + def blocks(self, start_height: int, end_height: int) -> ChronikResponse: return self._request_get(f'/blocks/{start_height}/{end_height}', pb.Blocks) def tx(self, txid: str) -> ChronikResponse: return self._request_get(f'/tx/{txid}', pb.Tx) def raw_tx(self, txid: str) -> bytes: return self._request_get(f'/raw-tx/{txid}', pb.RawTx) def script(self, script_type: str, script_payload: str) -> ChronikScriptClient: return ChronikScriptClient(self, script_type, script_payload) def ws(self, *, timeout=None) -> ChronikWs: ws = websocket.WebSocket() ws.connect(f'ws://{self.host}:{self.port}/ws', timeout=timeout) return ChronikWs(ws) + + +def _page_query_params(page=None, page_size=None) -> str: + if page is not None and page_size is not None: + return f'?page={page}&page_size={page_size}' + elif page is not None: + return f'?page={page}' + elif page_size is not None: + return f'?page_size={page_size}' + else: + return ''