diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -350,6 +350,7 @@ "chronik-db", "chronik-proto", "chronik-util", + "cxx", "pretty_assertions", "prost", "prost-build", diff --git a/chronik/chronik-bridge/src/ffi.rs b/chronik/chronik-bridge/src/ffi.rs --- a/chronik/chronik-bridge/src/ffi.rs +++ b/chronik/chronik-bridge/src/ffi.rs @@ -258,3 +258,19 @@ fn shutdown_requested() -> bool; } } + +/// SAFETY: All fields of ChronikBridge (const Consensus::Params &, const +/// node::NodeContext &) can be moved betweed threads safely. +#[allow(unsafe_code)] +unsafe impl Send for ChronikBridge {} + +/// SAFETY: All fields of ChronikBridge (const Consensus::Params &, const +/// node::NodeContext &) can be accessed from different threads safely. +#[allow(unsafe_code)] +unsafe impl Sync for ChronikBridge {} + +impl std::fmt::Debug for ChronikBridge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ChronikBridge").finish_non_exhaustive() + } +} diff --git a/chronik/chronik-http/src/server.rs b/chronik/chronik-http/src/server.rs --- a/chronik/chronik-http/src/server.rs +++ b/chronik/chronik-http/src/server.rs @@ -14,7 +14,7 @@ routing, Extension, Router, }; use bitcoinsuite_core::tx::TxId; -use chronik_indexer::indexer::ChronikIndexer; +use chronik_indexer::indexer::{ChronikIndexer, Node}; use chronik_proto::proto; use hyper::server::conn::AddrIncoming; use thiserror::Error; @@ -27,6 +27,8 @@ /// Ref-counted indexer with read or write access pub type ChronikIndexerRef = Arc>; +/// Ref-counted access to the bitcoind node +pub type NodeRef = Arc; /// Params defining what and where to serve for [`ChronikServer`]. #[derive(Clone, Debug)] @@ -35,6 +37,8 @@ pub hosts: Vec, /// Indexer to read data from pub indexer: ChronikIndexerRef, + /// Access to the bitcoind node + pub node: NodeRef, } /// Chronik HTTP server, holding all the data/handles required to serve an @@ -43,6 +47,7 @@ pub struct ChronikServer { server_builders: Vec>, indexer: ChronikIndexerRef, + node: NodeRef, } /// Errors for [`ChronikServer`]. @@ -86,12 +91,13 @@ Ok(ChronikServer { server_builders, indexer: params.indexer, + node: params.node, }) } /// Serve a Chronik HTTP endpoint with the given parameters. pub async fn serve(self) -> Result<()> { - let app = Self::make_router(self.indexer); + let app = Self::make_router(self.indexer, self.node); let servers = self .server_builders .into_iter() @@ -109,7 +115,7 @@ Ok(()) } - fn make_router(indexer: ChronikIndexerRef) -> Router { + fn make_router(indexer: ChronikIndexerRef, node: NodeRef) -> Router { Router::new() .route("/blockchain-info", routing::get(handle_blockchain_info)) .route("/block/:hash_or_height", routing::get(handle_block)) @@ -136,6 +142,7 @@ .route("/ws", routing::get(handle_ws)) .fallback(handlers::handle_not_found) .layer(Extension(indexer)) + .layer(Extension(node)) } } diff --git a/chronik/chronik-indexer/Cargo.toml b/chronik/chronik-indexer/Cargo.toml --- a/chronik/chronik-indexer/Cargo.toml +++ b/chronik/chronik-indexer/Cargo.toml @@ -18,6 +18,9 @@ chronik-proto = { path = "../chronik-proto" } chronik-util = { path = "../chronik-util" } +# Bridge to C++ +cxx = "1.0" + # Protobuf en-/decoding prost = "0.11" diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -59,6 +59,13 @@ subs: RwLock, } +/// Access to the bitcoind node. +#[derive(Debug)] +pub struct Node { + /// FFI bridge to the node. + pub bridge: cxx::UniquePtr, +} + /// Block to be indexed by Chronik. #[derive(Clone, Debug, Default, Eq, PartialEq)] pub struct ChronikBlock { diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -17,7 +17,7 @@ use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; use chronik_db::mem::MempoolTx; use chronik_http::server::{ChronikServer, ChronikServerParams}; -use chronik_indexer::indexer::{ChronikIndexer, ChronikIndexerParams}; +use chronik_indexer::indexer::{ChronikIndexer, ChronikIndexerParams, Node}; use chronik_util::{log, log_chronik}; use thiserror::Error; use tokio::sync::RwLock; @@ -55,7 +55,7 @@ fn try_setup_chronik( params: ffi::SetupParams, config: &ffi::Config, - node: &ffi::NodeContext, + node_context: &ffi::NodeContext, ) -> Result<()> { abc_rust_error::install(); let hosts = params @@ -64,7 +64,7 @@ .map(|host| parse_socket_addr(host, params.default_port)) .collect::>>()?; log!("Starting Chronik bound to {:?}\n", hosts); - let bridge = chronik_bridge::ffi::make_bridge(config, node); + let bridge = chronik_bridge::ffi::make_bridge(config, node_context); let bridge_ref = expect_unique_ptr("make_bridge", &bridge); let mut indexer = ChronikIndexer::setup(ChronikIndexerParams { datadir_net: params.datadir_net.into(), @@ -77,25 +77,31 @@ return Ok(()); } let indexer = Arc::new(RwLock::new(indexer)); + let node = Arc::new(Node { bridge }); let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build()?; let server = runtime.block_on({ let indexer = Arc::clone(&indexer); + let node = Arc::clone(&node); async move { // try_bind requires a Runtime - ChronikServer::setup(ChronikServerParams { hosts, indexer }) + ChronikServer::setup(ChronikServerParams { + hosts, + indexer, + node, + }) } })?; runtime.spawn(async move { ok_or_abort_node("ChronikServer::serve", server.serve().await); }); let chronik = Box::new(Chronik { - bridge: Arc::new(bridge), + node: Arc::clone(&node), indexer, _runtime: runtime, }); - StartChronikValidationInterface(node, chronik); + StartChronikValidationInterface(node_context, chronik); Ok(()) } @@ -117,7 +123,7 @@ /// This makes it so when this struct is dropped, all handles are relased /// cleanly. pub struct Chronik { - bridge: Arc>, + node: Arc, indexer: Arc>, // Having this here ensures HTTP server, outstanding requests etc. will get // stopped when `Chronik` is dropped. @@ -183,7 +189,7 @@ time_first_seen: i64, ) -> Result<()> { let mut indexer = self.indexer.blocking_write(); - let tx = self.bridge.bridge_tx(ptx)?; + let tx = self.node.bridge.bridge_tx(ptx)?; let txid = TxId::from(tx.txid); indexer.handle_tx_added_to_mempool(MempoolTx { tx: Tx::from(tx), @@ -230,7 +236,7 @@ } fn finalize_block(&self, bindex: &ffi::CBlockIndex) -> Result<()> { - let block = self.bridge.load_block(bindex)?; + let block = self.node.bridge.load_block(bindex)?; let block_ref = expect_unique_ptr("load_block", &block); let mut indexer = self.indexer.blocking_write(); let block = indexer.make_chronik_block(block_ref, bindex)?;