diff --git a/src/avalanche.h b/src/avalanche.h --- a/src/avalanche.h +++ b/src/avalanche.h @@ -149,11 +149,32 @@ class AvalancheProcessor { private: + CConnman *connman; + /** * Blocks to run avalanche on. */ RWCollection> vote_records; + /** + * Keep track of peers and quesries sent. + */ + struct RequestReccord { + private: + int64_t timestamp; + std::vector invs; + + public: + RequestReccord(int64_t timestampIn, std::vector invIn) + : timestamp(timestampIn), invs(std::move(invIn)) {} + + int64_t GetTimestamp() const { return timestamp; } + const std::vector &GetInvs() const { return invs; } + }; + + RWCollection> nodeids; + RWCollection> queries; + /** * Start stop machinery. */ @@ -164,13 +185,14 @@ std::condition_variable cond_running; public: - AvalancheProcessor() : stopRequest(false), running(false) {} + AvalancheProcessor(CConnman *connmanIn) + : connman(connmanIn), stopRequest(false), running(false) {} ~AvalancheProcessor() { stopEventLoop(); } bool addBlockToReconciliate(const CBlockIndex *pindex); bool isAccepted(const CBlockIndex *pindex) const; - bool registerVotes(const AvalancheResponse &response, + bool registerVotes(NodeId nodeid, const AvalancheResponse &response, std::vector &finalized); bool startEventLoop(CScheduler &scheduler); @@ -179,6 +201,7 @@ private: void runEventLoop(); std::vector getInvsForNextPoll() const; + NodeId getSuitableNodeToQuery(); friend struct AvalancheTest; }; diff --git a/src/avalanche.cpp b/src/avalanche.cpp --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -34,24 +34,62 @@ return false; } -bool AvalancheProcessor::registerVotes(const AvalancheResponse &response, +bool AvalancheProcessor::registerVotes(NodeId nodeid, + const AvalancheResponse &response, std::vector &finalized) { + RequestReccord r{0, {}}; + + { + // Check that the query exists. + auto w = queries.getWriteView(); + auto it = w->find(nodeid); + if (it == w.end()) { + // NB: The request may be old, so we don't increase banscore. + return false; + } + + r = std::move(it->second); + w->erase(it); + } + + // Verify that the request and the vote are consistent. + const std::vector &invs = r.GetInvs(); const std::vector &votes = response.GetVotes(); + size_t size = invs.size(); + if (votes.size() != size) { + // TODO: increase banscore for inconsistent response. + // NB: This isn't timeout but actually node misbehaving. + return false; + } - // Register votes. - auto w = vote_records.getWriteView(); - for (auto &v : votes) { - auto &r = w[v.GetHash()]; - r.registerVote(v.IsValid()); - if (r.hasFinalized()) { - finalized.push_back(v.GetHash()); + for (size_t i = 0; i < size; i++) { + if (invs[i].hash != votes[i].GetHash()) { + // TODO: increase banscore for inconsistent response. + // NB: This isn't timeout but actually node misbehaving. + return false; } } - for (auto &h : finalized) { - w->erase(h); + { + // Register votes. + auto w = vote_records.getWriteView(); + for (auto &v : votes) { + auto &vr = w[v.GetHash()]; + vr.registerVote(v.IsValid()); + if (vr.hasFinalized()) { + finalized.push_back(v.GetHash()); + } + } + + // Clear finalized results. + for (auto &h : finalized) { + w->erase(h); + } } + // Put the node back in the list of queriable nodes. + auto w = nodeids.getWriteView(); + w->insert(nodeid); return true; } @@ -78,6 +116,7 @@ // Start the event loop. scheduler.scheduleEvery( [this]() -> bool { + runEventLoop(); if (!stopRequest) { return true; } @@ -134,6 +173,65 @@ return invs; } +NodeId AvalancheProcessor::getSuitableNodeToQuery() { + auto w = nodeids.getWriteView(); + if (w->empty()) { + auto r = queries.getReadView(); + + // We don't have any candidate node, so let's try to find some. + connman->ForEachNode([&w, &r](CNode *pnode) { + // If this node doesn't support avalanche, we remove. + if (!(pnode->nServices & NODE_AVALANCHE)) { + return; + } + + // if we have a request in flight for that node. + if (r->find(pnode->GetId()) != r.end()) { + return; + } + + w->insert(pnode->GetId()); + }); + } + + // We don't have any suitable candidate. + if (w->empty()) { + return -1; + } + + auto it = w.begin(); + NodeId nodeid = *it; + w->erase(it); + + return nodeid; +} + void AvalancheProcessor::runEventLoop() { std::vector invs = getInvsForNextPoll(); + if (invs.empty()) { + // If there are no invs to poll, we are done. + return; + } + + NodeId nodeid = getSuitableNodeToQuery(); + + /** + * If we lost contact to that node, then we remive it from nodeids, but + * enevr add the request to queries, which ensures bad nodes get cleaned up + * over time. + */ + connman->ForNode(nodeid, [this, &invs](CNode *pfrom) { + { + // Register the query. + queries.getWriteView()->emplace( + pfrom->GetId(), RequestReccord(GetAdjustedTime(), invs)); + } + + // Send the query to the node. + connman->PushMessage( + pfrom, + CNetMsgMaker(pfrom->GetSendVersion()) + .Make(NetMsgType::AVA_POLL, AvalanchePoll(std::move(invs)))); + return true; + }); } diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -444,6 +444,7 @@ friend struct CConnmanTest; }; + extern std::unique_ptr g_connman; void Discover(); void StartMapPort(); diff --git a/src/protocol.h b/src/protocol.h --- a/src/protocol.h +++ b/src/protocol.h @@ -252,6 +252,16 @@ * @since protocol version 70014 as described by BIP 152 */ extern const char *BLOCKTXN; +/** + * Contains a AvalanchePoll. + * Peer should respond with "ava_vote" message. + */ +extern const char *AVA_POLL; +/** + * Contains a vector of AvalancheVote. + * Sent in response to a "ava_poll" message. + */ +extern const char *AVA_VOTE; /** * Indicate if the message is used to transmit the content of a block. @@ -294,6 +304,9 @@ // TODO: remove (free up) the NODE_BITCOIN_CASH service bit once no longer // needed. NODE_BITCOIN_CASH = (1 << 5), + // NODE_AVALANCHE means the node supports Bitcoin Cash's avalanche + // preconsensus mechanism. + NODE_AVALANCHE = (1 << 6), // Bits 24-31 are reserved for temporary experiments. Just pick a bit that // isn't getting used, or one not being used much, and notify the diff --git a/src/protocol.cpp b/src/protocol.cpp --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -41,6 +41,8 @@ const char *CMPCTBLOCK = "cmpctblock"; const char *GETBLOCKTXN = "getblocktxn"; const char *BLOCKTXN = "blocktxn"; +const char *AVA_POLL = "ava_poll"; +const char *AVA_VOTE = "ava_vote"; bool IsBlockLike(const std::string &strCommand) { return strCommand == NetMsgType::BLOCK || diff --git a/src/test/avalanche_tests.cpp b/src/test/avalanche_tests.cpp --- a/src/test/avalanche_tests.cpp +++ b/src/test/avalanche_tests.cpp @@ -3,6 +3,9 @@ // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "avalanche.h" + +#include "config.h" +#include "net_processing.h" #include "scheduler.h" #include "test/test_bitcoin.h" @@ -10,12 +13,18 @@ #include struct AvalancheTest { + static void runEventLoop(AvalancheProcessor &p) { p.runEventLoop(); } + static std::vector getInvsForNextPoll(const AvalancheProcessor &p) { return p.getInvsForNextPoll(); } + + static NodeId getSuitableNodeToQuery(AvalancheProcessor &p) { + return p.getSuitableNodeToQuery(); + } }; -BOOST_FIXTURE_TEST_SUITE(avalanche_tests, BasicTestingSetup) +BOOST_FIXTURE_TEST_SUITE(avalanche_tests, TestingSetup) #define REGISTER_VOTE_AND_CHECK(vr, vote, state, finalized, confidence) \ vr.registerVote(vote); \ @@ -73,11 +82,42 @@ AVALANCHE_FINALIZATION_SCORE); } +CService ip(uint32_t i) { + struct in_addr s; + s.s_addr = i; + return CService(CNetAddr(s), Params().GetDefaultPort()); +} + +std::unique_ptr ConnectNode(const Config &config, ServiceFlags nServices, + PeerLogicValidation &peerLogic) { + static NodeId id = 0; + + CAddress addr(ip(GetRandInt(0xffffffff)), NODE_NONE); + std::unique_ptr nodeptr(new CNode(id++, ServiceFlags(NODE_NETWORK), + 0, INVALID_SOCKET, addr, 0, 0, + CAddress(), "", + /*fInboundIn=*/false)); + CNode &node = *nodeptr; + node.SetSendVersion(PROTOCOL_VERSION); + node.nServices = nServices; + peerLogic.InitializeNode(config, &node); + node.nVersion = 1; + node.fSuccessfullyConnected = true; + + CConnmanTest::AddNode(node); + return nodeptr; +} + BOOST_AUTO_TEST_CASE(block_register) { - AvalancheProcessor p; + AvalancheProcessor p(g_connman.get()); CBlockIndex index; std::vector finalized; + const Config &config = GetConfig(); + + // Create a node that supports avalanche. + auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + NodeId nodeid = avanode->GetId(); // Make sure the block has a hash. static const uint256 blockHash(uint256S( @@ -100,14 +140,16 @@ // Let's vote for this block a few times. AvalancheResponse resp{{AvalancheVote(blockHash, 0)}}; for (int i = 0; i < 5; i++) { - p.registerVotes(resp, finalized); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, finalized)); BOOST_CHECK(!p.isAccepted(&index)); BOOST_CHECK_EQUAL(finalized.size(), 0); } // Now it is accepeted, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { - p.registerVotes(resp, finalized); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, finalized)); BOOST_CHECK(p.isAccepted(&index)); BOOST_CHECK_EQUAL(finalized.size(), 0); } @@ -119,7 +161,8 @@ BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. - p.registerVotes(resp, finalized); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, finalized)); BOOST_CHECK_EQUAL(finalized.size(), 1); BOOST_CHECK(finalized[0] == blockHash); finalized = {}; @@ -138,14 +181,16 @@ // Only 3 here as we don't need to flip state. resp = {{AvalancheVote(blockHash, 1)}}; for (int i = 0; i < 3; i++) { - p.registerVotes(resp, finalized); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, finalized)); BOOST_CHECK(!p.isAccepted(&index)); BOOST_CHECK_EQUAL(finalized.size(), 0); } // Now it is rejected, but we can vote for it numerous times. for (int i = 0; i < AVALANCHE_FINALIZATION_SCORE; i++) { - p.registerVotes(resp, finalized); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, finalized)); BOOST_CHECK(!p.isAccepted(&index)); BOOST_CHECK_EQUAL(finalized.size(), 0); } @@ -157,7 +202,8 @@ BOOST_CHECK(invs[0].hash == blockHash); // Now finalize the decision. - p.registerVotes(resp, finalized); + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(nodeid, resp, finalized)); BOOST_CHECK(!p.isAccepted(&index)); BOOST_CHECK_EQUAL(finalized.size(), 1); BOOST_CHECK(finalized[0] == blockHash); @@ -171,60 +217,149 @@ BOOST_CHECK(p.addBlockToReconciliate(&index)); BOOST_CHECK(!p.addBlockToReconciliate(&index)); BOOST_CHECK(!p.isAccepted(&index)); + + CConnmanTest::ClearNodes(); } BOOST_AUTO_TEST_CASE(event_loop) { - AvalancheProcessor p; - CScheduler scheduler; + AvalancheProcessor p(g_connman.get()); + CScheduler s; // Starting the event loop. - BOOST_CHECK(p.startEventLoop(scheduler)); + BOOST_CHECK(p.startEventLoop(s)); // There is one task planned in the next hour (our event loop). boost::chrono::system_clock::time_point start, stop; - BOOST_CHECK_EQUAL(scheduler.getQueueInfo(start, stop), 1); + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); // Starting twice doesn't start it twice. - BOOST_CHECK(!p.startEventLoop(scheduler)); + BOOST_CHECK(!p.startEventLoop(s)); // Start the scheduler thread. - std::thread schedulerThread( - std::bind(&CScheduler::serviceQueue, &scheduler)); + std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); // Stop event loop. BOOST_CHECK(p.stopEventLoop()); // We don't have any task scheduled anymore. - BOOST_CHECK_EQUAL(scheduler.getQueueInfo(start, stop), 0); + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Can't stop the event loop twice. BOOST_CHECK(!p.stopEventLoop()); // Wait for the scheduler to stop. - scheduler.stop(true); + s.stop(true); schedulerThread.join(); } BOOST_AUTO_TEST_CASE(destructor) { - CScheduler scheduler; + CScheduler s; boost::chrono::system_clock::time_point start, stop; // Start the scheduler thread. - std::thread schedulerThread( - std::bind(&CScheduler::serviceQueue, &scheduler)); + std::thread schedulerThread(std::bind(&CScheduler::serviceQueue, &s)); { - AvalancheProcessor p; - BOOST_CHECK(p.startEventLoop(scheduler)); - BOOST_CHECK_EQUAL(scheduler.getQueueInfo(start, stop), 1); + AvalancheProcessor p(g_connman.get()); + BOOST_CHECK(p.startEventLoop(s)); + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 1); } - // We don't have any task scheduled anymore taht avalanche is destroyed. - BOOST_CHECK_EQUAL(scheduler.getQueueInfo(start, stop), 0); + // We don't have any task scheduled anymore that avalanche is destroyed. + BOOST_CHECK_EQUAL(s.getQueueInfo(start, stop), 0); // Wait for the scheduler to stop. - scheduler.stop(true); + s.stop(true); schedulerThread.join(); } +BOOST_AUTO_TEST_CASE(poll_and_response) { + std::vector finalized; + const Config &config = GetConfig(); + + AvalancheProcessor p(g_connman.get()); + + // There is no node to query. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // Create a node that supports avalanche and one that doesn't. + auto oldnode = ConnectNode(config, NODE_NONE, *peerLogic); + auto avanode = ConnectNode(config, NODE_AVALANCHE, *peerLogic); + NodeId avanodeid = avanode->GetId(); + + // It returns the avalanche peer. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Let's register a block to vote on and do one round of event loop. + static const uint256 blockHash(uint256S( + "abcdef0000000000000000000000000000000000000000000000000000000001")); + CBlockIndex index; + index.phashBlock = &blockHash; + + // Register a block and check it is added to the list of elements to poll. + BOOST_CHECK(p.addBlockToReconciliate(&index)); + auto invs = AvalancheTest::getInvsForNextPoll(p); + BOOST_CHECK_EQUAL(invs.size(), 1); + BOOST_CHECK_EQUAL(invs[0].type, MSG_BLOCK); + BOOST_CHECK(invs[0].hash == blockHash); + + // Trigger a poll on avanode. + AvalancheTest::runEventLoop(p); + + // There is no more suitable peer available, so return nothing. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // Respond to the request. + AvalancheResponse resp = {{AvalancheVote(blockHash, 0)}}; + BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, finalized)); + BOOST_CHECK_EQUAL(finalized.size(), 0); + + // Now that avanode fullfilled his request, ti is added back to the list of + // queriable nodes. + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Sending a response when not polled fails. + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, finalized)); + BOOST_CHECK_EQUAL(finalized.size(), 0); + + // Trigger a poll on avanode. + AvalancheTest::runEventLoop(p); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), -1); + + // Sending responses that do not match the request also fails. + // 1. Too many results. + resp = {{AvalancheVote(blockHash, 0), AvalancheVote(blockHash, 0)}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, finalized)); + BOOST_CHECK_EQUAL(finalized.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // 2. Not enough results. + resp = {{}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, finalized)); + BOOST_CHECK_EQUAL(finalized.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // 3. Do not match the poll. + resp = {{AvalancheVote(uint256(), 0)}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(!p.registerVotes(avanode->GetId(), resp, finalized)); + BOOST_CHECK_EQUAL(finalized.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Proper response gets processed and avanode is available again. + resp = {{AvalancheVote(blockHash, 0)}}; + AvalancheTest::runEventLoop(p); + BOOST_CHECK(p.registerVotes(avanode->GetId(), resp, finalized)); + BOOST_CHECK_EQUAL(finalized.size(), 0); + BOOST_CHECK_EQUAL(AvalancheTest::getSuitableNodeToQuery(p), avanodeid); + + // Making request for invalid nodes do not work. + BOOST_CHECK(!p.registerVotes(avanode->GetId() + 1234, resp, finalized)); + BOOST_CHECK_EQUAL(finalized.size(), 0); + + CConnmanTest::ClearNodes(); +} + BOOST_AUTO_TEST_SUITE_END()