diff --git a/src/avalanche/compactproofs.h b/src/avalanche/compactproofs.h --- a/src/avalanche/compactproofs.h +++ b/src/avalanche/compactproofs.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -35,6 +36,21 @@ template void UnserData(Stream &s) { s >> proof; } }; +struct ShortIdProcessorPrefilledProofAdapter { + uint32_t getIndex(const PrefilledProof &pp) const { return pp.index; } + ProofRef getItem(const PrefilledProof &pp) const { return pp.proof; } +}; + +struct ProofRefCompare { + bool operator()(const ProofRef &lhs, const ProofRef &rhs) const { + return lhs->getId() == rhs->getId(); + } +}; + +using ProofShortIdProcessor = + ShortIdProcessor; + class CompactProofs { private: uint64_t shortproofidk0, shortproofidk1; @@ -57,6 +73,10 @@ std::pair getKeys() const { return std::make_pair(shortproofidk0, shortproofidk1); } + const std::vector &getPrefilledProofs() const { + return prefilledProofs; + } + const std::vector &getShortIDs() const { return shortproofids; } SERIALIZE_METHODS(CompactProofs, obj) { READWRITE( @@ -91,6 +111,15 @@ friend struct ::avalanche::TestCompactProofs; }; +class ProofsRequest { +public: + std::vector indices; + + SERIALIZE_METHODS(ProofsRequest, obj) { + READWRITE(Using>(obj.indices)); + } +}; + } // namespace avalanche #endif // BITCOIN_AVALANCHE_COMPACTPROOFS_H diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -655,7 +655,8 @@ 10000, 0.000001}; std::chrono::microseconds nextInvSend{0}; - RadixTree + std::unique_ptr< + RadixTree> sharedProofs; std::atomic compactproofs_requested{false}; }; diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -3261,7 +3261,8 @@ msg_type == NetMsgType::AVARESPONSE || msg_type == NetMsgType::AVAPROOF || msg_type == NetMsgType::GETAVAADDR || - msg_type == NetMsgType::GETAVAPROOFS; + msg_type == NetMsgType::GETAVAPROOFS || + msg_type == NetMsgType::AVAPROOFS; } uint32_t PeerManagerImpl::GetAvalancheVoteForBlock(const BlockHash &hash) { @@ -5164,19 +5165,112 @@ return; } - pfrom.m_proof_relay->sharedProofs = + pfrom.m_proof_relay->sharedProofs = std::make_unique>( g_avalanche->withPeerManager([&](const avalanche::PeerManager &pm) { return pm.getShareableProofsSnapshot(); - }); + })); avalanche::CompactProofs compactProofs( - pfrom.m_proof_relay->sharedProofs); + *pfrom.m_proof_relay->sharedProofs); m_connman.PushMessage( &pfrom, msgMaker.Make(NetMsgType::AVAPROOFS, compactProofs)); return; } + if (msg_type == NetMsgType::AVAPROOFS) { + if (pfrom.m_proof_relay == nullptr) { + return; + } + + // Only process the compact proofs if we requested them + if (!pfrom.m_proof_relay->compactproofs_requested) { + LogPrint(BCLog::AVALANCHE, "Ignoring unsollicited avaproofs\n"); + return; + } + pfrom.m_proof_relay->compactproofs_requested = false; + + avalanche::CompactProofs compactProofs; + try { + vRecv >> compactProofs; + } catch (std::ios_base::failure &e) { + // This compact proofs have non contiguous or overflowing indexes + Misbehaving(pfrom, 100, "avaproofs-bad-indexes"); + return; + } + + // If there are prefilled proofs, process them first + std::set prefilledIndexes; + for (const auto &prefilledProof : compactProofs.getPrefilledProofs()) { + if (!ReceivedAvalancheProof(pfrom, prefilledProof.proof)) { + // If we got an invalid proof, the peer is getting banned and we + // can bail out. + return; + } + } + + // To determine the chance that the number of entries in a bucket + // exceeds N, we use the fact that the number of elements in a single + // bucket is binomially distributed (with n = the number of shorttxids + // S, and p = 1 / the number of buckets), that in the worst case the + // number of buckets is equal to S (due to std::unordered_map having a + // default load factor of 1.0), and that the chance for any bucket to + // exceed N elements is at most buckets * (the chance that any given + // bucket is above N elements). Thus: + // P(max_elements_per_bucket > N) <= + // S * (1 - cdf(binomial(n=S,p=1/S), N)) + // If we assume up to 21000000, allowing 15 elements per bucket should + // only fail once per ~2.5 million avaproofs transfers (per peer and + // connection). + // TODO re-evaluate the bucket count to a more realistic value. + // TODO: In the case of a shortid-collision, we should request all the + // proofs which collided. For now, we only request one, which is not + // that bad considering this event is expected to be very rare. + auto shortIdProcessor = + avalanche::ProofShortIdProcessor(compactProofs.getPrefilledProofs(), + compactProofs.getShortIDs(), 15); + + if (shortIdProcessor.hasOutOfBoundIndex()) { + // This should be catched by deserialization, but catch it here as + // well as a good measure. + Misbehaving(pfrom, 100, "avaproofs-bad-indexes"); + return; + } + if (!shortIdProcessor.isEvenlyDistributed()) { + // This is suspicious, don't ban but bail out + return; + } + + const auto &proofs = + g_avalanche->withPeerManager([&](const avalanche::PeerManager &pm) { + return pm.getShareableProofsSnapshot(); + }); + + size_t proofCount = 0; + proofs.forEachLeaf([&](const avalanche::ProofRef &proof) { + uint64_t shortid = compactProofs.getShortID(proof->getId()); + + proofCount += shortIdProcessor.matchKnownItem(shortid, proof); + + // Though ideally we'd continue scanning for the + // two-proofs-match-shortid case, the performance win of an early + // exit here is too good to pass up and worth the extra risk. + return proofCount != shortIdProcessor.getShortIdCount(); + }); + + avalanche::ProofsRequest req; + for (size_t i = 0; i < compactProofs.size(); i++) { + if (shortIdProcessor.getItem(i) == nullptr) { + req.indices.push_back(i); + } + } + + m_connman.PushMessage(&pfrom, + msgMaker.Make(NetMsgType::AVAPROOFSREQ, req)); + return; + } + if (msg_type == NetMsgType::GETADDR) { // This asymmetric behavior for inbound and outbound connections was // introduced to prevent a fingerprinting attack: an attacker can send diff --git a/src/protocol.h b/src/protocol.h --- a/src/protocol.h +++ b/src/protocol.h @@ -315,6 +315,12 @@ */ extern const char *AVAPROOFS; +/** + * Request for missing avalanche proofs after an avaproofs message has been + * processed. + */ +extern const char *AVAPROOFSREQ; + /** * Indicate if the message is used to transmit the content of a block. * These messages can be significantly larger than usual messages and therefore diff --git a/src/protocol.cpp b/src/protocol.cpp --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -54,6 +54,7 @@ const char *GETAVAADDR = "getavaaddr"; const char *GETAVAPROOFS = "getavaproofs"; const char *AVAPROOFS = "avaproofs"; +const char *AVAPROOFSREQ = "avaproofsreq"; bool IsBlockLike(const std::string &strCommand) { return strCommand == NetMsgType::BLOCK || diff --git a/test/functional/abc_p2p_compactproofs.py b/test/functional/abc_p2p_compactproofs.py --- a/test/functional/abc_p2p_compactproofs.py +++ b/test/functional/abc_p2p_compactproofs.py @@ -6,19 +6,24 @@ Test proof inventory relaying """ +import random + from test_framework.avatools import ( AvaP2PInterface, gen_proof, + get_ava_p2p_interface, get_proof_ids, wait_for_proof, ) from test_framework.messages import ( NODE_AVALANCHE, NODE_NETWORK, + AvalanchePrefilledProof, + calculate_shortid, + msg_avaproofs, msg_getavaproofs, ) from test_framework.p2p import P2PInterface, p2p_lock -from test_framework.siphash import siphash256 from test_framework.test_framework import BitcoinTestFramework from test_framework.util import MAX_NODES, assert_equal, p2p_port @@ -29,7 +34,6 @@ self.extra_args = [[ '-enableavalanche=1', '-avacooldown=0', - '-whitelist=noban@127.0.0.1', ]] * self.num_nodes def test_send_outbound_getavaproofs(self): @@ -144,19 +148,182 @@ avaproofs = received_avaproofs(receiving_peer) expected_shortids = [ - siphash256( + calculate_shortid( avaproofs.key0, avaproofs.key1, - proofid) & 0x0000ffffffffffff for proofid in sorted(proofids)] + proofid) for proofid in sorted(proofids)] assert_equal(expected_shortids, avaproofs.shortids) # Don't expect any prefilled proof for now assert_equal(len(avaproofs.prefilled_proofs), 0) + def test_request_missing_proofs(self): + self.log.info( + "Check the node requests the missing proofs after receiving an avaproofs message") + + node = self.nodes[0] + + self.restart_node(0) + + key0 = random.randint(0, 2**64 - 1) + key1 = random.randint(0, 2**64 - 1) + proofs = [gen_proof(node)[1] for _ in range(10)] + + # Build a map from proofid to shortid. Use sorted proofids so we don't + # have the same indices than the `proofs` list. + proofids = [p.proofid for p in proofs] + shortid_map = {} + for proofid in sorted(proofids): + shortid_map[proofid] = calculate_shortid(key0, key1, proofid) + + self.log.info("The node ignores unsollicited avaproofs") + + spam_peer = get_ava_p2p_interface(node) + + msg = msg_avaproofs() + msg.key0 = key0 + msg.key1 = key1 + msg.shortids = list(shortid_map.values()) + msg.prefilled_proofs = [] + + with node.assert_debug_log(["Ignoring unsollicited avaproofs"]): + spam_peer.send_message(msg) + + def received_avaproofsreq(peer): + with p2p_lock: + return peer.last_message.get("avaproofsreq") + + p2p_idx = 0 + + def add_avalanche_p2p_outbound(): + nonlocal p2p_idx + + peer = P2PInterface() + node.add_outbound_p2p_connection( + peer, + p2p_idx=p2p_idx, + connection_type="avalanche", + services=NODE_NETWORK | NODE_AVALANCHE, + ) + p2p_idx += 1 + + peer.wait_until(lambda: peer.last_message.get("getavaproofs")) + + return peer + + def expect_indices(shortids, expected_indices, prefilled_proofs=None): + nonlocal p2p_idx + + if prefilled_proofs is None: + prefilled_proofs = [] + + msg = msg_avaproofs() + msg.key0 = key0 + msg.key1 = key1 + msg.shortids = shortids + msg.prefilled_proofs = prefilled_proofs + + peer = add_avalanche_p2p_outbound() + peer.send_message(msg) + self.wait_until(lambda: received_avaproofsreq(peer)) + + avaproofsreq = received_avaproofsreq(peer) + assert_equal(avaproofsreq.indices, expected_indices) + + self.log.info("Check no proof is requested if there is no shortid") + expect_indices([], []) + + self.log.info( + "Check the node requests all the proofs if it known none") + expect_indices( + list(shortid_map.values()), + [i for i in range(len(shortid_map))] + ) + + self.log.info( + "Check the node requests only the missing proofs") + + known_proofids = [] + for proof in proofs[:5]: + node.sendavalancheproof(proof.serialize().hex()) + known_proofids.append(proof.proofid) + + expected_indices = [i for i, proofid in enumerate( + shortid_map) if proofid not in known_proofids] + expect_indices(list(shortid_map.values()), expected_indices) + + self.log.info( + "Check the node don't request prefilled proofs") + # Get the indices for a couple of proofs + indice_proof5 = list(shortid_map.keys()).index(proofids[5]) + indice_proof6 = list(shortid_map.keys()).index(proofids[6]) + prefilled_proofs = [ + AvalanchePrefilledProof(indice_proof5, proofs[5]), + AvalanchePrefilledProof(indice_proof6, proofs[6]), + ] + prefilled_proofs = sorted( + prefilled_proofs, + key=lambda prefilled_proof: prefilled_proof.index) + remaining_shortids = [shortid for proofid, shortid in shortid_map.items( + ) if proofid not in proofids[5:7]] + known_proofids.extend(proofids[5:7]) + expected_indices = [i for i, proofid in enumerate( + shortid_map) if proofid not in known_proofids] + expect_indices( + remaining_shortids, + expected_indices, + prefilled_proofs=prefilled_proofs) + + self.log.info( + "Check the node requests no proof if it knows all of them") + + for proof in proofs[5:]: + node.sendavalancheproof(proof.serialize().hex()) + known_proofids.append(proof.proofid) + + expect_indices(list(shortid_map.values()), []) + + self.log.info("Check out of bounds index") + + bad_peer = add_avalanche_p2p_outbound() + + msg = msg_avaproofs() + msg.key0 = key0 + msg.key1 = key1 + msg.shortids = list(shortid_map.values()) + msg.prefilled_proofs = [ + AvalanchePrefilledProof( + len(shortid_map) + 1, + gen_proof(node)[1])] + + with node.assert_debug_log(["Misbehaving", "avaproofs-bad-indexes"]): + bad_peer.send_message(msg) + bad_peer.wait_for_disconnect() + + self.log.info("An invalid prefilled proof will trigger a ban") + + _, no_stake = gen_proof(node) + no_stake.stakes = [] + + bad_peer = add_avalanche_p2p_outbound() + + msg = msg_avaproofs() + msg.key0 = key0 + msg.key1 = key1 + msg.shortids = list(shortid_map.values()) + msg.prefilled_proofs = [ + AvalanchePrefilledProof(len(shortid_map), no_stake), + ] + + with node.assert_debug_log(["Misbehaving", "invalid-proof"]): + bad_peer.send_message(msg) + bad_peer.wait_for_disconnect() + def run_test(self): self.test_send_outbound_getavaproofs() self.test_send_manual_getavaproofs() self.test_respond_getavaproofs() + self.test_request_missing_proofs() if __name__ == '__main__': diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -2211,6 +2211,41 @@ self.key0, self.key1, len(self.shortids), self.shortids, len(self.prefilled_proofs), self.prefilled_proofs) +class msg_avaproofsreq: + __slots__ = ("indices") + msgtype = b"avaproofsreq" + + def __init__(self): + self.indices = [] + + def deserialize(self, f): + indices_length = deser_compact_size(f) + + # The indices are differentially encoded + current_indice = -1 + for _ in range(indices_length): + current_indice += deser_compact_size(f) + 1 + self.indices.append(current_indice) + + def serialize(self): + r = b"" + r += ser_compact_size(len(self.indices)) + + if (len(self.indices) < 1): + return r + + # The indices are differentially encoded + r += ser_compact_size(self.indices[0]) + for i in range(len(self.indices[1:])): + r += ser_compact_size(self.indices[i + 1] - self.indices[i] - 1) + + return r + + def __repr__(self): + return "msg_avaproofsreq(len(shortids)={}, indices={})".format( + len(self.indices), self.indices) + + class TestFrameworkMessages(unittest.TestCase): def test_legacy_avalanche_proof_serialization_round_trip(self): """Verify that a LegacyAvalancheProof object is unchanged after a diff --git a/test/functional/test_framework/p2p.py b/test/functional/test_framework/p2p.py --- a/test/functional/test_framework/p2p.py +++ b/test/functional/test_framework/p2p.py @@ -41,6 +41,7 @@ msg_avapoll, msg_avaproof, msg_avaproofs, + msg_avaproofsreq, msg_block, msg_blocktxn, msg_cfcheckpt, @@ -97,6 +98,7 @@ b"avapoll": msg_avapoll, b"avaproof": msg_avaproof, b"avaproofs": msg_avaproofs, + b"avaproofsreq": msg_avaproofsreq, b"avaresponse": msg_tcpavaresponse, b"avahello": msg_avahello, b"block": msg_block, @@ -438,6 +440,8 @@ def on_avaproofs(self, message): pass + def on_avaproofsreq(self, message): pass + def on_avaresponse(self, message): pass def on_avahello(self, message): pass