diff --git a/src/avalanche.h b/src/avalanche.h --- a/src/avalanche.h +++ b/src/avalanche.h @@ -29,11 +29,24 @@ class CScheduler; namespace { +/** + * Is avalanche enabled by default. + */ +static const bool AVALANCHE_DEFAULT_ENABLED = false; + /** * Finalization score. */ static const int AVALANCHE_FINALIZATION_SCORE = 128; +/** + * Maximum item that can be polled at once. + */ +static const size_t AVALANCHE_MAX_ELEMENT_POLL = 16; +/** + * Avalanche default cooldown in milliseconds. + */ +static const size_t AVALANCHE_DEFAULT_COOLDOWN = 100; /** * How long before we consider that a query timed out. */ @@ -319,12 +332,8 @@ std::condition_variable cond_running; public: - AvalancheProcessor(CConnman *connmanIn) - : connman(connmanIn), - queryTimeoutDuration( - AVALANCHE_DEFAULT_QUERY_TIMEOUT_DURATION_MILLISECONDS), - round(0), stopRequest(false), running(false) {} - ~AvalancheProcessor() { stopEventLoop(); } + AvalancheProcessor(CConnman *connmanIn); + ~AvalancheProcessor(); void setQueryTimeoutDuration(std::chrono::milliseconds d) { queryTimeoutDuration = d; @@ -334,6 +343,8 @@ bool isAccepted(const CBlockIndex *pindex) const; int getConfidence(const CBlockIndex *pindex) const; + void sendResponse(CNode *pfrom, AvalancheResponse response) const; + bool registerVotes(NodeId nodeid, const AvalancheResponse &response, std::vector &updates); diff --git a/src/avalanche.cpp b/src/avalanche.cpp --- a/src/avalanche.cpp +++ b/src/avalanche.cpp @@ -18,11 +18,6 @@ */ static const int64_t AVALANCHE_TIME_STEP_MILLISECONDS = 10; -/** - * Maximum item count that can be polled at once. - */ -static const size_t AVALANCHE_MAX_ELEMENT_POLL = 4096; - // Unfortunately, the bitcoind codebase is full of global and we are kinda // forced into it here. std::unique_ptr g_avalanche; @@ -133,6 +128,16 @@ return true; } +AvalancheProcessor::AvalancheProcessor(CConnman *connmanIn) + : connman(connmanIn), + queryTimeoutDuration( + AVALANCHE_DEFAULT_QUERY_TIMEOUT_DURATION_MILLISECONDS), + round(0), stopRequest(false), running(false) {} + +AvalancheProcessor::~AvalancheProcessor() { + stopEventLoop(); +} + bool AvalancheProcessor::addBlockToReconcile(const CBlockIndex *pindex) { bool isAccepted; @@ -171,6 +176,13 @@ return it->second.getConfidence(); } +void AvalancheProcessor::sendResponse(CNode *pfrom, + AvalancheResponse response) const { + connman->PushMessage( + pfrom, CNetMsgMaker(pfrom->GetSendVersion()) + .Make(NetMsgType::AVARESPONSE, std::move(response))); +} + bool AvalancheProcessor::registerVotes( NodeId nodeid, const AvalancheResponse &response, std::vector &updates) { diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -1027,6 +1027,17 @@ hidden_args.emplace_back("-daemon"); #endif + // Avalanche options. + gArgs.AddArg( + "-enableavalanche", + strprintf("Enable avalanche (default: %u)", AVALANCHE_DEFAULT_ENABLED), + false, OptionsCategory::AVALANCHE); + gArgs.AddArg( + "-avacooldown", + strprintf("Mandatory cooldown between two avapoll (default: %u)", + AVALANCHE_DEFAULT_COOLDOWN), + false, OptionsCategory::AVALANCHE); + // Add the hidden options gArgs.AddHiddenArgs(hidden_args); } diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -7,6 +7,7 @@ #include #include +#include #include #include #include @@ -422,6 +423,12 @@ TxDownloadState m_tx_download; + struct AvalancheState { + std::chrono::time_point last_poll; + }; + + AvalancheState m_avalanche_state; + CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { fCurrentlyConnected = false; @@ -3418,6 +3425,68 @@ return true; } + // Ignore avalanche requests while importing + if (strCommand == NetMsgType::AVAPOLL && !fImporting && !fReindex && + g_avalanche && + gArgs.GetBoolArg("-enableavalanche", AVALANCHE_DEFAULT_ENABLED)) { + auto now = std::chrono::steady_clock::now(); + int64_t cooldown = + gArgs.GetArg("-avacooldown", AVALANCHE_DEFAULT_COOLDOWN); + + { + LOCK(cs_main); + auto &node_state = State(pfrom->GetId())->m_avalanche_state; + + if (now < + node_state.last_poll + std::chrono::milliseconds(cooldown)) { + Misbehaving(pfrom, 20, "avapool-cooldown"); + } + + node_state.last_poll = now; + } + + uint64_t round; + Unserialize(vRecv, round); + + unsigned int nCount = ReadCompactSize(vRecv); + if (nCount > AVALANCHE_MAX_ELEMENT_POLL) { + LOCK(cs_main); + Misbehaving(pfrom, 20, "too-many-poll"); + return error("poll message size = %u", nCount); + } + + std::vector votes; + votes.reserve(nCount); + + LogPrint(BCLog::NET, "received avalanche poll from peer=%d\n", + pfrom->GetId()); + + { + LOCK(cs_main); + + for (unsigned int n = 0; n < nCount; n++) { + CInv inv; + vRecv >> inv; + + uint32_t error = -1; + if (inv.type == MSG_BLOCK) { + BlockMap::iterator mi = + mapBlockIndex.find(BlockHash(inv.hash)); + if (mi != mapBlockIndex.end()) { + error = ::ChainActive().Contains(mi->second) ? 0 : 1; + } + } + + votes.emplace_back(error, inv.hash); + } + } + + // Send the query to the node. + g_avalanche->sendResponse( + pfrom, AvalancheResponse(round, cooldown, std::move(votes))); + return true; + } + if (strCommand == NetMsgType::GETADDR) { // This asymmetric behavior for inbound and outbound connections was // introduced to prevent a fingerprinting attack: an attacker can send diff --git a/src/util/system.h b/src/util/system.h --- a/src/util/system.h +++ b/src/util/system.h @@ -133,6 +133,9 @@ // Always the last option to avoid printing these in the help HIDDEN, + + // Avalanche is still experimental, so we keep it hidden for now. + AVALANCHE, }; class ArgsManager { diff --git a/test/functional/abc-p2p-avalanche.py b/test/functional/abc-p2p-avalanche.py new file mode 100755 --- /dev/null +++ b/test/functional/abc-p2p-avalanche.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +# Copyright (c) 2018 The Bitcoin developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +"""Test the resolution of forks via avalanche.""" +import random + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import assert_equal, wait_until +from test_framework.mininode import P2PInterface, mininode_lock +from test_framework.messages import AvalancheVote, CInv, msg_avapoll + + +BLOCK_ACCEPTED = 0 +BLOCK_REJECTED = 1 +BLOCK_UNKNOWN = -1 + + +class TestNode(P2PInterface): + + def __init__(self): + self.last_avaresponse = None + super().__init__() + + def on_avaresponse(self, message): + self.last_avaresponse = message.response + + def send_poll(self, hashes): + msg = msg_avapoll() + for h in hashes: + msg.poll.invs.append(CInv(2, h)) + self.send_message(msg) + + def wait_for_avaresponse(self, timeout=10): + self.sync_with_ping() + + def test_function(): + m = self.last_message.get("avaresponse") + return m is not None and m != self.last_avaresponse + wait_until(test_function, timeout=timeout, lock=mininode_lock) + + +class AvalancheTest(BitcoinTestFramework): + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + self.extra_args = [['-enableavalanche=1', '-avacooldown=0']] + + def run_test(self): + node = self.nodes[0] + + # Create a fake node and connect it to our real node. + poll_node = TestNode() + node.add_p2p_connection(poll_node) + poll_node.wait_for_verack() + poll_node.sync_with_ping() + + # Generate many block and poll for them. + node.generate(100) + + self.log.info("Poll for the chain tip...") + best_block_hash = int(node.getbestblockhash(), 16) + poll_node.send_poll([best_block_hash]) + poll_node.wait_for_avaresponse() + + def assert_response(response, expected): + votes = response.votes + self.log.info("response: {}".format(repr(response))) + assert_equal(len(votes), len(expected)) + for i in range(0, len(votes)): + assert_equal(repr(votes[i]), repr(expected[i])) + + assert_response(poll_node.last_avaresponse, [ + AvalancheVote(BLOCK_ACCEPTED, best_block_hash)]) + + self.log.info("Poll for a selection of blocks...") + various_block_hashes = [ + int(node.getblockhash(0), 16), + int(node.getblockhash(1), 16), + int(node.getblockhash(10), 16), + int(node.getblockhash(25), 16), + int(node.getblockhash(42), 16), + int(node.getblockhash(96), 16), + int(node.getblockhash(99), 16), + int(node.getblockhash(100), 16), + ] + + poll_node.send_poll(various_block_hashes) + poll_node.wait_for_avaresponse() + assert_response(poll_node.last_avaresponse, + [AvalancheVote(BLOCK_ACCEPTED, h) for h in various_block_hashes]) + + self.log.info( + "Poll for a selection of blocks, but some are now invalid...") + invalidated_block = node.getblockhash(75) + node.invalidateblock(invalidated_block) + node.generate(30) + node.reconsiderblock(invalidated_block) + + poll_node.send_poll(various_block_hashes) + poll_node.wait_for_avaresponse() + assert_response(poll_node.last_avaresponse, + [AvalancheVote(BLOCK_ACCEPTED, h) for h in various_block_hashes[:5]] + + [AvalancheVote(BLOCK_REJECTED, h) for h in various_block_hashes[-3:]]) + + self.log.info("Poll for unknown blocks...") + various_block_hashes = [ + int(node.getblockhash(0), 16), + int(node.getblockhash(25), 16), + int(node.getblockhash(42), 16), + various_block_hashes[5], + various_block_hashes[6], + various_block_hashes[7], + random.randrange(1 << 255, (1 << 256) - 1), + random.randrange(1 << 255, (1 << 256) - 1), + random.randrange(1 << 255, (1 << 256) - 1), + ] + poll_node.send_poll(various_block_hashes) + poll_node.wait_for_avaresponse() + assert_response(poll_node.last_avaresponse, + [AvalancheVote(BLOCK_ACCEPTED, h) for h in various_block_hashes[:3]] + + [AvalancheVote(BLOCK_REJECTED, h) for h in various_block_hashes[3:6]] + + [AvalancheVote(BLOCK_UNKNOWN, h) for h in various_block_hashes[-3:]]) + + +if __name__ == '__main__': + AvalancheTest().main() diff --git a/test/functional/test_framework/messages.py b/test/functional/test_framework/messages.py --- a/test/functional/test_framework/messages.py +++ b/test/functional/test_framework/messages.py @@ -52,6 +52,7 @@ NODE_XTHIN = (1 << 4) NODE_BITCOIN_CASH = (1 << 5) NODE_NETWORK_LIMITED = (1 << 10) +NODE_AVALANCHE = (1 << 24) MSG_TX = 1 MSG_BLOCK = 2 @@ -764,6 +765,75 @@ self.blockhash, repr(self.transactions)) +class AvalanchePoll(): + __slots__ = ("round", "invs") + + def __init__(self, round=0, invs=None): + self.round = round + self.invs = invs if invs is not None else [] + + def deserialize(self, f): + self.round = struct.unpack("