Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F14864641
D18087.id54053.diff
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
33 KB
Subscribers
None
D18087.id54053.diff
View Options
diff --git a/chronik/chronik-db/src/group.rs b/chronik/chronik-db/src/group.rs
--- a/chronik/chronik-db/src/group.rs
+++ b/chronik/chronik-db/src/group.rs
@@ -109,6 +109,10 @@
/// Serialize the given member.
fn ser_member(&self, member: &Self::Member<'_>) -> Self::MemberSer;
+ /// Whether this group supports ser_hash_member. If false the
+ /// ser_hash_member function will panic and should not be called.
+ fn is_hash_member_supported(&self) -> bool;
+
/// Hash the given member.
/// This is currently only used for ScriptGroup to create a
/// scripthash to script index for the ElectrumX API.
diff --git a/chronik/chronik-db/src/groups/lokad_id.rs b/chronik/chronik-db/src/groups/lokad_id.rs
--- a/chronik/chronik-db/src/groups/lokad_id.rs
+++ b/chronik/chronik-db/src/groups/lokad_id.rs
@@ -74,6 +74,10 @@
*member
}
+ fn is_hash_member_supported(&self) -> bool {
+ false
+ }
+
fn ser_hash_member(&self, _member: &Self::Member<'_>) -> [u8; 32] {
unimplemented!("There is no use case for hashing LokadIdGroup")
}
diff --git a/chronik/chronik-db/src/groups/script.rs b/chronik/chronik-db/src/groups/script.rs
--- a/chronik/chronik-db/src/groups/script.rs
+++ b/chronik/chronik-db/src/groups/script.rs
@@ -110,6 +110,10 @@
compress_script_variant(&member.variant())
}
+ fn is_hash_member_supported(&self) -> bool {
+ true
+ }
+
fn ser_hash_member(&self, member: &Self::Member<'_>) -> [u8; 32] {
Sha256::digest(member.bytecode()).to_be_bytes()
}
diff --git a/chronik/chronik-db/src/groups/token_id.rs b/chronik/chronik-db/src/groups/token_id.rs
--- a/chronik/chronik-db/src/groups/token_id.rs
+++ b/chronik/chronik-db/src/groups/token_id.rs
@@ -103,6 +103,10 @@
member.to_be_bytes()
}
+ fn is_hash_member_supported(&self) -> bool {
+ false
+ }
+
fn ser_hash_member(&self, _member: &Self::Member<'_>) -> [u8; 32] {
unimplemented!("There is no use case for hashing TokenIdGroup")
}
diff --git a/chronik/chronik-db/src/plugins/group.rs b/chronik/chronik-db/src/plugins/group.rs
--- a/chronik/chronik-db/src/plugins/group.rs
+++ b/chronik/chronik-db/src/plugins/group.rs
@@ -119,6 +119,10 @@
member.to_vec()
}
+ fn is_hash_member_supported(&self) -> bool {
+ false
+ }
+
fn ser_hash_member(&self, _member: &Self::Member<'_>) -> [u8; 32] {
unimplemented!("There is no known use case for hashing PluginsGroup")
}
diff --git a/chronik/chronik-db/src/test/value_group.rs b/chronik/chronik-db/src/test/value_group.rs
--- a/chronik/chronik-db/src/test/value_group.rs
+++ b/chronik/chronik-db/src/test/value_group.rs
@@ -61,6 +61,10 @@
ser_value(*sats)
}
+ fn is_hash_member_supported(&self) -> bool {
+ false
+ }
+
fn ser_hash_member(&self, _member: &Self::Member<'_>) -> [u8; 32] {
unimplemented!()
}
diff --git a/chronik/chronik-http/src/electrum.rs b/chronik/chronik-http/src/electrum.rs
--- a/chronik/chronik-http/src/electrum.rs
+++ b/chronik/chronik-http/src/electrum.rs
@@ -15,11 +15,13 @@
use chronik_bridge::ffi;
use chronik_db::group::GroupMember;
use chronik_indexer::{
+ indexer::ChronikIndexer,
merkle::MerkleTree,
query::{QueryBlocks, MAX_HISTORY_PAGE_SIZE},
subs::BlockMsgType,
+ subs_group::TxMsgType,
};
-use chronik_proto::proto::Tx;
+use chronik_proto::proto::{BlockMetadata, Tx};
use chronik_util::log_chronik;
use futures::future;
use itertools::izip;
@@ -142,7 +144,7 @@
message::Notification {
jsonrpc: message::JSONRPC_VERSION.to_string(),
method: nt.method,
- params: Some(Value::Array(vec![nt.result])),
+ params: Some(nt.result),
}
}
@@ -453,12 +455,15 @@
hex::encode(Sha256::from_be_slice(hash).unwrap().as_le_bytes())
}
+// Return the history in a format that fulcrum expects
async fn get_scripthash_history(
script_hash: Sha256,
indexer: ChronikIndexerRef,
node: NodeRef,
max_history: u32,
) -> Result<Vec<Tx>, RPCError> {
+ let script_hash_hex = hex::encode(script_hash.to_be_bytes());
+
let indexer = indexer.read().await;
let script_history = indexer
.script_history(&node)
@@ -469,9 +474,10 @@
let mut tx_history: Vec<Tx> = vec![];
while page < num_pages {
- // Return the history in ascending block height order
+ // Return the confirmed txs in ascending block height order, with txs
+ // ordered as they appear in the block
let history = script_history
- .rev_history(
+ .confirmed_txs(
GroupMember::MemberHash(script_hash).as_ref(),
page as usize,
MAX_HISTORY_PAGE_SIZE,
@@ -479,8 +485,6 @@
.map_err(|_| RPCError::InternalError)?;
if history.num_txs > max_history {
- let script_hash_hex = hex::encode(script_hash.to_be_bytes());
- // Note that Fulcrum would return an empty history in this case
return Err(RPCError::CustomError(
1,
format!(
@@ -499,6 +503,65 @@
page += 1;
}
+ // Note that there is currently no pagination for the mempool.
+ let mut history = script_history
+ .unconfirmed_txs(GroupMember::MemberHash(script_hash).as_ref())
+ .map_err(|_| RPCError::InternalError)?;
+
+ if history.num_txs + (tx_history.len() as u32) > max_history {
+ return Err(RPCError::CustomError(
+ 1,
+ format!(
+ "transaction history for scripthash {script_hash_hex} exceeds \
+ limit ({0})",
+ max_history
+ ),
+ ));
+ }
+
+ let mut unconfirmed_tx_history: Vec<Tx> = vec![];
+
+ for tx in history.txs.iter_mut() {
+ let block_height =
+ if has_unconfirmed_parents(tx, &indexer, &node).await? {
+ -1
+ } else {
+ 0
+ };
+
+ // Override the block height:
+ // - -1 if the tx has some unconfirmed parents
+ // - 0 if the tx has no unconfirmed parents
+ let electrum_fake_block = BlockMetadata {
+ height: block_height,
+ hash: vec![0; 64],
+ timestamp: 0,
+ is_final: false,
+ };
+ tx.block = Some(electrum_fake_block);
+ unconfirmed_tx_history.push(tx.clone());
+ }
+
+ // Return the mempool txs in the reverse block height then txid order
+ unconfirmed_tx_history.sort_by(|a, b| {
+ let a_height = a.block.as_ref().unwrap().height;
+ let b_height = b.block.as_ref().unwrap().height;
+ if a_height != b_height {
+ // Warning: reverse order! We place txs with no unconfirmed parents
+ // first (height = 0) then txs with unconfirmed parents
+ // (height = -1).
+ return b_height.cmp(&a_height);
+ }
+
+ let a_txid =
+ hex::encode(a.txid.iter().copied().rev().collect::<Vec<u8>>());
+ let b_txid =
+ hex::encode(b.txid.iter().copied().rev().collect::<Vec<u8>>());
+ a_txid.cmp(&b_txid)
+ });
+
+ tx_history.append(&mut unconfirmed_tx_history);
+
Ok(tx_history)
}
@@ -507,7 +570,10 @@
let mut unconfirmed: i64 = 0;
for tx in history {
- let is_mempool = tx.block.is_none();
+ let is_mempool = match &tx.block {
+ Some(block) => block.height <= 0,
+ None => true,
+ };
for outp in tx.outputs.iter() {
if Sha256::digest(&outp.output_script) != scripthash {
continue;
@@ -532,6 +598,60 @@
(confirmed, unconfirmed)
}
+async fn has_unconfirmed_parents(
+ tx: &Tx,
+ indexer: &tokio::sync::RwLockReadGuard<'_, ChronikIndexer>,
+ node: &NodeRef,
+) -> Result<bool, RPCError> {
+ for input in tx.inputs.iter() {
+ let prev_txid = match &input.prev_out {
+ Some(prev_out) => TxId::try_from(prev_out.txid.as_slice())
+ .map_err(|_| RPCError::InternalError)?,
+ // If a tx has no prev_out it's a coinbase, so it can't be missing
+ // parents.
+ None => return Ok(false),
+ };
+ // This should never fail
+ let tx = indexer
+ .txs(node)
+ .tx_by_id(prev_txid)
+ .map_err(|_| RPCError::InternalError)?;
+ if tx.block.is_none() {
+ return Ok(true);
+ }
+ }
+
+ Ok(false)
+}
+
+async fn get_scripthash_status(
+ script_hash: Sha256,
+ indexer: ChronikIndexerRef,
+ node: NodeRef,
+ max_history: u32,
+) -> Result<Option<String>, RPCError> {
+ let tx_history =
+ get_scripthash_history(script_hash, indexer, node, max_history).await?;
+
+ if tx_history.is_empty() {
+ return Ok(None);
+ }
+
+ // Then compute the status
+ let mut status_parts = Vec::<String>::new();
+
+ for tx in tx_history {
+ let tx_hash =
+ hex::encode(tx.txid.iter().copied().rev().collect::<Vec<u8>>());
+ let height = tx.block.as_ref().unwrap().height;
+ status_parts.push(format!("{tx_hash}:{height}:"));
+ }
+
+ let status_string = status_parts.join("");
+
+ Ok(Some(Sha256::digest(status_string.as_bytes()).hex_le()))
+}
+
fn get_tx_fee(tx: &Tx) -> i64 {
let mut fee: i64 = 0;
for inp in tx.inputs.iter() {
@@ -543,10 +663,10 @@
fee
}
-async fn header_json_from_height(
+async fn header_hex_from_height(
blocks: &QueryBlocks<'_>,
height: i32,
-) -> Result<Value, RPCError> {
+) -> Result<String, RPCError> {
let header = blocks.header(height.to_string(), 0).await.map_err(|_| {
RPCError::CustomError(
1,
@@ -554,10 +674,14 @@
)
})?;
- Ok(json!({
- "height": height,
- "hex": hex::encode(header.raw_header),
- }))
+ Ok(hex::encode(header.raw_header))
+}
+
+fn script_hash_to_sub_id(script_hash: Sha256) -> u32 {
+ let script_hash_bytes: [u8; 32] = script_hash.into();
+ let id_bytes: [u8; 4] = script_hash_bytes[..4].try_into().unwrap();
+
+ u32::from_le_bytes(id_bytes)
}
#[rpc_pubsub_impl(name = "blockchain")]
@@ -604,15 +728,22 @@
let blocks: chronik_indexer::query::QueryBlocks<'_> =
indexer.blocks(&node_clone);
- match header_json_from_height(&blocks, block_msg.height)
+ match header_hex_from_height(&blocks, block_msg.height)
.await
{
Err(err) => {
log_chronik!("{err}\n");
break;
}
- Ok(header_json) => {
- if sub.notify(header_json).await.is_err() {
+ Ok(header_hex) => {
+ if sub
+ .notify(json!([{
+ "height": block_msg.height,
+ "hex": header_hex,
+ }]))
+ .await
+ .is_err()
+ {
// Don't log, it's likely a client
// unsubscription or disconnection
break;
@@ -630,7 +761,12 @@
.map_err(|_| RPCError::InternalError)?
.tip_height;
- header_json_from_height(&blocks, tip_height).await
+ let header_hex = header_hex_from_height(&blocks, tip_height).await?;
+
+ Ok(json!({
+ "height": tip_height,
+ "hex": header_hex,
+ }))
}
#[rpc_method(name = "headers.unsubscribe")]
@@ -642,6 +778,129 @@
) -> Result<Value, RPCError> {
let sub_id: message::SubscriptionID = 0;
let success = chan.remove_subscription(&sub_id).await.is_ok();
+ Ok(json!(success))
+ }
+
+ #[rpc_method(name = "scripthash.subscribe")]
+ async fn scripthash_subscribe(
+ &self,
+ chan: Arc<Channel>,
+ method: String,
+ params: Value,
+ ) -> Result<Value, RPCError> {
+ let script_hash_hex = match get_param!(params, 0, "scripthash")? {
+ Value::String(v) => Ok(v),
+ _ => {
+ Err(RPCError::CustomError(1, "Invalid scripthash".to_string()))
+ }
+ }?;
+
+ let script_hash =
+ Sha256::from_be_hex(&script_hash_hex).map_err(|_| {
+ RPCError::CustomError(1, "Invalid scripthash".to_string())
+ })?;
+
+ let indexer = self.indexer.read().await;
+ let mut subs: tokio::sync::RwLockWriteGuard<
+ '_,
+ chronik_indexer::subs::Subs,
+ > = indexer.subs().write().await;
+ let script_subs = subs.subs_script_mut();
+
+ let mut recv =
+ script_subs.subscribe_to_hash_member(&script_hash.to_be_bytes());
+
+ let indexer_clone = self.indexer.clone();
+ let node_clone = self.node.clone();
+ let max_history = self.max_history;
+
+ let sub_id = script_hash_to_sub_id(script_hash);
+ if let Ok(sub) = chan.new_subscription(&method, Some(sub_id)).await {
+ tokio::spawn(async move {
+ log_chronik!("Subscription to electrum scripthash\n");
+
+ let mut last_status = None;
+
+ loop {
+ let Ok(tx_msg) = recv.recv().await else {
+ // Error, disconnect
+ break;
+ };
+
+ // We want all the events except finalization (this might
+ // change in the future):
+ // - added to mempool
+ // - removed from mempool
+ // - confirmed
+ if tx_msg.msg_type == TxMsgType::Finalized {
+ continue;
+ }
+
+ if let Ok(status) = get_scripthash_status(
+ script_hash,
+ indexer_clone.clone(),
+ node_clone.clone(),
+ max_history,
+ )
+ .await
+ {
+ if last_status == status {
+ continue;
+ }
+ last_status = status.clone();
+
+ if sub
+ .notify(json!([
+ hex::encode(script_hash.to_be_bytes()),
+ status,
+ ]))
+ .await
+ .is_err()
+ {
+ // Don't log, it's likely a client
+ // unsubscription or
+ // disconnection
+ break;
+ }
+ }
+ }
+
+ log_chronik!("Unsubscription from electrum scripthash\n");
+ });
+ }
+
+ let status = get_scripthash_status(
+ script_hash,
+ self.indexer.clone(),
+ self.node.clone(),
+ max_history,
+ )
+ .await?;
+
+ Ok(serde_json::json!(status))
+ }
+
+ #[rpc_method(name = "scripthash.unsubscribe")]
+ async fn scripthash_unsubscribe(
+ &self,
+ chan: Arc<Channel>,
+ _method: String,
+ params: Value,
+ ) -> Result<Value, RPCError> {
+ let script_hash_hex = match get_param!(params, 0, "scripthash")? {
+ Value::String(v) => Ok(v),
+ _ => {
+ Err(RPCError::CustomError(1, "Invalid scripthash".to_string()))
+ }
+ }?;
+
+ let script_hash =
+ Sha256::from_be_hex(&script_hash_hex).map_err(|_| {
+ RPCError::CustomError(1, "Invalid scripthash".to_string())
+ })?;
+
+ let sub_id = script_hash_to_sub_id(script_hash);
+ let success = chan.remove_subscription(&sub_id).await.is_ok();
Ok(serde_json::json!(success))
}
}
@@ -1056,15 +1315,8 @@
// Return history in ascending order (and mempool last), which is the
// opposite of what chronik does.
- for tx in history.iter().rev() {
- let height = match &tx.block {
- Some(block) => block.height,
- // Here we differ from Fulcrum because we don't discriminate
- // between unconfirmed transactions and
- // transactions with unconfirmed parents
- // (height -1)
- None => 0,
- };
+ for tx in history {
+ let height = tx.block.as_ref().unwrap().height;
let be_txid: Vec<u8> = tx.txid.iter().copied().rev().collect();
if height > 0 {
json_history.push(json!({
@@ -1072,7 +1324,7 @@
"tx_hash": hex::encode(be_txid)
}));
} else {
- let fee = get_tx_fee(tx);
+ let fee = get_tx_fee(&tx);
json_history.push(json!({
"fee": fee,
"height": height,
diff --git a/chronik/chronik-indexer/src/subs_group.rs b/chronik/chronik-indexer/src/subs_group.rs
--- a/chronik/chronik-indexer/src/subs_group.rs
+++ b/chronik/chronik-indexer/src/subs_group.rs
@@ -71,6 +71,22 @@
}
}
+ /// Subscribe to updates about the given group hash member.
+ pub fn subscribe_to_hash_member(
+ &mut self,
+ hash_member: &[u8; 32],
+ ) -> broadcast::Receiver<TxMsg> {
+ match self.subs.get(hash_member.as_ref()) {
+ Some(sender) => sender.subscribe(),
+ None => {
+ let (sender, receiver) =
+ broadcast::channel(GROUP_CHANNEL_CAPACITY);
+ self.subs.insert(hash_member.as_ref().to_vec(), sender);
+ receiver
+ }
+ }
+ }
+
/// Cleanly unsubscribe from a member. This will try to deallocate the
/// memory used by a subscriber.
pub fn unsubscribe_from_member(&mut self, member: &G::Member<'_>) {
@@ -98,7 +114,14 @@
txid: tx.txid(),
};
let mut already_notified = HashSet::new();
+ let mut already_notified_hash = HashSet::new();
for member in tx_members_for_group(&self.group, query, aux) {
+ let hash_member = if self.group.is_hash_member_supported() {
+ Some(self.group.ser_hash_member(&member))
+ } else {
+ None
+ };
+
if !already_notified.contains(&member) {
let member_ser = self.group.ser_member(&member);
if let Some(sender) = self.subs.get(member_ser.as_ref()) {
@@ -109,6 +132,24 @@
}
already_notified.insert(member);
}
+
+ match hash_member {
+ // What is below is only for ScriptGroup
+ None => continue,
+ Some(hash_member) => {
+ if !already_notified_hash.contains(&hash_member) {
+ if let Some(sender) =
+ self.subs.get(hash_member.as_ref())
+ {
+ // Unclean unsubscribe
+ if sender.send(msg.clone()).is_err() {
+ self.subs.remove(hash_member.as_ref());
+ }
+ }
+ already_notified_hash.insert(hash_member);
+ }
+ }
+ }
}
}
diff --git a/test/functional/chronik_electrum_blockchain.py b/test/functional/chronik_electrum_blockchain.py
--- a/test/functional/chronik_electrum_blockchain.py
+++ b/test/functional/chronik_electrum_blockchain.py
@@ -25,7 +25,12 @@
)
from test_framework.script import OP_RETURN, OP_TRUE, CScript
from test_framework.test_framework import BitcoinTestFramework
-from test_framework.util import assert_equal, chronikelectrum_port, hex_to_be_bytes
+from test_framework.util import (
+ assert_equal,
+ assert_greater_than,
+ chronikelectrum_port,
+ hex_to_be_bytes,
+)
from test_framework.wallet import MiniWallet
COINBASE_TX_HEX = (
@@ -67,6 +72,7 @@
self.test_block_header()
self.test_scripthash()
self.test_headers_subscribe()
+ self.test_scripthash_subscribe()
def test_invalid_params(self):
# Invalid params type
@@ -700,7 +706,7 @@
def utxo_sorting_key(utxo):
return utxo["tx_hash"], utxo["tx_pos"]
- def assert_scripthash_balance_and_history(check_sorting=True):
+ def assert_scripthash_balance_and_history():
assert_equal(
self.client.blockchain.scripthash.get_balance(scripthash).result,
{
@@ -716,17 +722,31 @@
scripthash
).result
expected_utxos = utxos
- if not check_sorting:
- # Enforce any unique arbitrary sorting so we can compare equality
- # between the two lists.
- def sorting_key(hist_item):
- return hist_item["tx_hash"]
- actual_history = sorted(actual_history, key=sorting_key)
- expected_history = sorted(expected_history, key=sorting_key)
+ def electrum_history_sort(hist):
+ # Extract confirmed txs and sort by ascending height then
+ # txid
+ conf_hist = [tx for tx in hist if tx["height"] > 0]
+ # We use no coinbase tx in this test, otherwise this should be
+ # accounted for (a coinbase tx should remain at first position)
+ conf_hist.sort(key=lambda tx: tx["tx_hash"])
+ conf_hist.sort(key=lambda tx: tx["height"])
+
+ # Extract unconfirmed txs and sort by descending height then
+ # txid
+ unconf_hist = [tx for tx in hist if tx["height"] <= 0]
+ unconf_hist.sort(key=lambda tx: tx["tx_hash"])
+ unconf_hist.sort(key=lambda tx: tx["height"], reverse=True)
+
+ # The full history is made of confirmed txs first then
+ # unconfirmed txs
+ return conf_hist + unconf_hist
+
+ expected_history = electrum_history_sort(expected_history)
+ # Actually the same sort except all mempool txs have a block
+ # height of 0
+ expected_utxos = electrum_history_sort(expected_utxos)
- actual_utxos = sorted(actual_utxos, key=utxo_sorting_key)
- expected_utxos = sorted(expected_utxos, key=utxo_sorting_key)
assert_equal(actual_history, expected_history)
assert_equal(actual_utxos, expected_utxos)
@@ -740,7 +760,17 @@
from_node=self.node, scriptPubKey=script, amount=amount, fee=fee
)
unconfirmed += amount
- history.append({"fee": fee, "height": 0, "tx_hash": txid})
+
+ unconfirmed_parents = len(self.node.getmempoolentry(txid)["depends"]) > 0
+ print(f"txid: {txid}, unconfirmed_parents: {unconfirmed_parents}")
+ history.append(
+ {
+ "fee": fee,
+ "height": -1 if unconfirmed_parents else 0,
+ "tx_hash": txid,
+ }
+ )
+
utxos.append({"height": 0, "tx_hash": txid, "tx_pos": n, "value": amount})
return txid, n
@@ -763,7 +793,7 @@
for _ in range(3):
add_unconfirmed_transaction(amount=888, fee=999)
# We cannot guarantee the sorting of unconfirmed transactions
- assert_scripthash_balance_and_history(check_sorting=False)
+ assert_scripthash_balance_and_history()
# Test an excessive transaction history
history_len = len(
@@ -795,7 +825,7 @@
self.client = self.nodes[0].get_chronik_electrum_client()
# We can add one more transaction
add_unconfirmed_transaction(amount=777, fee=998)
- assert_scripthash_balance_and_history(check_sorting=False)
+ assert_scripthash_balance_and_history()
# The next transaction makes the tx history too long.
add_unconfirmed_transaction(amount=777, fee=998)
@@ -825,6 +855,11 @@
sorted(utxos, key=utxo_sorting_key),
)
+ # Remove the history limit for the next tests
+ self.restart_node(0)
+ self.client = self.node.get_chronik_electrum_client()
+ self.wallet.rescan_utxos()
+
def test_headers_subscribe(self):
self.log.info("Test the blockchain.headers.subscribe endpoint")
@@ -860,7 +895,7 @@
for client in clients:
notification = client.wait_for_notification(
"blockchain.headers.subscribe"
- )
+ )[0]
assert_equal(notification["height"], height)
assert_equal(notification["hex"], header_hex)
@@ -918,6 +953,186 @@
unsub_message = client2.blockchain.headers.unsubscribe()
assert_equal(unsub_message.result, False)
+ # Unsubscribe all the clients so we don't mess with other tests
+ unsub_message = self.client.blockchain.headers.unsubscribe()
+ assert_equal(unsub_message.result, True)
+ unsub_message = client3.blockchain.headers.unsubscribe()
+ assert_equal(unsub_message.result, True)
+
+ def test_scripthash_subscribe(self):
+ self.log.info("Test the blockchain.scripthash.subscribe endpoint")
+
+ self.generate(self.wallet, 10)
+
+ # Subscribing to an address with no history returns null as a status
+ sub_message = self.client.blockchain.scripthash.subscribe("0" * 64)
+ result_no_history = sub_message.result
+ assert_equal(result_no_history, None)
+
+ # Subscribing to an address with some history returns a hash as a status
+ scripthash = hex_be_sha256(self.wallet.get_scriptPubKey())
+ assert_greater_than(
+ len(self.client.blockchain.scripthash.get_history(scripthash).result), 0
+ )
+ sub_message = self.client.blockchain.scripthash.subscribe(scripthash)
+ result_history = sub_message.result
+ assert result_history is not None
+ assert_equal(len(result_history), 64)
+
+ # Subscribing again is a no-op and returns the same result
+ for _ in range(3):
+ assert_equal(
+ self.client.blockchain.scripthash.subscribe("0" * 64).result,
+ result_no_history,
+ )
+ assert_equal(
+ self.client.blockchain.scripthash.subscribe(scripthash).result,
+ result_history,
+ )
+
+ # Generate a few wallet transactions so we get notifications
+ chain_length = 3
+ self.wallet.send_self_transfer_chain(
+ from_node=self.node, chain_length=chain_length
+ )
+
+ def check_notification(clients, scripthash, last_status=None):
+ ret_status = None
+ for client in clients:
+ notification = client.wait_for_notification(
+ "blockchain.scripthash.subscribe", timeout=1
+ )
+ # We should have exactly 2 items, the scripthash and the status
+ assert_equal(len(notification), 2)
+ (ret_scripthash, status) = notification
+ assert_equal(ret_scripthash, scripthash)
+ # Status is some hash
+ assert_equal(len(status), 64)
+
+ # The status should be the same for all clients
+ if ret_status:
+ assert_equal(status, ret_status)
+ ret_status = status
+
+ assert ret_status != last_status
+ return ret_status
+
+ # We should get a notification of each tx in the chain. Each tx causes
+ # the status to change so the status should be different for each
+ # notification.
+ last_status = None
+ for _ in range(chain_length):
+ last_status = check_notification([self.client], scripthash, last_status)
+
+ # Mine a block: the 3 previously unconfirmed txs are confirmed. We get 2
+ # notification: 1 for the confirmation of the 3 mempool txs, and 1 for
+ # the new coinbase tx
+ assert_equal(len(self.node.getrawmempool()), 3)
+ self.generate(self.wallet, 1)
+ assert_equal(len(self.node.getrawmempool()), 0)
+
+ # Here the confirmation happens for all the txs at the same time, so the
+ # status is the same across all the notifications (there is no such
+ # thing as one tx enters the block, then another one etc.).
+ # But this will differ from the previously saved status because the txs
+ # now have a non zero block height (and there is a new coinbase tx).
+ last_status = check_notification([self.client], scripthash, last_status)
+
+ # Let's add some clients
+ client2 = self.node.get_chronik_electrum_client()
+ assert_equal(
+ client2.blockchain.scripthash.subscribe(scripthash).result, last_status
+ )
+ client3 = self.node.get_chronik_electrum_client()
+ assert_equal(
+ client3.blockchain.scripthash.subscribe(scripthash).result, last_status
+ )
+
+ # Add a few more txs: all clients get notified. The status changes
+ # everytime, see the above rationale
+ self.wallet.send_self_transfer_chain(
+ from_node=self.node, chain_length=chain_length
+ )
+ for _ in range(chain_length):
+ last_status = check_notification(
+ [self.client, client2, client3], scripthash, last_status
+ )
+
+ # Mine the block to confirm the transactions
+ assert_equal(len(self.node.getrawmempool()), 3)
+ self.generate(self.wallet, 1)
+ assert_equal(len(self.node.getrawmempool()), 0)
+ last_status = check_notification(
+ [self.client, client2, client3], scripthash, last_status
+ )
+
+ # Unsubscribe client 2, the other clients are still notified
+ assert_equal(client2.blockchain.scripthash.unsubscribe(scripthash).result, True)
+
+ self.generate(self.wallet, 1)
+ last_status = check_notification(
+ [self.client, client3], scripthash, last_status
+ )
+
+ try:
+ client2.wait_for_notification("blockchain.scripthash.subscribe", timeout=1)
+ assert False, "Received an unexpected scripthash notification"
+ except TimeoutError:
+ pass
+
+ # Unsubscribing again is a no-op
+ for _ in range(3):
+ assert_equal(
+ client2.blockchain.scripthash.unsubscribe(scripthash).result, False
+ )
+
+ # Subscribe the first client to another hash
+ scriptpubkey = CScript(
+ bytes.fromhex("76a91462e907b15cbf27d5425399ebf6f0fb50ebb88f1888ac")
+ )
+ other_scripthash = hex_be_sha256(scriptpubkey)
+ # This script has some history from the previous tests
+ sub_message = self.client.blockchain.scripthash.subscribe(other_scripthash)
+ assert_equal(len(sub_message.result), 64)
+
+ # We're sending from the originally subscribed address to the newly
+ # subscribed one so we also get the change output
+ self.wallet.send_to(
+ from_node=self.node,
+ scriptPubKey=scriptpubkey,
+ amount=1000,
+ )
+ check_notification([self.client], other_scripthash)
+ last_status = check_notification(
+ [self.client, client3], scripthash, last_status
+ )
+
+ # Unsubscribe the first client from the first scripthash
+ assert_equal(
+ self.client.blockchain.scripthash.unsubscribe(scripthash).result, True
+ )
+
+ # Now only client3 gets notified for the original scripthash
+ self.generate(self.wallet, 1)
+ last_status = check_notification([client3], scripthash, last_status)
+
+ # The other tx get confirmed
+ check_notification([self.client], other_scripthash)
+ # But that's the only notification
+ try:
+ self.client.wait_for_notification(
+ "blockchain.scripthash.subscribe", timeout=1
+ )
+ assert False, "Received an unexpected scripthash notification"
+ except TimeoutError:
+ pass
+
+ # Unsubscribe to everything
+ assert_equal(
+ self.client.blockchain.scripthash.unsubscribe(other_scripthash).result, True
+ )
+ assert_equal(client3.blockchain.scripthash.unsubscribe(scripthash).result, True)
+
if __name__ == "__main__":
ChronikElectrumBlockchain().main()
diff --git a/test/functional/test_framework/jsonrpctools.py b/test/functional/test_framework/jsonrpctools.py
--- a/test/functional/test_framework/jsonrpctools.py
+++ b/test/functional/test_framework/jsonrpctools.py
@@ -7,7 +7,7 @@
import socket
from typing import Any, Optional
-from .util import assert_equal
+from .util import assert_equal, assert_greater_than
class OversizedResponseError(Exception):
@@ -160,8 +160,8 @@
assert "id" not in json_reply
assert_equal(json_reply.get("method"), method)
assert "params" in json_reply
- assert_equal(len(json_reply["params"]), 1)
+ assert_greater_than(len(json_reply["params"]), 0)
# The "result" is within a "params" field. There is no point returning
# a JsonRpcResponse here as we only care about the result
- return json_reply["params"][0]
+ return json_reply["params"]
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Tue, May 20, 21:22 (15 h, 16 m)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
5865748
Default Alt Text
D18087.id54053.diff (33 KB)
Attached To
D18087: [chronik] Add the blockchain.scripthash.subscribe endpoint
Event Timeline
Log In to Comment