diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -692,6 +692,9 @@ // m_avalanche_state == nullptr if we're not using avalanche with this peer std::unique_ptr m_avalanche_state; + // Store the next time we will consider a getavaaddr message from this peer + std::chrono::seconds m_nextGetAvaAddr{0}; + /** * UNIX epoch time of the last block received from this peer that we had * not yet seen (e.g. not already received from another peer), that passed diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -105,6 +105,9 @@ "Max protocol message length must be greater than largest " "possible INV message"); +/** Minimum time between 2 successives getavaaddr messages from the same peer */ +static constexpr std::chrono::minutes GETAVAADDR_INTERVAL{10}; + struct DataRequestParameters { /** * Maximum number of in-flight data requests from a peer. It is not a hard @@ -3295,7 +3298,8 @@ return msg_type == NetMsgType::AVAHELLO || msg_type == NetMsgType::AVAPOLL || msg_type == NetMsgType::AVARESPONSE || - msg_type == NetMsgType::AVAPROOF; + msg_type == NetMsgType::AVAPROOF || + msg_type == NetMsgType::GETAVAADDR; } /** @@ -5114,6 +5118,55 @@ return; } + if (msg_type == NetMsgType::GETAVAADDR) { + auto now = GetTime(); + if (now < pfrom.m_nextGetAvaAddr) { + // Prevent a peer from exhausting our resources by spamming + // getavaaddr messages. + LogPrint(BCLog::AVALANCHE, + "Ignoring repeated getavaaddr from peer %d\n", + pfrom.GetId()); + return; + } + + // Only accept a getavaaddr every GETAVAADDR_INTERVAL at most + pfrom.m_nextGetAvaAddr = now + GETAVAADDR_INTERVAL; + + auto availabilityScoreComparator = [](const CNode *lhs, + const CNode *rhs) { + double scoreLhs = lhs->m_avalanche_state->getAvailabilityScore(); + double scoreRhs = rhs->m_avalanche_state->getAvailabilityScore(); + + if (scoreLhs != scoreRhs) { + return scoreLhs > scoreRhs; + } + + return lhs < rhs; + }; + + // Get up to MAX_ADDR_TO_SEND addresses of the nodes which are the + // most active in the avalanche network. + std::set avaNodes( + availabilityScoreComparator); + m_connman.ForEachNode([&](const CNode *pnode) { + if (pnode && pnode->m_avalanche_state && + pnode->m_avalanche_state->getAvailabilityScore() > 0.) { + avaNodes.insert(pnode); + if (avaNodes.size() > GetMaxAddrToSend()) { + avaNodes.erase(std::prev(avaNodes.end())); + } + } + }); + + peer->m_addrs_to_send.clear(); + FastRandomContext insecure_rand; + for (const CNode *pnode : avaNodes) { + PushAddress(*peer, pnode->addr, insecure_rand); + } + + return; + } + if (msg_type == NetMsgType::MEMPOOL) { if (!(pfrom.GetLocalServices() & NODE_BLOOM) && !pfrom.HasPermission(PF_MEMPOOL)) { diff --git a/src/protocol.h b/src/protocol.h --- a/src/protocol.h +++ b/src/protocol.h @@ -297,6 +297,12 @@ */ extern const char *AVAPROOF; +/** + * The getavaaddr message requests an addr message from the receiving node, + * containing IP addresses of the most active avalanche nodes. + */ +extern const char *GETAVAADDR; + /** * Indicate if the message is used to transmit the content of a block. * These messages can be significantly larger than usual messages and therefore diff --git a/src/protocol.cpp b/src/protocol.cpp --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -52,6 +52,7 @@ const char *AVAPOLL = "avapoll"; const char *AVARESPONSE = "avaresponse"; const char *AVAPROOF = "avaproof"; +const char *GETAVAADDR = "getavaaddr"; bool IsBlockLike(const std::string &strCommand) { return strCommand == NetMsgType::BLOCK || diff --git a/test/functional/abc_p2p_getavaaddr.py b/test/functional/abc_p2p_getavaaddr.py new file mode 100755 --- /dev/null +++ b/test/functional/abc_p2p_getavaaddr.py @@ -0,0 +1,206 @@ +#!/usr/bin/env python3 +# Copyright (c) 2022 The Bitcoin developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test getavaaddr p2p message""" +import time + +from test_framework.avatools import AvaP2PInterface, gen_proof +from test_framework.messages import ( + AvalancheVote, + AvalancheVoteError, + msg_getavaaddr, +) +from test_framework.p2p import P2PInterface, p2p_lock +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal + +# getavaaddr time interval in seconds, as defined in net_processing.cpp +GETAVAADDR_INTERVAL = 10 * 60 + + +class AddrReceiver(P2PInterface): + + def __init__(self): + super().__init__() + self.received_addrs = None + + def get_received_addrs(self): + with p2p_lock: + return self.received_addrs + + def on_addr(self, message): + self.received_addrs = [] + for addr in message.addrs: + self.received_addrs.append(f"{addr.ip}:{addr.port}") + + def addr_received(self): + return self.received_addrs is not None + + +class MutedAvaP2PInterface(AvaP2PInterface): + def __init__(self): + super().__init__() + self.is_responding = False + self.privkey = None + self.addr = None + self.poll_received = 0 + + def set_addr(self, addr): + self.addr = addr + + def on_avapoll(self, message): + self.poll_received += 1 + + +class AllYesAvaP2PInterface(MutedAvaP2PInterface): + def __init__(self, privkey): + super().__init__() + self.privkey = privkey + self.is_responding = True + + def on_avapoll(self, message): + self.send_avaresponse( + message.poll.round, [ + AvalancheVote( + AvalancheVoteError.ACCEPTED, inv.hash) for inv in message.poll.invs], self.privkey) + super().on_avapoll(message) + + +class AvaAddrTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = False + self.num_nodes = 1 + self.extra_args = [['-enableavalanche=1']] + + def getavaaddr_interval_test(self): + node = self.nodes[0] + + # Init mock time + mock_time = int(time.time()) + node.setmocktime(mock_time) + + master_privkey, proof = gen_proof(node) + master_pubkey = master_privkey.get_pubkey().get_bytes().hex() + proof_hex = proof.serialize().hex() + + # Add some avalanche peers to the node + for n in range(10): + node.add_p2p_connection(AllYesAvaP2PInterface(master_privkey)) + assert node.addavalanchenode( + node.getpeerinfo()[-1]['id'], master_pubkey, proof_hex) + + # Build some statistics to ensure some addresses will be returned + self.wait_until(lambda: all( + [avanode.poll_received > 0 for avanode in node.p2ps])) + node.mockscheduler(10 * 60) + + requester = node.add_p2p_connection(AddrReceiver()) + requester.send_message(msg_getavaaddr()) + # Remember the time we sent the getavaaddr message + getavaddr_time = mock_time + + # Spamming more get getavaaddr has no effect + for i in range(10): + with node.assert_debug_log(["Ignoring repeated getavaaddr from peer"]): + requester.send_message(msg_getavaaddr()) + + # Move the time so we get an addr response + mock_time += 5 * 60 + node.setmocktime(mock_time) + requester.wait_until(requester.addr_received) + + # Spamming more get getavaaddr still has no effect + for i in range(10): + with node.assert_debug_log(["Ignoring repeated getavaaddr from peer"]): + requester.send_message(msg_getavaaddr()) + + # Elapse the getavaaddr interval and check our message is now accepted + # again + mock_time = getavaddr_time + GETAVAADDR_INTERVAL + node.setmocktime(mock_time) + + requester.send_message(msg_getavaaddr()) + + # We can get an addr message again + mock_time += 5 * 60 + node.setmocktime(mock_time) + requester.wait_until(requester.addr_received) + + def address_test(self, maxaddrtosend, num_proof, num_avanode): + self.restart_node( + 0, + extra_args=self.extra_args[0] + + [f'-maxaddrtosend={maxaddrtosend}']) + node = self.nodes[0] + + # Init mock time + mock_time = int(time.time()) + node.setmocktime(mock_time) + + # Create a bunch of proofs and associate each a bunch of nodes. + avanodes = [] + for _ in range(num_proof): + master_privkey, proof = gen_proof(node) + master_pubkey = master_privkey.get_pubkey().get_bytes().hex() + proof_hex = proof.serialize().hex() + + for n in range(num_avanode): + avanode = AllYesAvaP2PInterface( + master_privkey) if n % 2 else MutedAvaP2PInterface() + node.add_p2p_connection(avanode) + + peerinfo = node.getpeerinfo()[-1] + avanode.set_addr(peerinfo["addr"]) + + assert node.addavalanchenode( + peerinfo['id'], master_pubkey, proof_hex) + avanodes.append(avanode) + + responding_addresses = [ + avanode.addr for avanode in avanodes if avanode.is_responding] + assert_equal(len(responding_addresses), num_proof * num_avanode / 2) + + # Check we have what we expect + avapeers = node.getavalanchepeerinfo() + assert_equal(len(avapeers), num_proof) + for avapeer in avapeers: + assert_equal(len(avapeer['nodes']), num_avanode) + + # Force the availability score to diverge between the responding and the + # muted nodes. + def poll_all_for_block(): + node.generate(1) + return all([avanode.poll_received > ( + 10 if avanode.is_responding else 0) for avanode in avanodes]) + self.wait_until(poll_all_for_block) + + # Move the scheduler time 10 minutes forward so that so that our peers + # get an availability score computed. + node.mockscheduler(10 * 60) + + requester = node.add_p2p_connection(AddrReceiver()) + requester.send_message(msg_getavaaddr()) + + mock_time += 5 * 60 + node.setmocktime(mock_time) + + requester.wait_until(requester.addr_received) + addresses = requester.get_received_addrs() + assert_equal(len(addresses), + min(maxaddrtosend, len(responding_addresses))) + + # Check all the addresses belong to responding peer + assert all([address in responding_addresses for address in addresses]) + + def run_test(self): + self.getavaaddr_interval_test() + + # Limited by maxaddrtosend + self.address_test(maxaddrtosend=20, num_proof=5, num_avanode=10) + # Limited by the number of good nodes + self.address_test(maxaddrtosend=100, num_proof=5, num_avanode=10) + + +if __name__ == '__main__': + AvaAddrTest().main() diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -2095,6 +2095,23 @@ return "msg_avahello(response={})".format(repr(self.hello)) +class msg_getavaaddr: + __slots__ = () + msgtype = b"getavaaddr" + + def __init__(self): + pass + + def deserialize(self, f): + pass + + def serialize(self): + return b"" + + def __repr__(self): + return "msg_getavaaddr()" + + class TestFrameworkMessages(unittest.TestCase): def test_legacy_avalanche_proof_serialization_round_trip(self): """Verify that a LegacyAvalancheProof object is unchanged after a