Page MenuHomePhabricator

D17465.id52186.diff
No OneTemporary

D17465.id52186.diff

diff --git a/Cargo.lock b/Cargo.lock
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -639,11 +639,16 @@
"bitcoinsuite-core 0.1.0",
"bytes",
"chronik-proto",
+ "dotenv",
"hex",
"pretty_assertions",
"prost",
"prost-build",
+ "rand 0.8.5",
"reqwest",
+ "serde",
+ "serde_json",
+ "tempfile",
"thiserror 1.0.69",
"tokio",
]
@@ -1311,6 +1316,12 @@
"syn 2.0.90",
]
+[[package]]
+name = "dotenv"
+version = "0.15.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
+
[[package]]
name = "dunce"
version = "1.0.5"
@@ -2362,7 +2373,7 @@
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
dependencies = [
"cfg-if",
- "windows-targets 0.48.5",
+ "windows-targets 0.52.6",
]
[[package]]
@@ -3430,9 +3441,9 @@
[[package]]
name = "serde"
-version = "1.0.215"
+version = "1.0.217"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6513c1ad0b11a9376da888e3e0baa0077f1aed55c17f50e7b2397136129fb88f"
+checksum = "02fc4265df13d6fa1d00ecff087228cc0a2b5f3c0e87e258d8b94a156e984c70"
dependencies = [
"serde_derive",
]
@@ -3450,9 +3461,9 @@
[[package]]
name = "serde_derive"
-version = "1.0.215"
+version = "1.0.217"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad1e866f866923f252f05c889987993144fb74e722403468a4ebd70c3cd756c0"
+checksum = "5a9bf7cf98d04a2b28aead066b7496853d4779c9cc183c440dbac457641e19a0"
dependencies = [
"proc-macro2",
"quote",
@@ -3741,12 +3752,13 @@
[[package]]
name = "tempfile"
-version = "3.14.0"
+version = "3.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c"
+checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704"
dependencies = [
"cfg-if",
"fastrand",
+ "getrandom",
"once_cell",
"rustix",
"windows-sys 0.59.0",
@@ -4314,7 +4326,7 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
- "windows-sys 0.48.0",
+ "windows-sys 0.59.0",
]
[[package]]
diff --git a/modules/bitcoinsuite-chronik-client/Cargo.toml b/modules/bitcoinsuite-chronik-client/Cargo.toml
--- a/modules/bitcoinsuite-chronik-client/Cargo.toml
+++ b/modules/bitcoinsuite-chronik-client/Cargo.toml
@@ -12,9 +12,13 @@
bitcoinsuite-core = { path = "../../chronik/bitcoinsuite-core" }
chronik-proto = { path = "../../chronik/chronik-proto/"}
+dotenv = "0.15.0"
+
# Error structs/enums
thiserror = "1.0"
+tempfile = "3.15.0"
+
# HTTP client
reqwest = "0.11"
@@ -24,9 +28,15 @@
# Protobuf (de)serialization
prost = "0.11"
+rand = "0.8.5"
+
# Hex en-/decoding
hex = "0.4"
+serde_json = "1.0"
+
+serde = "1.0.217"
+
bytes = { version = "1.4", features = ["serde"] }
[build-dependencies]
diff --git a/modules/bitcoinsuite-chronik-client/src/handler.rs b/modules/bitcoinsuite-chronik-client/src/handler.rs
new file mode 100644
--- /dev/null
+++ b/modules/bitcoinsuite-chronik-client/src/handler.rs
@@ -0,0 +1,96 @@
+// Copyright (c) 2025 The Bitcoin developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+use std::io;
+
+use serde::Serialize;
+use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
+use tokio::net::UnixStream;
+use tokio::sync::mpsc;
+
+#[derive(Debug)]
+pub struct Handler {
+ pub reader: BufReader<tokio::io::ReadHalf<UnixStream>>,
+ pub writer: tokio::io::WriteHalf<UnixStream>,
+}
+
+#[derive(Serialize)]
+pub struct Message {
+ command: String,
+}
+
+// Split the stream so we can handle reader and writer operations separately
+impl Handler {
+ pub fn new(stream: UnixStream) -> Self {
+ let (reader, writer) = tokio::io::split(stream);
+ Self {
+ reader: BufReader::new(reader),
+ writer,
+ }
+ }
+
+ pub async fn listen_for_messages(
+ &mut self,
+ sender: mpsc::Sender<String>,
+ mut receiver: mpsc::Receiver<String>,
+ ) -> io::Result<()> {
+ let mut buffer = String::new();
+
+ loop {
+ buffer.clear();
+
+ tokio::select! {
+ Some(msg) = receiver.recv() => {
+ match msg.trim() {
+ "ready" => {
+ self.send_message("next").await.unwrap()
+ },
+ _ => println!("Received other messages {:?}", msg),
+ }
+ }
+
+ result = self.reader.read_line(&mut buffer) => {
+ match result{
+ Ok(bytes_read) => {
+ if bytes_read == 0{
+ println!("EOF: Connection closed by client");
+ break;
+ }
+ let message = buffer.trim().to_string();
+ println!("Received from reader {}", message);
+ sender.send(message).await.unwrap();
+ }
+ Err(e) => {
+ eprintln!("Error reading from reader {}", e);
+ break;
+ }
+
+ }
+ }
+
+ }
+ }
+ Ok(())
+ }
+
+ pub async fn send_message(&mut self, message: &str) -> io::Result<()> {
+ // Serialize the message to JSON
+ let json_message = serde_json::to_string(message)?;
+
+ // Send the JSON message
+ self.writer.write_all(json_message.as_bytes()).await?;
+ self.writer.write_all(b"\n").await?;
+ self.writer.flush().await?;
+
+ println!("Sent: {}", json_message);
+
+ Ok(())
+ }
+}
+
+/*
+let Some(hehe) = Some("hello") else {
+
+}
+*/
diff --git a/modules/bitcoinsuite-chronik-client/src/lib.rs b/modules/bitcoinsuite-chronik-client/src/lib.rs
--- a/modules/bitcoinsuite-chronik-client/src/lib.rs
+++ b/modules/bitcoinsuite-chronik-client/src/lib.rs
@@ -1,6 +1,7 @@
-// Copyright (c) 2024 The Bitcoin developers
+// Copyright (c) 2025 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
use std::fmt::Display;
use abc_rust_error::{Result, WrapErr};
@@ -10,6 +11,9 @@
use reqwest::{header::CONTENT_TYPE, StatusCode};
use thiserror::Error;
+pub mod handler;
+pub mod test_runner;
+
use crate::ChronikClientError::*;
#[derive(Debug, Clone)]
@@ -135,6 +139,10 @@
Ok(blocks.blocks)
}
+ pub async fn chronik_info(&self) -> Result<proto::ChronikInfo> {
+ self._get("/chronik-info").await
+ }
+
pub async fn tx(&self, txid: &Sha256d) -> Result<proto::Tx> {
self._get(&format!("/tx/{}", txid.hex_be())).await
}
diff --git a/modules/bitcoinsuite-chronik-client/src/test_runner.rs b/modules/bitcoinsuite-chronik-client/src/test_runner.rs
new file mode 100644
--- /dev/null
+++ b/modules/bitcoinsuite-chronik-client/src/test_runner.rs
@@ -0,0 +1,130 @@
+// Copyright (c) 2025 The Bitcoin developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+use std::fs::File;
+use std::io::{self, Write};
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use std::{env, error::Error};
+
+use rand::Rng;
+use serde::Deserialize;
+use serde_json::{from_str, from_value, Value};
+use tokio::io::AsyncReadExt;
+use tokio::net::UnixListener;
+use tokio::process::{Child, Command};
+use tokio::sync::{oneshot::Receiver, Mutex};
+
+use crate::handler::Handler;
+
+#[derive(Debug, Deserialize)]
+pub struct TestInfo {
+ pub chronik: String,
+ pub setup_script_timeout: i32,
+}
+
+pub async fn get_socket_path() -> PathBuf {
+ let socket_path = Path::new("/tmp/chronik_info.socket");
+ socket_path.to_path_buf()
+}
+
+pub async fn spin_child_process(python_script: &str) -> (Child, Handler) {
+ let script_name = format!("chronik-client_{}", python_script);
+
+ let socket_path = get_socket_path().await;
+ let socket_path = socket_path.to_str().unwrap();
+
+ if std::path::Path::new(&socket_path).exists() {
+ std::fs::remove_file(socket_path).expect("Failed to remove file");
+ }
+
+ let build_dir = env::var("BUILD_DIR").unwrap_or_else(|_| ".".to_string());
+
+ let python_command = if Command::new("python3")
+ .arg("--version")
+ .output()
+ .await
+ .is_ok()
+ {
+ "python3"
+ } else {
+ "python"
+ };
+
+ let child = Command::new(python_command)
+ .arg("test/functional/test_runner.py")
+ .arg(format!("setup_scripts/{}", script_name))
+ .current_dir(build_dir.clone())
+ .env("SOCKET", socket_path)
+ .env("BUILD_DIR", build_dir)
+ .stdout(std::process::Stdio::piped())
+ .stderr(std::process::Stdio::piped())
+ .spawn() // Start the child process
+ .expect("Failed to start Python process");
+
+ println!("{:?}", socket_path);
+ let listener =
+ UnixListener::bind(socket_path).expect("Failed to bind socket");
+ println!("Server is listening on {:?}", socket_path);
+
+ let (socket, _) = listener.accept().await.unwrap();
+ println!("Connection accepted!");
+
+ let handler = Handler::new(socket);
+
+ (child, handler)
+}
+
+pub fn handling_test_info(
+ message: &String,
+) -> Result<TestInfo, Box<dyn Error>> {
+ match from_str::<Value>(message) {
+ Ok(json_data) => {
+ println!("Parsed JSON: {:?}", json_data);
+ if let Some(test_info) = json_data.get("test_info") {
+ match from_value::<TestInfo>(test_info.clone()) {
+ Ok(parsed_test_info) => {
+ println!(
+ "Deserialized TestInfo: {:?}",
+ parsed_test_info
+ );
+ Ok(parsed_test_info)
+ }
+ Err(e) => {
+ eprintln!("Failed to deserialize TestInfo: {}", e);
+ Err(Box::new(e))
+ }
+ }
+ } else {
+ eprintln!("test_info field is missing in JSON");
+ Err("test_info field is missing in JSON".into())
+ }
+ }
+ Err(e) => {
+ eprintln!("Failed to parse JSON: {}", e);
+ Err(Box::new(e))
+ }
+ }
+}
+
+pub async fn terminal(mut child: Child, receiver: Receiver<String>) {
+ let mut stdout = child.stdout.take().unwrap();
+ let output = Arc::new(Mutex::new(Vec::new()));
+ let output_clone = Arc::clone(&output);
+
+ tokio::spawn(async move {
+ let mut output_lock = output_clone.lock().await;
+ let _ = stdout.read_to_end(&mut *output_lock).await;
+ });
+
+ match receiver.await {
+ Ok(_) => {
+ let output = output.lock().await;
+ eprintln!("\nOutput: {}", String::from_utf8_lossy(&*output));
+ }
+ Err(e) => {
+ eprintln!("{}", e);
+ }
+ }
+}
diff --git a/modules/bitcoinsuite-chronik-client/tests/integrations/chronik_info.rs b/modules/bitcoinsuite-chronik-client/tests/integrations/chronik_info.rs
new file mode 100644
--- /dev/null
+++ b/modules/bitcoinsuite-chronik-client/tests/integrations/chronik_info.rs
@@ -0,0 +1,100 @@
+// Copyright (c) 2025 The Bitcoin developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+use std::sync::Arc;
+
+use bitcoinsuite_chronik_client::proto::ChronikInfo;
+use bitcoinsuite_chronik_client::test_runner::{
+ handling_test_info, spin_child_process, terminal,
+};
+use bitcoinsuite_chronik_client::ChronikClient;
+use serde_json::{from_str, Value};
+use tokio::sync::{mpsc, oneshot, Mutex};
+
+pub async fn chronik_info_ipc() -> (String, String) {
+ let python_script = "chronik_info";
+
+ // Run testrunner to return a Handler for listening and sending messages,
+ // and the child process so we can monitor the stdout of the python process
+ let child_handler = spin_child_process(python_script).await;
+ let handler = Arc::new(Mutex::new(child_handler.1));
+
+ // Channels so spawned tasks can alert eachother on events such as receiving
+ // / sending via IPC, and when the child process has terminated
+ const CHANNEL_CAPACITY: usize = 32;
+ let (terminal_tx, terminal_rx) = oneshot::channel();
+ let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY);
+ let (tx1, rx1) = mpsc::channel(CHANNEL_CAPACITY);
+
+ // We can't return as soon as the chronik information has been received, so
+ // we append to an empty String and return end of function
+ let mut chronik_version_ipc_string = String::new();
+ let mut chronik_version_btc_string = String::new();
+
+ // Spawn a new thread to listen for stdout / stderr, will return once IPC is
+ // closed.
+ let terminal = tokio::spawn(async move {
+ terminal(child_handler.0, terminal_rx).await;
+ });
+
+ let handler_listener = Arc::clone(&handler);
+
+ // Listen on a loop inside listen_for_messages for messages through IPC, and
+ // send the results back to this function so we can handle them
+ let listen_task = tokio::spawn(async move {
+ let mut handler = handler_listener.lock().await;
+ handler.listen_for_messages(tx, rx1).await.unwrap();
+ terminal_tx.send("finished".to_string()).unwrap();
+ });
+
+ // Create a bool lock, so that handling_test_info runs once only
+ let mut has_it_ran = false;
+
+ // Once a message is received from listen_for_messages, do the following
+ while let Some(message) = rx.recv().await {
+ if !has_it_ran {
+ let chronik_url = handling_test_info(&message).unwrap().chronik;
+ let chronik_info =
+ test_chronik_info(chronik_url.as_str()).await.unwrap();
+ chronik_version_btc_string.push_str(&chronik_info.version);
+ has_it_ran = true;
+ continue;
+ }
+
+ match from_str::<Value>(&message) {
+ Ok(json_data) => {
+ println!("Parsed JSON: {:?}", json_data);
+ if let Some(chronik) = json_data.get("chronik_version") {
+ if let Some(chronik_str) = chronik.as_str() {
+ chronik_version_ipc_string.push_str(chronik_str);
+ }
+ }
+ if let Some(ready) = json_data.get("status") {
+ if let Some(ready_str) = ready.as_str() {
+ // Debugging line
+ if ready_str == "ready" {
+ tx1.send(ready_str.to_string()).await.unwrap();
+ }
+ }
+ }
+ }
+ Err(e) => {
+ eprintln!("Failed to parse JSON: {}", e);
+ }
+ }
+ }
+
+ // Only exit main once terminal thread has finished, otherwise we won't get
+ // the stdout
+ tokio::try_join!(terminal, listen_task).unwrap();
+
+ (chronik_version_ipc_string, chronik_version_btc_string)
+}
+
+pub async fn test_chronik_info(
+ chronik_url: &str,
+) -> Result<ChronikInfo, abc_rust_error::Report> {
+ let client = ChronikClient::new(chronik_url.to_string()).unwrap();
+ client.chronik_info().await
+}
diff --git a/modules/bitcoinsuite-chronik-client/tests/integrations/mod.rs b/modules/bitcoinsuite-chronik-client/tests/integrations/mod.rs
new file mode 100644
--- /dev/null
+++ b/modules/bitcoinsuite-chronik-client/tests/integrations/mod.rs
@@ -0,0 +1,5 @@
+// Copyright (c) 2025 The Bitcoin developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+pub mod chronik_info;
diff --git a/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs b/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs
--- a/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs
+++ b/modules/bitcoinsuite-chronik-client/tests/test_chronik_client.rs
@@ -1,6 +1,7 @@
-// Copyright (c) 2024 The Bitcoin developers
+// Copyright (c) 2025 The Bitcoin developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
use std::collections::HashMap;
use abc_rust_error::Result;
@@ -328,6 +329,13 @@
Ok(())
}
+#[tokio::test]
+pub async fn test_chronik_info() -> Result<()> {
+ let client = ChronikClient::new(CHRONIK_URL.to_string())?;
+ client.chronik_info().await?;
+ Ok(())
+}
+
#[tokio::test]
pub async fn test_slpv1_token() -> Result<()> {
let client = ChronikClient::new(CHRONIK_URL.to_string())?;
diff --git a/modules/bitcoinsuite-chronik-client/tests/test_chronik_info.rs b/modules/bitcoinsuite-chronik-client/tests/test_chronik_info.rs
new file mode 100644
--- /dev/null
+++ b/modules/bitcoinsuite-chronik-client/tests/test_chronik_info.rs
@@ -0,0 +1,14 @@
+// Copyright (c) 2025 The Bitcoin developers
+// Distributed under the MIT software license, see the accompanying
+// file COPYING or http://www.opensource.org/licenses/mit-license.php.
+
+mod integrations;
+
+use integrations::chronik_info::chronik_info_ipc;
+
+#[tokio::test]
+pub async fn test_chronik_info_ipc() {
+ let chronik_version_tuple = chronik_info_ipc().await;
+ println!("{:?}", chronik_version_tuple);
+ assert_eq!(chronik_version_tuple.0, chronik_version_tuple.1);
+}
diff --git a/test/functional/setup_scripts/ipc.py b/test/functional/setup_scripts/ipc.py
--- a/test/functional/setup_scripts/ipc.py
+++ b/test/functional/setup_scripts/ipc.py
@@ -1,39 +1,50 @@
-# Copyright (c) 2023 The Bitcoin developers
-# Distributed under the MIT software license, see the accompanying
-# file COPYING or http://www.opensource.org/licenses/mit-license.php.
-"""
-IPC communication with NodeJs
-"""
-
import json
import os
import select
+import socket
import time
+ipc_socket = None
+use_stdin = None
+ipc_read_fd = None
-def receive_ipc_messages(timeout):
- if not hasattr(receive_ipc_messages, "ipc_rbuf"):
- receive_ipc_messages.ipc_rbuf = b""
- messages = []
+def receive_ipc_messages(timeout=5):
- use_stdin = "NODE_CHANNEL_FD" not in os.environ
- ipc_read_fd = int(os.environ.get("NODE_CHANNEL_FD", 0))
+ global ipc_socket
+ global use_stdin
+ global ipc_read_fd
+ if not ipc_socket:
+ if not hasattr(receive_ipc_messages, "ipc_rbuf"):
+ receive_ipc_messages.ipc_rbuf = b""
+ use_stdin = "NODE_CHANNEL_FD" not in os.environ
+ ipc_read_fd = int(os.environ.get("NODE_CHANNEL_FD", 0))
+ """
+ Receive IPC messages from the Unix socket.
+ This function assumes the socket is already initialized.
+ """
+
+ messages = []
max_time = time.time() + timeout
while not messages and time.time() < max_time:
- # Make sure there is some data before calling os.read, or we could
- # wait indefinitely. The use of select() with a 1 sec timeout makes us
- # read the file descriptor in a non blocking way, so we can escape the
- # loop preiodically and respect the global timeout supplied to the
- # receive_ipc_messages() function.
- r, _, _ = select.select([ipc_read_fd], [], [], 1)
- if ipc_read_fd not in r:
- continue
-
- receive_ipc_messages.ipc_rbuf += os.read(ipc_read_fd, 100)
+
+ if ipc_socket:
+ data = ipc_socket.recv(1024)
+ if data:
+ if hasattr(receive_ipc_messages, "ipc_rbuf"):
+ receive_ipc_messages.ipc_rbuf += data
+ else:
+ receive_ipc_messages.ipc_rbuf = data
+ else:
+ r, _, _ = select.select([ipc_read_fd], [], [], 1)
+ if ipc_read_fd not in r:
+ continue
+ receive_ipc_messages.ipc_rbuf += os.read(ipc_read_fd, 100)
+
messages = receive_ipc_messages.ipc_rbuf.splitlines(keepends=True)
+ # Check if the last message is complete
if messages[-1].endswith(b"\n"):
receive_ipc_messages.ipc_rbuf = b""
else:
@@ -41,6 +52,7 @@
messages = messages[:-1]
if messages:
+ print("Python received", messages)
return [
(
m.strip().decode(encoding="utf-8")
@@ -51,14 +63,51 @@
]
time.sleep(0.1)
-
return []
def send_ipc_message(message):
- s = (json.dumps(message) + "\n").encode(encoding="utf-8")
- os.write(int(os.environ.get("NODE_CHANNEL_FD", 1)), s)
+ """
+ Send an IPC message over the Unix domain socket.
+ This function assumes the socket is already initialized.
+ """
+ global ipc_socket
+ global ipc_read_fd
+
+ message_data = (json.dumps(message) + "\n").encode("utf-8")
+ if ipc_socket:
+ ipc_socket.sendall(message_data)
+ else:
+ os.write(int(os.environ.get("NODE_CHANNEL_FD", 1)), message_data)
def ready():
+ """
+ Send a 'ready' message over the Unix domain socket to indicate the Python process is ready.
+ This function assumes the socket is already initialized.
+ """
send_ipc_message({"status": "ready"})
+
+
+def init_socket(ipc_method):
+
+ global use_stdin
+ global ipc_socket
+
+ socket_path = ipc_method
+
+ use_stdin = "SOCKET" not in os.environ
+
+ if not socket_path:
+ print("SOCKET environment variable is not set.")
+ exit(1)
+ try:
+ ipc_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ ipc_socket.connect(socket_path)
+ ipc_socket.settimeout(1) # Set a default timeout for blocking operations
+ except socket.error:
+ exit(1)
+
+
+if "SOCKET" in os.environ:
+ init_socket(os.getenv("SOCKET"))

File Metadata

Mime Type
text/plain
Expires
Sat, Apr 26, 11:41 (16 h, 46 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5572861
Default Alt Text
D17465.id52186.diff (21 KB)

Event Timeline