diff --git a/chronik/chronik-cpp/chronik.cpp b/chronik/chronik-cpp/chronik.cpp --- a/chronik/chronik-cpp/chronik.cpp +++ b/chronik/chronik-cpp/chronik.cpp @@ -4,11 +4,13 @@ #include #include +#include #include #include #include #include #include +#include #include #include @@ -17,6 +19,15 @@ namespace chronik { +// Duration between WebSocket pings initiated by Chronik. +// 45s has been empirically established as a reliable duration for both browser +// and NodeJS WebSockets. +static constexpr std::chrono::seconds WS_PING_INTERVAL_DEFAULT{45s}; + +// Ping duration is just 5s on regtest to speed up ping tests and make +// functional tests more reliable. +static constexpr std::chrono::seconds WS_PING_INTERVAL_REGTEST{5s}; + template rust::Vec ToRustVec(const C &container) { rust::Vec vec; vec.reserve(container.size()); @@ -26,7 +37,8 @@ bool Start(const Config &config, const node::NodeContext &node, bool fWipe) { const bool is_pause_allowed = gArgs.GetBoolArg("-chronikallowpause", false); - if (is_pause_allowed && !config.GetChainParams().IsTestChain()) { + const CChainParams params = config.GetChainParams(); + if (is_pause_allowed && !params.IsTestChain()) { return InitError(_("Using -chronikallowpause on a mainnet chain is not " "allowed for security reasons.")); } @@ -40,6 +52,10 @@ .wipe_db = fWipe, .is_pause_allowed = is_pause_allowed, .enable_perf_stats = gArgs.GetBoolArg("-chronikperfstats", false), + .ws_ping_interval_secs = + params.NetworkIDString() == CBaseChainParams::REGTEST + ? uint64_t(count_seconds(WS_PING_INTERVAL_REGTEST)) + : uint64_t(count_seconds(WS_PING_INTERVAL_DEFAULT)), }, config, node); } diff --git a/chronik/chronik-http/src/server.rs b/chronik/chronik-http/src/server.rs --- a/chronik/chronik-http/src/server.rs +++ b/chronik/chronik-http/src/server.rs @@ -5,6 +5,7 @@ //! Module for [`ChronikServer`]. use std::collections::HashMap; +use std::time::Duration; use std::{net::SocketAddr, sync::Arc}; use abc_rust_error::{Result, WrapErr}; @@ -35,6 +36,13 @@ /// Ref-counted pause notifier for Chronik indexing pub type PauseNotifyRef = Arc; +/// Settings to tune Chronik +#[derive(Clone, Debug)] +pub struct ChronikSettings { + /// Duration between WebSocket pings initiated by Chronik. + pub ws_ping_interval: Duration, +} + /// Params defining what and where to serve for [`ChronikServer`]. #[derive(Clone, Debug)] pub struct ChronikServerParams { @@ -46,6 +54,8 @@ pub node: NodeRef, /// Handle for pausing/resuming indexing any updates from the node pub pause_notify: PauseNotifyRef, + /// Settings to tune Chronik + pub settings: ChronikSettings, } /// Chronik HTTP server, holding all the data/handles required to serve an @@ -56,6 +66,7 @@ indexer: ChronikIndexerRef, node: NodeRef, pause_notify: PauseNotifyRef, + settings: ChronikSettings, } /// Errors for [`ChronikServer`]. @@ -101,12 +112,18 @@ indexer: params.indexer, node: params.node, pause_notify: params.pause_notify, + settings: params.settings, }) } /// Serve a Chronik HTTP endpoint with the given parameters. pub async fn serve(self) -> Result<()> { - let app = Self::make_router(self.indexer, self.node, self.pause_notify); + let app = Self::make_router( + self.indexer, + self.node, + self.pause_notify, + self.settings, + ); let servers = self .server_builders .into_iter() @@ -128,6 +145,7 @@ indexer: ChronikIndexerRef, node: NodeRef, pause_notify: PauseNotifyRef, + settings: ChronikSettings, ) -> Router { Router::new() .route("/blockchain-info", routing::get(handle_blockchain_info)) @@ -160,6 +178,7 @@ .layer(Extension(indexer)) .layer(Extension(node)) .layer(Extension(pause_notify)) + .layer(Extension(settings)) } } @@ -304,6 +323,7 @@ async fn handle_ws( ws: WebSocketUpgrade, Extension(indexer): Extension, + Extension(settings): Extension, ) -> impl IntoResponse { - ws.on_upgrade(|ws| handle_subscribe_socket(ws, indexer)) + ws.on_upgrade(|ws| handle_subscribe_socket(ws, indexer, settings)) } diff --git a/chronik/chronik-http/src/ws.rs b/chronik/chronik-http/src/ws.rs --- a/chronik/chronik-http/src/ws.rs +++ b/chronik/chronik-http/src/ws.rs @@ -4,7 +4,7 @@ //! Module for [`handle_subscribe_socket`]. -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use abc_rust_error::Result; use axum::extract::ws::{self, WebSocket}; @@ -21,8 +21,9 @@ use tokio::sync::broadcast; use crate::{ - error::report_status_error, parse::parse_script_variant, - server::ChronikIndexerRef, + error::report_status_error, + parse::parse_script_variant, + server::{ChronikIndexerRef, ChronikSettings}, }; /// Errors for [`ChronikServer`]. @@ -59,10 +60,10 @@ type SubRecvBlocks = Option>; type SubRecvScripts = HashMap>; -#[derive(Default)] struct SubRecv { blocks: SubRecvBlocks, scripts: SubRecvScripts, + ws_ping_interval: Duration, } impl SubRecv { @@ -70,6 +71,7 @@ tokio::select! { action = Self::recv_blocks(&mut self.blocks) => action, action = Self::recv_scripts(&mut self.scripts) => action, + action = Self::schedule_ping(self.ws_ping_interval) => action, } } @@ -95,6 +97,12 @@ } } + async fn schedule_ping(ws_ping_interval: Duration) -> Result { + tokio::time::sleep(ws_ping_interval).await; + let ping_payload = b"Bitcoin ABC Chronik Indexer".to_vec(); + Ok(WsAction::Message(ws::Message::Ping(ping_payload))) + } + async fn handle_sub(&mut self, sub: WsSub, indexer: &ChronikIndexerRef) { let indexer = indexer.read().await; let mut subs = indexer.subs().write().await; @@ -179,7 +187,9 @@ block_msg: Result, ) -> Result { use proto::{ws_msg::MsgType, BlockMsgType::*}; - let Ok(block_msg) = block_msg else { return Ok(WsAction::Nothing) }; + let Ok(block_msg) = block_msg else { + return Ok(WsAction::Nothing); + }; let block_msg_type = match block_msg.msg_type { BlockMsgType::Connected => BlkConnected, BlockMsgType::Disconnected => BlkDisconnected, @@ -223,8 +233,13 @@ pub async fn handle_subscribe_socket( mut socket: WebSocket, indexer: ChronikIndexerRef, + settings: ChronikSettings, ) { - let mut recv = SubRecv::default(); + let mut recv = SubRecv { + blocks: Default::default(), + scripts: Default::default(), + ws_ping_interval: settings.ws_ping_interval, + }; loop { let sub_action = tokio::select! { diff --git a/chronik/chronik-lib/src/bridge.rs b/chronik/chronik-lib/src/bridge.rs --- a/chronik/chronik-lib/src/bridge.rs +++ b/chronik/chronik-lib/src/bridge.rs @@ -7,13 +7,16 @@ use std::{ net::{AddrParseError, IpAddr, SocketAddr}, sync::Arc, + time::Duration, }; use abc_rust_error::Result; use bitcoinsuite_core::tx::{Tx, TxId}; use chronik_bridge::{ffi::init_error, util::expect_unique_ptr}; use chronik_db::mem::MempoolTx; -use chronik_http::server::{ChronikServer, ChronikServerParams}; +use chronik_http::server::{ + ChronikServer, ChronikServerParams, ChronikSettings, +}; use chronik_indexer::{ indexer::{ChronikIndexer, ChronikIndexerParams, Node}, pause::Pause, @@ -96,6 +99,11 @@ indexer, node, pause_notify: Arc::new(pause_notify), + settings: ChronikSettings { + ws_ping_interval: Duration::from_secs( + params.ws_ping_interval_secs, + ), + }, }) } })?; diff --git a/chronik/chronik-lib/src/ffi.rs b/chronik/chronik-lib/src/ffi.rs --- a/chronik/chronik-lib/src/ffi.rs +++ b/chronik/chronik-lib/src/ffi.rs @@ -27,6 +27,8 @@ /// Whether to output Chronik performance statistics into a perf/ /// folder pub enable_perf_stats: bool, + /// Duration between WebSocket pings initiated by Chronik. + pub ws_ping_interval_secs: u64, } extern "Rust" { diff --git a/test/functional/chronik_ws_ping.py b/test/functional/chronik_ws_ping.py new file mode 100755 --- /dev/null +++ b/test/functional/chronik_ws_ping.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +# Copyright (c) 2024 The Bitcoin developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test whether Chronik sends regular WebSocket pings to keep connections open. +""" + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal + + +class ChronikWsPingTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + self.extra_args = [["-chronik"]] + self.supports_cli = False + + def skip_test_if_missing_module(self): + self.skip_if_no_chronik() + + def run_test(self): + node = self.nodes[0] + chronik = node.get_chronik_client() + + from test_framework.chronik.client import ChronikWs, pb + + class PingChronikWs(ChronikWs): + got_ping = False + + def on_ping(self, ws, message): + PingChronikWs.got_ping = True + + # Connect and subscribe to blocks. + # Disable pinging from the testing framework, otherwise they cancel Chronik's ping timeout. + ws = PingChronikWs(chronik, ping_interval=None, ping_timeout=None) + ws.sub_to_blocks() + + # Sanity WS check: mine and expect a CONNECTED msg + self.generate(node, 1)[-1] + assert_equal(ws.recv().block.msg_type, pb.BLK_CONNECTED) + + # Wait for ping while doing nothing. Ping interval is 5s on regtest. + # Note that interacting with the WS would reset the ping timer. + self.wait_until(lambda: PingChronikWs.got_ping) + + # Another sanity WS check to ensure the connection is actually still open + self.generate(node, 1)[-1] + assert_equal(ws.recv().block.msg_type, pb.BLK_CONNECTED) + + ws.close() + + +if __name__ == "__main__": + ChronikWsPingTest().main()