diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -10,7 +10,7 @@ "eyre", "http 1.2.0", "stable-eyre", - "thiserror 2.0.4", + "thiserror 2.0.12", ] [[package]] @@ -166,7 +166,7 @@ "log", "pin-project-lite", "tokio", - "tungstenite", + "tungstenite 0.24.0", ] [[package]] @@ -271,7 +271,7 @@ "sha1", "sync_wrapper 1.0.2", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.24.0", "tower 0.5.1", "tower-layer", "tower-service", @@ -474,7 +474,9 @@ "bitcoinsuite-core 0.1.0", "bytes", "chronik-proto", + "futures-util", "hex", + "lazy_static", "pretty_assertions", "prost", "prost-build", @@ -484,6 +486,8 @@ "serde_json", "thiserror 1.0.69", "tokio", + "tokio-tungstenite 0.26.2", + "url", ] [[package]] @@ -513,7 +517,7 @@ "serde", "serde_json", "sha2", - "thiserror 2.0.4", + "thiserror 2.0.12", ] [[package]] @@ -569,7 +573,7 @@ "itertools 0.10.5", "pretty_assertions", "serde", - "thiserror 2.0.4", + "thiserror 2.0.12", ] [[package]] @@ -706,7 +710,7 @@ "seahash", "serde", "tempdir", - "thiserror 2.0.4", + "thiserror 2.0.12", "topo_sort", ] @@ -737,7 +741,7 @@ "prost", "rustls", "serde_json", - "thiserror 2.0.4", + "thiserror 2.0.12", "tokio", "tower-http 0.5.2", "tower-service", @@ -765,7 +769,7 @@ "prost", "prost-build", "tempdir", - "thiserror 2.0.4", + "thiserror 2.0.12", "tokio", ] @@ -809,7 +813,7 @@ "pyo3", "serde", "tempdir", - "thiserror 2.0.4", + "thiserror 2.0.12", "toml 0.8.19", "versions", ] @@ -845,7 +849,7 @@ "chronik-util", "cxx", "cxx-build", - "thiserror 2.0.4", + "thiserror 2.0.12", "tokio", ] @@ -1151,7 +1155,7 @@ "ecash-secp256k1", "ripemd", "sha2", - "thiserror 2.0.4", + "thiserror 2.0.12", "wasm-bindgen", ] @@ -1162,7 +1166,7 @@ "bincode 1.3.3", "bitcoin_hashes 0.14.0", "ecash-secp256k1-sys", - "getrandom", + "getrandom 0.2.15", "hex_lit", "rand 0.8.5", "rand_core 0.6.4", @@ -1279,7 +1283,7 @@ "regex", "serde", "serde_json", - "thiserror 2.0.4", + "thiserror 2.0.12", "tokio", "toml 0.5.11", "tower-http 0.3.5", @@ -1488,10 +1492,22 @@ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + [[package]] name = "gimli" version = "0.28.1" @@ -2304,7 +2320,7 @@ checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -2571,7 +2587,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -2780,6 +2796,12 @@ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "radium" version = "0.5.3" @@ -2806,10 +2828,21 @@ checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", + "rand_chacha 0.3.1", "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", + "zerocopy 0.8.24", +] + [[package]] name = "rand_chacha" version = "0.3.1" @@ -2820,6 +2853,16 @@ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -2841,7 +2884,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", +] + +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", ] [[package]] @@ -2868,7 +2920,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43" dependencies = [ - "getrandom", + "getrandom 0.2.15", "libredox", "thiserror 1.0.69", ] @@ -2959,7 +3011,7 @@ dependencies = [ "cc", "cfg-if", - "getrandom", + "getrandom 0.2.15", "libc", "spin", "untrusted", @@ -3489,11 +3541,11 @@ [[package]] name = "thiserror" -version = "2.0.4" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f49a1853cf82743e3b7950f77e0f4d622ca36cf4317cba00c767838bac8d490" +checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708" dependencies = [ - "thiserror-impl 2.0.4", + "thiserror-impl 2.0.12", ] [[package]] @@ -3509,9 +3561,9 @@ [[package]] name = "thiserror-impl" -version = "2.0.4" +version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8381894bb3efe0c4acac3ded651301ceee58a15d47c2e34885ed1908ad667061" +checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d" dependencies = [ "proc-macro2", "quote", @@ -3586,7 +3638,19 @@ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.24.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9daff607c6d2bf6c16fd681ccb7eecc83e4e2cdc1ca067ffaadfca5de7f084" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.26.2", ] [[package]] @@ -3780,6 +3844,23 @@ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.26.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4793cb5e56680ecbb1d843515b23b6de9a75eb04b66643e256a396d43be33c13" +dependencies = [ + "bytes", + "data-encoding", + "http 1.2.0", + "httparse", + "log", + "rand 0.9.0", + "sha1", + "thiserror 2.0.12", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" @@ -3900,6 +3981,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -4220,6 +4310,15 @@ "windows-sys 0.48.0", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "write16" version = "1.0.0" @@ -4275,7 +4374,16 @@ checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +dependencies = [ + "zerocopy-derive 0.8.24", ] [[package]] @@ -4289,6 +4397,17 @@ "syn 2.0.90", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.90", +] + [[package]] name = "zerofrom" version = "0.1.5" diff --git a/modules/bitcoinsuite-chronik-client/Cargo.toml b/modules/bitcoinsuite-chronik-client/Cargo.toml --- a/modules/bitcoinsuite-chronik-client/Cargo.toml +++ b/modules/bitcoinsuite-chronik-client/Cargo.toml @@ -9,36 +9,22 @@ [dependencies] abc-rust-error = { path = "../../chronik/abc-rust-error"} -bitcoinsuite-core = { path = "../../chronik/bitcoinsuite-core" } -chronik-proto = { path = "../../chronik/chronik-proto/"} - -# Async trait facility async-trait = "0.1.86" - -# Error structs/enums -thiserror = "1.0" - -# HTTP client -reqwest = "0.11" - -# Async runtime and scheduler -tokio = { version = "1.14", features = ["full"] } - -# Protobuf (de)serialization -prost = "0.11" - -# Hex en-/decoding +bitcoinsuite-core = { path = "../../chronik/bitcoinsuite-core"} +chronik-proto = { path = "../../chronik/chronik-proto/"} +bytes = { version = "1.4", features = ["serde"]} +futures-util = "0.3" hex = "0.4" - +lazy_static = "1.4" +prost = "0.11" regex = "1" - -bytes = { version = "1.4", features = ["serde"] } - -# JSON serialization/deserialization -serde_json = "1.0" - -# Serializes and deserializes Rust data +reqwest = { version = "0.11", features = ["json"]} serde = "1.0.217" +serde_json = "1.0" +thiserror = "1.0" +tokio = { version = "1.14", features = ["full"]} +tokio-tungstenite = "0.26.2" +url = "2.5" [build-dependencies] # Build Protobuf structs diff --git a/modules/bitcoinsuite-chronik-client/src/failover_proxy.rs b/modules/bitcoinsuite-chronik-client/src/failover_proxy.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/src/failover_proxy.rs @@ -0,0 +1,289 @@ +// Copyright (c) 2023-2024 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +use abc_rust_error::Report; +use bytes::Bytes; +use reqwest::Url; +use reqwest::{Client, Method}; +use tokio::time::timeout; +use tokio::time::Duration; +use tokio_tungstenite::connect_async; +use url; + +use crate::WsEndpoint; + +const WEBSOCKET_TIMEOUT_MS: u64 = 5000; + +#[derive(Debug, Clone)] +pub struct Endpoint { + pub url: String, + pub ws_url: String, +} + +// Handles the networking to Chronik `Endpoint`s, including cycling +// through both types of endpoints. +pub struct FailoverProxy { + endpoint_array: Vec<Endpoint>, + working_index: usize, +} + +impl FailoverProxy { + pub fn new(urls: impl Into<Vec<String>>) -> Result<Self, Report> { + let urls_vec = urls.into(); + + if urls_vec.is_empty() { + return Err(Report::msg("Url array must not be empty")); + } + + // Validate each URL + for url in &urls_vec { + if url.ends_with('/') { + return Err(Report::msg(format!( + "`url` cannot end with '/', got: {}", + url + ))); + } + + if !url.starts_with("https://") && !url.starts_with("http://") { + return Err(Report::msg(format!( + "`url` must start with 'https://' or 'http://', got: {}", + url + ))); + } + } + + let endpoint_array = Self::append_ws_urls(urls_vec); + + // Create and return the FailoverProxy + Ok(Self { + endpoint_array, + working_index: 0, + }) + } + + pub fn get_endpoint_array(&self) -> &[Endpoint] { + &self.endpoint_array + } + + // Derives the endpoint array index based on working_index + pub fn derive_endpoint_index(&self, loop_index: usize) -> usize { + (self.working_index + loop_index) % self.endpoint_array.len() + } + + pub fn set_working_index(&mut self, new_index: usize) { + self.working_index = new_index; + } + + // Converts an array of chronik http/https urls into websocket equivalents + fn append_ws_urls(urls: Vec<String>) -> Vec<Endpoint> { + urls.into_iter() + .map(|url| { + if url.starts_with("https://") { + Endpoint { + url: url.clone(), + ws_url: format!( + "wss://{}/ws", + &url["https://".len()..] + ), + } + } else if url.starts_with("http://") { + Endpoint { + url: url.clone(), + ws_url: format!("ws://{}/ws", &url["http://".len()..]), + } + } else { + panic!("Invalid url found in array: {}", url) + } + }) + .collect() + } + + pub async fn post( + &mut self, + path: &str, + data: Bytes, + ) -> Result<Bytes, Report> { + self.request(path, Method::POST, Some(data)).await + } + + pub async fn get(&mut self, path: &str) -> Result<Bytes, Report> { + self.request(path, Method::GET, None).await + } + + async fn request( + &mut self, + path: &str, + method: Method, + data: Option<Bytes>, + ) -> Result<Bytes, Report> { + for i in 0..self.endpoint_array.len() { + let index = self.derive_endpoint_index(i); + let this_proxy_url = &self.endpoint_array[index].url; + + let client = Client::new(); + let mut request_builder = client + .request(method.clone(), format!("{}{}", this_proxy_url, path)); + + if let Some(body_data) = &data { + request_builder = request_builder.body(body_data.clone()); + } + + match request_builder.send().await { + Ok(response) => { + if response.status().is_success() { + self.working_index = index; + + return match response.bytes().await { + Ok(bytes) => Ok(bytes), + Err(err) => Err(Report::msg(format!( + "Failed to read response bytes: {}", + err + ))), + }; + } else { + let status = response.status(); + let error_text = match response.text().await { + Ok(text) => text, + Err(_) => { + String::from("Unable to read error response") + } + }; + + if error_text.contains( + "Unable to decode error msg, chronik server is \ + indexing or in error state", + ) || error_text.trim().ends_with(":") + { + continue; + } + + return Err(Report::msg(format!( + "HTTP error {}: {}", + status, error_text + ))); + } + } + Err(err) => { + if err.is_connect() || err.is_timeout() { + continue; + } + + let err_string = err.to_string(); + if err_string.contains( + "Unable to decode error msg, chronik server is \ + indexing or in error state", + ) || err_string.trim().ends_with(":") + { + continue; + } + + return Err(Report::msg(format!("Request error: {}", err))); + } + } + } + + Err(Report::msg("Error connecting to known Chronik instances")) + } + + pub async fn websocket_url_connects( + &self, + ws_url: &str, + ) -> Result<bool, Report> { + let url = Url::parse(ws_url).map_err(|err| { + Report::msg(format!("Invalid WebSocket URL: {}", err)) + })?; + + let url_str = url.as_str(); + + let connection = timeout( + Duration::from_millis(WEBSOCKET_TIMEOUT_MS), + connect_async(url_str), + ) + .await; + + match connection { + Ok(Ok((_ws_stream, _))) => Ok(true), + Ok(Err(_)) | Err(_) => Ok(false), + } + } + + pub async fn connect_ws( + &mut self, + ws_endpoint: &mut WsEndpoint, + ) -> Result<(), Report> { + // Cycle through known Chronik instances + for i in 0..self.endpoint_array.len() { + let index = self.derive_endpoint_index(i); + let this_proxy_ws_url = &self.endpoint_array[index].ws_url; + + if let Ok(true) = + self.websocket_url_connects(this_proxy_ws_url).await + { + self.working_index = index; + + let (tx, rx) = tokio::sync::oneshot::channel(); + ws_endpoint.connected = Some(rx); + + let url = + url::Url::parse(this_proxy_ws_url).map_err(|err| { + Report::msg(format!("Invalid WebSocket URL: {}", err)) + })?; + + let (ws_stream, _) = + connect_async(url.as_str()).await.map_err(|err| { + Report::msg(format!( + "Failed to connect to WebSocket: {}", + err + )) + })?; + + ws_endpoint.ws = Some(ws_stream); + + // Collect subscriptions before using them + let scripts: Vec<_> = ws_endpoint + .subs + .scripts + .iter() + .map(|sub| (sub.script_type, sub.payload.clone())) + .collect(); + let lokad_ids: Vec<_> = ws_endpoint.subs.lokad_ids.clone(); + let tokens: Vec<_> = ws_endpoint.subs.tokens.clone(); + let blocks = ws_endpoint.subs.blocks; + + // Subscribe to all previously-subscribed scripts + for (script_type, payload) in scripts { + ws_endpoint + .subscribe_to_script(script_type, &payload) + .await?; + } + + // Subscribe to all previously-subscribed lokadIds + for lokad_id in lokad_ids { + ws_endpoint.subscribe_to_lokad_id(&lokad_id).await?; + } + + // Subscribe to all previously-subscribed tokenIds + for token_id in tokens { + ws_endpoint.subscribe_to_token_id(&token_id).await?; + } + + // Subscribe to blocks if previously subscribed + if blocks { + ws_endpoint.subscribe_to_blocks().await?; + } + + let _ = tx.send(()); + + if let Some(handler) = &ws_endpoint.on_connect { + handler(); + } + + return Ok(()); + } + } + + // If no WebSocket URLs connect, throw error + Err(Report::msg("Error connecting to known Chronik websockets")) + } +} diff --git a/modules/bitcoinsuite-chronik-client/src/handler.rs b/modules/bitcoinsuite-chronik-client/src/handler.rs --- a/modules/bitcoinsuite-chronik-client/src/handler.rs +++ b/modules/bitcoinsuite-chronik-client/src/handler.rs @@ -6,7 +6,6 @@ use std::sync::Arc; use abc_rust_error::Result; -use async_trait::async_trait; use serde_json::{from_str, Value}; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf}, @@ -19,7 +18,7 @@ pub writer: WriteHalf<UnixStream>, } -#[async_trait] +#[async_trait::async_trait] pub trait IpcReader: Sync + Send { async fn on_rx( &self, @@ -75,7 +74,6 @@ } } - /// Send message (encoded using JSON) to the handler‘s UnixStream pub async fn send_message(&mut self, message: &str) -> io::Result<()> { let json_message = serde_json::to_string(&message)?; self.writer.write_all(json_message.as_bytes()).await?; diff --git a/modules/bitcoinsuite-chronik-client/src/hex.rs b/modules/bitcoinsuite-chronik-client/src/hex.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/src/hex.rs @@ -0,0 +1,107 @@ +// Copyright (c) 2023-2024 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +use std::fmt; + +use abc_rust_error::Result; + +const LUT_HEX_4B: [char; 16] = [ + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', + 'f', +]; + +#[derive(Debug)] +pub struct HexError { + msg: String, +} + +impl fmt::Display for HexError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.msg) + } +} + +impl std::error::Error for HexError {} + +pub fn to_hex(buffer: &[u8]) -> String { + let mut out = String::with_capacity(buffer.len() * 2); + for &byte in buffer { + out.push(LUT_HEX_4B[(byte >> 4) as usize]); + out.push(LUT_HEX_4B[(byte & 0xf) as usize]); + } + out +} + +pub fn to_hex_rev(buffer: &[u8]) -> String { + let mut out = String::with_capacity(buffer.len() * 2); + for &byte in buffer.iter().rev() { + out.push(LUT_HEX_4B[(byte >> 4) as usize]); + out.push(LUT_HEX_4B[(byte & 0xf) as usize]); + } + out +} + +pub fn from_hex(str: &str) -> Result<Vec<u8>, HexError> { + if str.len() % 2 != 0 { + return Err(HexError { + msg: format!("Odd hex length: {}", str), + }); + } + + let n_bytes = str.len() / 2; + let mut array = Vec::with_capacity(n_bytes); + + for (idx, chunk) in str.as_bytes().chunks(2).enumerate() { + let pair = std::str::from_utf8(chunk).unwrap(); + let byte = match u8::from_str_radix(pair, 16) { + Ok(b) => b, + Err(_) => { + return Err(HexError { + msg: format!( + "Invalid hex pair: {}, at index {}", + pair, + idx * 2 + ), + }); + } + }; + array.push(byte); + } + + Ok(array) +} + +pub fn from_hex_rev(str: &str) -> Result<Vec<u8>, HexError> { + let mut array = from_hex(str)?; + array.reverse(); + Ok(array) +} + +pub fn validate_hex(str: &str) -> Result<(), HexError> { + if str.chars().any(|c| c.is_uppercase()) { + return Err(HexError { + msg: format!( + "Invalid hex: \"{}\". Payload must be lowercase hex string.", + str + ), + }); + } + + if str.len() % 2 != 0 { + return Err(HexError { + msg: format!("Odd hex length: {}", str), + }); + } + + if !str.chars().all(|c| c.is_ascii_hexdigit()) { + return Err(HexError { + msg: format!( + "Invalid hex: \"{}\". Payload must be lowercase hex string.", + str + ), + }); + } + + Ok(()) +} diff --git a/modules/bitcoinsuite-chronik-client/src/lib.rs b/modules/bitcoinsuite-chronik-client/src/lib.rs --- a/modules/bitcoinsuite-chronik-client/src/lib.rs +++ b/modules/bitcoinsuite-chronik-client/src/lib.rs @@ -3,17 +3,24 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. use std::fmt::Display; -use abc_rust_error::{Result, WrapErr}; +use ::hex::encode; +use abc_rust_error::{Report, Result, WrapErr}; use bitcoinsuite_core::hash::{Hashed, Sha256d}; use bytes::Bytes; pub use chronik_proto::proto; +use futures_util::SinkExt; +use prost::Message; use reqwest::{header::CONTENT_TYPE, StatusCode}; use thiserror::Error; +use tokio_tungstenite::WebSocketStream; use crate::ChronikClientError::*; +pub mod failover_proxy; pub mod handler; +pub mod hex; pub mod test_runner; +pub mod validation; #[derive(Debug, Clone)] pub struct ChronikClient { @@ -69,6 +76,44 @@ InvalidProtobuf(String), } +pub struct WsEndpoint { + pub on_message: Option<Box<dyn Fn(proto::WsMsg) + Send + Sync>>, + pub on_connect: Option<Box<dyn Fn() + Send + Sync>>, + pub on_reconnect: Option<Box<dyn Fn() + Send + Sync>>, + pub on_error: Option<Box<dyn Fn() + Send + Sync>>, + pub on_end: Option<Box<dyn Fn() + Send + Sync>>, + pub auto_reconnect: bool, + pub ws: Option< + WebSocketStream< + tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>, + >, + >, + pub connected: Option<tokio::sync::oneshot::Receiver<()>>, + pub manually_closed: bool, + pub subs: WsSubscriptions, +} + +#[derive(Debug, Clone, Default)] +pub struct WsSubscriptions { + pub scripts: Vec<WsSubScript>, + pub tokens: Vec<String>, + pub lokad_ids: Vec<String>, + pub plugins: Vec<WsSubPlugin>, + pub blocks: bool, +} + +#[derive(Debug, Clone)] +pub struct WsSubScript { + pub script_type: ScriptType, + pub payload: String, +} + +#[derive(Debug, Clone)] +pub struct WsSubPlugin { + pub plugin_name: String, + pub group: String, +} + impl ChronikClient { pub fn new(url: String) -> Result<Self> { if url.ends_with('/') { @@ -234,10 +279,7 @@ } } - async fn _post< - MRequest: prost::Message, - MResponse: prost::Message + Default, - >( + async fn _post<MRequest: Message, MResponse: Message + Default>( &self, url_suffix: &str, request: &MRequest, @@ -253,7 +295,7 @@ Self::_handle_response(response).await } - async fn _get<MResponse: prost::Message + Default>( + async fn _get<MResponse: Message + Default>( &self, url_suffix: &str, ) -> Result<MResponse> { @@ -267,7 +309,7 @@ Self::_handle_response(response).await } - async fn _handle_response<MResponse: prost::Message + Default>( + async fn _handle_response<MResponse: Message + Default>( response: reqwest::Response, ) -> Result<MResponse> { use prost::Message as _; @@ -275,7 +317,7 @@ if status_code != StatusCode::OK { let data = response.bytes().await?; let error = proto::Error::decode(data.as_ref()) - .wrap_err_with(|| InvalidProtobuf(hex::encode(&data)))?; + .wrap_err_with(|| InvalidProtobuf(crate::hex::to_hex(&data)))?; return Err(ChronikError { status_code, error_msg: error.msg.clone(), @@ -285,7 +327,7 @@ } let bytes = response.bytes().await.wrap_err(HttpRequestError)?; let response = MResponse::decode(bytes.as_ref()) - .wrap_err_with(|| InvalidProtobuf(hex::encode(&bytes)))?; + .wrap_err_with(|| InvalidProtobuf(crate::hex::to_hex(&bytes)))?; Ok(response) } } @@ -300,7 +342,7 @@ ._get(&format!( "/script/{}/{}/history?page={}&page_size={}", self.script_type, - hex::encode(self.script_payload), + crate::hex::to_hex(self.script_payload), page, page_size, )) @@ -312,7 +354,7 @@ ._get(&format!( "/script/{}/{}/history?page={}", self.script_type, - hex::encode(self.script_payload), + crate::hex::to_hex(self.script_payload), page, )) .await @@ -326,7 +368,7 @@ ._get(&format!( "/script/{}/{}/confirmed-txs?page={}", self.script_type, - hex::encode(self.script_payload), + crate::hex::to_hex(self.script_payload), page, )) .await @@ -340,7 +382,7 @@ ._get(&format!( "/script/{}/{}/unconfirmed-txs?page={}", self.script_type, - hex::encode(self.script_payload), + crate::hex::to_hex(self.script_payload), page, )) .await @@ -352,7 +394,7 @@ ._get::<proto::ScriptUtxos>(&format!( "/script/{}/{}/utxos", self.script_type, - hex::encode(self.script_payload), + crate::hex::to_hex(self.script_payload), )) .await?; Ok(utxos.utxos) @@ -369,7 +411,7 @@ ._get(&format!( "/plugin/{}/{}/history?page={}&page_size={}", self.plugin_name, - hex::encode(self.payload), + crate::hex::to_hex(self.payload), page, page_size, )) @@ -381,7 +423,7 @@ ._get(&format!( "/plugin/{}/{}/history?page={}", self.plugin_name, - hex::encode(self.payload), + crate::hex::to_hex(self.payload), page, )) .await @@ -395,7 +437,7 @@ ._get(&format!( "/plugin/{}/{}/confirmed-txs?page={}", self.plugin_name, - hex::encode(self.payload), + crate::hex::to_hex(self.payload), page, )) .await @@ -409,7 +451,7 @@ ._get(&format!( "/plugin/{}/{}/unconfirmed-txs?page={}", self.plugin_name, - hex::encode(self.payload), + crate::hex::to_hex(self.payload), page, )) .await @@ -421,7 +463,7 @@ ._get::<proto::ScriptUtxos>(&format!( "/plugin/{}/{}/utxos", self.plugin_name, - hex::encode(self.payload), + encode(self.payload), )) .await?; Ok(utxos.utxos) @@ -441,6 +483,261 @@ } } +impl WsEndpoint { + pub fn new() -> Self { + Self { + on_message: None, + on_connect: None, + on_reconnect: None, + on_error: None, + on_end: None, + auto_reconnect: true, + ws: None, + connected: None, + manually_closed: false, + subs: WsSubscriptions::default(), + } + } + + pub async fn subscribe_to_blocks(&mut self) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSubBlocks {}; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.blocks = true; + } + Ok(()) + } + + pub async fn unsubscribe_from_blocks(&mut self) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSub { + is_unsub: true, + sub_type: Some(proto::ws_sub::SubType::Blocks( + proto::WsSubBlocks {}, + )), + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.blocks = false; + } + Ok(()) + } + + pub async fn subscribe_to_script( + &mut self, + script_type: ScriptType, + payload: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSubScript { + script_type: script_type.to_string(), + payload: crate::hex::from_hex(payload) + .map_err(|e| Report::msg(e.to_string()))?, + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.scripts.push(WsSubScript { + script_type, + payload: payload.to_string(), + }); + } + Ok(()) + } + + pub async fn unsubscribe_from_script( + &mut self, + script_type: ScriptType, + payload: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSub { + is_unsub: true, + sub_type: Some(proto::ws_sub::SubType::Script( + proto::WsSubScript { + script_type: script_type.to_string(), + payload: crate::hex::from_hex(payload) + .map_err(|e| Report::msg(e.to_string()))?, + }, + )), + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.scripts.retain(|s| { + s.script_type != script_type || s.payload != payload + }); + } + Ok(()) + } + + pub async fn subscribe_to_lokad_id( + &mut self, + lokad_id: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSubLokadId { + lokad_id: crate::hex::from_hex(lokad_id) + .map_err(|e| Report::msg(e.to_string()))?, + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.lokad_ids.push(lokad_id.to_string()); + } + Ok(()) + } + + pub async fn unsubscribe_from_lokad_id( + &mut self, + lokad_id: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSub { + is_unsub: true, + sub_type: Some(proto::ws_sub::SubType::LokadId( + proto::WsSubLokadId { + lokad_id: crate::hex::from_hex(lokad_id) + .map_err(|e| Report::msg(e.to_string()))?, + }, + )), + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.lokad_ids.retain(|id| id != lokad_id); + } + Ok(()) + } + + pub async fn subscribe_to_token_id( + &mut self, + token_id: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSubTokenId { + token_id: token_id.to_string(), + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.tokens.push(token_id.to_string()); + } + Ok(()) + } + + pub async fn unsubscribe_from_token_id( + &mut self, + token_id: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSub { + is_unsub: true, + sub_type: Some(proto::ws_sub::SubType::TokenId( + proto::WsSubTokenId { + token_id: token_id.to_string(), + }, + )), + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.tokens.retain(|id| id != token_id); + } + Ok(()) + } + + pub async fn subscribe_to_plugin( + &mut self, + plugin_name: &str, + group: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSub { + is_unsub: false, + sub_type: Some(proto::ws_sub::SubType::Plugin( + proto::WsPlugin { + plugin_name: plugin_name.to_string(), + group: crate::hex::from_hex(group) + .map_err(|e| Report::msg(e.to_string()))?, + }, + )), + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs.plugins.push(WsSubPlugin { + plugin_name: plugin_name.to_string(), + group: group.to_string(), + }); + } + Ok(()) + } + + pub async fn unsubscribe_from_plugin( + &mut self, + plugin_name: &str, + group: &str, + ) -> Result<(), Report> { + if let Some(ws) = &mut self.ws { + let msg = proto::WsSub { + is_unsub: true, + sub_type: Some(proto::ws_sub::SubType::Plugin( + proto::WsPlugin { + plugin_name: plugin_name.to_string(), + group: crate::hex::from_hex(group) + .map_err(|e| Report::msg(e.to_string()))?, + }, + )), + }; + ws.send(tokio_tungstenite::tungstenite::Message::Binary( + Bytes::from(msg.encode_to_vec()), + )) + .await + .wrap_err(HttpRequestError)?; + self.subs + .plugins + .retain(|p| p.plugin_name != plugin_name || p.group != group); + } + Ok(()) + } + + pub async fn handle_msg(&mut self, msg: proto::WsMsg) { + if let Some(handler) = &self.on_message { + handler(msg); + } + } + + pub fn close(&mut self) { + self.manually_closed = true; + if let Some(ws) = &mut self.ws { + // Close the WebSocket connection + let _ = ws.close(None); + } + } +} + #[cfg(test)] mod tests { use abc_rust_error::Result; diff --git a/modules/bitcoinsuite-chronik-client/src/validation.rs b/modules/bitcoinsuite-chronik-client/src/validation.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/src/validation.rs @@ -0,0 +1,113 @@ +use regex::Regex; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ValidationError { + #[error("Invalid hex: {0}. Payload must be lowercase hex string.")] + InvalidHex(String), + #[error("Odd hex length: {0}")] + OddHexLength(String), + #[error("Invalid length, expected {0} bytes but got {1} bytes")] + InvalidLength(usize, usize), + #[error("Invalid scriptType: {0}")] + InvalidScriptType(String), + #[error( + "Invalid lokadId: {0}. lokadId must be 4 bytes (8 chars) of lowercase \ + hex." + )] + InvalidLokadId(String), + #[error( + "Invalid tokenId: {0}. tokenId must be 64 characters of lowercase hex." + )] + InvalidTokenId(String), + #[error("pluginName must be a string")] + MissingPluginName, + #[error("group must be a string")] + MissingGroup, + #[error("group must have even length (complete bytes): {0}")] + OddGroupLength(String), + #[error("group must be a valid lowercase hex string: {0}")] + InvalidGroupHex(String), +} + +lazy_static::lazy_static! { + static ref VALID_HEX_REGEX: Regex = + Regex::new(r"^[a-f0-9]+$").unwrap(); + static ref VALID_LOKADID_REGEX: Regex = + Regex::new(r"^[a-f0-9]{8}$").unwrap(); + static ref VALID_TOKENID_REGEX: Regex = + Regex::new(r"^[a-f0-9]{64}$").unwrap(); +} + +pub fn verify_ws_subscription( + script_type: &str, + payload: &str, +) -> Result<(), ValidationError> { + if payload.len() % 2 != 0 { + return Err(ValidationError::OddHexLength(payload.to_string())); + } + + if !VALID_HEX_REGEX.is_match(payload) { + return Err(ValidationError::InvalidHex(payload.to_string())); + } + + const SUPPORTED_HASH_BYTES_P2PKH_P2SH: usize = 20; + const SUPPORTED_HASH_BYTES_P2PK: [usize; 2] = [33, 65]; + let payload_bytes = payload.len() / 2; + + match script_type { + "p2pkh" | "p2sh" => { + if payload_bytes != SUPPORTED_HASH_BYTES_P2PKH_P2SH { + return Err(ValidationError::InvalidLength( + SUPPORTED_HASH_BYTES_P2PKH_P2SH, + payload_bytes, + )); + } + Ok(()) + } + "p2pk" => { + if !SUPPORTED_HASH_BYTES_P2PK.contains(&payload_bytes) { + return Err(ValidationError::InvalidLength(33, payload_bytes)); + } + Ok(()) + } + "other" => Ok(()), + _ => Err(ValidationError::InvalidScriptType(script_type.to_string())), + } +} + +pub fn verify_lokad_id(lokad_id: &str) -> Result<(), ValidationError> { + if !VALID_LOKADID_REGEX.is_match(lokad_id) { + return Err(ValidationError::InvalidLokadId(lokad_id.to_string())); + } + Ok(()) +} + +pub fn verify_token_id(token_id: &str) -> Result<(), ValidationError> { + if !VALID_TOKENID_REGEX.is_match(token_id) { + return Err(ValidationError::InvalidTokenId(token_id.to_string())); + } + Ok(()) +} + +pub fn verify_plugin_subscription( + plugin_name: &str, + group: &str, +) -> Result<(), ValidationError> { + if plugin_name.is_empty() { + return Err(ValidationError::MissingPluginName); + } + if group.is_empty() { + return Err(ValidationError::MissingGroup); + } + + if group.len() % 2 != 0 { + return Err(ValidationError::OddGroupLength(group.to_string())); + } + + if !VALID_HEX_REGEX.is_match(group) { + return Err(ValidationError::InvalidGroupHex(group.to_string())); + } + + Ok(()) +} diff --git a/modules/bitcoinsuite-chronik-client/tests/failover_proxy_test.rs b/modules/bitcoinsuite-chronik-client/tests/failover_proxy_test.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/tests/failover_proxy_test.rs @@ -0,0 +1,208 @@ +use abc_rust_error::Result; +use bitcoinsuite_chronik_client::failover_proxy::FailoverProxy; +use bitcoinsuite_chronik_client::hex::{from_hex, validate_hex}; + +#[test] +fn test_append_ws_urls_combines_valid_urls() -> Result<()> { + let urls = vec![ + "https://chronik.be.cash/xec".to_string(), + "https://chronik.fabien.cash".to_string(), + "https://chronik2.fabien.cash".to_string(), + ]; + let proxy = FailoverProxy::new(urls.clone())?; + let endpoints = proxy.get_endpoint_array(); + + assert_eq!(endpoints[0].url, "https://chronik.be.cash/xec"); + assert_eq!(endpoints[0].ws_url, "wss://chronik.be.cash/xec/ws"); + assert_eq!(endpoints[1].url, "https://chronik.fabien.cash"); + assert_eq!(endpoints[1].ws_url, "wss://chronik.fabien.cash/ws"); + assert_eq!(endpoints[2].url, "https://chronik2.fabien.cash"); + assert_eq!(endpoints[2].ws_url, "wss://chronik2.fabien.cash/ws"); + + Ok(()) +} + +#[test] +fn test_append_ws_urls_combines_mixed_urls() -> Result<()> { + let urls = vec![ + "https://chronik.be.cash/xec".to_string(), + "http://chronik.fabien.cash".to_string(), + "https://chronik2.fabien.cash".to_string(), + ]; + let proxy = FailoverProxy::new(urls.clone())?; + let endpoints = proxy.get_endpoint_array(); + + assert_eq!(endpoints[0].url, "https://chronik.be.cash/xec"); + assert_eq!(endpoints[0].ws_url, "wss://chronik.be.cash/xec/ws"); + assert_eq!(endpoints[1].url, "http://chronik.fabien.cash"); + assert_eq!(endpoints[1].ws_url, "ws://chronik.fabien.cash/ws"); + assert_eq!(endpoints[2].url, "https://chronik2.fabien.cash"); + assert_eq!(endpoints[2].ws_url, "wss://chronik2.fabien.cash/ws"); + + Ok(()) +} + +#[test] +fn test_append_ws_urls_empty_input() -> Result<()> { + let proxy = + FailoverProxy::new(vec!["https://chronik.be.cash/xec".to_string()])?; + let endpoints = proxy.get_endpoint_array(); + assert_eq!(endpoints.len(), 1); + Ok(()) +} + +#[test] +fn test_append_ws_urls_invalid_url() { + let urls = vec![ + "http://chronik.fabien.cash".to_string(), + "not-a-valid-url".to_string(), + "https://chronik2.fabien.cash".to_string(), + ]; + let result = FailoverProxy::new(urls); + assert!(result.is_err()); +} + +#[test] +fn test_derive_endpoint_index_default_working_index() -> Result<()> { + let urls = vec![ + "https://chronik.be.cash/xec".to_string(), + "http://chronik.fabien.cash".to_string(), + "https://chronik2.fabien.cash".to_string(), + "https://chronik3.fabien.cash".to_string(), + ]; + let proxy = FailoverProxy::new(urls)?; + + let mut index_order = Vec::new(); + for i in 0..4 { + index_order.push(proxy.derive_endpoint_index(i)); + } + assert_eq!(index_order, vec![0, 1, 2, 3]); + + Ok(()) +} + +#[test] +fn test_derive_endpoint_index_working_index_3() -> Result<()> { + let urls = vec![ + "https://chronik.be.cash/xec".to_string(), + "http://chronik.fabien.cash".to_string(), + "https://chronik2.fabien.cash".to_string(), + "https://chronik3.fabien.cash".to_string(), + ]; + let mut proxy = FailoverProxy::new(urls)?; + proxy.set_working_index(3); + + let mut index_order = Vec::new(); + for i in 0..4 { + index_order.push(proxy.derive_endpoint_index(i)); + } + assert_eq!(index_order, vec![3, 0, 1, 2]); + + Ok(()) +} + +#[test] +fn test_script_subscription_validation() { + struct TestCase { + description: &'static str, + payload: &'static str, + expected_result: bool, + } + + let test_cases = vec![ + TestCase { + description: "Valid p2pkh sub", + payload: "d37c4c809fe9840e7bfa77b86bd47163f6fb6c60", + expected_result: true, + }, + TestCase { + description: "Valid p2sh sub", + payload: "d37c4c809fe9840e7bfa77b86bd47163f6fb6c60", + expected_result: true, + }, + TestCase { + description: "Valid 33-byte p2pk sub", + payload: "10d141b856a092ee169c5405323895e1962c6b0d\ + 7c101120d360164c9e4b3997bd", + expected_result: true, + }, + TestCase { + description: "Valid 65-byte p2pk sub", + payload: "047fa64f6874fb7213776b24c40bc915451b57ef7\ + f17ad7b982561f99f7cdc7010d141b856a092ee16\ + 9c5405323895e1962c6b0d7c101120d360164c9e4\ + b3997bd", + expected_result: true, + }, + TestCase { + description: "Valid other sub", + payload: "deadbeef", + expected_result: true, + }, + ]; + + for case in test_cases { + let result = from_hex(case.payload).is_ok(); + assert_eq!( + result, case.expected_result, + "Test case '{}' failed", + case.description + ); + } +} + +#[test] +fn test_invalid_script_subscriptions() { + struct TestCase { + description: &'static str, + payload: &'static str, + expected_error: &'static str, + } + + let test_cases = vec![ + TestCase { + description: "Invalid hex: uppercase", + payload: "DEADBEEF", + expected_error: "Invalid hex: \"DEADBEEF\". Payload must be \ + lowercase hex string.", + }, + TestCase { + description: "Invalid hex: mixed case", + payload: "DEADbeef", + expected_error: "Invalid hex: \"DEADbeef\". Payload must be \ + lowercase hex string.", + }, + TestCase { + description: "Invalid hex: odd length", + payload: "dea", + expected_error: "Odd hex length: dea", + }, + TestCase { + description: "Invalid hex: non-hex chars", + payload: "nothex", + expected_error: "Invalid hex: \"nothex\". Payload must be \ + lowercase hex string.", + }, + ]; + + for case in test_cases { + let result = validate_hex(case.payload); + match result { + Ok(_) => panic!( + "Test case '{}' should have failed with error: {}", + case.description, case.expected_error + ), + Err(e) => { + assert_eq!( + e.to_string(), + case.expected_error, + "Test case '{}' failed with wrong error. Expected: {}, \ + Got: {}", + case.description, + case.expected_error, + e + ); + } + } + } +}