diff --git a/src/avalanche/processor.cpp b/src/avalanche/processor.cpp --- a/src/avalanche/processor.cpp +++ b/src/avalanche/processor.cpp @@ -673,6 +673,25 @@ * us to take decisions based on polls. */ bool Processor::isQuorumEstablished() { + { + LOCK(cs_peerManager); + if (peerManager->getNodeCount() < 8) { + // There is no point polling if we know the vote cannot converge + return false; + } + } + + /* + * The following parameters can naturally go temporarly below the threshold + * under normal circumstances, like during a proof replacement with a lower + * stake amount, or the discovery of a new proofs for which we don't have a + * node yet. + * In order to prevent our node from starting and stopping the polls + * spuriously on such event, the quorum establishement is latched. The only + * parameters that should not latched is the minimum node count, as this + * would cause the poll to be inconclusive anyway and should not happen + * under normal circumstances. + */ if (quorumIsEstablished) { return true; } diff --git a/src/avalanche/test/processor_tests.cpp b/src/avalanche/test/processor_tests.cpp --- a/src/avalanche/test/processor_tests.cpp +++ b/src/avalanche/test/processor_tests.cpp @@ -641,20 +641,29 @@ auto &updates = provider.updates; const uint32_t invType = provider.invType; - const auto item = provider.buildVoteItem(); - const auto itemid = provider.getVoteItemId(item); + auto item = provider.buildVoteItem(); + auto itemid = provider.getVoteItemId(item); // There is no node to query. BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); - // Create a node that supports avalanche and one that doesn't. - ConnectNode(NODE_NONE); - auto avanode = ConnectNode(NODE_AVALANCHE); - NodeId avanodeid = avanode->GetId(); - BOOST_CHECK(addNode(avanodeid)); + // Add enough nodes to have a valid quorum, and the same amount with no + // avalanche support + std::set avanodeIds; + auto avanodes = ConnectNodes(); + for (auto avanode : avanodes) { + ConnectNode(NODE_NONE); + avanodeIds.insert(avanode->GetId()); + } + + auto getSelectedAvanodeId = [&]() { + NodeId avanodeid = getSuitableNodeToQuery(); + BOOST_CHECK(avanodeIds.find(avanodeid) != avanodeIds.end()); + return avanodeid; + }; - // It returns the avalanche peer. - BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); + // It returns one of the avalanche peer. + NodeId avanodeid = getSelectedAvanodeId(); // Register an item and check it is added to the list of elements to poll. BOOST_CHECK(provider.addToReconcile(item)); @@ -663,11 +672,24 @@ BOOST_CHECK_EQUAL(invs[0].type, invType); BOOST_CHECK(invs[0].hash == itemid); - // Trigger a poll on avanode. + std::set unselectedNodeids = avanodeIds; + unselectedNodeids.erase(avanodeid); + const size_t remainingNodeIds = unselectedNodeids.size(); + uint64_t round = getRound(); - runEventLoop(); + for (size_t i = 0; i < remainingNodeIds; i++) { + // Trigger a poll on avanode. + runEventLoop(); + + // Another node is selected + NodeId nodeid = getSuitableNodeToQuery(); + BOOST_CHECK(unselectedNodeids.find(nodeid) != avanodeIds.end()); + unselectedNodeids.erase(nodeid); + } // There is no more suitable peer available, so return nothing. + BOOST_CHECK(unselectedNodeids.empty()); + runEventLoop(); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), NO_NODE); // Respond to the request. @@ -715,6 +737,20 @@ checkRegisterVotesError(avanodeid, resp, "invalid-ava-response-content"); BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), avanodeid); + // At this stage we have reached the max inflight requests for our inv, so + // it won't be requested anymore until the requests are fullfilled. Let's + // vote on another item with no inflight request so the remaining tests + // makes sense. + invs = getInvsForNextPoll(); + BOOST_CHECK(invs.empty()); + + item = provider.buildVoteItem(); + itemid = provider.getVoteItemId(item); + BOOST_CHECK(provider.addToReconcile(item)); + + invs = getInvsForNextPoll(); + BOOST_CHECK_EQUAL(invs.size(), 1); + // 4. Invalid round count. Request is not discarded. uint64_t queryRound = getRound(); runEventLoop(); @@ -804,16 +840,16 @@ // Add the item BOOST_CHECK(provider.addToReconcile(item)); - // Create a node that supports avalanche. - auto avanode = ConnectNode(NODE_AVALANCHE); - NodeId avanodeid = avanode->GetId(); - BOOST_CHECK(addNode(avanodeid)); + // Create a quorum of nodes that support avalanche. + ConnectNodes(); + NodeId avanodeid = NO_NODE; // Expire requests after some time. auto queryTimeDuration = std::chrono::milliseconds(10); m_processor->setQueryTimeoutDuration(queryTimeDuration); for (int i = 0; i < 10; i++) { Response resp = {getRound(), 0, {Vote(0, itemid)}}; + avanodeid = getSuitableNodeToQuery(); auto start = std::chrono::steady_clock::now(); runEventLoop(); @@ -834,6 +870,8 @@ // We are within time bounds, so the vote should have worked. BOOST_CHECK(ret); + avanodeid = getSuitableNodeToQuery(); + // Now try again but wait for expiration. runEventLoop(); std::this_thread::sleep_for(queryTimeDuration); @@ -984,23 +1022,23 @@ // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); - // Create a node that supports avalanche. - auto avanode = ConnectNode(NODE_AVALANCHE); - NodeId nodeid = avanode->GetId(); - BOOST_CHECK(addNode(nodeid)); + // Create a quorum of nodes that support avalanche. + auto avanodes = ConnectNodes(); // There is no query in flight at the moment. - BOOST_CHECK_EQUAL(getSuitableNodeToQuery(), nodeid); + NodeId nodeid = getSuitableNodeToQuery(); + BOOST_CHECK_NE(nodeid, NO_NODE); // Add a new block. Check it is added to the polls. uint64_t queryRound = getRound(); BOOST_CHECK(m_processor->addBlockToReconcile(pindex)); + // Wait until all nodes got a poll for (int i = 0; i < 60 * 1000; i++) { // Technically, this is a race condition, but this should do just fine - // as we wait up to 1 minute for an event that should take 10ms. + // as we wait up to 1 minute for an event that should take 80ms. UninterruptibleSleep(std::chrono::milliseconds(1)); - if (getRound() != queryRound) { + if (getRound() == queryRound + avanodes.size()) { break; } } @@ -1014,7 +1052,9 @@ std::chrono::steady_clock::now() + std::chrono::milliseconds(100); std::vector updates; + // Only the first node answers, so it's the only one that gets polled again registerVotes(nodeid, {queryRound, 100, {Vote(0, blockHash)}}, updates); + for (int i = 0; i < 10000; i++) { // We make sure that we do not get a request before queryTime. UninterruptibleSleep(std::chrono::milliseconds(1)); @@ -1275,12 +1315,21 @@ }); BOOST_CHECK(!processor->isQuorumEstablished()); + // Add enough nodes to get a conclusive vote + for (NodeId id = 0; id < 8; id++) { + processor->withPeerManager([&](avalanche::PeerManager &pm) { + pm.addNode(id, processor->getLocalProof()->getId()); + BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); + }); + } + // Add part of the required stake and make sure we still report no quorum auto proof1 = buildRandomProof(active_chainstate, minScore / 2); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof1)); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 3 * minScore / 4); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(!processor->isQuorumEstablished()); @@ -1289,30 +1338,42 @@ processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK(pm.registerProof(proof2)); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(!processor->isQuorumEstablished()); // Adding a node should cause the quorum to be detected and locked-in processor->withPeerManager([&](avalanche::PeerManager &pm) { - pm.addNode(0, proof2->getId()); + pm.addNode(8, proof2->getId()); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); // The peer manager knows that proof2 has a node attached ... - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 2); }); // ... but the processor also account for the local proof, so we reached 50% BOOST_CHECK(processor->isQuorumEstablished()); - // Go back to not having enough connected nodes, but we've already latched + // Go back to not having enough connected score, but we've already latched // the quorum as established processor->withPeerManager([&](avalanche::PeerManager &pm) { - pm.removeNode(0); + pm.removeNode(8); BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); + }); + BOOST_CHECK(processor->isQuorumEstablished()); + + // Removing one more node drops our count below the minimum and the quorum + // is no longer ready + processor->withPeerManager( + [&](avalanche::PeerManager &pm) { pm.removeNode(7); }); + BOOST_CHECK(!processor->isQuorumEstablished()); + + // It resumes when we have enough nodes again + processor->withPeerManager([&](avalanche::PeerManager &pm) { + pm.addNode(7, processor->getLocalProof()->getId()); }); BOOST_CHECK(processor->isQuorumEstablished()); - // Remove peers one at a time and ensure the quorum stays established + // Remove peers one at a time until the quorum is no longer established auto spendProofUtxo = [&processor, &chainman](ProofRef proof) { { LOCK(cs_main); @@ -1328,14 +1389,14 @@ spendProofUtxo(proof2); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 3 * minScore / 4); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(processor->isQuorumEstablished()); spendProofUtxo(proof1); processor->withPeerManager([&](avalanche::PeerManager &pm) { BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), minScore / 4); - BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); + BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), minScore / 4); }); BOOST_CHECK(processor->isQuorumEstablished()); @@ -1344,7 +1405,8 @@ BOOST_CHECK_EQUAL(pm.getTotalPeersScore(), 0); BOOST_CHECK_EQUAL(pm.getConnectedPeersScore(), 0); }); - BOOST_CHECK(processor->isQuorumEstablished()); + // There is no node left + BOOST_CHECK(!processor->isQuorumEstablished()); gArgs.ClearForcedArg("-avamasterkey"); gArgs.ClearForcedArg("-avaproof"); @@ -1429,9 +1491,6 @@ argsman, *m_node.chain, m_node.connman.get(), chainman, *m_node.scheduler, error); - BOOST_CHECK_EQUAL(processor->isQuorumEstablished(), - minAvaproofsMessages <= 0); - auto addNode = [&](NodeId nodeid) { auto proof = buildRandomProof(chainman.ActiveChainstate(), MIN_VALID_PROOF_SCORE); @@ -1441,6 +1500,17 @@ }); }; + // Add enough node to have a conclusive vote, but don't account any + // avaproofs. + // NOTE: we can't use the test facilites like ConnectNodes() because we + // are not testing on m_processor. + for (NodeId id = 100; id < 108; id++) { + addNode(id); + } + + BOOST_CHECK_EQUAL(processor->isQuorumEstablished(), + minAvaproofsMessages <= 0); + for (int64_t i = 0; i < minAvaproofsMessages - 1; i++) { addNode(i); diff --git a/test/functional/abc_p2p_avalanche_proof_voting.py b/test/functional/abc_p2p_avalanche_proof_voting.py --- a/test/functional/abc_p2p_avalanche_proof_voting.py +++ b/test/functional/abc_p2p_avalanche_proof_voting.py @@ -323,6 +323,8 @@ '-avalancheconflictingproofcooldown=0', '-whitelist=noban@127.0.0.1', ]) + self.get_quorum(node) + ava_node = get_ava_p2p_interface(node) # Generate coinbases to use for stakes diff --git a/test/functional/abc_p2p_avalanche_quorum.py b/test/functional/abc_p2p_avalanche_quorum.py --- a/test/functional/abc_p2p_avalanche_quorum.py +++ b/test/functional/abc_p2p_avalanche_quorum.py @@ -9,6 +9,7 @@ build_msg_avaproofs, gen_proof, get_ava_p2p_interface, + wait_for_proof, ) from test_framework.key import ECPubKey from test_framework.messages import ( @@ -33,7 +34,7 @@ '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', '-avatimeout=0', - '-avaminquorumstake=100000000', + '-avaminquorumstake=150000000', '-avaminquorumconnectedstakeratio=0.8', '-minimumchainwork=0', ]] * self.num_nodes @@ -45,6 +46,20 @@ [f'-avaminavaproofsnodecount={self.min_avaproofs_node_count}'] def run_test(self): + # Initially all nodes start with 8 nodes attached to a single proof + privkey, proof = gen_proof(self.nodes[0]) + for node in self.nodes: + quorum = [get_ava_p2p_interface(node) + for _ in range(0, 8)] + + for n in quorum: + success = node.addavalanchenode( + n.nodeid, + privkey.get_pubkey().get_bytes().hex(), + proof.serialize().hex(), + ) + assert success is True + # Prepare peers proofs peers = [] for i in range(0, self.min_avaproofs_node_count + 1): @@ -132,12 +147,13 @@ get_ava_outbound(node, peer, empty_avaproof) poll_and_assert_response(node, expected_status[i]) - # Start polling. The response should be UNKNOWN because there's no - # score + # Start polling. The response should be UNKNOWN because there's not + # enough stake [poll_and_assert_response(node, AvalancheVoteError.UNKNOWN) for node in self.nodes] - # Create one peer with half the score and add one node + # Create one peer with half the remaining missing stake and add one + # node add_avapeer_and_check_status( peers[0], [ AvalancheVoteError.UNKNOWN, @@ -202,6 +218,20 @@ AvalancheVoteError.ACCEPTED, ]) + # Unless there is not enough nodes to poll + for node in self.nodes: + avapeers = [p2p for p2p in node.p2ps if p2p not in pollers] + for peer in avapeers[7:]: + peer.peer_disconnect() + peer.wait_for_disconnect() + poll_and_assert_response(node, AvalancheVoteError.UNKNOWN) + + # Add a node back and check it resumes the quorum status + avapeer = AvaP2PInterface(node) + node.add_p2p_connection(avapeer) + wait_for_proof(node, f"{avapeer.proof.proofid:0{64}x}") + poll_and_assert_response(node, AvalancheVoteError.ACCEPTED) + if __name__ == '__main__': AvalancheQuorumTest().main() diff --git a/test/functional/abc_p2p_avalanche_voting.py b/test/functional/abc_p2p_avalanche_voting.py --- a/test/functional/abc_p2p_avalanche_voting.py +++ b/test/functional/abc_p2p_avalanche_voting.py @@ -50,6 +50,24 @@ # Use the first coinbase to create a stake stakes = create_coinbase_stakes(node, [blockhashes[0]], addrkey0.key) + # duplicate the deterministic sig test from src/test/key_tests.cpp + privkey = ECKey() + privkey.set(bytes.fromhex( + "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747"), True) + + proof_sequence = 11 + proof_expiration = 12 + proof = node.buildavalancheproof( + proof_sequence, proof_expiration, bytes_to_wif( + privkey.get_bytes()), + stakes) + + # Activate the quorum. + for n in quorum: + success = node.addavalanchenode( + n.nodeid, privkey.get_pubkey().get_bytes().hex(), proof) + assert success is True + fork_node = self.nodes[1] # Make sure the fork node has synced the blocks self.sync_blocks([node, fork_node]) @@ -126,23 +144,6 @@ [AvalancheVote(AvalancheVoteError.UNKNOWN, h) for h in various_block_hashes[-3:]]) self.log.info("Trigger polling from the node...") - # duplicate the deterministic sig test from src/test/key_tests.cpp - privkey = ECKey() - privkey.set(bytes.fromhex( - "12b004fff7f4b69ef8650e767f18f11ede158148b425660723b9f9a66e61f747"), True) - - proof_sequence = 11 - proof_expiration = 12 - proof = node.buildavalancheproof( - proof_sequence, proof_expiration, bytes_to_wif( - privkey.get_bytes()), - stakes) - - # Activate the quorum. - for n in quorum: - success = node.addavalanchenode( - n.nodeid, privkey.get_pubkey().get_bytes().hex(), proof) - assert success is True def can_find_block_in_poll(hash, resp=AvalancheVoteError.ACCEPTED): found_hash = False diff --git a/test/functional/abc_p2p_compactproofs.py b/test/functional/abc_p2p_compactproofs.py --- a/test/functional/abc_p2p_compactproofs.py +++ b/test/functional/abc_p2p_compactproofs.py @@ -56,6 +56,7 @@ '-enableavalanche=1', '-avaproofstakeutxoconfirmations=1', '-avacooldown=0', + '-enableavalanchepeerdiscovery=1', ]] * self.num_nodes def setup_network(self): @@ -647,14 +648,10 @@ [inbound, outbound]) > current_total) current_total = count_getavaproofs([inbound, outbound]) - # Connect the minimum amount of stake - privkey, proof = gen_proof(node) - assert node.addavalanchenode( - inbound.nodeid, - privkey.get_pubkey().get_bytes().hex(), - proof.serialize().hex()) - - assert_equal(node.getavalancheinfo()['active'], True) + # Connect the minimum amount of stake and nodes + for _ in range(8): + node.add_p2p_connection(AvaP2PInterface(node)) + self.wait_until(lambda: node.getavalancheinfo()['active'] is True) # From now only the outbound is requested count_inbound = count_getavaproofs([inbound]) diff --git a/test/functional/abc_p2p_getavaaddr.py b/test/functional/abc_p2p_getavaaddr.py --- a/test/functional/abc_p2p_getavaaddr.py +++ b/test/functional/abc_p2p_getavaaddr.py @@ -403,14 +403,10 @@ [inbound, outbound]) > current_total) current_total = count_getavaaddr([inbound, outbound]) - # Connect the minimum amount of stake - privkey, proof = gen_proof(node) - assert node.addavalanchenode( - inbound.nodeid, - privkey.get_pubkey().get_bytes().hex(), - proof.serialize().hex()) - - assert_equal(node.getavalancheinfo()['active'], True) + # Connect the minimum amount of stake and nodes + for _ in range(8): + node.add_p2p_connection(AvaP2PInterface(node)) + self.wait_until(lambda: node.getavalancheinfo()['active'] is True) # From now only the outbound is requested count_inbound = count_getavaaddr([inbound]) diff --git a/test/functional/abc_rpc_getavalancheinfo.py b/test/functional/abc_rpc_getavalancheinfo.py --- a/test/functional/abc_rpc_getavalancheinfo.py +++ b/test/functional/abc_rpc_getavalancheinfo.py @@ -351,12 +351,10 @@ self.log.info("Disconnect all the nodes") - for n in node.p2ps: - n.peer_disconnect() - n.wait_for_disconnect() + node.disconnect_p2ps() assert_avalancheinfo({ - "active": True, + "active": False, "local": { "live": True, "proofid": f"{proof.proofid:0{64}x}",