diff --git a/chronik/chronik-db/src/mem/mempool.rs b/chronik/chronik-db/src/mem/mempool.rs --- a/chronik/chronik-db/src/mem/mempool.rs +++ b/chronik/chronik-db/src/mem/mempool.rs @@ -4,7 +4,7 @@ //! Module for [`Mempool`], to index mempool txs. -use std::collections::HashMap; +use std::{borrow::Cow, collections::HashMap}; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; @@ -33,8 +33,17 @@ token_id_utxos: MempoolTokenIdUtxos, } +/// Result after adding a tx to the mempool +#[derive(Debug)] +pub struct MempoolResult<'m> { + /// Mempool tx that was just added + pub mempool_tx: Cow<'m, MempoolTx>, + /// [`TokenIdGroupAux`] generated while indexing the mempool tx + pub token_id_aux: TokenIdGroupAux, +} + /// Transaction in the mempool. -#[derive(Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct MempoolTx { /// Transaction, including spent coins. pub tx: Tx, @@ -71,7 +80,11 @@ } /// Insert tx into the mempool. - pub fn insert(&mut self, db: &Db, mempool_tx: MempoolTx) -> Result<()> { + pub fn insert( + &mut self, + db: &Db, + mempool_tx: MempoolTx, + ) -> Result<MempoolResult<'_>> { let txid = mempool_tx.tx.txid(); self.script_history.insert(&mempool_tx, &()); self.script_utxos.insert( @@ -93,11 +106,14 @@ if self.txs.insert(txid, mempool_tx).is_some() { return Err(DuplicateTx(txid).into()); } - Ok(()) + Ok(MempoolResult { + mempool_tx: Cow::Borrowed(&self.txs[&txid]), + token_id_aux, + }) } /// Remove tx from the mempool. - pub fn remove(&mut self, txid: TxId) -> Result<MempoolTx> { + pub fn remove(&mut self, txid: TxId) -> Result<MempoolResult<'_>> { let mempool_tx = match self.txs.remove(&txid) { Some(mempool_tx) => mempool_tx, None => return Err(NoSuchMempoolTx(txid).into()), @@ -118,7 +134,10 @@ &token_id_aux, )?; self.tokens.remove(&txid); - Ok(mempool_tx) + Ok(MempoolResult { + mempool_tx: Cow::Owned(mempool_tx), + token_id_aux, + }) } /// Remove mined tx from the mempool. diff --git a/chronik/chronik-http/src/ws.rs b/chronik/chronik-http/src/ws.rs --- a/chronik/chronik-http/src/ws.rs +++ b/chronik/chronik-http/src/ws.rs @@ -9,6 +9,7 @@ use abc_rust_error::Result; use axum::extract::ws::{self, WebSocket}; use bitcoinsuite_core::script::ScriptVariant; +use bitcoinsuite_slp::token_id::TokenId; use chronik_indexer::{ subs::{BlockMsg, BlockMsgType}, subs_group::{TxMsg, TxMsgType}, @@ -55,14 +56,17 @@ enum WsSubType { Blocks, Script(ScriptVariant), + TokenId(TokenId), } type SubRecvBlocks = Option<broadcast::Receiver<BlockMsg>>; type SubRecvScripts = HashMap<ScriptVariant, broadcast::Receiver<TxMsg>>; +type SubRecvTokenId = HashMap<TokenId, broadcast::Receiver<TxMsg>>; struct SubRecv { blocks: SubRecvBlocks, scripts: SubRecvScripts, + token_ids: SubRecvTokenId, ws_ping_interval: Duration, } @@ -71,6 +75,7 @@ tokio::select! { action = Self::recv_blocks(&mut self.blocks) => action, action = Self::recv_scripts(&mut self.scripts) => action, + action = Self::recv_token_ids(&mut self.token_ids) => action, action = Self::schedule_ping(self.ws_ping_interval) => action, } } @@ -92,8 +97,24 @@ .values_mut() .map(|receiver| Box::pin(receiver.recv())), ); - let (script_msg, _, _) = script_receivers.await; - sub_script_msg_action(script_msg) + let (tx_msg, _, _) = script_receivers.await; + sub_tx_msg_action(tx_msg) + } + } + + async fn recv_token_ids( + token_ids: &mut SubRecvTokenId, + ) -> Result<WsAction> { + if token_ids.is_empty() { + futures::future::pending().await + } else { + let token_ids_receivers = select_all( + token_ids + .values_mut() + .map(|receiver| Box::pin(receiver.recv())), + ); + let (tx_msg, _, _) = token_ids_receivers.await; + sub_tx_msg_action(tx_msg) } } @@ -132,6 +153,18 @@ self.scripts.insert(script_variant, recv); } } + WsSubType::TokenId(token_id) => { + if sub.is_unsub { + log_chronik!("WS unsubscribe from {:?}\n", token_id); + std::mem::drop(self.token_ids.remove(&token_id)); + subs.subs_token_id_mut().unsubscribe_from_member(&token_id) + } else { + log_chronik!("WS subscribe to {:?}\n", token_id); + let recv = + subs.subs_token_id_mut().subscribe_to_member(&token_id); + self.token_ids.insert(token_id, recv); + } + } } } @@ -171,6 +204,9 @@ &script.payload, )?) } + Some(SubType::TokenId(token_id)) => WsSubType::TokenId( + token_id.token_id.parse::<TokenId>()?, + ), }, })) } @@ -205,15 +241,15 @@ Ok(WsAction::Message(msg)) } -fn sub_script_msg_action( - script_msg: Result<TxMsg, broadcast::error::RecvError>, +fn sub_tx_msg_action( + tx_msg: Result<TxMsg, broadcast::error::RecvError>, ) -> Result<WsAction> { use proto::{ws_msg::MsgType, TxMsgType::*}; - let script_msg = match script_msg { - Ok(script_msg) => script_msg, + let tx_msg = match tx_msg { + Ok(tx_msg) => tx_msg, Err(_) => return Ok(WsAction::Nothing), }; - let tx_msg_type = match script_msg.msg_type { + let tx_msg_type = match tx_msg.msg_type { TxMsgType::AddedToMempool => TxAddedToMempool, TxMsgType::RemovedFromMempool => TxRemovedFromMempool, TxMsgType::Confirmed => TxConfirmed, @@ -221,7 +257,7 @@ }; let msg_type = Some(MsgType::Tx(proto::MsgTx { msg_type: tx_msg_type as _, - txid: script_msg.txid.to_vec(), + txid: tx_msg.txid.to_vec(), })); let msg_proto = proto::WsMsg { msg_type }; let msg = ws::Message::Binary(msg_proto.encode_to_vec()); @@ -238,6 +274,7 @@ let mut recv = SubRecv { blocks: Default::default(), scripts: Default::default(), + token_ids: Default::default(), ws_ping_interval: settings.ws_ping_interval, }; diff --git a/chronik/chronik-indexer/src/indexer.rs b/chronik/chronik-indexer/src/indexer.rs --- a/chronik/chronik-indexer/src/indexer.rs +++ b/chronik/chronik-indexer/src/indexer.rs @@ -23,7 +23,7 @@ merge, token::TokenWriter, BlockHeight, BlockReader, BlockStatsWriter, BlockTxs, BlockWriter, DbBlock, GroupHistoryMemData, GroupUtxoMemData, MetadataReader, MetadataWriter, SchemaVersion, SpentByWriter, TxEntry, - TxWriter, + TxReader, TxWriter, }, mem::{MemData, MemDataConf, Mempool, MempoolTx}, }; @@ -300,10 +300,12 @@ &mut self, mempool_tx: MempoolTx, ) -> Result<()> { - self.subs - .get_mut() - .handle_tx_event(&mempool_tx.tx, TxMsgType::AddedToMempool); - self.mempool.insert(&self.db, mempool_tx)?; + let result = self.mempool.insert(&self.db, mempool_tx)?; + self.subs.get_mut().handle_tx_event( + &result.mempool_tx.tx, + TxMsgType::AddedToMempool, + &result.token_id_aux, + ); Ok(()) } @@ -311,10 +313,12 @@ /// etc. This is not called when the transaction has been mined (and thus /// also removed from the mempool). pub fn handle_tx_removed_from_mempool(&mut self, txid: TxId) -> Result<()> { - let mempool_tx = self.mempool.remove(txid)?; - self.subs - .get_mut() - .handle_tx_event(&mempool_tx.tx, TxMsgType::RemovedFromMempool); + let result = self.mempool.remove(txid)?; + self.subs.get_mut().handle_tx_event( + &result.mempool_tx.tx, + TxMsgType::RemovedFromMempool, + &result.token_id_aux, + ); Ok(()) } @@ -393,7 +397,7 @@ height: block.db_block.height, }); for tx in &block.txs { - subs.handle_tx_event(tx, TxMsgType::Confirmed); + subs.handle_tx_event(tx, TxMsgType::Confirmed, &token_id_aux); } Ok(()) } @@ -480,8 +484,15 @@ hash: block.db_block.hash, height: block.db_block.height, }); + let tx_reader = TxReader::new(&self.db)?; + let first_tx_num = tx_reader + .first_tx_num_by_block(block.db_block.height)? + .unwrap(); + let index_txs = + prepare_indexed_txs(&self.db, first_tx_num, &block.txs)?; + let token_id_aux = TokenIdGroupAux::from_db(&index_txs, &self.db)?; for tx in &block.txs { - subs.handle_tx_event(tx, TxMsgType::Finalized); + subs.handle_tx_event(tx, TxMsgType::Finalized, &token_id_aux); } Ok(()) } diff --git a/chronik/chronik-indexer/src/subs.rs b/chronik/chronik-indexer/src/subs.rs --- a/chronik/chronik-indexer/src/subs.rs +++ b/chronik/chronik-indexer/src/subs.rs @@ -5,7 +5,10 @@ //! Module containing [`Subs`]. use bitcoinsuite_core::{block::BlockHash, tx::Tx}; -use chronik_db::{groups::ScriptGroup, io::BlockHeight}; +use chronik_db::{ + groups::{ScriptGroup, TokenIdGroup, TokenIdGroupAux}, + io::BlockHeight, +}; use chronik_util::log; use tokio::sync::broadcast; @@ -40,6 +43,7 @@ pub struct Subs { subs_block: broadcast::Sender<BlockMsg>, subs_script: SubsGroup<ScriptGroup>, + subs_token_id: SubsGroup<TokenIdGroup>, } impl Subs { @@ -48,6 +52,7 @@ Subs { subs_block: broadcast::channel(BLOCK_CHANNEL_CAPACITY).0, subs_script: SubsGroup::new(script_group), + subs_token_id: SubsGroup::new(TokenIdGroup), } } @@ -61,9 +66,21 @@ &mut self.subs_script } + /// Mutable reference to the token ID subscribers. + pub fn subs_token_id_mut(&mut self) -> &mut SubsGroup<TokenIdGroup> { + &mut self.subs_token_id + } + /// Send out updates to subscribers for this tx and msg_type. - pub fn handle_tx_event(&mut self, tx: &Tx, msg_type: TxMsgType) { + pub fn handle_tx_event( + &mut self, + tx: &Tx, + msg_type: TxMsgType, + token_id_aux: &TokenIdGroupAux, + ) { self.subs_script.handle_tx_event(tx, &(), msg_type); + self.subs_token_id + .handle_tx_event(tx, token_id_aux, msg_type); } pub(crate) fn broadcast_block_msg(&self, msg: BlockMsg) { diff --git a/chronik/chronik-indexer/src/subs_group.rs b/chronik/chronik-indexer/src/subs_group.rs --- a/chronik/chronik-indexer/src/subs_group.rs +++ b/chronik/chronik-indexer/src/subs_group.rs @@ -20,7 +20,7 @@ } /// What happened to a tx. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum TxMsgType { /// Tx was added to the mempool. AddedToMempool, diff --git a/chronik/chronik-proto/proto/chronik.proto b/chronik/chronik-proto/proto/chronik.proto --- a/chronik/chronik-proto/proto/chronik.proto +++ b/chronik/chronik-proto/proto/chronik.proto @@ -406,6 +406,8 @@ WsSubBlocks blocks = 2; // Subscription to a script WsSubScript script = 3; + // Subscription to a token ID + WsSubTokenId token_id = 4; } } @@ -413,7 +415,7 @@ // disconnected or finalized. message WsSubBlocks {} -// Subscription to a script. They will be send every time a tx spending the +// Subscription to a script. They will be sent every time a tx spending the // given script or sending to the given script has been added to/removed from // the mempool, or confirmed in a block. message WsSubScript { @@ -426,6 +428,13 @@ bytes payload = 2; } +// Subscription to a token ID. They will be sent every time a tx spending or +// sending tokens with the token ID. +message WsSubTokenId { + // Hex token ID to subscribe to. + string token_id = 1; +} + // Message coming from the WebSocket message WsMsg { // Kind of message diff --git a/test/functional/chronik_token_id_group.py b/test/functional/chronik_token_id_group.py --- a/test/functional/chronik_token_id_group.py +++ b/test/functional/chronik_token_id_group.py @@ -46,8 +46,18 @@ **kwargs, ) + def ws_msg(txid: str, msg_type): + return pb.WsMsg( + tx=pb.MsgTx( + msg_type=msg_type, + txid=bytes.fromhex(txid)[::-1], + ) + ) + node = self.nodes[0] chronik = node.get_chronik_client() + ws1 = chronik.ws() + ws2 = chronik.ws() mocktime = 1300000000 node.setmocktime(mocktime) @@ -103,8 +113,10 @@ ), ) txs.append(genesis_slp) + ws1.sub_token_id(genesis_slp.txid) genesis_slp.send(chronik) genesis_slp.test(chronik) + assert_equal(ws1.recv(), ws_msg(genesis_slp.txid, pb.TX_ADDED_TO_MEMPOOL)) tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(genesis_slp.txid, 16), 3), SCRIPTSIG_OP_TRUE)] @@ -147,8 +159,10 @@ ), ) txs.append(genesis_alp) + ws1.sub_token_id(genesis_alp.txid) genesis_alp.send(chronik) genesis_alp.test(chronik) + assert_equal(ws1.recv(), ws_msg(genesis_alp.txid, pb.TX_ADDED_TO_MEMPOOL)) tx = CTransaction() tx.vin = [CTxIn(COutPoint(int(genesis_alp.txid, 16), 4), SCRIPTSIG_OP_TRUE)] @@ -189,8 +203,10 @@ ), ) txs.append(genesis2_alp) + ws2.sub_token_id(genesis2_alp.txid) genesis2_alp.send(chronik) genesis2_alp.test(chronik) + assert_equal(ws2.recv(), ws_msg(genesis2_alp.txid, pb.TX_ADDED_TO_MEMPOOL)) tx = CTransaction() tx.vin = [ @@ -251,6 +267,11 @@ error=f"400: Tx {send_alp.txid} failed token checks: Unexpected burn: Burns 5000 base tokens.", ) send_alp.test(chronik) + expected_msg = ws_msg(send_alp.txid, pb.TX_ADDED_TO_MEMPOOL) + # ws1 subscribed to both genesis_slp and genesis_alp, so it get's a ws msg twice + assert_equal(ws1.recv(), expected_msg) + assert_equal(ws1.recv(), expected_msg) + assert_equal(ws2.recv(), expected_msg) slp_txs = sorted([genesis_slp, send_alp], key=lambda tx: tx.txid) history_txs = chronik.token_id(genesis_slp.txid).unconfirmed_txs().ok().txs @@ -363,6 +384,13 @@ for utxo, proto_utxo in zip_longest(alp2_utxos, utxos): assert_equal(utxo, proto_utxo) + # Resubscribe so ws1=genesis_slp.txid, ws2=genesis_alp.txid, ws3=genesis2_alp.txid + ws1.sub_token_id(genesis_alp.txid, is_unsub=True) + ws2.sub_token_id(genesis2_alp.txid, is_unsub=True) + ws2.sub_token_id(genesis_alp.txid) + ws3 = chronik.ws() + ws3.sub_token_id(genesis2_alp.txid) + # After mining, all txs still work fine block_hash = self.generatetoaddress(node, 1, ADDRESS_ECREG_UNSPENDABLE)[0] for tx in txs: @@ -393,6 +421,13 @@ for utxo, proto_utxo in zip_longest(alp2_utxos, utxos): assert_equal(utxo, proto_utxo) + for tx in slp_txs: + assert_equal(ws1.recv(), ws_msg(tx.txid, pb.TX_CONFIRMED)) + for tx in alp_txs: + assert_equal(ws2.recv(), ws_msg(tx.txid, pb.TX_CONFIRMED)) + for tx in alp2_txs: + assert_equal(ws3.recv(), ws_msg(tx.txid, pb.TX_CONFIRMED)) + # Undo block + test again node.invalidateblock(block_hash) for tx in txs: @@ -421,6 +456,14 @@ for utxo, proto_utxo in zip_longest(alp2_utxos, utxos): assert_equal(utxo, proto_utxo) + # TX_ADDED_TO_MEMPOOL are coming in topologically + for tx in [genesis_slp, send_alp]: + assert_equal(ws1.recv(), ws_msg(tx.txid, pb.TX_ADDED_TO_MEMPOOL)) + for tx in [genesis_alp, send_alp]: + assert_equal(ws2.recv(), ws_msg(tx.txid, pb.TX_ADDED_TO_MEMPOOL)) + for tx in [genesis2_alp, send_alp]: + assert_equal(ws3.recv(), ws_msg(tx.txid, pb.TX_ADDED_TO_MEMPOOL)) + if __name__ == "__main__": ChronikTokenBurn().main() diff --git a/test/functional/test_framework/chronik/client.py b/test/functional/test_framework/chronik/client.py --- a/test/functional/test_framework/chronik/client.py +++ b/test/functional/test_framework/chronik/client.py @@ -193,6 +193,13 @@ ) self.send_bytes(sub.SerializeToString()) + def sub_token_id(self, token_id: str, *, is_unsub=False) -> None: + sub = pb.WsSub( + is_unsub=is_unsub, + token_id=pb.WsSubTokenId(token_id=token_id), + ) + self.send_bytes(sub.SerializeToString()) + def close(self): self.ws.close() self.ws_thread.join(self.timeout)