diff --git a/chronik/chronik-http/src/ws.rs b/chronik/chronik-http/src/ws.rs index 7a90d0932..cb48f6b36 100644 --- a/chronik/chronik-http/src/ws.rs +++ b/chronik/chronik-http/src/ws.rs @@ -1,311 +1,321 @@ // Copyright (c) 2023 The Bitcoin developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. //! Module for [`handle_subscribe_socket`]. use std::{collections::HashMap, time::Duration}; use abc_rust_error::Result; use axum::extract::ws::{self, WebSocket}; use bitcoinsuite_core::script::ScriptVariant; use bitcoinsuite_slp::token_id::TokenId; use chronik_indexer::{ subs::{BlockMsg, BlockMsgType}, subs_group::{TxMsg, TxMsgType}, }; use chronik_proto::proto; use chronik_util::log_chronik; use futures::future::select_all; use prost::Message; use thiserror::Error; use tokio::sync::broadcast; use crate::{ error::report_status_error, parse::parse_script_variant, server::{ChronikIndexerRef, ChronikSettings}, }; /// Errors for [`ChronikServer`]. #[derive(Debug, Eq, Error, PartialEq)] pub enum ChronikWsError { /// Unexpected [`ws::Message`] type. #[error("Unexpected message type {0}")] UnexpectedMessageType(&'static str), /// [`proto::WsSub`] must have the `sub_type` field set. #[error("400: Missing sub_type in WsSub message")] MissingSubType, } use self::ChronikWsError::*; enum WsAction { Close, Sub(WsSub), Message(ws::Message), Nothing, } struct WsSub { is_unsub: bool, sub_type: WsSubType, } enum WsSubType { Blocks, Script(ScriptVariant), TokenId(TokenId), } type SubRecvBlocks = Option>; type SubRecvScripts = HashMap>; type SubRecvTokenId = HashMap>; struct SubRecv { blocks: SubRecvBlocks, scripts: SubRecvScripts, token_ids: SubRecvTokenId, ws_ping_interval: Duration, } impl SubRecv { async fn recv_action(&mut self) -> Result { tokio::select! { action = Self::recv_blocks(&mut self.blocks) => action, action = Self::recv_scripts(&mut self.scripts) => action, action = Self::recv_token_ids(&mut self.token_ids) => action, action = Self::schedule_ping(self.ws_ping_interval) => action, } } async fn recv_blocks(blocks: &mut SubRecvBlocks) -> Result { match blocks { Some(blocks) => sub_block_msg_action(blocks.recv().await), None => futures::future::pending().await, } } #[allow(clippy::mutable_key_type)] async fn recv_scripts(scripts: &mut SubRecvScripts) -> Result { if scripts.is_empty() { futures::future::pending().await } else { let script_receivers = select_all( scripts .values_mut() .map(|receiver| Box::pin(receiver.recv())), ); let (tx_msg, _, _) = script_receivers.await; sub_tx_msg_action(tx_msg) } } async fn recv_token_ids( token_ids: &mut SubRecvTokenId, ) -> Result { if token_ids.is_empty() { futures::future::pending().await } else { let token_ids_receivers = select_all( token_ids .values_mut() .map(|receiver| Box::pin(receiver.recv())), ); let (tx_msg, _, _) = token_ids_receivers.await; sub_tx_msg_action(tx_msg) } } async fn schedule_ping(ws_ping_interval: Duration) -> Result { tokio::time::sleep(ws_ping_interval).await; let ping_payload = b"Bitcoin ABC Chronik Indexer".to_vec(); Ok(WsAction::Message(ws::Message::Ping(ping_payload))) } async fn handle_sub(&mut self, sub: WsSub, indexer: &ChronikIndexerRef) { let indexer = indexer.read().await; let mut subs = indexer.subs().write().await; match sub.sub_type { WsSubType::Blocks => { if sub.is_unsub { log_chronik!("WS unsubscribe from blocks\n"); self.blocks = None; } else { log_chronik!("WS subscribe to blocks\n"); // Silently ignore multiple subs to blocks if self.blocks.is_none() { self.blocks = Some(subs.sub_to_block_msgs()); } } } WsSubType::Script(script_variant) => { let script = script_variant.to_script(); if sub.is_unsub { log_chronik!("WS unsubscribe from {:?}\n", script_variant); std::mem::drop(self.scripts.remove(&script_variant)); subs.subs_script_mut().unsubscribe_from_member(&&script) } else { log_chronik!("WS subscribe to {:?}\n", script_variant); let recv = subs.subs_script_mut().subscribe_to_member(&&script); self.scripts.insert(script_variant, recv); } } WsSubType::TokenId(token_id) => { if sub.is_unsub { log_chronik!("WS unsubscribe from {:?}\n", token_id); std::mem::drop(self.token_ids.remove(&token_id)); subs.subs_token_id_mut().unsubscribe_from_member(&token_id) } else { log_chronik!("WS subscribe to {:?}\n", token_id); let recv = subs.subs_token_id_mut().subscribe_to_member(&token_id); self.token_ids.insert(token_id, recv); } } } } async fn cleanup(self, indexer: &ChronikIndexerRef) { if self.scripts.is_empty() { return; } let indexer = indexer.read().await; let mut subs = indexer.subs().write().await; for (script_variant, receiver) in self.scripts { std::mem::drop(receiver); subs.subs_script_mut() .unsubscribe_from_member(&&script_variant.to_script()); } } } fn sub_client_msg_action( client_msg: Option>, ) -> Result { let client_msg = match client_msg { Some(client_msg) => client_msg, None => return Ok(WsAction::Close), }; match client_msg { Ok(ws::Message::Binary(data)) => { use proto::ws_sub::SubType; let sub = proto::WsSub::decode(data.as_slice())?; Ok(WsAction::Sub(WsSub { is_unsub: sub.is_unsub, sub_type: match sub.sub_type { None => return Err(MissingSubType.into()), Some(SubType::Blocks(_)) => WsSubType::Blocks, Some(SubType::Script(script)) => { WsSubType::Script(parse_script_variant( &script.script_type, &script.payload, )?) } Some(SubType::TokenId(token_id)) => WsSubType::TokenId( token_id.token_id.parse::()?, ), }, })) } Ok(ws::Message::Text(_)) => Err(UnexpectedMessageType("Text").into()), Ok(ws::Message::Ping(ping)) => { Ok(WsAction::Message(ws::Message::Pong(ping))) } Ok(ws::Message::Pong(_pong)) => Ok(WsAction::Nothing), Ok(ws::Message::Close(_)) | Err(_) => Ok(WsAction::Close), } } fn sub_block_msg_action( block_msg: Result, ) -> Result { use proto::{ws_msg::MsgType, BlockMsgType::*}; let Ok(block_msg) = block_msg else { return Ok(WsAction::Nothing); }; let block_msg_type = match block_msg.msg_type { BlockMsgType::Connected => BlkConnected, BlockMsgType::Disconnected => BlkDisconnected, BlockMsgType::Finalized => BlkFinalized, }; let msg_type = Some(MsgType::Block(proto::MsgBlock { msg_type: block_msg_type as _, block_hash: block_msg.hash.to_vec(), block_height: block_msg.height, })); let msg_proto = proto::WsMsg { msg_type }; let msg = ws::Message::Binary(msg_proto.encode_to_vec()); Ok(WsAction::Message(msg)) } fn sub_tx_msg_action( tx_msg: Result, ) -> Result { use proto::{ws_msg::MsgType, TxMsgType::*}; let tx_msg = match tx_msg { Ok(tx_msg) => tx_msg, Err(_) => return Ok(WsAction::Nothing), }; let tx_msg_type = match tx_msg.msg_type { TxMsgType::AddedToMempool => TxAddedToMempool, TxMsgType::RemovedFromMempool => TxRemovedFromMempool, TxMsgType::Confirmed => TxConfirmed, TxMsgType::Finalized => TxFinalized, }; let msg_type = Some(MsgType::Tx(proto::MsgTx { msg_type: tx_msg_type as _, txid: tx_msg.txid.to_vec(), })); let msg_proto = proto::WsMsg { msg_type }; let msg = ws::Message::Binary(msg_proto.encode_to_vec()); Ok(WsAction::Message(msg)) } /// Future for a WS connection, which will run indefinitely until the WS will be /// closed. pub async fn handle_subscribe_socket( mut socket: WebSocket, indexer: ChronikIndexerRef, settings: ChronikSettings, ) { let mut recv = SubRecv { blocks: Default::default(), scripts: Default::default(), token_ids: Default::default(), ws_ping_interval: settings.ws_ping_interval, }; + let mut last_msg = None; loop { let sub_action = tokio::select! { client_msg = socket.recv() => sub_client_msg_action(client_msg), action = recv.recv_action() => action, }; let subscribe_action = match sub_action { + // Deduplicate identical consecutive msgs + Ok(WsAction::Message(ws::Message::Binary(msg))) => { + if last_msg.as_ref() == Some(&msg) { + WsAction::Nothing + } else { + last_msg = Some(msg.clone()); + WsAction::Message(ws::Message::Binary(msg)) + } + } Ok(subscribe_action) => subscribe_action, // Turn Err into Message and reply Err(report) => { let (_, error_proto) = report_status_error(report); WsAction::Message(ws::Message::Binary( error_proto.encode_to_vec(), )) } }; match subscribe_action { WsAction::Close => { recv.cleanup(&indexer).await; return; } WsAction::Sub(sub) => recv.handle_sub(sub, &indexer).await, WsAction::Message(msg) => match socket.send(msg).await { Ok(()) => {} Err(_) => return, }, WsAction::Nothing => {} } } } diff --git a/test/functional/chronik_token_id_group.py b/test/functional/chronik_token_id_group.py index 466a59ffe..eaa24211c 100644 --- a/test/functional/chronik_token_id_group.py +++ b/test/functional/chronik_token_id_group.py @@ -1,469 +1,467 @@ #!/usr/bin/env python3 # Copyright (c) 2024 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """ Test Chronik indexes tx by token ID correctly. """ from itertools import zip_longest from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, P2SH_OP_TRUE, SCRIPTSIG_OP_TRUE, ) from test_framework.chronik.alp import alp_genesis, alp_opreturn, alp_send from test_framework.chronik.slp import slp_genesis from test_framework.chronik.token_tx import TokenTx from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal class ChronikTokenBurn(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.extra_args = [["-chronik"]] def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): from test_framework.chronik.client import pb def slp_token(token_type=None, **kwargs) -> pb.Token: return pb.Token( token_type=token_type or pb.TokenType(slp=pb.SLP_TOKEN_TYPE_FUNGIBLE), **kwargs, ) def alp_token(token_type=None, **kwargs) -> pb.Token: return pb.Token( token_type=token_type or pb.TokenType(alp=pb.ALP_TOKEN_TYPE_STANDARD), **kwargs, ) def ws_msg(txid: str, msg_type): return pb.WsMsg( tx=pb.MsgTx( msg_type=msg_type, txid=bytes.fromhex(txid)[::-1], ) ) node = self.nodes[0] chronik = node.get_chronik_client() ws1 = chronik.ws() ws2 = chronik.ws() mocktime = 1300000000 node.setmocktime(mocktime) coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) cointx = coinblock["tx"][0] self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE) coinvalue = 5000000000 txs = [] tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(cointx, 16), 0), SCRIPTSIG_OP_TRUE)] tx.vout = [ CTxOut( 0, slp_genesis( token_type=pb.SLP_TOKEN_TYPE_FUNGIBLE, mint_baton_vout=2, initial_mint_amount=5000, ), ), CTxOut(10000, P2SH_OP_TRUE), CTxOut(10000, P2SH_OP_TRUE), CTxOut(coinvalue - 100000, P2SH_OP_TRUE), ] tx.rehash() genesis_slp = TokenTx( tx=tx, status=pb.TOKEN_STATUS_NORMAL, entries=[ pb.TokenEntry( token_id=tx.hash, token_type=pb.TokenType(slp=pb.SLP_TOKEN_TYPE_FUNGIBLE), tx_type=pb.GENESIS, actual_burn_amount="0", ), ], inputs=[pb.Token()], outputs=[ pb.Token(), slp_token(token_id=tx.hash, amount=5000), slp_token(token_id=tx.hash, is_mint_baton=True), pb.Token(), ], token_info=pb.TokenInfo( token_id=tx.hash, token_type=pb.TokenType(slp=pb.SLP_TOKEN_TYPE_FUNGIBLE), genesis_info=pb.GenesisInfo(), ), ) txs.append(genesis_slp) ws1.sub_token_id(genesis_slp.txid) genesis_slp.send(chronik) genesis_slp.test(chronik) assert_equal(ws1.recv(), ws_msg(genesis_slp.txid, pb.TX_ADDED_TO_MEMPOOL)) tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(genesis_slp.txid, 16), 3), SCRIPTSIG_OP_TRUE)] tx.vout = [ alp_opreturn( alp_genesis( mint_amounts=[1000, 2000], num_batons=1, ), ), CTxOut(10000, P2SH_OP_TRUE), CTxOut(10000, P2SH_OP_TRUE), CTxOut(10000, P2SH_OP_TRUE), CTxOut(coinvalue - 200000, P2SH_OP_TRUE), ] tx.rehash() genesis_alp = TokenTx( tx=tx, status=pb.TOKEN_STATUS_NORMAL, entries=[ pb.TokenEntry( token_id=tx.hash, token_type=pb.TokenType(alp=pb.ALP_TOKEN_TYPE_STANDARD), tx_type=pb.GENESIS, actual_burn_amount="0", ), ], inputs=[pb.Token()], outputs=[ pb.Token(), alp_token(token_id=tx.hash, amount=1000), alp_token(token_id=tx.hash, amount=2000), alp_token(token_id=tx.hash, is_mint_baton=True), pb.Token(), ], token_info=pb.TokenInfo( token_id=tx.hash, token_type=pb.TokenType(alp=pb.ALP_TOKEN_TYPE_STANDARD), genesis_info=pb.GenesisInfo(), ), ) txs.append(genesis_alp) ws1.sub_token_id(genesis_alp.txid) genesis_alp.send(chronik) genesis_alp.test(chronik) assert_equal(ws1.recv(), ws_msg(genesis_alp.txid, pb.TX_ADDED_TO_MEMPOOL)) tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(genesis_alp.txid, 16), 4), SCRIPTSIG_OP_TRUE)] tx.vout = [ alp_opreturn( alp_genesis( mint_amounts=[10, 20], num_batons=0, ), ), CTxOut(10000, P2SH_OP_TRUE), CTxOut(10000, P2SH_OP_TRUE), CTxOut(coinvalue - 300000, P2SH_OP_TRUE), ] tx.rehash() genesis2_alp = TokenTx( tx=tx, status=pb.TOKEN_STATUS_NORMAL, entries=[ pb.TokenEntry( token_id=tx.hash, token_type=pb.TokenType(alp=pb.ALP_TOKEN_TYPE_STANDARD), tx_type=pb.GENESIS, actual_burn_amount="0", ), ], inputs=[pb.Token()], outputs=[ pb.Token(), alp_token(token_id=tx.hash, amount=10), alp_token(token_id=tx.hash, amount=20), pb.Token(), ], token_info=pb.TokenInfo( token_id=tx.hash, token_type=pb.TokenType(alp=pb.ALP_TOKEN_TYPE_STANDARD), genesis_info=pb.GenesisInfo(), ), ) txs.append(genesis2_alp) ws2.sub_token_id(genesis2_alp.txid) genesis2_alp.send(chronik) genesis2_alp.test(chronik) assert_equal(ws2.recv(), ws_msg(genesis2_alp.txid, pb.TX_ADDED_TO_MEMPOOL)) tx = CTransaction() tx.vin = [ CTxIn(COutPoint(int(genesis_slp.txid, 16), 1), SCRIPTSIG_OP_TRUE), CTxIn(COutPoint(int(genesis_alp.txid, 16), 1), SCRIPTSIG_OP_TRUE), CTxIn(COutPoint(int(genesis2_alp.txid, 16), 1), SCRIPTSIG_OP_TRUE), ] tx.vout = [ alp_opreturn( alp_send(genesis_alp.txid, [400, 600]), alp_send(genesis2_alp.txid, [0, 0, 3, 7]), ), CTxOut(546, P2SH_OP_TRUE), CTxOut(546, P2SH_OP_TRUE), CTxOut(546, P2SH_OP_TRUE), CTxOut(546, P2SH_OP_TRUE), ] send_alp = TokenTx( tx=tx, status=pb.TOKEN_STATUS_NOT_NORMAL, entries=[ pb.TokenEntry( token_id=genesis_alp.txid, token_type=pb.TokenType(alp=pb.ALP_TOKEN_TYPE_STANDARD), tx_type=pb.SEND, actual_burn_amount="0", ), pb.TokenEntry( token_id=genesis2_alp.txid, token_type=pb.TokenType(alp=pb.ALP_TOKEN_TYPE_STANDARD), tx_type=pb.SEND, actual_burn_amount="0", ), pb.TokenEntry( token_id=genesis_slp.txid, token_type=pb.TokenType(slp=pb.SLP_TOKEN_TYPE_FUNGIBLE), is_invalid=True, burn_summary="Unexpected burn: Burns 5000 base tokens", actual_burn_amount="5000", ), ], inputs=[ slp_token(token_id=genesis_slp.txid, amount=5000, entry_idx=2), alp_token(token_id=genesis_alp.txid, amount=1000), alp_token(token_id=genesis2_alp.txid, amount=10, entry_idx=1), ], outputs=[ pb.Token(), alp_token(token_id=genesis_alp.txid, amount=400), alp_token(token_id=genesis_alp.txid, amount=600), alp_token(token_id=genesis2_alp.txid, amount=3, entry_idx=1), alp_token(token_id=genesis2_alp.txid, amount=7, entry_idx=1), ], ) txs.append(send_alp) send_alp.send( chronik, error=f"400: Tx {send_alp.txid} failed token checks: Unexpected burn: Burns 5000 base tokens.", ) send_alp.test(chronik) expected_msg = ws_msg(send_alp.txid, pb.TX_ADDED_TO_MEMPOOL) - # ws1 subscribed to both genesis_slp and genesis_alp, so it get's a ws msg twice - assert_equal(ws1.recv(), expected_msg) assert_equal(ws1.recv(), expected_msg) assert_equal(ws2.recv(), expected_msg) slp_txs = sorted([genesis_slp, send_alp], key=lambda tx: tx.txid) history_txs = chronik.token_id(genesis_slp.txid).unconfirmed_txs().ok().txs for tx, proto_tx in zip_longest(slp_txs, history_txs): tx.test_tx(proto_tx) alp_txs = sorted([genesis_alp, send_alp], key=lambda tx: tx.txid) history_txs = chronik.token_id(genesis_alp.txid).unconfirmed_txs().ok().txs for tx, proto_tx in zip_longest(alp_txs, history_txs): tx.test_tx(proto_tx) alp2_txs = sorted([genesis2_alp, send_alp], key=lambda tx: tx.txid) history_txs = chronik.token_id(genesis2_alp.txid).unconfirmed_txs().ok().txs for tx, proto_tx in zip_longest(alp2_txs, history_txs): tx.test_tx(proto_tx) slp_utxos = [ pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(genesis_slp.txid)[::-1], out_idx=2 ), block_height=-1, value=10000, script=bytes(P2SH_OP_TRUE), token=slp_token( token_id=genesis_slp.txid, is_mint_baton=True, entry_idx=-1 ), ), ] utxos = chronik.token_id(genesis_slp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(slp_utxos, utxos): assert_equal(utxo, proto_utxo) alp_utxos = [ pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(genesis_alp.txid)[::-1], out_idx=2 ), block_height=-1, value=10000, script=bytes(P2SH_OP_TRUE), token=alp_token(token_id=genesis_alp.txid, amount=2000, entry_idx=-1), ), pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(genesis_alp.txid)[::-1], out_idx=3 ), block_height=-1, value=10000, script=bytes(P2SH_OP_TRUE), token=alp_token( token_id=genesis_alp.txid, is_mint_baton=True, entry_idx=-1 ), ), pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(send_alp.txid)[::-1], out_idx=1 ), block_height=-1, value=546, script=bytes(P2SH_OP_TRUE), token=alp_token(token_id=genesis_alp.txid, amount=400, entry_idx=-1), ), pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(send_alp.txid)[::-1], out_idx=2 ), block_height=-1, value=546, script=bytes(P2SH_OP_TRUE), token=alp_token(token_id=genesis_alp.txid, amount=600, entry_idx=-1), ), ] alp_utxos = sorted(alp_utxos, key=lambda o: o.outpoint.txid[::-1]) utxos = chronik.token_id(genesis_alp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(alp_utxos, utxos): assert_equal(utxo, proto_utxo) alp2_utxos = [ pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(genesis2_alp.txid)[::-1], out_idx=2 ), block_height=-1, value=10000, script=bytes(P2SH_OP_TRUE), token=alp_token(token_id=genesis2_alp.txid, amount=20, entry_idx=-1), ), pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(send_alp.txid)[::-1], out_idx=3 ), block_height=-1, value=546, script=bytes(P2SH_OP_TRUE), token=alp_token(token_id=genesis2_alp.txid, amount=3, entry_idx=-1), ), pb.Utxo( outpoint=pb.OutPoint( txid=bytes.fromhex(send_alp.txid)[::-1], out_idx=4 ), block_height=-1, value=546, script=bytes(P2SH_OP_TRUE), token=alp_token(token_id=genesis2_alp.txid, amount=7, entry_idx=-1), ), ] alp2_utxos = sorted(alp2_utxos, key=lambda o: o.outpoint.txid[::-1]) utxos = chronik.token_id(genesis2_alp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(alp2_utxos, utxos): assert_equal(utxo, proto_utxo) # Resubscribe so ws1=genesis_slp.txid, ws2=genesis_alp.txid, ws3=genesis2_alp.txid ws1.sub_token_id(genesis_alp.txid, is_unsub=True) ws2.sub_token_id(genesis2_alp.txid, is_unsub=True) ws2.sub_token_id(genesis_alp.txid) ws3 = chronik.ws() ws3.sub_token_id(genesis2_alp.txid) # After mining, all txs still work fine block_hash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] for tx in txs: tx.test(chronik, block_hash) history_txs = chronik.token_id(genesis_slp.txid).confirmed_txs().ok().txs for tx, proto_tx in zip_longest(slp_txs, history_txs): tx.test_tx(proto_tx, block_hash) history_txs = chronik.token_id(genesis_alp.txid).confirmed_txs().ok().txs for tx, proto_tx in zip_longest(alp_txs, history_txs): tx.test_tx(proto_tx, block_hash) history_txs = chronik.token_id(genesis2_alp.txid).confirmed_txs().ok().txs for tx, proto_tx in zip_longest(alp2_txs, history_txs): tx.test_tx(proto_tx, block_hash) for utxo in slp_utxos + alp_utxos + alp2_utxos: utxo.block_height = 102 utxos = chronik.token_id(genesis_slp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(slp_utxos, utxos): assert_equal(utxo, proto_utxo) utxos = chronik.token_id(genesis_alp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(alp_utxos, utxos): assert_equal(utxo, proto_utxo) utxos = chronik.token_id(genesis2_alp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(alp2_utxos, utxos): assert_equal(utxo, proto_utxo) for tx in slp_txs: assert_equal(ws1.recv(), ws_msg(tx.txid, pb.TX_CONFIRMED)) for tx in alp_txs: assert_equal(ws2.recv(), ws_msg(tx.txid, pb.TX_CONFIRMED)) for tx in alp2_txs: assert_equal(ws3.recv(), ws_msg(tx.txid, pb.TX_CONFIRMED)) # Undo block + test again node.invalidateblock(block_hash) for tx in txs: tx.test(chronik) history_txs = chronik.token_id(genesis_slp.txid).unconfirmed_txs().ok().txs for tx, proto_tx in zip_longest(slp_txs, history_txs): tx.test_tx(proto_tx) history_txs = chronik.token_id(genesis_alp.txid).unconfirmed_txs().ok().txs for tx, proto_tx in zip_longest(alp_txs, history_txs): tx.test_tx(proto_tx) history_txs = chronik.token_id(genesis2_alp.txid).unconfirmed_txs().ok().txs for tx, proto_tx in zip_longest(alp2_txs, history_txs): tx.test_tx(proto_tx) for utxo in slp_utxos + alp_utxos + alp2_utxos: utxo.block_height = -1 utxos = chronik.token_id(genesis_slp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(slp_utxos, utxos): assert_equal(utxo, proto_utxo) utxos = chronik.token_id(genesis_alp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(alp_utxos, utxos): assert_equal(utxo, proto_utxo) utxos = chronik.token_id(genesis2_alp.txid).utxos().ok().utxos for utxo, proto_utxo in zip_longest(alp2_utxos, utxos): assert_equal(utxo, proto_utxo) # TX_ADDED_TO_MEMPOOL are coming in topologically for tx in [genesis_slp, send_alp]: assert_equal(ws1.recv(), ws_msg(tx.txid, pb.TX_ADDED_TO_MEMPOOL)) for tx in [genesis_alp, send_alp]: assert_equal(ws2.recv(), ws_msg(tx.txid, pb.TX_ADDED_TO_MEMPOOL)) for tx in [genesis2_alp, send_alp]: assert_equal(ws3.recv(), ws_msg(tx.txid, pb.TX_ADDED_TO_MEMPOOL)) if __name__ == "__main__": ChronikTokenBurn().main() diff --git a/test/functional/chronik_ws_script.py b/test/functional/chronik_ws_script.py index 4b226687d..59aaaed72 100755 --- a/test/functional/chronik_ws_script.py +++ b/test/functional/chronik_ws_script.py @@ -1,225 +1,223 @@ #!/usr/bin/env python3 # Copyright (c) 2023 The Bitcoin developers # Distributed under the MIT software license, see the accompanying # file COPYING or http://www.opensource.org/licenses/mit-license.php. """Test whether Chronik sends WebSocket messages correctly.""" from test_framework.address import ( ADDRESS_ECREG_P2SH_OP_TRUE, ADDRESS_ECREG_UNSPENDABLE, SCRIPTSIG_OP_TRUE, ) from test_framework.avatools import can_find_inv_in_poll, get_ava_p2p_interface from test_framework.blocktools import ( create_block, create_coinbase, make_conform_to_ctor, ) from test_framework.messages import COutPoint, CTransaction, CTxIn, CTxOut from test_framework.p2p import P2PDataStore from test_framework.script import OP_EQUAL, OP_HASH160, CScript, hash160 from test_framework.test_framework import BitcoinTestFramework from test_framework.txtools import pad_tx from test_framework.util import assert_equal QUORUM_NODE_COUNT = 16 class ChronikWsScriptTest(BitcoinTestFramework): def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 self.extra_args = [ [ "-avaproofstakeutxodustthreshold=1000000", "-avaproofstakeutxoconfirmations=1", "-avacooldown=0", "-avaminquorumstake=0", "-avaminavaproofsnodecount=0", "-chronik", "-whitelist=noban@127.0.0.1", ], ] self.supports_cli = False self.rpc_timeout = 240 def skip_test_if_missing_module(self): self.skip_if_no_chronik() def run_test(self): node = self.nodes[0] chronik = node.get_chronik_client() node.setmocktime(1300000000) peer = node.add_p2p_connection(P2PDataStore()) # Make us a coin coinblockhash = self.generatetoaddress(node, 1, ADDRESS_ECREG_P2SH_OP_TRUE)[0] coinblock = node.getblock(coinblockhash) cointx = coinblock["tx"][0] # Set up Avalanche def get_quorum(): return [ get_ava_p2p_interface(self, node) for _ in range(0, QUORUM_NODE_COUNT) ] def has_finalized_tip(tip_expected): hash_tip_final = int(tip_expected, 16) can_find_inv_in_poll(quorum, hash_tip_final) return node.isfinalblock(tip_expected) quorum = get_quorum() assert node.getavalancheinfo()["ready_to_poll"] is True tip = self.generatetoaddress(node, 100, ADDRESS_ECREG_UNSPENDABLE)[-1] # Tx sending to 4 different scripts coinvalue = 5000000000 send_values = [coinvalue - 10000, 1000, 1000, 1000] send_redeem_scripts = [bytes([i + 0x52]) for i in range(len(send_values))] send_script_hashes = [hash160(script) for script in send_redeem_scripts] send_scripts = [ CScript([OP_HASH160, script_hash, OP_EQUAL]) for script_hash in send_script_hashes ] tx = CTransaction() tx.vin = [ CTxIn(outpoint=COutPoint(int(cointx, 16), 0), scriptSig=SCRIPTSIG_OP_TRUE) ] tx.vout = [ CTxOut(value, script) for (value, script) in zip(send_values, send_scripts) ] # Connect 2 websocket clients ws1 = chronik.ws() ws2 = chronik.ws() # Subscribe to 2 scripts on ws1 and 1 on ws2 ws1.sub_script("p2sh", send_script_hashes[1]) ws1.sub_script("p2sh", send_script_hashes[2]) ws2.sub_script("p2sh", send_script_hashes[2]) # Send the tx, will send updates to ws1 and ws2 txid = node.sendrawtransaction(tx.serialize().hex()) self.wait_until(lambda: txid in node.getrawmempool()) from test_framework.chronik.client import pb expected_msg = pb.WsMsg( tx=pb.MsgTx( msg_type=pb.TX_ADDED_TO_MEMPOOL, txid=bytes.fromhex(txid)[::-1], ) ) - # ws1 receives the tx msg twice, as it contains both scripts - assert_equal(ws1.recv(), expected_msg) assert_equal(ws1.recv(), expected_msg) assert_equal(ws2.recv(), expected_msg) # Unsubscribe ws1 from the other script ws2 is subscribed to ws1.sub_script("p2sh", send_script_hashes[2], is_unsub=True) # tx2 is only sent to ws2 tx2 = CTransaction() tx2.vin = [ CTxIn( outpoint=COutPoint(int(txid, 16), 2), scriptSig=CScript([send_redeem_scripts[2]]), ) ] pad_tx(tx2) txid2 = node.sendrawtransaction(tx2.serialize().hex()) assert_equal( ws2.recv(), pb.WsMsg( tx=pb.MsgTx( msg_type=pb.TX_ADDED_TO_MEMPOOL, txid=bytes.fromhex(txid2)[::-1], ) ), ) # tx3 is only sent to ws1 tx3 = CTransaction() tx3.vin = [ CTxIn( outpoint=COutPoint(int(txid, 16), 1), scriptSig=CScript([send_redeem_scripts[1]]), ) ] pad_tx(tx3) txid3 = node.sendrawtransaction(tx3.serialize().hex()) assert_equal( ws1.recv(), pb.WsMsg( tx=pb.MsgTx( msg_type=pb.TX_ADDED_TO_MEMPOOL, txid=bytes.fromhex(txid3)[::-1], ) ), ) # Tweak tx3 to cause a conflict tx3_conflict = CTransaction(tx3) tx3_conflict.nLockTime = 1 tx3_conflict.rehash() # Mine tx, tx2 and tx3_conflict height = 102 block = create_block( int(tip, 16), create_coinbase(height, b"\x03" * 33), 1300000500 ) block.vtx += [tx, tx2, tx3_conflict] make_conform_to_ctor(block) block.hashMerkleRoot = block.calc_merkle_root() block.solve() peer.send_blocks_and_test([block], node) def check_tx_msgs(ws, msg_type, txids): for txid in txids: assert_equal( ws.recv(), pb.WsMsg( tx=pb.MsgTx( msg_type=msg_type, txid=bytes.fromhex(txid)[::-1], ) ), ) # For ws1, this sends a REMOVED_FROM_MEMPOOL for tx3, and two CONFIRMED check_tx_msgs(ws1, pb.TX_REMOVED_FROM_MEMPOOL, [tx3.hash]) check_tx_msgs(ws1, pb.TX_CONFIRMED, sorted([txid, tx3_conflict.hash])) # For ws2, this only sends the CONFIRMED msgs check_tx_msgs(ws2, pb.TX_CONFIRMED, sorted([txid, txid2])) # Invalidate the block again node.invalidateblock(block.hash) # Adds the disconnected block's txs back into the mempool check_tx_msgs(ws1, pb.TX_ADDED_TO_MEMPOOL, [txid, tx3_conflict.hash]) check_tx_msgs(ws2, pb.TX_ADDED_TO_MEMPOOL, [txid, txid2]) # Test Avalanche finalization tip = node.getbestblockhash() self.wait_until(lambda: has_finalized_tip(tip)) # Mine txs in a block -> sends CONFIRMED tip = self.generate(node, 1)[-1] check_tx_msgs(ws1, pb.TX_CONFIRMED, sorted([txid, tx3_conflict.hash])) check_tx_msgs(ws2, pb.TX_CONFIRMED, sorted([txid, txid2])) # Wait for Avalanche finalization of block -> sends TX_FINALIZED self.wait_until(lambda: has_finalized_tip(tip)) check_tx_msgs(ws1, pb.TX_FINALIZED, sorted([txid, tx3_conflict.hash])) check_tx_msgs(ws2, pb.TX_FINALIZED, sorted([txid, txid2])) ws1.close() ws2.close() if __name__ == "__main__": ChronikWsScriptTest().main()