diff --git a/src/avalanche/avalanche.h b/src/avalanche/avalanche.h --- a/src/avalanche/avalanche.h +++ b/src/avalanche/avalanche.h @@ -67,6 +67,15 @@ */ static constexpr double AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO = 0; +/** + * Default minimum number of nodes that sent us an avaproofs message before we + * can consider our quorum suitable for polling. + * + * FIXME: The default is set to 0 to allow existing tests to pass for now. We + * need to set a sane default and update tests later. + */ +static constexpr double AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT = 0; + /** * Global avalanche instance. */ diff --git a/src/avalanche/node.h b/src/avalanche/node.h --- a/src/avalanche/node.h +++ b/src/avalanche/node.h @@ -22,10 +22,12 @@ NodeId nodeid; PeerId peerid; TimePoint nextRequestTime; + bool avaproofsSent; Node(NodeId nodeid_, PeerId peerid_) : nodeid(nodeid_), peerid(peerid_), - nextRequestTime(std::chrono::steady_clock::now()) {} + nextRequestTime(std::chrono::steady_clock::now()), + avaproofsSent(false) {} }; } // namespace avalanche diff --git a/src/avalanche/peermanager.h b/src/avalanche/peermanager.h --- a/src/avalanche/peermanager.h +++ b/src/avalanche/peermanager.h @@ -228,6 +228,12 @@ // Update when a node is to be polled next. bool updateNextRequestTime(NodeId nodeid, TimePoint timeout); + /** + * Flag that a node did send its compact proofs. + * @return True if the flag changed state, i;e. if this is the first time + * the message is accounted for this node. + */ + bool latchAvaproofsSent(NodeId nodeid); // Randomly select a node to poll. NodeId selectNode(); diff --git a/src/avalanche/peermanager.cpp b/src/avalanche/peermanager.cpp --- a/src/avalanche/peermanager.cpp +++ b/src/avalanche/peermanager.cpp @@ -153,6 +153,16 @@ return nodes.modify(it, [&](Node &n) { n.nextRequestTime = timeout; }); } +bool PeerManager::latchAvaproofsSent(NodeId nodeid) { + auto it = nodes.find(nodeid); + if (it == nodes.end()) { + return false; + } + + return !it->avaproofsSent && + nodes.modify(it, [&](Node &n) { n.avaproofsSent = true; }); +} + static bool isOrphanState(const ProofValidationState &state) { return state.GetResult() == ProofValidationResult::MISSING_UTXO || state.GetResult() == ProofValidationResult::HEIGHT_MISMATCH; diff --git a/src/avalanche/processor.h b/src/avalanche/processor.h --- a/src/avalanche/processor.h +++ b/src/avalanche/processor.h @@ -162,6 +162,8 @@ uint32_t minQuorumScore; double minQuorumConnectedScoreRatio; std::atomic quorumIsEstablished{false}; + int64_t minAvaproofsNodeCount; + std::atomic avaproofsNodeCounter{0}; /** Voting parameters. */ const uint32_t staleVoteThreshold; @@ -175,7 +177,8 @@ CConnman *connmanIn, std::unique_ptr peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, - uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn); + int64_t minAvaproofsNodeCountIn, uint32_t staleVoteThresholdIn, + uint32_t staleVoteFactorIn); public: ~Processor(); @@ -221,6 +224,7 @@ bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); + void avaproofsSent(NodeId nodeid); bool isQuorumEstablished(); // Implement NetEventInterface. Only FinalizeNode is of interest. diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -141,6 +141,7 @@ CConnman *connmanIn, std::unique_ptr peerDataIn, CKey sessionKeyIn, uint32_t minQuorumTotalScoreIn, double minQuorumConnectedScoreRatioIn, + int64_t minAvaproofsNodeCountIn, uint32_t staleVoteThresholdIn, uint32_t staleVoteFactorIn) : connman(connmanIn), queryTimeoutDuration(argsman.GetArg( @@ -149,6 +150,7 @@ peerData(std::move(peerDataIn)), sessionKey(std::move(sessionKeyIn)), minQuorumScore(minQuorumTotalScoreIn), minQuorumConnectedScoreRatio(minQuorumConnectedScoreRatioIn), + minAvaproofsNodeCount(minAvaproofsNodeCountIn), staleVoteThreshold(staleVoteThresholdIn), staleVoteFactor(staleVoteFactorIn) { // Make sure we get notified of chain state changes. @@ -283,6 +285,15 @@ return nullptr; } + int64_t minAvaproofsNodeCount = + argsman.GetArg("-avaminavaproofsnodecount", + AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT); + if (minAvaproofsNodeCount < 0) { + error = _("The minimum number of node that sent avaproofs message " + "should be non-negative"); + return nullptr; + } + // Determine voting parameters int64_t staleVoteThreshold = argsman.GetArg("-avastalevotethreshold", AVALANCHE_VOTE_STALE_THRESHOLD); @@ -316,7 +327,7 @@ return std::unique_ptr(new Processor( argsman, chain, connman, std::move(peerData), std::move(sessionKey), Proof::amountToScore(minQuorumStake), minQuorumConnectedStakeRatio, - staleVoteThreshold, staleVoteFactor)); + minAvaproofsNodeCount, staleVoteThreshold, staleVoteFactor)); } bool Processor::addBlockToReconcile(const CBlockIndex *pindex) { @@ -829,6 +840,21 @@ } while (nodeid != NO_NODE); } +void Processor::avaproofsSent(NodeId nodeid) { + if (::ChainstateActive().IsInitialBlockDownload()) { + // Before IBD is complete there is no way to make sure a proof is valid + // or not, e.g. it can be spent in a block we don't know yet. In order + // to increase confidence that our proof set is similar to other nodes + // on the network, the messages received during IBD are not accounted. + return; + } + + LOCK(cs_peerManager); + if (peerManager->latchAvaproofsSent(nodeid)) { + avaproofsNodeCounter++; + } +} + /* * Returns a bool indicating whether we have a usable Avalanche quorum enabling * us to take decisions based on polls. @@ -838,6 +864,10 @@ return true; } + if (avaproofsNodeCounter < minAvaproofsNodeCount) { + return false; + } + auto localProof = getLocalProof(); // Get the registered proof score and registered score we have nodes for diff --git a/src/avalanche/test/init_tests.cpp b/src/avalanche/test/init_tests.cpp --- a/src/avalanche/test/init_tests.cpp +++ b/src/avalanche/test/init_tests.cpp @@ -48,6 +48,7 @@ tenBillion); BOOST_CHECK_EQUAL( args.GetArg("-avaminquorumconnectedstakeratio", "0.42"), "0.8"); + BOOST_CHECK_EQUAL(args.GetArg("-avaminavaproofsnodecount", 42), 8); } { @@ -69,6 +70,8 @@ BOOST_CHECK_EQUAL( args.GetArg("-avaminquorumconnectedstakeratio", "0.8"), ToString(AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO)); + BOOST_CHECK_EQUAL(args.GetArg("-avaminavaproofsnodecount", 42), + AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT); } { @@ -79,6 +82,7 @@ args.ForceSetArg("-enableavalancheproofreplacement", "0"); args.ForceSetArg("-automaticunparking", "1"); args.ForceSetArg("-avaminquorumstake", FormatMoney(123 * COIN)); + args.ForceSetArg("-avaminavaproofsnodecount", "42"); InitParameterInteraction(args); BOOST_CHECK_EQUAL(args.GetBoolArg("-enableavalanche", false), true); @@ -92,6 +96,7 @@ 123 * COIN); BOOST_CHECK_EQUAL( args.GetArg("-avaminquorumconnectedstakeratio", "0.42"), "0.8"); + BOOST_CHECK_EQUAL(args.GetArg("-avaminavaproofsnodecount", 0), 42); } gArgs.ClearForcedArg("-ecash"); diff --git a/src/avalanche/test/peermanager_tests.cpp b/src/avalanche/test/peermanager_tests.cpp --- a/src/avalanche/test/peermanager_tests.cpp +++ b/src/avalanche/test/peermanager_tests.cpp @@ -1815,4 +1815,25 @@ gArgs.ClearForcedArg("-enableavalancheproofreplacement"); } +BOOST_AUTO_TEST_CASE(received_avaproofs) { + avalanche::PeerManager pm; + + auto addNode = [&](NodeId nodeid) { + auto proof = buildRandomProof(MIN_VALID_PROOF_SCORE); + BOOST_CHECK(pm.registerProof(proof)); + BOOST_CHECK(pm.addNode(nodeid, proof->getId())); + }; + + for (NodeId nodeid = 0; nodeid < 10; nodeid++) { + // Node doesn't exist + BOOST_CHECK(!pm.latchAvaproofsSent(nodeid)); + + addNode(nodeid); + BOOST_CHECK(pm.latchAvaproofsSent(nodeid)); + + // The flag is already set + BOOST_CHECK(!pm.latchAvaproofsSent(nodeid)); + } +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -53,6 +53,14 @@ static double getMinQuorumConnectedScoreRatio(const Processor &p) { return p.minQuorumConnectedScoreRatio; } + + static int64_t getavaproofsNodeCounter(Processor &p) { + return p.avaproofsNodeCounter; + } + + static void clearavaproofsNodeCounter(Processor &p) { + p.avaproofsNodeCounter = 0; + } }; } // namespace } // namespace avalanche @@ -1242,42 +1250,50 @@ } BOOST_AUTO_TEST_CASE(quorum_detection_parameter_validation) { - // Create vector of tuples of - std::vector> tests = { - // Both parameters are invalid - {"", "", false}, - {"-1", "-1", false}, - - // Min stake is out of range - {"-1", "0", false}, - {"-0.01", "0", false}, - {"21000000000000.01", "0", false}, - - // Min connected ratio is out of range - {"0", "-1", false}, - {"0", "1.1", false}, - - // Both parameters are valid - {"0", "0", true}, - {"0.00", "0", true}, - {"0.01", "0", true}, - {"1", "0.1", true}, - {"10", "0.5", true}, - {"10", "1", true}, - {"21000000000000.00", "0", true}, - }; + // Create vector of tuples of: + // + std::vector> tests = + { + // All parameters are invalid + {"", "", "", false}, + {"-1", "-1", "-1", false}, + + // Min stake is out of range + {"-1", "0", "0", false}, + {"-0.01", "0", "0", false}, + {"21000000000000.01", "0", "0", false}, + + // Min connected ratio is out of range + {"0", "-1", "0", false}, + {"0", "1.1", "0", false}, + + // Min avaproofs messages ratio is out of range + {"0", "0", "-1", false}, + + // All parameters are valid + {"0", "0", "0", true}, + {"0.00", "0", "0", true}, + {"0.01", "0", "0", true}, + {"1", "0.1", "0", true}, + {"10", "0.5", "0", true}, + {"10", "1", "0", true}, + {"21000000000000.00", "0", "0", true}, + {"0", "0", "1", true}, + {"0", "0", "100", true}, + }; // For each case set the parameters and check that making the processor // succeeds or fails as expected for (auto it = tests.begin(); it != tests.end(); ++it) { gArgs.ForceSetArg("-avaminquorumstake", std::get<0>(*it)); gArgs.ForceSetArg("-avaminquorumconnectedstakeratio", std::get<1>(*it)); + gArgs.ForceSetArg("-avaminavaproofsnodecount", std::get<2>(*it)); bilingual_str error; std::unique_ptr processor = Processor::MakeProcessor( *m_node.args, *m_node.chain, m_node.connman.get(), error); - if (std::get<2>(*it)) { + if (std::get<3>(*it)) { BOOST_CHECK(processor != nullptr); BOOST_CHECK(error.empty()); BOOST_CHECK_EQUAL(error.original, ""); @@ -1290,6 +1306,61 @@ gArgs.ClearForcedArg("-avaminquorumstake"); gArgs.ClearForcedArg("-avaminquorumconnectedstakeratio"); + gArgs.ClearForcedArg("-avaminavaproofsnodecount"); +} + +BOOST_AUTO_TEST_CASE(min_avaproofs_messages) { + ArgsManager argsman; + argsman.ForceSetArg("-avaminquorumstake", "0"); + argsman.ForceSetArg("-avaminquorumconnectedstakeratio", "0"); + + auto checkMinAvaproofsMessages = [&](int64_t minAvaproofsMessages) { + argsman.ForceSetArg("-avaminavaproofsnodecount", + ToString(minAvaproofsMessages)); + + bilingual_str error; + auto processor = Processor::MakeProcessor(argsman, *m_node.chain, + m_node.connman.get(), error); + + BOOST_CHECK_EQUAL(processor->isQuorumEstablished(), + minAvaproofsMessages <= 0); + + auto addNode = [&](NodeId nodeid) { + auto proof = buildRandomProof(MIN_VALID_PROOF_SCORE); + processor->withPeerManager([&](avalanche::PeerManager &pm) { + BOOST_CHECK(pm.registerProof(proof)); + BOOST_CHECK(pm.addNode(nodeid, proof->getId())); + }); + }; + + for (int64_t i = 0; i < minAvaproofsMessages - 1; i++) { + addNode(i); + + processor->avaproofsSent(i); + BOOST_CHECK_EQUAL( + AvalancheTest::getavaproofsNodeCounter(*processor), i + 1); + + // Receiving again on the same node does not increase the counter + processor->avaproofsSent(i); + BOOST_CHECK_EQUAL( + AvalancheTest::getavaproofsNodeCounter(*processor), i + 1); + + BOOST_CHECK(!processor->isQuorumEstablished()); + } + + addNode(minAvaproofsMessages); + processor->avaproofsSent(minAvaproofsMessages); + BOOST_CHECK(processor->isQuorumEstablished()); + + // Check the latch + AvalancheTest::clearavaproofsNodeCounter(*processor); + BOOST_CHECK(processor->isQuorumEstablished()); + }; + + checkMinAvaproofsMessages(0); + checkMinAvaproofsMessages(1); + checkMinAvaproofsMessages(10); + checkMinAvaproofsMessages(100); } BOOST_AUTO_TEST_CASE_TEMPLATE(voting_parameters, P, VoteItemProviders) { diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -1334,6 +1334,13 @@ " need nodes for to have a usable quorum (default: %s)", AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO), ArgsManager::ALLOW_STRING, OptionsCategory::AVALANCHE); + argsman.AddArg( + "-avaminavaproofsnodecount", + strprintf("Minimum number of node that needs to send us an avaproofs" + " message before we consider we have a usable quorum" + " (default: %s)", + AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT), + ArgsManager::ALLOW_INT, OptionsCategory::AVALANCHE); argsman.AddArg( "-avastalevotethreshold", strprintf("Number of avalanche votes before a voted item goes stale " @@ -1662,6 +1669,10 @@ fAvalanche ? "0.8" : ToString(AVALANCHE_DEFAULT_MIN_QUORUM_CONNECTED_STAKE_RATIO)); + args.SoftSetArg( + "-avaminavaproofsnodecount", + fAvalanche ? "8" + : ToString(AVALANCHE_DEFAULT_MIN_AVAPROOFS_NODE_COUNT)); } } diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -5308,6 +5308,12 @@ m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::AVAPROOFSREQ, req)); + + // We want to keep a count of how many nodes we successfully requested + // avaproofs from as this is used to determine when we are confident our + // quorum is close enough to the other participants. + g_avalanche->avaproofsSent(pfrom.GetId()); + return; } diff --git a/test/functional/abc_p2p_avalanche_quorum.py b/test/functional/abc_p2p_avalanche_quorum.py --- a/test/functional/abc_p2p_avalanche_quorum.py +++ b/test/functional/abc_p2p_avalanche_quorum.py @@ -7,11 +7,18 @@ from time import time from test_framework.avatools import ( + AvaP2PInterface, create_coinbase_stakes, get_ava_p2p_interface, ) from test_framework.key import ECKey, ECPubKey -from test_framework.messages import AvalancheVote, AvalancheVoteError +from test_framework.messages import ( + NODE_AVALANCHE, + NODE_NETWORK, + AvalancheVote, + AvalancheVoteError, + msg_avaproofs, +) from test_framework.test_framework import BitcoinTestFramework from test_framework.util import assert_equal from test_framework.wallet_util import bytes_to_wif @@ -21,12 +28,16 @@ def set_test_params(self): self.setup_clean_chain = True self.num_nodes = 1 + self.min_avaproofs_node_count = 8 self.extra_args = [ - ['-enableavalanche=1', - '-avacooldown=0', - '-avatimeout=0', - '-avaminquorumstake=100000000', - '-avaminquorumconnectedstakeratio=0.8'] + [ + '-enableavalanche=1', + '-avacooldown=0', + '-avatimeout=0', + '-avaminquorumstake=100000000', + '-avaminquorumconnectedstakeratio=0.8', + f'-avaminavaproofsnodecount={self.min_avaproofs_node_count}', + ] ] def mock_forward(self, delta): @@ -60,13 +71,12 @@ assert_equal(actual, expected) # Create peers to poll - num_quorum_peers = 2 coinbase_key = node.get_deterministic_priv_key().key - blocks = node.generate(num_quorum_peers) + blocks = node.generate(self.min_avaproofs_node_count) peers = [] - for i in range(0, num_quorum_peers): - keyHex = "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f75" + \ - str(i) + + for i in range(0, self.min_avaproofs_node_count): + keyHex = f"12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f7{i:02}" k = ECKey() k.set(bytes.fromhex(keyHex), True) stakes = create_coinbase_stakes( @@ -80,18 +90,51 @@ assert node.addavalanchenode( peer['node'].nodeid, pubkey, peer['proof']) is True + p2p_idx = 0 + + def get_ava_outbound(n, peer): + nonlocal p2p_idx + + avapeer = AvaP2PInterface() + n.add_outbound_p2p_connection( + avapeer, + p2p_idx=p2p_idx, + connection_type="avalanche", + services=NODE_NETWORK | NODE_AVALANCHE, + ) + p2p_idx += 1 + avapeer.nodeid = node.getpeerinfo()[-1]['id'] + + peer['node'] = avapeer + addavalanchenode(peer) + + avapeer.wait_until( + lambda: avapeer.last_message.get("getavaproofs")) + avapeer.send_and_ping(msg_avaproofs()) + avapeer.wait_until( + lambda: avapeer.last_message.get("avaproofsreq")) + + return avapeer + # Start polling. The response should be UNKNOWN because there's no # score poll_and_assert_response(AvalancheVoteError.UNKNOWN) # Create one peer with half the score and add one node - peers[0]['node'] = get_ava_p2p_interface(node) - addavalanchenode(peers[0]) + get_ava_outbound(node, peers[0]) poll_and_assert_response(AvalancheVoteError.UNKNOWN) - # Create a second peer with the other half and add one node - peers[1]['node'] = get_ava_p2p_interface(node) - addavalanchenode(peers[1]) + # Create a second peer with the other half and add one node. + # This is not enough because we are lacking avaproofs messages + get_ava_outbound(node, peers[1]) + poll_and_assert_response(AvalancheVoteError.UNKNOWN) + + # Add more peers for triggering the avaproofs messaging + for i in range(2, self.min_avaproofs_node_count - 1): + get_ava_outbound(node, peers[i]) + poll_and_assert_response(AvalancheVoteError.UNKNOWN) + + get_ava_outbound(node, peers[self.min_avaproofs_node_count - 1]) poll_and_assert_response(AvalancheVoteError.ACCEPTED) # Disconnect peer 1's node which drops us below the threshold, but we've @@ -102,8 +145,7 @@ poll_and_assert_response(AvalancheVoteError.ACCEPTED) # Reconnect node and re-establish quorum - peers[1]['node'] = get_ava_p2p_interface(node) - addavalanchenode(peers[1]) + get_ava_outbound(node, peers[1]) poll_and_assert_response(AvalancheVoteError.ACCEPTED)