Changeset View
Changeset View
Standalone View
Standalone View
chronik/chronik-lib/src/bridge.rs
Show All 11 Lines | |||||
use abc_rust_error::Result; | use abc_rust_error::Result; | ||||
use bitcoinsuite_core::{ | use bitcoinsuite_core::{ | ||||
script::Script, | script::Script, | ||||
tx::{Tx, TxId}, | tx::{Tx, TxId}, | ||||
}; | }; | ||||
use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; | use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; | ||||
use chronik_db::mem::MempoolTx; | use chronik_db::mem::MempoolTx; | ||||
use chronik_http::server::{ChronikServer, ChronikServerParams}; | use chronik_http::server::{ChronikServer, ChronikServerParams}; | ||||
use chronik_indexer::indexer::{ChronikIndexer, ChronikIndexerParams}; | use chronik_indexer::indexer::{ChronikIndexer, ChronikIndexerParams, Node}; | ||||
use chronik_util::{log, log_chronik}; | use chronik_util::{log, log_chronik}; | ||||
use thiserror::Error; | use thiserror::Error; | ||||
use tokio::sync::RwLock; | use tokio::sync::RwLock; | ||||
use crate::{ | use crate::{ | ||||
error::ok_or_abort_node, | error::ok_or_abort_node, | ||||
ffi::{self, StartChronikValidationInterface}, | ffi::{self, StartChronikValidationInterface}, | ||||
}; | }; | ||||
Show All 21 Lines | match try_setup_chronik(params, config, node) { | ||||
init_error(&report.to_string()) | init_error(&report.to_string()) | ||||
} | } | ||||
} | } | ||||
} | } | ||||
fn try_setup_chronik( | fn try_setup_chronik( | ||||
params: ffi::SetupParams, | params: ffi::SetupParams, | ||||
config: &ffi::Config, | config: &ffi::Config, | ||||
node: &ffi::NodeContext, | node_context: &ffi::NodeContext, | ||||
) -> Result<()> { | ) -> Result<()> { | ||||
abc_rust_error::install(); | abc_rust_error::install(); | ||||
let hosts = params | let hosts = params | ||||
.hosts | .hosts | ||||
.into_iter() | .into_iter() | ||||
.map(|host| parse_socket_addr(host, params.default_port)) | .map(|host| parse_socket_addr(host, params.default_port)) | ||||
.collect::<Result<Vec<_>>>()?; | .collect::<Result<Vec<_>>>()?; | ||||
log!("Starting Chronik bound to {:?}\n", hosts); | log!("Starting Chronik bound to {:?}\n", hosts); | ||||
let bridge = chronik_bridge::ffi::make_bridge(config, node); | let bridge = chronik_bridge::ffi::make_bridge(config, node_context); | ||||
let bridge_ref = expect_unique_ptr("make_bridge", &bridge); | let bridge_ref = expect_unique_ptr("make_bridge", &bridge); | ||||
let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { | let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { | ||||
datadir_net: params.datadir_net.into(), | datadir_net: params.datadir_net.into(), | ||||
wipe_db: params.wipe_db, | wipe_db: params.wipe_db, | ||||
fn_compress_script: compress_script, | fn_compress_script: compress_script, | ||||
})?; | })?; | ||||
indexer.resync_indexer(bridge_ref)?; | indexer.resync_indexer(bridge_ref)?; | ||||
if chronik_bridge::ffi::shutdown_requested() { | if chronik_bridge::ffi::shutdown_requested() { | ||||
// Don't setup Chronik if the user requested shutdown during resync | // Don't setup Chronik if the user requested shutdown during resync | ||||
return Ok(()); | return Ok(()); | ||||
} | } | ||||
let indexer = Arc::new(RwLock::new(indexer)); | let indexer = Arc::new(RwLock::new(indexer)); | ||||
let node = Arc::new(Node { bridge }); | |||||
let runtime = tokio::runtime::Builder::new_multi_thread() | let runtime = tokio::runtime::Builder::new_multi_thread() | ||||
.enable_all() | .enable_all() | ||||
.build()?; | .build()?; | ||||
let server = runtime.block_on({ | let server = runtime.block_on({ | ||||
let indexer = Arc::clone(&indexer); | let indexer = Arc::clone(&indexer); | ||||
let node = Arc::clone(&node); | |||||
async move { | async move { | ||||
// try_bind requires a Runtime | // try_bind requires a Runtime | ||||
ChronikServer::setup(ChronikServerParams { hosts, indexer }) | ChronikServer::setup(ChronikServerParams { | ||||
hosts, | |||||
indexer, | |||||
node, | |||||
}) | |||||
} | } | ||||
})?; | })?; | ||||
runtime.spawn(async move { | runtime.spawn(async move { | ||||
ok_or_abort_node("ChronikServer::serve", server.serve().await); | ok_or_abort_node("ChronikServer::serve", server.serve().await); | ||||
}); | }); | ||||
let chronik = Box::new(Chronik { | let chronik = Box::new(Chronik { | ||||
bridge: Arc::new(bridge), | node: Arc::clone(&node), | ||||
indexer, | indexer, | ||||
_runtime: runtime, | _runtime: runtime, | ||||
}); | }); | ||||
StartChronikValidationInterface(node, chronik); | StartChronikValidationInterface(node_context, chronik); | ||||
Ok(()) | Ok(()) | ||||
} | } | ||||
fn parse_socket_addr(host: String, default_port: u16) -> Result<SocketAddr> { | fn parse_socket_addr(host: String, default_port: u16) -> Result<SocketAddr> { | ||||
if let Ok(addr) = host.parse::<SocketAddr>() { | if let Ok(addr) = host.parse::<SocketAddr>() { | ||||
return Ok(addr); | return Ok(addr); | ||||
} | } | ||||
let ip_addr = host | let ip_addr = host | ||||
.parse::<IpAddr>() | .parse::<IpAddr>() | ||||
.map_err(|err| InvalidChronikHost(host, err))?; | .map_err(|err| InvalidChronikHost(host, err))?; | ||||
Ok(SocketAddr::new(ip_addr, default_port)) | Ok(SocketAddr::new(ip_addr, default_port)) | ||||
} | } | ||||
fn compress_script(script: &Script) -> Vec<u8> { | fn compress_script(script: &Script) -> Vec<u8> { | ||||
chronik_bridge::ffi::compress_script(script.as_ref()) | chronik_bridge::ffi::compress_script(script.as_ref()) | ||||
} | } | ||||
/// Contains all db, runtime, tpc, etc. handles needed by Chronik. | /// Contains all db, runtime, tpc, etc. handles needed by Chronik. | ||||
/// This makes it so when this struct is dropped, all handles are relased | /// This makes it so when this struct is dropped, all handles are relased | ||||
/// cleanly. | /// cleanly. | ||||
pub struct Chronik { | pub struct Chronik { | ||||
bridge: Arc<cxx::UniquePtr<ffi::ChronikBridge>>, | node: Arc<Node>, | ||||
indexer: Arc<RwLock<ChronikIndexer>>, | indexer: Arc<RwLock<ChronikIndexer>>, | ||||
// Having this here ensures HTTP server, outstanding requests etc. will get | // Having this here ensures HTTP server, outstanding requests etc. will get | ||||
// stopped when `Chronik` is dropped. | // stopped when `Chronik` is dropped. | ||||
_runtime: tokio::runtime::Runtime, | _runtime: tokio::runtime::Runtime, | ||||
} | } | ||||
impl Chronik { | impl Chronik { | ||||
/// Tx added to the bitcoind mempool | /// Tx added to the bitcoind mempool | ||||
▲ Show 20 Lines • Show All 49 Lines • ▼ Show 20 Lines | impl Chronik { | ||||
} | } | ||||
fn add_tx_to_mempool( | fn add_tx_to_mempool( | ||||
&self, | &self, | ||||
ptx: &ffi::CTransaction, | ptx: &ffi::CTransaction, | ||||
time_first_seen: i64, | time_first_seen: i64, | ||||
) -> Result<()> { | ) -> Result<()> { | ||||
let mut indexer = self.indexer.blocking_write(); | let mut indexer = self.indexer.blocking_write(); | ||||
let tx = self.bridge.bridge_tx(ptx)?; | let tx = self.node.bridge.bridge_tx(ptx)?; | ||||
let txid = TxId::from(tx.txid); | let txid = TxId::from(tx.txid); | ||||
indexer.handle_tx_added_to_mempool(MempoolTx { | indexer.handle_tx_added_to_mempool(MempoolTx { | ||||
tx: Tx::from(tx), | tx: Tx::from(tx), | ||||
time_first_seen, | time_first_seen, | ||||
})?; | })?; | ||||
log_chronik!("Chronik: transaction {} added to mempool\n", txid); | log_chronik!("Chronik: transaction {} added to mempool\n", txid); | ||||
Ok(()) | Ok(()) | ||||
} | } | ||||
Show All 30 Lines | ) -> Result<()> { | ||||
"Chronik: block {} disconnected with {} txs\n", | "Chronik: block {} disconnected with {} txs\n", | ||||
block_hash, | block_hash, | ||||
num_txs, | num_txs, | ||||
); | ); | ||||
Ok(()) | Ok(()) | ||||
} | } | ||||
fn finalize_block(&self, bindex: &ffi::CBlockIndex) -> Result<()> { | fn finalize_block(&self, bindex: &ffi::CBlockIndex) -> Result<()> { | ||||
let block = self.bridge.load_block(bindex)?; | let block = self.node.bridge.load_block(bindex)?; | ||||
let block_ref = expect_unique_ptr("load_block", &block); | let block_ref = expect_unique_ptr("load_block", &block); | ||||
let mut indexer = self.indexer.blocking_write(); | let mut indexer = self.indexer.blocking_write(); | ||||
let block = indexer.make_chronik_block(block_ref, bindex)?; | let block = indexer.make_chronik_block(block_ref, bindex)?; | ||||
let block_hash = block.db_block.hash.clone(); | let block_hash = block.db_block.hash.clone(); | ||||
let num_txs = block.block_txs.txs.len(); | let num_txs = block.block_txs.txs.len(); | ||||
indexer.handle_block_finalized(block)?; | indexer.handle_block_finalized(block)?; | ||||
log_chronik!( | log_chronik!( | ||||
"Chronik: block {} finalized with {} txs\n", | "Chronik: block {} finalized with {} txs\n", | ||||
Show All 12 Lines |