diff --git a/src/avalanche.h b/src/avalanche.h --- a/src/avalanche.h +++ b/src/avalanche.h @@ -139,6 +139,26 @@ } }; +class AvalanchePoll { + uint32_t round; + std::vector invs; + +public: + AvalanchePoll(uint32_t roundIn, std::vector invsIn) + : round(roundIn), invs(invsIn) {} + + const std::vector &GetInvs() const { return invs; } + + // serialization support + ADD_SERIALIZE_METHODS; + + template + inline void SerializationOp(Stream &s, Operation ser_action) { + READWRITE(round); + READWRITE(invs); + } +}; + class AvalancheBlockUpdate { union { CBlockIndex *pindex; @@ -174,11 +194,34 @@ class AvalancheProcessor { private: + CConnman *connman; + /** * Blocks to run avalanche on. */ RWCollection vote_records; + /** + * Keep track of peers and queries sent. + */ + struct RequestRecord { + private: + int64_t timestamp; + std::vector invs; + + public: + RequestRecord() : timestamp(0), invs() {} + RequestRecord(int64_t timestampIn, std::vector invIn) + : timestamp(timestampIn), invs(std::move(invIn)) {} + + int64_t GetTimestamp() const { return timestamp; } + const std::vector &GetInvs() const { return invs; } + }; + + std::atomic round; + RWCollection> nodeids; + RWCollection> queries; + /** * Start stop machinery. */ @@ -189,20 +232,23 @@ std::condition_variable cond_running; public: - AvalancheProcessor() : stopRequest(false), running(false) {} + AvalancheProcessor(CConnman *connmanIn) + : connman(connmanIn), stopRequest(false), running(false) {} ~AvalancheProcessor() { stopEventLoop(); } bool addBlockToReconcile(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; - bool registerVotes(const AvalancheResponse &response, + bool registerVotes(NodeId nodeid, const AvalancheResponse &response, std::vector &updates); bool startEventLoop(CScheduler &scheduler); bool stopEventLoop(); private: + void runEventLoop(); std::vector getInvsForNextPoll() const; + NodeId getSuitableNodeToQuery(); friend struct AvalancheTest; }; diff --git a/src/avalanche.cpp b/src/avalanche.cpp --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -38,9 +38,40 @@ } bool AvalancheProcessor::registerVotes( - const AvalancheResponse &response, + NodeId nodeid, const AvalancheResponse &response, std::vector &updates) { + RequestRecord r; + + { + // Check that the query exists. + auto w = queries.getWriteView(); + auto it = w->find(nodeid); + if (it == w.end()) { + // NB: The request may be old, so we don't increase banscore. + return false; + } + + r = std::move(it->second); + w->erase(it); + } + + // Verify that the request and the vote are consistent. + const std::vector &invs = r.GetInvs(); const std::vector &votes = response.GetVotes(); + size_t size = invs.size(); + if (votes.size() != size) { + // TODO: increase banscore for inconsistent response. + // NB: This isn't timeout but actually node misbehaving. + return false; + } + + for (size_t i = 0; i < size; i++) { + if (invs[i].hash != votes[i].GetHash()) { + // TODO: increase banscore for inconsistent response. + // NB: This isn't timeout but actually node misbehaving. + return false; + } + } std::map responseIndex; @@ -96,6 +127,9 @@ } } + // Put the node back in the list of queriable nodes. + auto w = nodeids.getWriteView(); + w->insert(nodeid); return true; } @@ -122,6 +156,7 @@ // Start the event loop. scheduler.scheduleEvery( [this]() -> bool { + runEventLoop(); if (!stopRequest) { return true; } @@ -178,3 +213,67 @@ return invs; } + +NodeId AvalancheProcessor::getSuitableNodeToQuery() { + auto w = nodeids.getWriteView(); + if (w->empty()) { + auto r = queries.getReadView(); + + // We don't have any candidate node, so let's try to find some. + connman->ForEachNode([&w, &r](CNode *pnode) { + // If this node doesn't support avalanche, we remove. + if (!(pnode->nServices & NODE_AVALANCHE)) { + return; + } + + // if we have a request in flight for that node. + if (r->find(pnode->GetId()) != r.end()) { + return; + } + + w->insert(pnode->GetId()); + }); + } + + // We don't have any suitable candidate. + if (w->empty()) { + return -1; + } + + auto it = w.begin(); + NodeId nodeid = *it; + w->erase(it); + + return nodeid; +} + +void AvalancheProcessor::runEventLoop() { + std::vector invs = getInvsForNextPoll(); + if (invs.empty()) { + // If there are no invs to poll, we are done. + return; + } + + NodeId nodeid = getSuitableNodeToQuery(); + + /** + * If we lost contact to that node, then we remove it from nodeids, but + * never add the request to queries, which ensures bad nodes get cleaned up + * over time. + */ + connman->ForNode(nodeid, [this, &invs](CNode *pnode) { + { + // Register the query. + queries.getWriteView()->emplace( + pnode->GetId(), RequestRecord(GetAdjustedTime(), invs)); + } + + // Send the query to the node. + connman->PushMessage( + pnode, + CNetMsgMaker(pnode->GetSendVersion()) + .Make(NetMsgType::AVAPOLL, + AvalanchePoll(round++, std::move(invs)))); + return true; + }); +} diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -444,6 +444,7 @@ friend struct CConnmanTest; }; + extern std::unique_ptr g_connman; void Discover(); void StartMapPort(); diff --git a/src/protocol.h b/src/protocol.h --- a/src/protocol.h +++ b/src/protocol.h @@ -252,6 +252,16 @@ * @since protocol version 70014 as described by BIP 152 */ extern const char *BLOCKTXN; +/** + * Contains an AvalanchePoll. + * Peer should respond with "avaresponse" message. + */ +extern const char *AVAPOLL; +/** + * Contains an AvalancheResponse. + * Sent in response to a "avapoll" message. + */ +extern const char *AVARESPONSE; /** * Indicate if the message is used to transmit the content of a block. @@ -302,6 +312,10 @@ // collisions and other cases where nodes may be advertising a service they // do not actually support. Other service bits should be allocated via the // BIP process. + + // NODE_AVALANCHE means the node supports Bitcoin Cash's avalanche + // preconsensus mechanism. + NODE_AVALANCHE = (1 << 24), }; /** diff --git a/src/protocol.cpp b/src/protocol.cpp --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -41,6 +41,8 @@ const char *CMPCTBLOCK = "cmpctblock"; const char *GETBLOCKTXN = "getblocktxn"; const char *BLOCKTXN = "blocktxn"; +const char *AVAPOLL = "avapoll"; +const char *AVARESPONSE = "avaresponse"; bool IsBlockLike(const std::string &strCommand) { return strCommand == NetMsgType::BLOCK || diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -3,15 +3,23 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "avalanche.h" +#include "config.h" +#include "net_processing.h" // For PeerLogicValidation #include "test/test_bitcoin.h" #include struct AvalancheTest { + static void runEventLoop(AvalancheProcessor &p) { p.runEventLoop(); } + static std::vector getInvsForNextPoll(const AvalancheProcessor &p) { return p.getInvsForNextPoll(); } + + static NodeId getSuitableNodeToQuery(AvalancheProcessor &p) { + return p.getSuitableNodeToQuery(); + } }; BOOST_FIXTURE_TEST_SUITE(avalanche_tests, TestChain100Setup) @@ -90,14 +98,46 @@ } } +CService ip(uint32_t i) { + struct in_addr s; + s.s_addr = i; + return CService(CNetAddr(s), Params().GetDefaultPort()); +} + +std::unique_ptr ConnectNode(const Config &config, ServiceFlags nServices, + PeerLogicValidation &peerLogic) { + static NodeId id = 0; + + CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); + std::unique_ptr nodeptr(new CNode(id++, ServiceFlags(NODE_NETWORK), + 0, INVALID_SOCKET, addr, 0, 0, + CAddress(), "", + /*fInboundIn=*/false)); + CNode &node = *nodeptr; + node.SetSendVersion(PROTOCOL_VERSION); + node.nServices = nServices; + peerLogic.InitializeNode(config, &node); + node.nVersion = 1; + node.fSuccessfullyConnected = true; + + CConnmanTest::AddNode(node); + return nodeptr; +} + BOOST_AUTO_TEST_CASE(block_register) { - AvalancheProcessor p; + AvalancheProcessor p(g_connman.get()); std::vector updates; CBlock block = CreateAndProcessBlock({}, CScript()); const uint256 blockHash = block.GetHash(); const CBlockIndex *pindex = mapBlockIndex[blockHash]; + const Config &config = GetConfig(); + + // Create a node that supports avalanche. + auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + NodeId nodeid = avanode->GetId(); + // Querying for random block returns false. BOOST_CHECK(!p.isAccepted(pindex)); @@ -114,13 +154,15 @@ // Let's vote for this block a few times. AvalancheResponse resp{0, {AvalancheVote(0, blockHash)}}; for (int i = 0; i < 5; i++) { - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip. - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); @@ -130,7 +172,8 @@ // Now it is accepted, but we can vote for it numerous times. for (int i = 1; i < AVALANCHE_FINALIZATION_SCORE; i++) { - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); BOOST_CHECK(p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } @@ -142,7 +185,8 @@ BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -163,14 +207,16 @@ // Only 3 here as we don't need to flip state. resp = {0, {AvalancheVote(1, blockHash)}}; for (int i = 0; i < 3; i++) { - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now it is rejected, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 0); } @@ -182,7 +228,8 @@ BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, updates)); BOOST_CHECK(!p.isAccepted(pindex)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindex); @@ -198,14 +245,22 @@ BOOST_CHECK(p.addBlockToReconcile(pindex)); BOOST_CHECK(!p.addBlockToReconcile(pindex)); BOOST_CHECK(!p.isAccepted(pindex)); + + CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(multi_block_register) { - AvalancheProcessor p; + AvalancheProcessor p(g_connman.get()); CBlockIndex indexA, indexB; std::vector updates; + const Config &config = GetConfig(); + + // Create a node that supports avalanche. + auto node0 = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + auto node1 = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + // Make sure the block has a hash. CBlock blockA = CreateAndProcessBlock({}, CScript()); const uint256 blockHashA = blockA.GetHash(); @@ -226,12 +281,14 @@ BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); BOOST_CHECK(invs[0].hash == blockHashA); - AvalancheResponse resp{ - 0, {AvalancheVote(0, blockHashA), AvalancheVote(0, blockHashB)}}; - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(node0->GetId(), + {0, {AvalancheVote(0, blockHashA)}}, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); // Start voting on block B after one vote. + AvalancheResponse resp{ + 0, {AvalancheVote(0, blockHashB), AvalancheVote(0, blockHashA)}}; BOOST_CHECK(p.addBlockToReconcile(pindexB)); invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 2); @@ -242,14 +299,16 @@ BOOST_CHECK_EQUAL(invs[1].type, MSG_BLOCK); BOOST_CHECK(invs[1].hash == blockHashA); - // Let's vote for this block a few times. + // Let's vote for these blocks a few times. for (int i = 0; i < 4; i++) { - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } // Now the state will flip for A. - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexA); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -257,7 +316,8 @@ updates = {}; // And then for B. - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexB); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -266,12 +326,18 @@ // Now it is rejected, but we can vote for it numerous times. for (int i = 2; i < AVALANCHE_FINALIZATION_SCORE; i++) { - p.registerVotes(resp, updates); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 0); } + // Running two iterration of the event loop so that vote gets triggerd on A + // and B. + AvalancheTest::runEventLoop(p); + AvalancheTest::runEventLoop(p); + // Next vote will finalize block A. - p.registerVotes(resp, updates); + BOOST_CHECK(p.registerVotes(node1->GetId(), resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexA); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -285,7 +351,7 @@ BOOST_CHECK(invs[0].hash == blockHashB); // Next vote will finalize block B. - p.registerVotes(resp, updates); + BOOST_CHECK(p.registerVotes(node0->GetId(), resp, updates)); BOOST_CHECK_EQUAL(updates.size(), 1); BOOST_CHECK(updates[0].getBlockIndex() == pindexB); BOOST_CHECK_EQUAL(updates[0].getStatus(), @@ -295,12 +361,118 @@ // There is nothing left to vote on. invs = AvalancheTest::getInvsForNextPoll(p); BOOST_CHECK_EQUAL(invs.size(), 0); + + CConnmanTest::ClearNodes(); +} + +BOOST_AUTO_TEST_CASE(poll_and_response) { + AvalancheProcessor p(g_connman.get()); + + std::vector updates; + + CBlock block = CreateAndProcessBlock({}, CScript()); + const uint256 blockHash = block.GetHash(); + const CBlockIndex *pindex = mapBlockIndex[blockHash]; + + const Config &config = GetConfig(); + + // There is no node to query. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // Create a node that supports avalanche and one that doesn't. + auto oldnode = ConnectNode(config, NODE_NONE, *peerLogic); + auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + NodeId avanodeid = avanode->GetId(); + + // It returns the avalanche peer. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Register a block and check it is added to the list of elements to poll. + BOOST_CHECK(p.addBlockToReconcile(pindex)); + auto invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 1); + BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); + BOOST_CHECK(invs[0].hash == blockHash); + + // Trigger a poll on avanode. + AvalancheTest::runEventLoop(p); + + // There is no more suitable peer available, so return nothing. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // Respond to the request. + AvalancheResponse resp = {0, {AvalancheVote(0, blockHash)}}; + BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + + // Now that avanode fullfilled his request, it is added back to the list of + // queriable nodes. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Sending a response when not polled fails. + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + + // Trigger a poll on avanode. + AvalancheTest::runEventLoop(p); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // Sending responses that do not match the request also fails. + // 1. Too many results. + resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash)}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // 2. Not enough results. + resp = {0, {}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // 3. Do not match the poll. + resp = {0, {AvalancheVote()}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Proper response gets processed and avanode is available again. + resp = {0, {AvalancheVote(0, blockHash)}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Making request for invalid nodes do not work. + BOOST_CHECK(!p.registerVotes(avanode->GetId() + 1234, resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + + // Out of order response are rejected. + CBlock block2 = CreateAndProcessBlock({}, CScript()); + const uint256 blockHash2 = block2.GetHash(); + const CBlockIndex *pindex2 = mapBlockIndex[blockHash2]; + BOOST_CHECK(p.addBlockToReconcile(pindex2)); + + resp = {0, {AvalancheVote(0, blockHash), AvalancheVote(0, blockHash2)}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, updates)); + BOOST_CHECK_EQUAL(updates.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(event_loop) { - AvalancheProcessor p; + AvalancheProcessor p(g_connman.get()); CScheduler s; + CBlock block = CreateAndProcessBlock({}, CScript()); + const uint256 blockHash = block.GetHash(); + const CBlockIndex *pindex = mapBlockIndex[blockHash]; + // Starting the event loop. BOOST_CHECK(p.startEventLoop(s)); @@ -314,6 +486,32 @@ // Start the scheduler thread. std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); + // Create a node and a block to query. + const Config &config = GetConfig(); + + // Create a node that supports avalanche. + auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + NodeId nodeid = avanode->GetId(); + + // There is no query in flight at the moment. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), nodeid); + + // Add a new block. Check it is added to the polls. + BOOST_CHECK(p.addBlockToReconcile(pindex)); + + bool hasQueried = false; + for (int i = 0; i < 1000; i++) { + // Technically, this is a race condition, but this should do just fine + // as we wait up to 1s for an event that should take 10ms. + boost::this_thread::sleep_for(boost::chrono::milliseconds(1)); + if (AvalancheTest::getSuitableNodeToQuery(p) == -1) { + hasQueried = true; + break; + } + } + + BOOST_CHECK(hasQueried); + // Stop event loop. BOOST_CHECK(p.stopEventLoop()); @@ -326,6 +524,8 @@ // Wait for the scheduler to stop. s.stop(true); schedulerThread.join(); + + CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(destructor) { @@ -336,7 +536,7 @@ std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); { - AvalancheProcessor p; + AvalancheProcessor p(g_connman.get()); BOOST_CHECK(p.startEventLoop(s)); BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); }