diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -499,6 +499,8 @@ .Make(NetMsgType::AVAHELLO, Hello(peerData->delegation, sig))); + pfrom->AddKnownProof(peerData->delegation.getProofId()); + return true; } diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -995,6 +995,19 @@ // m_tx_relay == nullptr if we're not relaying transactions with this peer std::unique_ptr m_tx_relay; + struct ProofRelay { + mutable RecursiveMutex cs_proof_inventory; + std::set + setInventoryProofToSend GUARDED_BY(cs_proof_inventory); + // Prevent sending proof invs if the peer already knows about them + CRollingBloomFilter filterProofKnown GUARDED_BY(cs_proof_inventory){ + 10000, 0.000001}; + std::chrono::microseconds nextInvSend{0}; + }; + + // m_proof_relay == nullptr if we're not relaying proofs with this peer + std::unique_ptr m_proof_relay; + struct AvalancheState { AvalancheState() {} @@ -1160,6 +1173,24 @@ } } + void AddKnownProof(const avalanche::ProofId &proofid) { + if (m_proof_relay != nullptr) { + LOCK(m_proof_relay->cs_proof_inventory); + m_proof_relay->filterProofKnown.insert(proofid); + } + } + + void PushProofInventory(const avalanche::ProofId &proofid) { + if (m_proof_relay == nullptr) { + return; + } + + LOCK(m_proof_relay->cs_proof_inventory); + if (!m_proof_relay->filterProofKnown.contains(proofid)) { + m_proof_relay->setInventoryProofToSend.insert(proofid); + } + } + void CloseSocketDisconnect(); void copyStats(CNodeStats &stats, const std::vector &m_asmap); diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -9,6 +9,7 @@ #include +#include #include #include #include @@ -3007,6 +3008,11 @@ m_addr_known = std::make_unique(5000, 0.001); } + // Don't relay proofs if avalanche is disabled + if (isAvalancheEnabled(gArgs)) { + m_proof_relay = std::make_unique(); + } + for (const std::string &msg : getAllNetMessageTypes()) { mapRecvBytesPerMsgCmd[msg] = 0; } diff --git a/src/net_processing.h b/src/net_processing.h --- a/src/net_processing.h +++ b/src/net_processing.h @@ -15,6 +15,10 @@ extern RecursiveMutex cs_main; extern RecursiveMutex g_cs_orphans; +namespace avalanche { +struct ProofId; +} + class BlockTransactionsRequest; class BlockValidationState; class CBlockHeader; @@ -240,4 +244,7 @@ /** Relay transaction to every node */ void RelayTransaction(const TxId &txid, const CConnman &connman); +/** Relay proof to every node */ +void RelayProof(const avalanche::ProofId &proofid, const CConnman &connman); + #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -1769,6 +1769,11 @@ [&txid](CNode *pnode) { pnode->PushTxInventory(txid); }); } +void RelayProof(const avalanche::ProofId &proofid, const CConnman &connman) { + connman.ForEachNode( + [&proofid](CNode *pnode) { pnode->PushProofInventory(proofid); }); +} + static void RelayAddress(const CAddress &addr, bool fReachable, const CConnman &connman) { // Limited relaying of addresses outside our network(s) @@ -3192,6 +3197,7 @@ const avalanche::ProofId proofid(inv.hash); const bool fAlreadyHave = AlreadyHaveProof(proofid); logInv(inv, fAlreadyHave); + pfrom.AddKnownProof(proofid); if (!fAlreadyHave && g_avalanche && isAvalancheEnabled(gArgs)) { const bool preferred = isPreferredDownloadPeer(pfrom); @@ -5328,6 +5334,29 @@ return fSendTrickle; }; + // Add proofs to inventory + if (pto->m_proof_relay != nullptr) { + LOCK(pto->m_proof_relay->cs_proof_inventory); + + if (computeNextInvSendTime(pto->m_proof_relay->nextInvSend)) { + auto it = pto->m_proof_relay->setInventoryProofToSend.begin(); + while (it != + pto->m_proof_relay->setInventoryProofToSend.end()) { + const avalanche::ProofId proofid = *it; + + it = pto->m_proof_relay->setInventoryProofToSend.erase(it); + + if (pto->m_proof_relay->filterProofKnown.contains( + proofid)) { + continue; + } + + pto->m_proof_relay->filterProofKnown.insert(proofid); + addInvAndMaybeFlush(MSG_AVA_PROOF, proofid); + } + } + } + if (pto->m_tx_relay != nullptr) { LOCK(pto->m_tx_relay->cs_tx_inventory); // Check whether periodic sends should happen diff --git a/src/rpc/avalanche.cpp b/src/rpc/avalanche.cpp --- a/src/rpc/avalanche.cpp +++ b/src/rpc/avalanche.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -498,8 +499,7 @@ "The proof has conflicting utxo with an existing proof"); } - // TODO actually announce the proof via an inv message - // RelayProof(proofid, *node.connman); + RelayProof(proofid, *node.connman); return true; } diff --git a/test/functional/abc_p2p_proof_inventory.py b/test/functional/abc_p2p_proof_inventory.py new file mode 100644 --- /dev/null +++ b/test/functional/abc_p2p_proof_inventory.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# Copyright (c) 2021 The Bitcoin developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test proof inventory relaying +""" + +from test_framework.avatools import ( + create_coinbase_stakes, +) +from test_framework.key import ECKey, bytes_to_wif +from test_framework.messages import ( + AvalancheProof, + FromHex, + MSG_AVA_PROOF, + MSG_TYPE_MASK, +) +from test_framework.p2p import ( + P2PInterface, + p2p_lock, +) +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + wait_until, +) + + +class ProofInvStoreP2PInterface(P2PInterface): + def __init__(self): + super().__init__() + self.proof_invs_counter = 0 + + def on_inv(self, message): + for i in message.inv: + if i.type & MSG_TYPE_MASK == MSG_AVA_PROOF: + self.proof_invs_counter += 1 + + +class ProofInventoryTest(BitcoinTestFramework): + def set_test_params(self): + self.num_nodes = 1 + self.extra_args = [['-enableavalanche=1', + '-avacooldown=0']] * self.num_nodes + + def gen_proof(self, node): + blockhashes = node.generate(10) + + privkey = ECKey() + privkey.generate() + pubkey = privkey.get_pubkey() + + stakes = create_coinbase_stakes( + node, blockhashes, node.get_deterministic_priv_key().key) + proof_hex = node.buildavalancheproof( + 42, 2000000000, pubkey.get_bytes().hex(), stakes) + + return bytes_to_wif(privkey.get_bytes()), FromHex( + AvalancheProof(), proof_hex) + + def test_send_proof_inv(self): + self.log.info("Test sending a proof to our peers") + + node = self.nodes[0] + + for i in range(10): + node.add_p2p_connection(ProofInvStoreP2PInterface()) + + _, proof = self.gen_proof(node) + assert node.sendavalancheproof(proof.serialize().hex()) + + def proof_inv_found(peer): + with p2p_lock: + return peer.last_message.get( + "inv") and peer.last_message["inv"].inv[-1].hash == proof.proofid + + wait_until(lambda: all(proof_inv_found(i) for i in node.p2ps)) + + self.log.info("Test that we don't send the same inv several times") + + extra_peer = ProofInvStoreP2PInterface() + node.add_p2p_connection(extra_peer) + + # Send the same proof one more time + node.sendavalancheproof(proof.serialize().hex()) + + # Our new extra peer should receive it but not the others + wait_until(lambda: proof_inv_found(extra_peer)) + assert all(p.proof_invs_counter == 1 for p in node.p2ps) + + # Send the proof again and force the send loop to be processed + for peer in node.p2ps: + node.sendavalancheproof(proof.serialize().hex()) + peer.sync_with_ping() + + assert all(p.proof_invs_counter == 1 for p in node.p2ps) + + def run_test(self): + self.test_send_proof_inv() + + +if __name__ == '__main__': + ProofInventoryTest().main() diff --git a/test/functional/abc_rpc_avalancheproof.py b/test/functional/abc_rpc_avalancheproof.py --- a/test/functional/abc_rpc_avalancheproof.py +++ b/test/functional/abc_rpc_avalancheproof.py @@ -18,7 +18,7 @@ AvalancheProof, FromHex, ) -from test_framework.p2p import P2PInterface +from test_framework.p2p import P2PInterface, p2p_lock from test_framework.test_framework import BitcoinTestFramework from test_framework.test_node import ErrorMatch from test_framework.util import ( @@ -260,18 +260,17 @@ # Good proof assert node.verifyavalancheproof(proof) + peer = node.add_p2p_connection(P2PInterface()) + proofid = FromHex(AvalancheProof(), proof).proofid node.sendavalancheproof(proof) assert proofid in get_proof_ids(node) - # TODO Once implemented we expect the sendavalancheproof to trigger the - # sending of an inv message with our proof: - # - # def inv_found(): - # with p2p_lock: - # return peer.last_message.get( - # "inv") and peer.last_message["inv"].inv[-1].hash == proofid - # wait_until(inv_found) + def inv_found(): + with p2p_lock: + return peer.last_message.get( + "inv") and peer.last_message["inv"].inv[-1].hash == proofid + wait_until(inv_found) self.log.info("Bad proof should be rejected at startup")