diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -657,6 +657,7 @@ RadixTree sharedProofs; + std::atomic lastSharedProofsUpdate{0s}; std::atomic compactproofs_requested{false}; }; diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -108,6 +108,12 @@ /** Minimum time between 2 successives getavaaddr messages from the same peer */ static constexpr std::chrono::minutes GETAVAADDR_INTERVAL{2}; +/** + * If no proof was requested from a compact proof message after this timeout + * expired, the proof radix tree can be cleaned up. + */ +static constexpr std::chrono::minutes AVALANCHE_AVAPROOFS_TIMEOUT{2}; + struct DataRequestParameters { /** * Maximum number of in-flight data requests from a peer. It is not a hard @@ -524,10 +530,9 @@ void UpdateAvalancheStatistics() const; /** - * Send a getavaaddr message to one of our avalanche outbounds if we are - * missing good nodes. + * Process periodic avalanche network messaging and cleanups. */ - void MaybeRequestAvalancheNodes(CScheduler &scheduler) const; + void AvalanchePeriodicNetworking(CScheduler &scheduler) const; /** * Get a shared pointer to the Peer object. @@ -1678,47 +1683,68 @@ (pnode->IsManualConn() && (pnode->nServices & NODE_AVALANCHE)); } -void PeerManagerImpl::MaybeRequestAvalancheNodes(CScheduler &scheduler) const { - if (g_avalanche && - (!g_avalanche->isQuorumEstablished() || - g_avalanche->withPeerManager([&](avalanche::PeerManager &pm) { - return pm.shouldRequestMoreNodes(); - }))) { - std::vector avanode_outbound_ids; - m_connman.ForEachNode([&](CNode *pnode) { - if (isAvalancheOutboundOrManual(pnode)) { - avanode_outbound_ids.push_back(pnode->GetId()); - } - }); +void PeerManagerImpl::AvalanchePeriodicNetworking(CScheduler &scheduler) const { + const auto now = GetTime(); + std::vector avanode_outbound_ids; + + if (!g_avalanche) { + // Not enabled or not ready yet, retry later + goto scheduleLater; + } + m_connman.ForEachNode([&](CNode *pnode) { + // Build a list of the avalanche manual or outbound peers nodeids + if (isAvalancheOutboundOrManual(pnode)) { + avanode_outbound_ids.push_back(pnode->GetId()); + } + + // If a proof radix tree timed out, cleanup + if (pnode->m_proof_relay && + now > (pnode->m_proof_relay->lastSharedProofsUpdate.load() + + AVALANCHE_AVAPROOFS_TIMEOUT)) { + LogPrint(BCLog::AVALANCHE, + "Cleaning up timed out compact proofs from peer %d\n", + pnode->GetId()); + pnode->m_proof_relay->sharedProofs = {}; + } + }); + + if (avanode_outbound_ids.empty()) { + // Not node is available for messaging, retry later + goto scheduleLater; + } + + if (!g_avalanche->isQuorumEstablished() || + g_avalanche->withPeerManager([&](avalanche::PeerManager &pm) { + return pm.shouldRequestMoreNodes(); + })) { // Randomly select an avalanche outbound peer to send the getavaaddr // message to - if (!avanode_outbound_ids.empty()) { - Shuffle(avanode_outbound_ids.begin(), avanode_outbound_ids.end(), - FastRandomContext()); - const NodeId avanodeId = avanode_outbound_ids.front(); - - m_connman.ForNode(avanodeId, [&](CNode *pavanode) { - LogPrint(BCLog::AVALANCHE, - "Requesting more avalanche addresses to peer %d\n", - avanodeId); - m_connman.PushMessage(pavanode, - CNetMsgMaker(pavanode->GetCommonVersion()) - .Make(NetMsgType::GETAVAADDR)); - PeerRef peer = GetPeerRef(avanodeId); - WITH_LOCK(peer->m_addr_token_bucket_mutex, - peer->m_addr_token_bucket += GetMaxAddrToSend()); - return true; - }); - } + Shuffle(avanode_outbound_ids.begin(), avanode_outbound_ids.end(), + FastRandomContext()); + const NodeId avanodeId = avanode_outbound_ids.front(); + + m_connman.ForNode(avanodeId, [&](CNode *pavanode) { + LogPrint(BCLog::AVALANCHE, + "Requesting more avalanche addresses to peer %d\n", + avanodeId); + m_connman.PushMessage(pavanode, + CNetMsgMaker(pavanode->GetCommonVersion()) + .Make(NetMsgType::GETAVAADDR)); + PeerRef peer = GetPeerRef(avanodeId); + WITH_LOCK(peer->m_addr_token_bucket_mutex, + peer->m_addr_token_bucket += GetMaxAddrToSend()); + return true; + }); } +scheduleLater: // Schedule next run for 2-5 minutes in the future. // We add randomness on every cycle to avoid the possibility of P2P // fingerprinting. - const auto requestAvalancheNodesInteval = 2min + GetRandMillis(3min); - scheduler.scheduleFromNow([&] { MaybeRequestAvalancheNodes(scheduler); }, - requestAvalancheNodesInteval); + const auto avalanchePeriodicNetworkingInterval = 2min + GetRandMillis(3min); + scheduler.scheduleFromNow([&] { AvalanchePeriodicNetworking(scheduler); }, + avalanchePeriodicNetworkingInterval); } void PeerManagerImpl::FinalizeNode(const Config &config, const CNode &node, @@ -2057,9 +2083,9 @@ AVALANCHE_STATISTICS_REFRESH_PERIOD); // schedule next run for 2-5 minutes in the future - const auto requestAvalancheNodesInteval = 2min + GetRandMillis(3min); - scheduler.scheduleFromNow([&] { MaybeRequestAvalancheNodes(scheduler); }, - requestAvalancheNodesInteval); + const auto avalanchePeriodicNetworkingInterval = 2min + GetRandMillis(3min); + scheduler.scheduleFromNow([&] { AvalanchePeriodicNetworking(scheduler); }, + avalanchePeriodicNetworkingInterval); } /** @@ -5166,6 +5192,9 @@ return; } + pfrom.m_proof_relay->lastSharedProofsUpdate = + GetTime(); + pfrom.m_proof_relay->sharedProofs = g_avalanche->withPeerManager([&](const avalanche::PeerManager &pm) { return pm.getShareableProofsSnapshot(); diff --git a/test/functional/abc_p2p_compactproofs.py b/test/functional/abc_p2p_compactproofs.py --- a/test/functional/abc_p2p_compactproofs.py +++ b/test/functional/abc_p2p_compactproofs.py @@ -7,6 +7,7 @@ """ import random +import time from test_framework.avatools import ( AvaP2PInterface, @@ -28,6 +29,11 @@ from test_framework.test_framework import BitcoinTestFramework from test_framework.util import MAX_NODES, assert_equal, p2p_port +# Timeout after which the proofs can be cleaned up +AVALANCHE_AVAPROOFS_TIMEOUT = 2 * 60 +# Max interval between 2 periodic networking processing +AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL = 5 * 60 + class ProofStoreP2PInterface(AvaP2PInterface): def __init__(self): @@ -414,6 +420,30 @@ # All check_received_proofs(range(numof_proof)) + self.log.info( + "Check the node will not send the proofs if not requested before the timeout elapsed") + + mocktime = int(time.time()) + node.setmocktime(mocktime) + + slow_peer = node.add_p2p_connection(ProofStoreP2PInterface()) + slow_peer.nodeid = node.getpeerinfo()[-1]['id'] + _ = request_proofs(slow_peer) + + # Elapse the timeout + mocktime += AVALANCHE_AVAPROOFS_TIMEOUT + 1 + node.setmocktime(mocktime) + + with node.assert_debug_log([f"Cleaning up timed out compact proofs from peer {slow_peer.nodeid}"]): + node.mockscheduler(AVALANCHE_MAX_PERIODIC_NETWORKING_INTERVAL) + + req = msg_avaproofsreq() + req.indices = range(numof_proof) + slow_peer.send_and_ping(req) + + # Check we get no proof + assert_equal(len(slow_peer.get_proofs()), 0) + def test_compact_proofs_download_on_connect(self): self.log.info( "Check the node get compact proofs upon avalanche outbound discovery")