Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14864896
D18110.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
21 KB
Subscribers
None
D18110.diff
View Options
diff --git a/chronik/bitcoinsuite-core/src/address.rs b/chronik/bitcoinsuite-core/src/address.rs
--- a/chronik/bitcoinsuite-core/src/address.rs
+++ b/chronik/bitcoinsuite-core/src/address.rs
@@ -35,13 +35,13 @@
#[derive(Clone, Debug, Eq, PartialEq, Hash)]
pub struct CashAddress {
/// Type of address (P2PKH or P2SH)
- addr_type: AddressType,
+ pub addr_type: AddressType,
/// The hash160 of the public key or script
- hash: ShaRmd160,
+ pub hash: ShaRmd160,
/// Complete cash address string including prefix
- cash_addr: String,
+ pub cash_addr: String,
/// Network prefix (e.g., "ecash", "etoken")
- prefix: String,
+ pub prefix: String,
}
/// Errors that can occur when working with CashAddresses
diff --git a/modules/bitcoinsuite-chronik-client/src/handler.rs b/modules/bitcoinsuite-chronik-client/src/handler.rs
--- a/modules/bitcoinsuite-chronik-client/src/handler.rs
+++ b/modules/bitcoinsuite-chronik-client/src/handler.rs
@@ -57,6 +57,7 @@
if let Err(err) = match from_str::<Value>(&message) {
Ok(json_data) => {
+ eprintln!("{:?}", json_data);
ipc_reader.on_rx(self, json_data).await.map_err(|e| {
io::Error::new(
ErrorKind::InvalidData,
@@ -69,6 +70,7 @@
format!("Failed to parse JSON: {}", e),
)),
} {
+ eprintln!("{:?}", err);
self.send_message("stop").await.unwrap();
return Err(err);
}
diff --git a/modules/bitcoinsuite-chronik-client/src/lib.rs b/modules/bitcoinsuite-chronik-client/src/lib.rs
--- a/modules/bitcoinsuite-chronik-client/src/lib.rs
+++ b/modules/bitcoinsuite-chronik-client/src/lib.rs
@@ -7,6 +7,7 @@
use bitcoinsuite_core::hash::{Hashed, Sha256d};
use bytes::Bytes;
pub use chronik_proto::proto;
+use chronik_proto::proto::WsSubScript;
use futures_util::{SinkExt, StreamExt};
use prost::Message;
use reqwest::{header::CONTENT_TYPE, StatusCode};
@@ -81,6 +82,9 @@
#[derive(Debug, Clone, Default)]
pub struct WsSubscriptions {
+ pub scripts: Vec<WsSubScript>,
+ pub tokens: Vec<String>,
+ pub lokad_ids: Vec<String>,
pub blocks: bool,
}
@@ -512,6 +516,39 @@
Ok(())
}
+ pub async fn subscribe_to_script(
+ &mut self,
+ script_type: ScriptType,
+ payload: &str,
+ ) -> Result<(), Report> {
+ let decoded_payload =
+ hex::decode(payload).map_err(|e| Report::msg(e.to_string()))?;
+
+ let msg = proto::WsSub {
+ is_unsub: false,
+ sub_type: Some(proto::ws_sub::SubType::Script(
+ proto::WsSubScript {
+ script_type: script_type.to_string(),
+ payload: decoded_payload,
+ },
+ )),
+ };
+
+ self.ws
+ .send(tokio_tungstenite::tungstenite::Message::Binary(
+ Bytes::from(msg.encode_to_vec()),
+ ))
+ .await
+ .wrap_err(HttpRequestError)?;
+
+ self.subs.scripts.push(WsSubScript {
+ script_type: script_type.to_string(),
+ payload: payload.into(),
+ });
+
+ Ok(())
+ }
+
pub async fn recv(&mut self) -> Result<Option<proto::WsMsg>, Report> {
while let Some(msg) = self.ws.next().await {
match msg {
diff --git a/modules/bitcoinsuite-chronik-client/tests/websocket.rs b/modules/bitcoinsuite-chronik-client/tests/websocket.rs
--- a/modules/bitcoinsuite-chronik-client/tests/websocket.rs
+++ b/modules/bitcoinsuite-chronik-client/tests/websocket.rs
@@ -8,14 +8,16 @@
};
use async_trait::async_trait;
+use bitcoinsuite_chronik_client::ScriptType;
use bitcoinsuite_chronik_client::{
handler::{IpcHandler, IpcReader},
test_runner::{handle_test_info, spin_child_process},
ChronikClient, WsEndpoint,
};
-use bitcoinsuite_core::block::BlockHash;
+use bitcoinsuite_core::{address::CashAddress, block::BlockHash, hash::Hashed};
use chronik_proto::proto::{
ws_msg::MsgType, BlockMsgType::*, CoinbaseData, MsgBlock, TxOutput, WsMsg,
+ WsSubScript,
};
use serde_json::Value;
use tokio::sync::Mutex;
@@ -31,6 +33,15 @@
coinbase_out_scriptpubkey: String,
coinbase_out_value: i64,
coinbase_scriptsig: String,
+ p2pkh_address: String,
+ p2sh_address: String,
+ p2pk_script: String,
+ other_script: String,
+ p2pkh_txid: String,
+ p2sh_txid: String,
+ p2pk_txid: String,
+ other_txid: String,
+ mixed_output_txid: String,
}
#[derive(Default)]
@@ -55,6 +66,15 @@
"coinbase_out_scriptpubkey",
"coinbase_out_value",
"coinbase_scriptsig",
+ "p2pkh_address",
+ "p2sh_address",
+ "p2pk_script",
+ "other_script",
+ "p2pkh_txid",
+ "p2sh_txid",
+ "p2pk_txid",
+ "other_txid",
+ "mixed_output_txid",
];
// We use a mutable reference so we can call recv() on data.endpoint
@@ -105,6 +125,51 @@
data.coinbase_scriptsig = coinbase_sig.to_string();
}
}
+ "p2pkh_address" => {
+ if let Some(address) = value.as_str() {
+ data.p2pkh_address = address.to_string();
+ }
+ }
+ "p2sh_address" => {
+ if let Some(address) = value.as_str() {
+ data.p2sh_address = address.to_string();
+ }
+ }
+ "p2pk_script" => {
+ if let Some(script) = value.as_str() {
+ data.p2pk_script = script.to_string();
+ }
+ }
+ "other_script" => {
+ if let Some(script) = value.as_str() {
+ data.other_script = script.to_string();
+ }
+ }
+ "p2pkh_txid" => {
+ if let Some(txid) = value.as_str() {
+ data.p2pkh_txid = txid.to_string();
+ }
+ }
+ "p2sh_txid" => {
+ if let Some(txid) = value.as_str() {
+ data.p2sh_txid = txid.to_string();
+ }
+ }
+ "p2pk_txid" => {
+ if let Some(txid) = value.as_str() {
+ data.p2pk_txid = txid.to_string();
+ }
+ }
+ "other_txid" => {
+ if let Some(txid) = value.as_str() {
+ data.other_txid = txid.to_string();
+ }
+ }
+ "mixed_output_txid" => {
+ if let Some(txid) = value.as_str() {
+ data.mixed_output_txid = txid.to_string();
+ }
+ }
_ => {
println!("Unhandled key: {}", key);
}
@@ -117,26 +182,61 @@
{
match self.counter.load(Ordering::SeqCst) {
// New regtest chain
- 0 | 1 => {
+ 0 => {
let chronik_url = data.chronik_url.clone();
data.endpoint =
Some(ChronikClient::new(chronik_url)?.ws().await?);
+ }
+
+ // Subscribe to scripts and blocks
+ 1 => {
+ let p2pkh_hash =
+ data.p2pkh_address.parse::<CashAddress>()?.hash;
+ let p2sh_hash =
+ data.p2sh_address.parse::<CashAddress>()?.hash;
+ let p2pk_script = &data.p2pk_script;
+ let other_script = &data.other_script;
+
+ let subscriptions: Vec<WsSubScript> = vec![
+ WsSubScript {
+ script_type: ScriptType::P2pkh.to_string(),
+ payload: hex::encode(p2pkh_hash).into(),
+ },
+ WsSubScript {
+ script_type: ScriptType::P2sh.to_string(),
+ payload: hex::encode(p2sh_hash).into(),
+ },
+ WsSubScript {
+ script_type: ScriptType::P2pk.to_string(),
+ payload: hex::encode(p2pk_script).into(),
+ },
+ WsSubScript {
+ script_type: ScriptType::Other.to_string(),
+ payload: hex::encode(other_script).into(),
+ },
+ ];
if let Some(ep) = &mut data.endpoint {
+ // Subscribe to blocks
ep.subscribe_to_blocks().await?;
assert_eq!(ep.subs.blocks, true);
+
+ // Unsubscribe from blocks
ep.unsubscribe_from_blocks().await?;
assert_eq!(ep.subs.blocks, false);
+
+ // Resubscribe to blocks
ep.subscribe_to_blocks().await?;
assert_eq!(ep.subs.blocks, true);
}
}
+
// After a block is avalanche finalized
2 => {
- let finalized_block_message =
+ let block_message =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- finalized_block_message,
+ block_message,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkFinalized.into(),
@@ -151,18 +251,29 @@
})
);
}
+
// After some txs have been broadcast
3 => {
- // Part of the structure to keep count clean
- // for future iterations. Tx testing will be
- // conducted here.
+ // Wait for 4 tx messages
+ for _ in 0..4 {
+ let tx_msg =
+ data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
+ }
}
+
// After a block is mined
4 => {
- let block_connected_msg =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_connected_msg,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkConnected.into(),
@@ -176,13 +287,26 @@
}))
})
);
+
+ for _ in 0..4 {
+ let tx_msg =
+ data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
+ }
}
+
// After this block is finalized by Avalanche
5 => {
- let block_connected_msg =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_connected_msg,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkFinalized.into(),
@@ -196,13 +320,26 @@
}))
})
);
+
+ for _ in 0..4 {
+ let tx_msg =
+ data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
+ }
}
+
// After this block is parked
6 => {
- let block_message =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_message,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkDisconnected.into(),
@@ -229,13 +366,26 @@
}))
})
);
+
+ for _ in 0..4 {
+ let tx_msg =
+ data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
+ }
}
+
// After this block is unparked
7 => {
- let block_message =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_message,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkConnected.into(),
@@ -249,13 +399,26 @@
}))
})
);
+
+ for _ in 0..4 {
+ let tx_msg =
+ data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
+ }
}
+
// After this block is invalidated
8 => {
- let block_message =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_message,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkDisconnected.into(),
@@ -282,13 +445,26 @@
}))
})
);
+
+ for _ in 0..4 {
+ let tx_msg =
+ data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
+ }
}
+
// After this block is reconsidered
9 => {
- let block_message =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_message,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkConnected.into(),
@@ -302,19 +478,37 @@
}))
})
);
+ for _ in 0..4 {
+ let tx_msg =
+ data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
+ }
}
+
// After a tx is broadcast with outputs of each type
10 => {
- // Part of the structure to keep count clean
- // for future iterations. Tx testing will be
- // conducted here.
+ let tx_msg = data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
}
+
// After a block is mined
11 => {
- let next_blockhash_msg =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- next_blockhash_msg,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkConnected.into(),
@@ -328,13 +522,24 @@
}))
})
);
+
+ // Single tx confirmed message
+ let tx_msg = data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
}
+
// After this block is avalanche parked
12 => {
- let block_message =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_message,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkDisconnected.into(),
@@ -361,13 +566,23 @@
}))
})
);
+
+ let tx_msg = data.endpoint.as_mut().unwrap().recv().await?;
+ assert!(matches!(
+ tx_msg,
+ Some(WsMsg {
+ msg_type: Some(MsgType::Tx(_)),
+ ..
+ })
+ ));
}
+
// After this block is avalanche invalidated
13 => {
- let block_message =
+ let block_msg =
data.endpoint.as_mut().unwrap().recv().await?;
assert_eq!(
- block_message,
+ block_msg,
Some(WsMsg {
msg_type: Some(MsgType::Block(MsgBlock {
msg_type: BlkInvalidated.into(),
@@ -394,18 +609,19 @@
}))
})
);
- data.endpoint
- .as_mut()
- .unwrap()
- .unsubscribe_from_blocks()
- .await?;
- assert_eq!(
- data.endpoint.as_ref().unwrap().subs.blocks,
- false
- );
+
+ if let Some(ep) = &mut data.endpoint {
+ ep.unsubscribe_from_blocks().await?;
+ assert_eq!(ep.subs.blocks, false);
+ }
}
+
// After we have unsubscribed to all and another block is found
- 14 => {}
+ 14 => {
+ let msg = data.endpoint.as_mut().unwrap().recv().await?;
+ assert_eq!(msg, None);
+ }
+
_ => {
handler.send_message("stop").await?;
unreachable!(
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, May 20, 23:02 (4 h, 28 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5866125
Default Alt Text
D18110.diff (21 KB)
Attached To
D18110: Chronik - Add script and address subscription to websocket.rs in bitcoinsuite-chroni-client
Event Timeline
Log In to Comment