diff --git a/Cargo.lock b/Cargo.lock --- a/Cargo.lock +++ b/Cargo.lock @@ -473,12 +473,17 @@ "bitcoinsuite-core 0.1.0", "bytes", "chronik-proto", + "dotenv", "hex", "pretty_assertions", "prost", "prost-build", + "rand 0.8.5", "regex", "reqwest", + "serde", + "serde_json", + "tempfile", "thiserror 1.0.69", "tokio", ] @@ -1133,6 +1138,12 @@ "syn 2.0.90", ] +[[package]] +name = "dotenv" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" + [[package]] name = "dunce" version = "1.0.5" @@ -3160,9 +3171,9 @@ [[package]] name = "serde" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f" +checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70" dependencies = [ "serde_derive", ] @@ -3180,9 +3191,9 @@ [[package]] name = "serde_derive" -version = "1.0.215" +version = "1.0.217" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0" +checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0" dependencies = [ "proc-macro2", "quote", @@ -3454,12 +3465,13 @@ [[package]] name = "tempfile" -version = "3.14.0" +version = "3.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" +checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" dependencies = [ "cfg-if", "fastrand", + "getrandom", "once_cell", "rustix", "windows-sys 0.59.0", diff --git a/modules/bitcoinsuite-chronik-client/Cargo.toml b/modules/bitcoinsuite-chronik-client/Cargo.toml --- a/modules/bitcoinsuite-chronik-client/Cargo.toml +++ b/modules/bitcoinsuite-chronik-client/Cargo.toml @@ -12,9 +12,13 @@ bitcoinsuite-core = { path = "../../chronik/bitcoinsuite-core" } chronik-proto = { path = "../../chronik/chronik-proto/"} +dotenv = "0.15.0" + # Error structs/enums thiserror = "1.0" +tempfile = "3.15.0" + # HTTP client reqwest = "0.11" @@ -24,11 +28,17 @@ # Protobuf (de)serialization prost = "0.11" +rand = "0.8.5" + # Hex en-/decoding hex = "0.4" regex = "1" +serde_json = "1.0" + +serde = "1.0.217" + bytes = { version = "1.4", features = ["serde"] } [build-dependencies] diff --git a/modules/bitcoinsuite-chronik-client/src/handler.rs b/modules/bitcoinsuite-chronik-client/src/handler.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/src/handler.rs @@ -0,0 +1,96 @@ +// Copyright (c) 2025 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +use std::io; + +use serde::Serialize; +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::UnixStream; +use tokio::sync::mpsc; + +#[derive(Debug)] +pub struct Handler { + pub reader: BufReader<tokio::io::ReadHalf<UnixStream>>, + pub writer: tokio::io::WriteHalf<UnixStream>, +} + +#[derive(Serialize)] +pub struct Message { + command: String, +} + +// Split the stream so we can handle reader and writer operations separately +impl Handler { + pub fn new(stream: UnixStream) -> Self { + let (reader, writer) = tokio::io::split(stream); + Self { + reader: BufReader::new(reader), + writer, + } + } + + pub async fn listen_for_messages( + &mut self, + sender: mpsc::Sender<String>, + mut receiver: mpsc::Receiver<String>, + ) -> io::Result<()> { + let mut buffer = String::new(); + + loop { + buffer.clear(); + + tokio::select! { + Some(msg) = receiver.recv() => { + match msg.trim() { + "ready" => { + self.send_message("next").await.unwrap() + }, + _ => println!("Received other messages {:?}", msg), + } + } + + result = self.reader.read_line(&mut buffer) => { + match result{ + Ok(bytes_read) => { + if bytes_read == 0{ + println!("EOF: Connection closed by client"); + break; + } + let message = buffer.trim().to_string(); + println!("Received from reader {}", message); + sender.send(message).await.unwrap(); + } + Err(e) => { + eprintln!("Error reading from reader {}", e); + break; + } + + } + } + + } + } + Ok(()) + } + + pub async fn send_message(&mut self, message: &str) -> io::Result<()> { + // Serialize the message to JSON + let json_message = serde_json::to_string(message)?; + + // Send the JSON message + self.writer.write_all(json_message.as_bytes()).await?; + self.writer.write_all(b"\n").await?; + self.writer.flush().await?; + + println!("Sent: {}", json_message); + + Ok(()) + } +} + +/* +let Some(hehe) = Some("hello") else { + +} +*/ diff --git a/modules/bitcoinsuite-chronik-client/src/lib.rs b/modules/bitcoinsuite-chronik-client/src/lib.rs --- a/modules/bitcoinsuite-chronik-client/src/lib.rs +++ b/modules/bitcoinsuite-chronik-client/src/lib.rs @@ -1,6 +1,7 @@ -// Copyright (c) 2024 The Bitcoin developers +// Copyright (c) 2024-2025 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. + use std::fmt::Display; use abc_rust_error::{Result, WrapErr}; @@ -10,6 +11,9 @@ use reqwest::{header::CONTENT_TYPE, StatusCode}; use thiserror::Error; +pub mod handler; +pub mod test_runner; + use crate::ChronikClientError::*; #[derive(Debug, Clone)] diff --git a/modules/bitcoinsuite-chronik-client/src/test_runner.rs b/modules/bitcoinsuite-chronik-client/src/test_runner.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/src/test_runner.rs @@ -0,0 +1,126 @@ +// Copyright (c) 2025 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::{env, error::Error}; + +use serde::Deserialize; +use serde_json::{from_str, from_value, Value}; +use tokio::io::AsyncReadExt; +use tokio::net::UnixListener; +use tokio::process::{Child, Command}; +use tokio::sync::{oneshot::Receiver, Mutex}; + +use crate::handler::Handler; + +#[derive(Debug, Deserialize)] +pub struct TestInfo { + pub chronik: String, + pub setup_script_timeout: i32, +} + +pub async fn get_socket_path() -> PathBuf { + let socket_path = Path::new("/tmp/chronik_info.socket"); + socket_path.to_path_buf() +} + +pub async fn spin_child_process(python_script: &str) -> (Child, Handler) { + let script_name = format!("chronik-client_{}", python_script); + + let socket_path = get_socket_path().await; + let socket_path = socket_path.to_str().unwrap(); + + if std::path::Path::new(&socket_path).exists() { + std::fs::remove_file(socket_path).expect("Failed to remove file"); + } + + let build_dir = env::var("BUILD_DIR").unwrap_or_else(|_| ".".to_string()); + + let python_command = if Command::new("python3") + .arg("--version") + .output() + .await + .is_ok() + { + "python3" + } else { + "python" + }; + + let child = Command::new(python_command) + .arg("test/functional/test_runner.py") + .arg(format!("setup_scripts/{}", script_name)) + .current_dir(build_dir.clone()) + .env("SOCKET", socket_path) + .env("BUILD_DIR", build_dir) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() // Start the child process + .expect("Failed to start Python process"); + + let listener = + UnixListener::bind(socket_path).expect("Failed to bind socket"); + println!("Server is listening on {:?}", socket_path); + + let (socket, _) = listener.accept().await.unwrap(); + println!("Connection accepted!"); + + let handler = Handler::new(socket); + + (child, handler) +} + +pub fn handling_test_info( + message: &String, +) -> Result<TestInfo, Box<dyn Error>> { + match from_str::<Value>(message) { + Ok(json_data) => { + println!("Parsed JSON: {:?}", json_data); + if let Some(test_info) = json_data.get("test_info") { + match from_value::<TestInfo>(test_info.clone()) { + Ok(parsed_test_info) => { + println!( + "Deserialized TestInfo: {:?}", + parsed_test_info + ); + Ok(parsed_test_info) + } + Err(e) => { + eprintln!("Failed to deserialize TestInfo: {}", e); + Err(Box::new(e)) + } + } + } else { + eprintln!("test_info field is missing in JSON"); + Err("test_info field is missing in JSON".into()) + } + } + Err(e) => { + eprintln!("Failed to parse JSON: {}", e); + Err(Box::new(e)) + } + } +} + +pub async fn terminal(mut child: Child, receiver: Receiver<String>) { + let mut stdout = child.stdout.take().unwrap(); + let output = Arc::new(Mutex::new(Vec::new())); + let output_clone = Arc::clone(&output); + + tokio::spawn(async move { + let mut output_lock = output_clone.lock().await; + let _ = stdout.read_to_end(&mut *output_lock).await; + }); + + match receiver.await { + Ok(_) => { + let output = output.lock().await; + eprintln!("\nOutput: {}", String::from_utf8_lossy(&*output)); + } + Err(e) => { + eprintln!("{}", e); + } + } +} diff --git a/modules/bitcoinsuite-chronik-client/tests/integrations/chronik_info.rs b/modules/bitcoinsuite-chronik-client/tests/integrations/chronik_info.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/tests/integrations/chronik_info.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2025 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +use std::sync::Arc; + +use bitcoinsuite_chronik_client::proto::ChronikInfo; +use bitcoinsuite_chronik_client::test_runner::{ + handling_test_info, spin_child_process, terminal, +}; +use bitcoinsuite_chronik_client::ChronikClient; +use serde_json::{from_str, Value}; +use tokio::sync::{mpsc, oneshot, Mutex}; + +pub async fn chronik_info_ipc() -> (String, String) { + let python_script = "chronik_info"; + + // Run testrunner to return a Handler for listening and sending messages, + // and the child process so we can monitor the stdout of the python process + let child_handler = spin_child_process(python_script).await; + let handler = Arc::new(Mutex::new(child_handler.1)); + + // Channels so spawned tasks can alert eachother on events such as receiving + // / sending via IPC, and when the child process has terminated + const CHANNEL_CAPACITY: usize = 32; + let (terminal_tx, terminal_rx) = oneshot::channel(); + let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); + let (tx1, rx1) = mpsc::channel(CHANNEL_CAPACITY); + + // We can't return as soon as the chronik information has been received, so + // we append to an empty String and return end of function + let mut chronik_version_ipc_string = String::new(); + let mut chronik_version_btc_string = String::new(); + + // Spawn a new thread to listen for stdout / stderr, will return once IPC is + // closed. + let terminal = tokio::spawn(async move { + terminal(child_handler.0, terminal_rx).await; + }); + + let handler_listener = Arc::clone(&handler); + + // Listen on a loop inside listen_for_messages for messages through IPC, and + // send the results back to this function so we can handle them + let listen_task = tokio::spawn(async move { + let mut handler = handler_listener.lock().await; + handler.listen_for_messages(tx, rx1).await.unwrap(); + terminal_tx.send("finished".to_string()).unwrap(); + }); + + // Create a bool lock, so that handling_test_info runs once only + let mut has_it_ran = false; + + // Once a message is received from listen_for_messages, do the following + while let Some(message) = rx.recv().await { + if !has_it_ran { + let chronik_url = handling_test_info(&message).unwrap().chronik; + let chronik_info = + test_chronik_info(chronik_url.as_str()).await.unwrap(); + chronik_version_btc_string.push_str(&chronik_info.version); + has_it_ran = true; + continue; + } + + match from_str::<Value>(&message) { + Ok(json_data) => { + println!("Parsed JSON: {:?}", json_data); + if let Some(chronik) = json_data.get("chronik_version") { + if let Some(chronik_str) = chronik.as_str() { + chronik_version_ipc_string.push_str(chronik_str); + } + } + if let Some(ready) = json_data.get("status") { + if let Some(ready_str) = ready.as_str() { + // Debugging line + if ready_str == "ready" { + tx1.send(ready_str.to_string()).await.unwrap(); + } + } + } + } + Err(e) => { + eprintln!("Failed to parse JSON: {}", e); + } + } + } + + // Only exit main once terminal thread has finished, otherwise we won't get + // the stdout + tokio::try_join!(terminal, listen_task).unwrap(); + + (chronik_version_ipc_string, chronik_version_btc_string) +} + +pub async fn test_chronik_info( + chronik_url: &str, +) -> Result<ChronikInfo, abc_rust_error::Report> { + let client = ChronikClient::new(chronik_url.to_string()).unwrap(); + client.chronik_info().await +} diff --git a/modules/bitcoinsuite-chronik-client/tests/integrations/mod.rs b/modules/bitcoinsuite-chronik-client/tests/integrations/mod.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/tests/integrations/mod.rs @@ -0,0 +1,5 @@ +// Copyright (c) 2025 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +pub mod chronik_info; diff --git a/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs b/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs --- a/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs +++ b/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs @@ -1,6 +1,7 @@ // Copyright (c) 2024 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. + use std::collections::HashMap; use abc_rust_error::Result; diff --git a/modules/bitcoinsuite-chronik-client/tests/test_chronik_info.rs b/modules/bitcoinsuite-chronik-client/tests/test_chronik_info.rs new file mode 100644 --- /dev/null +++ b/modules/bitcoinsuite-chronik-client/tests/test_chronik_info.rs @@ -0,0 +1,14 @@ +// Copyright (c) 2025 The Bitcoin developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +mod integrations; + +use integrations::chronik_info::chronik_info_ipc; + +#[tokio::test] +pub async fn test_chronik_info_ipc() { + let chronik_version_tuple = chronik_info_ipc().await; + println!("{:?}", chronik_version_tuple); + assert_eq!(chronik_version_tuple.0, chronik_version_tuple.1); +} diff --git a/test/functional/setup_scripts/ipc.py b/test/functional/setup_scripts/ipc.py --- a/test/functional/setup_scripts/ipc.py +++ b/test/functional/setup_scripts/ipc.py @@ -2,38 +2,56 @@ # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ -IPC communication with NodeJs +IPC communication with NodeJs and Rust """ import json import os import select +import socket import time +ipc_socket = None +use_stdin = None +ipc_read_fd = None -def receive_ipc_messages(timeout): - if not hasattr(receive_ipc_messages, "ipc_rbuf"): - receive_ipc_messages.ipc_rbuf = b"" - messages = [] +def receive_ipc_messages(timeout=5): + + global ipc_socket + global use_stdin + global ipc_read_fd - use_stdin = "NODE_CHANNEL_FD" not in os.environ - ipc_read_fd = int(os.environ.get("NODE_CHANNEL_FD", 0)) + if not ipc_socket: + if not hasattr(receive_ipc_messages, "ipc_rbuf"): + receive_ipc_messages.ipc_rbuf = b"" + use_stdin = "NODE_CHANNEL_FD" not in os.environ + ipc_read_fd = int(os.environ.get("NODE_CHANNEL_FD", 0)) + """ + Receive IPC messages from the Unix socket. + This function assumes the socket is already initialized. + """ + messages = [] max_time = time.time() + timeout while not messages and time.time() < max_time: - # Make sure there is some data before calling os.read, or we could - # wait indefinitely. The use of select() with a 1 sec timeout makes us - # read the file descriptor in a non blocking way, so we can escape the - # loop preiodically and respect the global timeout supplied to the - # receive_ipc_messages() function. - r, _, _ = select.select([ipc_read_fd], [], [], 1) - if ipc_read_fd not in r: - continue - - receive_ipc_messages.ipc_rbuf += os.read(ipc_read_fd, 100) + + if ipc_socket: + data = ipc_socket.recv(1024) + if data: + if hasattr(receive_ipc_messages, "ipc_rbuf"): + receive_ipc_messages.ipc_rbuf += data + else: + receive_ipc_messages.ipc_rbuf = data + else: + r, _, _ = select.select([ipc_read_fd], [], [], 1) + if ipc_read_fd not in r: + continue + receive_ipc_messages.ipc_rbuf += os.read(ipc_read_fd, 100) + messages = receive_ipc_messages.ipc_rbuf.splitlines(keepends=True) + # Check if the last message is complete if messages[-1].endswith(b"\n"): receive_ipc_messages.ipc_rbuf = b"" else: @@ -41,6 +59,7 @@ messages = messages[:-1] if messages: + print("Python received", messages) return [ ( m.strip().decode(encoding="utf-8") @@ -51,14 +70,51 @@ ] time.sleep(0.1) - return [] def send_ipc_message(message): - s = (json.dumps(message) + "\n").encode(encoding="utf-8") - os.write(int(os.environ.get("NODE_CHANNEL_FD", 1)), s) + """ + Send an IPC message over the Unix domain socket. + This function assumes the socket is already initialized. + """ + global ipc_socket + global ipc_read_fd + + message_data = (json.dumps(message) + "\n").encode("utf-8") + if ipc_socket: + ipc_socket.sendall(message_data) + else: + os.write(int(os.environ.get("NODE_CHANNEL_FD", 1)), message_data) def ready(): + """ + Send a 'ready' message over the Unix domain socket to indicate the Python process is ready. + This function assumes the socket is already initialized. + """ send_ipc_message({"status": "ready"}) + + +def init_socket(ipc_method): + + global use_stdin + global ipc_socket + + socket_path = ipc_method + + use_stdin = "SOCKET" not in os.environ + + if not socket_path: + print("SOCKET environment variable is not set.") + exit(1) + try: + ipc_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + ipc_socket.connect(socket_path) + ipc_socket.settimeout(2) # Set a default timeout for blocking operations + except socket.error: + exit(1) + + +if "SOCKET" in os.environ: + init_socket(os.getenv("SOCKET"))