diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -90,6 +90,7 @@ dependencies = [ "async-trait", "axum-core", + "base64", "bitflags", "bytes", "futures-util", @@ -107,8 +108,10 @@ "serde_json", "serde_path_to_error", "serde_urlencoded", + "sha-1", "sync_wrapper", "tokio", + "tokio-tungstenite", "tower", "tower-http", "tower-layer", @@ -162,6 +165,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8fe8f5a8a398345e52358e18ff07cc17a568fbca5c6f73873d3a62056309603" +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "bindgen" version = "0.64.0" @@ -211,6 +220,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + [[package]] name = "byteorder" version = "1.4.3" @@ -323,6 +341,7 @@ "prost-build", "tempdir", "thiserror", + "tokio", ] [[package]] @@ -399,6 +418,15 @@ "volatile-register", ] +[[package]] +name = "cpufeatures" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "280a9f2d8b3a38871a3c8a46fb80db65e5e5ed97da80c4d08bf27fb63e35e181" +dependencies = [ + "libc", +] + [[package]] name = "critical-section" version = "0.2.7" @@ -411,6 +439,16 @@ "riscv", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "ctor" version = "0.1.22" @@ -471,6 +509,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" +[[package]] +name = "digest" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" +dependencies = [ + "block-buffer", + "crypto-common", +] + [[package]] name = "either" version = "1.8.1" @@ -643,6 +691,27 @@ "slab", ] +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c85e1d9ab2eadba7e5040d4e09cbd6d072b76a557ad64e797c2cb9d4da21d7e4" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "gimli" version = "0.26.2" @@ -774,6 +843,16 @@ "want", ] +[[package]] +name = "idna" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" +dependencies = [ + "unicode-bidi", + "unicode-normalization", +] + [[package]] name = "indenter" version = "0.3.3" @@ -1100,6 +1179,12 @@ "serde", ] +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "pretty_assertions" version = "1.2.1" @@ -1207,6 +1292,27 @@ "winapi", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -1222,6 +1328,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -1443,6 +1558,17 @@ "serde", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "shlex" version = "1.1.0" @@ -1517,7 +1643,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" dependencies = [ - "rand", + "rand 0.4.6", "remove_dir_all", ] @@ -1563,6 +1689,21 @@ "syn", ] +[[package]] +name = "tinyvec" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87cc5ceb3875bb20c2890005a4e226a4651264a5c75edb2421b52861a0a0cb50" +dependencies = [ + "tinyvec_macros", +] + +[[package]] +name = "tinyvec_macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" + [[package]] name = "tokio" version = "1.25.0" @@ -1570,7 +1711,9 @@ checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" dependencies = [ "autocfg", + "bytes", "libc", + "memchr", "mio", "num_cpus", "pin-project-lite", @@ -1590,6 +1733,18 @@ "syn", ] +[[package]] +name = "tokio-tungstenite" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tower" version = "0.4.13" @@ -1664,18 +1819,75 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.17.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" +dependencies = [ + "base64", + "byteorder", + "bytes", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha-1", + "thiserror", + "url", + "utf-8", +] + +[[package]] +name = "typenum" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" + +[[package]] +name = "unicode-bidi" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92888ba5573ff080736b3648696b70cafad7d250551175acbaa4e0385b3e1460" + [[package]] name = "unicode-ident" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15c61ba63f9235225a22310255a29b806b907c9b8c964bcbd0a2c70f3f2deea7" +[[package]] +name = "unicode-normalization" +version = "0.1.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c5713f0fc4b5db668a2ac63cdb7bb4469d8c9fed047b1d0292cc7b0ce2ba921" +dependencies = [ + "tinyvec", +] + [[package]] name = "unicode-width" version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" +[[package]] +name = "url" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" +dependencies = [ + "form_urlencoded", + "idna", + "percent-encoding", +] + +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "vcell" version = "0.1.3" @@ -1688,6 +1900,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version_check" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" + [[package]] name = "void" version = "1.0.2" diff --git a/chronik/chronik-cpp/chronik_validationinterface.cpp b/chronik/chronik-cpp/chronik_validationinterface.cpp --- a/chronik/chronik-cpp/chronik_validationinterface.cpp +++ b/chronik/chronik-cpp/chronik_validationinterface.cpp @@ -56,7 +56,9 @@ } void BlockFinalized(const CBlockIndex *pindex) override { - m_chronik->handle_block_finalized(pindex->nHeight); + m_chronik->handle_block_finalized( + pindex->nHeight, + chronik::util::HashToArray(pindex->GetBlockHash())); } }; diff --git a/chronik/chronik-http/Cargo.toml b/chronik/chronik-http/Cargo.toml --- a/chronik/chronik-http/Cargo.toml +++ b/chronik/chronik-http/Cargo.toml @@ -22,7 +22,7 @@ async-trait = "0.1" # HTTP webapps -axum = "0.6" +axum = { version = "0.6", features = ["ws"] } # Async toolkit futures = "0.3" diff --git a/chronik/chronik-http/src/error.rs b/chronik/chronik-http/src/error.rs --- a/chronik/chronik-http/src/error.rs +++ b/chronik/chronik-http/src/error.rs @@ -28,35 +28,42 @@ } } +pub(crate) fn report_status_error( + report: Report, +) -> (StatusCode, proto::Error) { + let msg = report.to_string(); + let (status_code, response_msg) = match parse_error_status(&msg) { + None => { + // Unknown internal error: don't expose potential + // vulnerabilities through HTTP, only log to node. + log_chronik!("{report:?}\n"); + log!("Chronik HTTP server got an unknown error: {report:#}\n"); + let unknown_msg = "Unknown error, contact admins".to_string(); + (StatusCode::INTERNAL_SERVER_ERROR, unknown_msg) + } + Some(status) if status.is_server_error() => { + // Internal server error, but explicitly marked with "5xx: ", so + // we expose it through HTTP (and also log to node). + log_chronik!("{report:?}\n"); + log!( + "Chronik HTTP server got an internal server error: \ + {report:#}\n" + ); + (status, msg) + } + Some(status) => { + // "Normal" error (400, 404, etc.), expose, but don't log. + (status, msg) + } + }; + let proto_response = proto::Error { msg: response_msg }; + (status_code, proto_response) +} + impl IntoResponse for ReportError { fn into_response(self) -> Response { let ReportError(report) = self; - let msg = report.to_string(); - let (code, response_msg) = match parse_error_status(&msg) { - None => { - // Unknown internal error: don't expose potential - // vulnerabilities through HTTP, only log to node. - log_chronik!("{report:?}\n"); - log!("Chronik HTTP server got an unknown error: {report:#}\n"); - let unknown_msg = "Unknown error, contact admins".to_string(); - (StatusCode::INTERNAL_SERVER_ERROR, unknown_msg) - } - Some(status) if status.is_server_error() => { - // Internal server error, but explicitly marked with "5xx: ", so - // we expose it through HTTP (and also log to node). - log_chronik!("{report:?}\n"); - log!( - "Chronik HTTP server got an internal server error: \ - {report:#}\n" - ); - (status, msg) - } - Some(status) => { - // "Normal" error (400, 404, etc.), expose, but don't log. - (status, msg) - } - }; - let proto_response = proto::Error { msg: response_msg }; + let (code, proto_response) = report_status_error(report); (code, Protobuf(proto_response)).into_response() } } diff --git a/chronik/chronik-http/src/lib.rs b/chronik/chronik-http/src/lib.rs --- a/chronik/chronik-http/src/lib.rs +++ b/chronik/chronik-http/src/lib.rs @@ -10,4 +10,5 @@ pub mod protobuf; pub mod server; pub(crate) mod validation; + pub mod ws; } diff --git a/chronik/chronik-http/src/server.rs b/chronik/chronik-http/src/server.rs --- a/chronik/chronik-http/src/server.rs +++ b/chronik/chronik-http/src/server.rs @@ -9,7 +9,8 @@ use abc_rust_error::{Result, WrapErr}; use axum::{ - extract::{Path, Query}, + extract::{Path, Query, WebSocketUpgrade}, + response::IntoResponse, routing, Extension, Router, }; use bitcoinsuite_core::tx::TxId; @@ -19,9 +20,13 @@ use thiserror::Error; use tokio::sync::RwLock; -use crate::{error::ReportError, handlers, protobuf::Protobuf}; +use crate::{ + error::ReportError, handlers, protobuf::Protobuf, + ws::handle_subscribe_socket, +}; -type ChronikIndexerRef = Arc>; +/// Ref-counted indexer with read or write access +pub type ChronikIndexerRef = Arc>; /// Params defining what and where to serve for [`ChronikServer`]. #[derive(Clone, Debug)] @@ -120,6 +125,7 @@ "/script/:type/:payload/unconfirmed-txs", routing::get(handle_script_unconfirmed_txs), ) + .route("/ws", routing::get(handle_ws)) .fallback(handlers::handle_not_found) .layer(Extension(indexer)) } @@ -191,3 +197,10 @@ .await?, )) } + +async fn handle_ws( + ws: WebSocketUpgrade, + Extension(indexer): Extension, +) -> impl IntoResponse { + ws.on_upgrade(|ws| handle_subscribe_socket(ws, indexer)) +} diff --git a/chronik/chronik-http/src/ws.rs b/chronik/chronik-http/src/ws.rs new file mode 100644 --- /dev/null +++ b/chronik/chronik-http/src/ws.rs @@ -0,0 +1,109 @@ +// Copyright (c) 2023 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +//! Module for [`handle_subscribe_socket`]. + +use abc_rust_error::Result; +use axum::extract::ws::{self, WebSocket}; +use chronik_indexer::subs::{BlockMsg, BlockMsgType}; +use chronik_proto::proto; +use prost::Message; +use thiserror::Error; +use tokio::sync::broadcast; + +use crate::{error::report_status_error, server::ChronikIndexerRef}; + +/// Errors for [`ChronikServer`]. +#[derive(Debug, Eq, Error, PartialEq)] +pub enum ChronikWsError { + /// Unexpected [`ws::Message`] type. + #[error("Unexpected message type {0}")] + UnexpectedMessageType(&'static str), +} + +use self::ChronikWsError::*; + +enum WsAction { + Close, + Message(ws::Message), + Nothing, +} + +fn sub_client_msg_action( + client_msg: Option>, +) -> Result { + let client_msg = match client_msg { + Some(client_msg) => client_msg, + None => return Ok(WsAction::Close), + }; + match client_msg { + Ok(ws::Message::Binary(_)) => Ok(WsAction::Nothing), + Ok(ws::Message::Text(_)) => Err(UnexpectedMessageType("Text").into()), + Ok(ws::Message::Ping(ping)) => { + Ok(WsAction::Message(ws::Message::Pong(ping))) + } + Ok(ws::Message::Pong(_pong)) => Ok(WsAction::Nothing), + Ok(ws::Message::Close(_)) | Err(_) => Ok(WsAction::Close), + } +} + +fn sub_block_msg_action( + block_msg: Result, +) -> Result { + use proto::{ws_msg::MsgType, BlockMsgType::*}; + let Ok(block_msg) = block_msg else { return Ok(WsAction::Nothing) }; + let block_msg_type = match block_msg.msg_type { + BlockMsgType::Connected => Connected, + BlockMsgType::Disconnected => Disconnected, + BlockMsgType::Finalized => Finalized, + }; + let msg_type = Some(MsgType::Block(proto::MsgBlock { + msg_type: block_msg_type as _, + block_hash: block_msg.hash.to_vec(), + block_height: block_msg.height, + })); + let msg_proto = proto::WsMsg { msg_type }; + let msg = ws::Message::Binary(msg_proto.encode_to_vec()); + Ok(WsAction::Message(msg)) +} + +/// Future for a WS connection, which will run indefinitely until the WS will be +/// closed. +pub async fn handle_subscribe_socket( + mut socket: WebSocket, + indexer: ChronikIndexerRef, +) { + let mut revc_blocks = { + let indexer = indexer.read().await; + let subs = indexer.subs().read().await; + subs.sub_to_block_msgs() + }; + + loop { + let sub_action = tokio::select! { + client_msg = socket.recv() => sub_client_msg_action(client_msg), + block_msg = revc_blocks.recv() => sub_block_msg_action(block_msg), + }; + + let subscribe_action = match sub_action { + Ok(subscribe_action) => subscribe_action, + // Turn Err into Message and reply + Err(report) => { + let (_, error_proto) = report_status_error(report); + WsAction::Message(ws::Message::Binary( + error_proto.encode_to_vec(), + )) + } + }; + + match subscribe_action { + WsAction::Close => return, + WsAction::Message(msg) => match socket.send(msg).await { + Ok(()) => {} + Err(_) => return, + }, + WsAction::Nothing => {} + } + } +} diff --git a/chronik/chronik-indexer/Cargo.toml b/chronik/chronik-indexer/Cargo.toml --- a/chronik/chronik-indexer/Cargo.toml +++ b/chronik/chronik-indexer/Cargo.toml @@ -24,6 +24,11 @@ # Derive error enums thiserror = "1.0" +# Async runtime +[dependencies.tokio] +version = "1.25" +features = ["sync", "rt", "rt-multi-thread", "macros"] + [dev-dependencies] # Colorful diffs for assertions pretty_assertions = "1.0" diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -24,10 +24,12 @@ }; use chronik_util::{log, log_chronik}; use thiserror::Error; +use tokio::sync::RwLock; use crate::{ avalanche::Avalanche, query::{QueryBlocks, QueryGroupHistory, QueryTxs}, + subs::{BlockMsg, BlockMsgType, Subs}, }; const CURRENT_INDEXER_VERSION: SchemaVersion = 5; @@ -50,6 +52,7 @@ mempool: Mempool, script_group: ScriptGroup, avalanche: Avalanche, + subs: RwLock, } /// Block to be indexed by Chronik. @@ -147,6 +150,7 @@ mempool, script_group, avalanche: Avalanche::default(), + subs: RwLock::new(Subs::default()), }) } @@ -285,6 +289,12 @@ self.db.write_batch(batch)?; self.mempool .removed_mined_txs(block.block_txs.txs.iter().map(|tx| tx.txid)); + let subs = self.subs.blocking_read(); + subs.broadcast_block_msg(BlockMsg { + msg_type: BlockMsgType::Connected, + hash: block.db_block.hash, + height: block.db_block.height, + }); Ok(()) } @@ -305,6 +315,12 @@ script_history_writer.delete(&mut batch, &index_txs)?; self.avalanche.disconnect_block(block.db_block.height)?; self.db.write_batch(batch)?; + let subs = self.subs.blocking_read(); + subs.broadcast_block_msg(BlockMsg { + msg_type: BlockMsgType::Disconnected, + hash: block.db_block.hash, + height: block.db_block.height, + }); Ok(()) } @@ -312,8 +328,15 @@ pub fn handle_block_finalized( &mut self, block_height: BlockHeight, + block_hash: BlockHash, ) -> Result<()> { self.avalanche.finalize_block(block_height)?; + let subs = self.subs.blocking_read(); + subs.broadcast_block_msg(BlockMsg { + msg_type: BlockMsgType::Finalized, + hash: block_hash, + height: block_height, + }); Ok(()) } @@ -346,6 +369,11 @@ }) } + /// Subscribers, behind read/write lock + pub fn subs(&self) -> &RwLock { + &self.subs + } + /// Build the ChronikBlock from the CBlockIndex pub fn make_chronik_block( &self, diff --git a/chronik/chronik-indexer/src/lib.rs b/chronik/chronik-indexer/src/lib.rs --- a/chronik/chronik-indexer/src/lib.rs +++ b/chronik/chronik-indexer/src/lib.rs @@ -8,4 +8,5 @@ pub mod avalanche; pub mod indexer; pub mod query; + pub mod subs; } diff --git a/chronik/chronik-indexer/src/subs.rs b/chronik/chronik-indexer/src/subs.rs new file mode 100644 --- /dev/null +++ b/chronik/chronik-indexer/src/subs.rs @@ -0,0 +1,63 @@ +// Copyright (c) 2023 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +//! Module containing [`Subs`]. + +use bitcoinsuite_core::block::BlockHash; +use chronik_db::io::BlockHeight; +use chronik_util::log; +use tokio::sync::broadcast; + +/// Block update message. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BlockMsg { + /// What happened with the block. + pub msg_type: BlockMsgType, + /// Hash of the block which we got an update for. + pub hash: BlockHash, + /// Height of the block which we got an update for. + pub height: BlockHeight, +} + +/// Type of message for the block. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum BlockMsgType { + /// Block connected to the blockchain + Connected, + /// Block disconnected from the blockchain + Disconnected, + /// Block has been finalized by Avalanche + Finalized, +} + +const BLOCK_CHANNEL_CAPACITY: usize = 16; + +/// Struct for managing subscriptions to e.g. block updates. +#[derive(Debug, Clone)] +pub struct Subs { + subs_block: broadcast::Sender, +} + +impl Subs { + /// Add a subscriber to block messages. + pub fn sub_to_block_msgs(&self) -> broadcast::Receiver { + self.subs_block.subscribe() + } + + pub(crate) fn broadcast_block_msg(&self, msg: BlockMsg) { + if self.subs_block.receiver_count() > 0 { + if let Err(err) = self.subs_block.send(msg) { + log!("Unexpected send error: {}\n", err); + } + } + } +} + +impl Default for Subs { + fn default() -> Self { + Subs { + subs_block: broadcast::channel(BLOCK_CHANNEL_CAPACITY).0, + } + } +} diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -11,6 +11,7 @@ use abc_rust_error::Result; use bitcoinsuite_core::{ + block::BlockHash, script::Script, tx::{Tx, TxId}, }; @@ -169,11 +170,18 @@ } /// Block finalized with Avalanche - pub fn handle_block_finalized(&self, block_height: i32) { + pub fn handle_block_finalized( + &self, + block_height: i32, + block_hash: [u8; 32], + ) { let mut indexer = self.indexer.blocking_write(); ok_or_abort_node( "handle_block_finalized", - indexer.handle_block_finalized(block_height), + indexer.handle_block_finalized( + block_height, + BlockHash::from(block_hash), + ), ); } diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs --- a/chronik/chronik-lib/src/ffi.rs +++ b/chronik/chronik-lib/src/ffi.rs @@ -44,7 +44,11 @@ block: &CBlock, bindex: &CBlockIndex, ); - fn handle_block_finalized(&self, block_height: i32); + fn handle_block_finalized( + &self, + block_height: i32, + block_hash: [u8; 32], + ); } unsafe extern "C++" { diff --git a/chronik/chronik-proto/proto/chronik.proto b/chronik/chronik-proto/proto/chronik.proto --- a/chronik/chronik-proto/proto/chronik.proto +++ b/chronik/chronik-proto/proto/chronik.proto @@ -100,6 +100,37 @@ uint32 num_txs = 3; } +// Message coming from the WebSocket +message WsMsg { + // Kind of message + oneof msg_type { + // Error, e.g. when a bad message has been sent into the WebSocket + Error error = 1; + // Block got connected, disconnected, finalized, etc. + MsgBlock block = 2; + } +} + +// Block got connected, disconnected, finalized, etc. +message MsgBlock { + // What happened to the block + BlockMsgType msg_type = 1; + // Hash of the block (little-endian) + bytes block_hash = 2; + // Height of the block + int32 block_height = 3; +} + +// Type of message for the block +enum BlockMsgType { + // Block connected to the blockchain + CONNECTED = 0; + // Block disconnected from the blockchain + DISCONNECTED = 1; + // Block has been finalized by Avalanche + FINALIZED = 2; +} + // Error message returned from our APIs. message Error { // 2, as legacy chronik uses this for the message so we're still compatible. diff --git a/contrib/buildbot/requirements.txt b/contrib/buildbot/requirements.txt --- a/contrib/buildbot/requirements.txt +++ b/contrib/buildbot/requirements.txt @@ -6,3 +6,4 @@ requests responses slackclient +websocket-client diff --git a/contrib/utils/install-dependencies-bullseye.sh b/contrib/utils/install-dependencies-bullseye.sh --- a/contrib/utils/install-dependencies-bullseye.sh +++ b/contrib/utils/install-dependencies-bullseye.sh @@ -141,6 +141,8 @@ pip3 install "protobuf<=3.20" # For security-check.py and symbol-check.py pip3 install "lief>=0.11.4" +# For Chronik WebSocket endpoint +pip3 install websocket-client # Up-to-date mypy, isort and flynt packages are required python linters pip3 install isort==5.6.4 mypy==0.910 flynt==0.78 flake8==6.0.0 diff --git a/test/functional/chronik_ws.py b/test/functional/chronik_ws.py new file mode 100755 --- /dev/null +++ b/test/functional/chronik_ws.py @@ -0,0 +1,90 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023 The Bitcoin developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test whether Chronik sends WebSocket messages correctly.""" + +from test_framework.avatools import can_find_inv_in_poll, get_ava_p2p_interface +from test_framework.chronik.client import ChronikClient +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal + +QUORUM_NODE_COUNT = 16 + + +class ChronikWsTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + self.extra_args = [ + [ + '-avaproofstakeutxodustthreshold=1000000', + '-avaproofstakeutxoconfirmations=1', + '-avacooldown=0', + '-avaminquorumstake=0', + '-avaminavaproofsnodecount=0', + '-chronik', + '-whitelist=noban@127.0.0.1', + ], + ] + self.supports_cli = False + + def skip_test_if_missing_module(self): + self.skip_if_no_chronik() + + def run_test(self): + import chronik_pb2 as pb + + node = self.nodes[0] + chronik = ChronikClient('127.0.0.1', node.chronik_port) + + # Build a fake quorum of nodes. + def get_quorum(): + return [get_ava_p2p_interface(self, node) + for _ in range(0, QUORUM_NODE_COUNT)] + + def has_finalized_tip(tip_expected): + hash_tip_final = int(tip_expected, 16) + can_find_inv_in_poll(quorum, hash_tip_final) + return node.isfinalblock(tip_expected) + + # Pick one node from the quorum for polling. + quorum = get_quorum() + + assert node.getavalancheinfo()['ready_to_poll'] is True + + tip = node.getbestblockhash() + self.wait_until(lambda: has_finalized_tip(tip)) + + ws = chronik.ws(timeout=30) + + # Mine block + tip = self.generate(node, 1)[-1] + height = node.getblockcount() + + # We get a CONNECTED msg + assert_equal(ws.recv(), pb.WsMsg(block=pb.MsgBlock( + msg_type=pb.CONNECTED, + block_hash=bytes.fromhex(tip)[::-1], + block_height=height, + ))) + + # After we wait, we get a FINALIZED msg + self.wait_until(lambda: has_finalized_tip(tip)) + assert_equal(ws.recv(), pb.WsMsg(block=pb.MsgBlock( + msg_type=pb.FINALIZED, + block_hash=bytes.fromhex(tip)[::-1], + block_height=height, + ))) + + # When we invalidate, we get a DISCONNECTED msg + node.invalidateblock(tip) + assert_equal(ws.recv(), pb.WsMsg(block=pb.MsgBlock( + msg_type=pb.DISCONNECTED, + block_hash=bytes.fromhex(tip)[::-1], + block_height=height, + ))) + + +if __name__ == '__main__': + ChronikWsTest().main() diff --git a/test/functional/test_framework/chronik/client.py b/test/functional/test_framework/chronik/client.py --- a/test/functional/test_framework/chronik/client.py +++ b/test/functional/test_framework/chronik/client.py @@ -6,6 +6,8 @@ import http.client from typing import Union +import websocket + # Timespan when HTTP requests to Chronik time out DEFAULT_TIMEOUT = 30 @@ -78,6 +80,17 @@ _pb().TxHistoryPage) +class ChronikWs: + def __init__(self, ws) -> None: + self.ws = ws + + def recv(self): + data = self.ws.recv() + ws_msg = _pb().WsMsg() + ws_msg.ParseFromString(data) + return ws_msg + + class ChronikClient: CONTENT_TYPE = 'application/x-protobuf' @@ -119,3 +132,8 @@ def script(self, script_type: str, script_payload: str) -> ChronikScriptClient: return ChronikScriptClient(self, script_type, script_payload) + + def ws(self, *, timeout=None) -> ChronikWs: + ws = websocket.WebSocket() + ws.connect(f'ws://{self.host}:{self.port}/ws', timeout=timeout) + return ChronikWs(ws)