diff --git a/src/init.cpp b/src/init.cpp --- a/src/init.cpp +++ b/src/init.cpp @@ -196,12 +196,15 @@ } #endif MapPort(false); + + // Because these depend on each-other, we make sure that neither can be + // using the other before destroying them. UnregisterValidationInterface(peerLogic.get()); + g_connman->Stop(); peerLogic.reset(); g_connman.reset(); StopTorControl(); - UnregisterNodeSignals(GetNodeSignals()); if (fDumpMempoolLater && gArgs.GetArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) { DumpMempool(); @@ -1819,7 +1822,6 @@ peerLogic.reset(new PeerLogicValidation(&connman)); RegisterValidationInterface(peerLogic.get()); - RegisterNodeSignals(GetNodeSignals()); if (gArgs.IsArgSet("-onlynet")) { std::set nets; @@ -2227,6 +2229,7 @@ connOptions.nMaxFeeler = 1; connOptions.nBestHeight = chainActive.Height(); connOptions.uiInterface = &uiInterface; + connOptions.m_msgproc = peerLogic.get(); connOptions.nSendBufferMaxSize = 1000 * gArgs.GetArg("-maxsendbuffer", DEFAULT_MAXSENDBUFFER); connOptions.nReceiveFloodSize = diff --git a/src/net.h b/src/net.h --- a/src/net.h +++ b/src/net.h @@ -35,8 +35,6 @@ #include #endif -#include - class CAddrMan; class Config; class CNode; @@ -123,6 +121,7 @@ std::string command; }; +class NetEventsInterface; class CConnman { public: enum NumConnections { @@ -141,6 +140,7 @@ int nMaxFeeler = 0; int nBestHeight = 0; CClientUIInterface *uiInterface = nullptr; + NetEventsInterface *m_msgproc = nullptr; unsigned int nSendBufferMaxSize = 0; unsigned int nReceiveFloodSize = 0; uint64_t nMaxOutboundTimeframe = 0; @@ -160,6 +160,7 @@ nMaxFeeler = connOptions.nMaxFeeler; nBestHeight = connOptions.nBestHeight; clientInterface = connOptions.uiInterface; + m_msgproc = connOptions.m_msgproc; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; @@ -407,6 +408,7 @@ int nMaxFeeler; std::atomic nBestHeight; CClientUIInterface *clientInterface; + NetEventsInterface *m_msgproc; /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; @@ -447,23 +449,20 @@ } }; -// Signals for message handling -struct CNodeSignals { - boost::signals2::signal &), - CombinerAll> - ProcessMessages; - boost::signals2::signal &), - CombinerAll> - SendMessages; - boost::signals2::signal - InitializeNode; - boost::signals2::signal FinalizeNode; +/** + * Interface for message handling + */ +class NetEventsInterface { +public: + virtual bool ProcessMessages(const Config &config, CNode *pnode, + std::atomic &interrupt) = 0; + virtual bool SendMessages(const Config &config, CNode *pnode, + std::atomic &interrupt) = 0; + virtual void InitializeNode(const Config &config, CNode *pnode) = 0; + virtual void FinalizeNode(const Config &config, NodeId id, + bool &update_connection_time) = 0; }; -CNodeSignals &GetNodeSignals(); - enum { // unknown LOCAL_NONE, diff --git a/src/net.cpp b/src/net.cpp --- a/src/net.cpp +++ b/src/net.cpp @@ -87,12 +87,6 @@ limitedmap mapAlreadyAskedFor(MAX_INV_SZ); -// Signals for message handling -static CNodeSignals g_signals; -CNodeSignals &GetNodeSignals() { - return g_signals; -} - void CConnman::AddOneShot(const std::string &strDest) { LOCK(cs_vOneShots); vOneShots.push_back(strDest); @@ -1199,8 +1193,7 @@ CalculateKeyedNetGroup(addr), nonce, "", true); pnode->AddRef(); pnode->fWhitelisted = whitelisted; - - GetNodeSignals().InitializeNode(*config, pnode, this); + m_msgproc->InitializeNode(*config, pnode); LogPrint(BCLog::NET, "connection from %s accepted\n", addr.ToString()); @@ -2080,7 +2073,7 @@ pnode->fAddnode = true; } - GetNodeSignals().InitializeNode(*config, pnode, this); + m_msgproc->InitializeNode(*config, pnode); { LOCK(cs_vNodes); vNodes.push_back(pnode); @@ -2108,8 +2101,8 @@ } // Receive messages - bool fMoreNodeWork = GetNodeSignals().ProcessMessages( - *config, pnode, this, flagInterruptMsgProc); + bool fMoreNodeWork = m_msgproc->ProcessMessages( + *config, pnode, flagInterruptMsgProc); fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) { return; @@ -2118,9 +2111,9 @@ // Send messages { LOCK(pnode->cs_sendProcessing); - GetNodeSignals().SendMessages(*config, pnode, this, - flagInterruptMsgProc); + m_msgproc->SendMessages(*config, pnode, flagInterruptMsgProc); } + if (flagInterruptMsgProc) { return; } @@ -2467,6 +2460,7 @@ // // Start threads // + assert(m_msgproc); InterruptSocks5(false); interruptNet.reset(); flagInterruptMsgProc = false; @@ -2607,7 +2601,7 @@ void CConnman::DeleteNode(CNode *pnode) { assert(pnode); bool fUpdateConnectionTime = false; - GetNodeSignals().FinalizeNode(pnode->GetId(), fUpdateConnectionTime); + m_msgproc->FinalizeNode(*config, pnode->GetId(), fUpdateConnectionTime); if (fUpdateConnectionTime) { addrman.Connected(pnode->addr); } diff --git a/src/net_processing.h b/src/net_processing.h --- a/src/net_processing.h +++ b/src/net_processing.h @@ -11,28 +11,28 @@ class Config; -/** Default for -maxorphantx, maximum number of orphan transactions kept in - * memory */ +/** + * Default for -maxorphantx, maximum number of orphan transactions kept in + * memory. + */ static const unsigned int DEFAULT_MAX_ORPHAN_TRANSACTIONS = 100; /** Expiration time for orphan transactions in seconds */ static const int64_t ORPHAN_TX_EXPIRE_TIME = 20 * 60; /** Minimum time between orphan transactions expire time checks in seconds */ static const int64_t ORPHAN_TX_EXPIRE_INTERVAL = 5 * 60; -/** Default number of orphan+recently-replaced txn to keep around for block - * reconstruction */ +/** + * Default number of orphan+recently-replaced txn to keep around for block + * reconstruction. + */ static const unsigned int DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN = 100; -/** Register with a network node to receive its signals */ -void RegisterNodeSignals(CNodeSignals &nodeSignals); -/** Unregister a network node */ -void UnregisterNodeSignals(CNodeSignals &nodeSignals); - -class PeerLogicValidation : public CValidationInterface { +class PeerLogicValidation : public CValidationInterface, + public NetEventsInterface { private: CConnman *connman; public: - PeerLogicValidation(CConnman *connmanIn); + explicit PeerLogicValidation(CConnman *connman); void BlockConnected(const std::shared_ptr &pblock, @@ -45,6 +45,24 @@ const CValidationState &state) override; void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr &pblock) override; + + void InitializeNode(const Config &config, CNode *pnode) override; + void FinalizeNode(const Config &config, NodeId nodeid, + bool &fUpdateConnectionTime) override; + /** + * Process protocol messages received from a given node. + */ + bool ProcessMessages(const Config &config, CNode *pfrom, + std::atomic &interrupt) override; + /** + * Send queued protocol messages to be sent to a give node. + * + * @param[in] pto The node which we are sending messages to. + * @param[in] interrupt Interrupt condition for processing threads + * @return True if there is more work to be done + */ + bool SendMessages(const Config &config, CNode *pto, + std::atomic &interrupt) override; }; struct CNodeStateStats { @@ -59,18 +77,4 @@ /** Increase a node's misbehavior score. */ void Misbehaving(NodeId nodeid, int howmuch, const std::string &reason); -/** Process protocol messages received from a given node */ -bool ProcessMessages(const Config &config, CNode *pfrom, CConnman *connman, - const std::atomic &interrupt); -/** - * Send queued protocol messages to be sent to a give node. - * - * @param[in] pto The node which we are sending messages to. - * @param[in] connman The connection manager for that node. - * @param[in] interrupt Interrupt condition for processing threads - * @return True if there is more work to be done - */ -bool SendMessages(const Config &config, CNode *pto, CConnman *connman, - const std::atomic &interrupt); - #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -134,11 +134,6 @@ std::deque> vRelayExpiration; } // namespace -////////////////////////////////////////////////////////////////////////////// -// -// Registration of network node signals. -// - namespace { struct CBlockReject { @@ -290,70 +285,6 @@ } } -void InitializeNode(const Config &config, CNode *pnode, CConnman *connman) { - CAddress addr = pnode->addr; - std::string addrName = pnode->GetAddrName(); - NodeId nodeid = pnode->GetId(); - { - LOCK(cs_main); - mapNodeState.emplace_hint( - mapNodeState.end(), std::piecewise_construct, - std::forward_as_tuple(nodeid), - std::forward_as_tuple(addr, std::move(addrName))); - } - - if (!pnode->fInbound) { - PushNodeVersion(config, pnode, connman, GetTime()); - } -} - -void FinalizeNode(NodeId nodeid, bool &fUpdateConnectionTime) { - fUpdateConnectionTime = false; - LOCK(cs_main); - CNodeState *state = State(nodeid); - - if (state->fSyncStarted) { - nSyncStarted--; - } - - if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { - fUpdateConnectionTime = true; - } - - for (const QueuedBlock &entry : state->vBlocksInFlight) { - mapBlocksInFlight.erase(entry.hash); - } - // Get rid of stale mapBlockSource entries for this peer as they may leak - // if we don't clean them up (I saw on the order of ~100 stale entries on - // a full resynch in my testing -- these entries stay forever). - // Performance note: most of the time mapBlockSource has 0 or 1 entries. - // During synch of blockchain it may end up with as many as 1000 entries, - // which still only takes ~1ms to iterate through on even old hardware. - // So this memleak cleanup is not expensive and worth doing since even - // small leaks are bad. :) - for (auto it = mapBlockSource.begin(); it != mapBlockSource.end(); /*NA*/) { - if (it->second.first == nodeid) { - mapBlockSource.erase(it++); - } else { - ++it; - } - } - - EraseOrphansFor(nodeid); - nPreferredDownload -= state->fPreferredDownload; - nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); - assert(nPeersWithValidatedDownloads >= 0); - - mapNodeState.erase(nodeid); - - if (mapNodeState.empty()) { - // Do a consistency check after the last peer is removed. - assert(mapBlocksInFlight.empty()); - assert(nPreferredDownload == 0); - assert(nPeersWithValidatedDownloads == 0); - } -} - // Requires cs_main. // Returns a bool indicating whether we requested this block. // Also used if a block was /not/ received and timed out or started with another @@ -653,6 +584,56 @@ } // namespace +void PeerLogicValidation::InitializeNode(const Config &config, CNode *pnode) { + CAddress addr = pnode->addr; + std::string addrName = pnode->GetAddrName(); + NodeId nodeid = pnode->GetId(); + { + LOCK(cs_main); + mapNodeState.emplace_hint( + mapNodeState.end(), std::piecewise_construct, + std::forward_as_tuple(nodeid), + std::forward_as_tuple(addr, std::move(addrName))); + } + if (!pnode->fInbound) { + PushNodeVersion(config, pnode, connman, GetTime()); + } +} + +void PeerLogicValidation::FinalizeNode(const Config &config, NodeId nodeid, + bool &fUpdateConnectionTime) { + fUpdateConnectionTime = false; + LOCK(cs_main); + CNodeState *state = State(nodeid); + assert(state != nullptr); + + if (state->fSyncStarted) { + nSyncStarted--; + } + + if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { + fUpdateConnectionTime = true; + } + + for (const QueuedBlock &entry : state->vBlocksInFlight) { + mapBlocksInFlight.erase(entry.hash); + } + EraseOrphansFor(nodeid); + nPreferredDownload -= state->fPreferredDownload; + nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); + assert(nPeersWithValidatedDownloads >= 0); + + mapNodeState.erase(nodeid); + + if (mapNodeState.empty()) { + // Do a consistency check after the last peer is removed. + assert(mapBlocksInFlight.empty()); + assert(nPreferredDownload == 0); + assert(nPeersWithValidatedDownloads == 0); + } + LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); +} + bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { LOCK(cs_main); CNodeState *state = State(nodeid); @@ -673,20 +654,6 @@ return true; } -void RegisterNodeSignals(CNodeSignals &nodeSignals) { - nodeSignals.ProcessMessages.connect(&ProcessMessages); - nodeSignals.SendMessages.connect(&SendMessages); - nodeSignals.InitializeNode.connect(&InitializeNode); - nodeSignals.FinalizeNode.connect(&FinalizeNode); -} - -void UnregisterNodeSignals(CNodeSignals &nodeSignals) { - nodeSignals.ProcessMessages.disconnect(&ProcessMessages); - nodeSignals.SendMessages.disconnect(&SendMessages); - nodeSignals.InitializeNode.disconnect(&InitializeNode); - nodeSignals.FinalizeNode.disconnect(&FinalizeNode); -} - ////////////////////////////////////////////////////////////////////////////// // // mapOrphanTransactions @@ -3040,8 +3007,8 @@ return false; } -bool ProcessMessages(const Config &config, CNode *pfrom, CConnman *connman, - const std::atomic &interruptMsgProc) { +bool PeerLogicValidation::ProcessMessages(const Config &config, CNode *pfrom, + std::atomic &interruptMsgProc) { const CChainParams &chainparams = config.GetChainParams(); // // Message format @@ -3196,8 +3163,8 @@ } }; -bool SendMessages(const Config &config, CNode *pto, CConnman *connman, - const std::atomic &interruptMsgProc) { +bool PeerLogicValidation::SendMessages(const Config &config, CNode *pto, + std::atomic &interruptMsgProc) { const Consensus::Params &consensusParams = config.GetChainParams().GetConsensus(); diff --git a/src/test/DoS_tests.cpp b/src/test/DoS_tests.cpp --- a/src/test/DoS_tests.cpp +++ b/src/test/DoS_tests.cpp @@ -52,12 +52,12 @@ CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 0, 0, "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(config, &dummyNode1, connman); + peerLogic->InitializeNode(config, &dummyNode1); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; // Should get banned. Misbehaving(dummyNode1.GetId(), 100, ""); - SendMessages(config, &dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(config, &dummyNode1, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); // Different IP, not banned. BOOST_CHECK(!connman->IsBanned(ip(0xa0b0c001 | 0x0000ff00))); @@ -66,17 +66,17 @@ CNode dummyNode2(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr2, 1, 1, "", true); dummyNode2.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(config, &dummyNode2, connman); + peerLogic->InitializeNode(config, &dummyNode2); dummyNode2.nVersion = 1; dummyNode2.fSuccessfullyConnected = true; Misbehaving(dummyNode2.GetId(), 50, ""); - SendMessages(config, &dummyNode2, connman, interruptDummy); + peerLogic->SendMessages(config, &dummyNode2, interruptDummy); // 2 not banned yet... BOOST_CHECK(!connman->IsBanned(addr2)); // ... but 1 still should be. BOOST_CHECK(connman->IsBanned(addr1)); Misbehaving(dummyNode2.GetId(), 50, ""); - SendMessages(config, &dummyNode2, connman, interruptDummy); + peerLogic->SendMessages(config, &dummyNode2, interruptDummy); BOOST_CHECK(connman->IsBanned(addr2)); } @@ -91,17 +91,17 @@ CNode dummyNode1(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr1, 3, 1, "", true); dummyNode1.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(config, &dummyNode1, connman); + peerLogic->InitializeNode(config, &dummyNode1); dummyNode1.nVersion = 1; dummyNode1.fSuccessfullyConnected = true; Misbehaving(dummyNode1.GetId(), 100, ""); - SendMessages(config, &dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(config, &dummyNode1, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 10, ""); - SendMessages(config, &dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(config, &dummyNode1, interruptDummy); BOOST_CHECK(!connman->IsBanned(addr1)); Misbehaving(dummyNode1.GetId(), 1, ""); - SendMessages(config, &dummyNode1, connman, interruptDummy); + peerLogic->SendMessages(config, &dummyNode1, interruptDummy); BOOST_CHECK(connman->IsBanned(addr1)); gArgs.ForceSetArg("-banscore", std::to_string(DEFAULT_BANSCORE_THRESHOLD)); } @@ -119,12 +119,12 @@ CNode dummyNode(id++, NODE_NETWORK, 0, INVALID_SOCKET, addr, 4, 4, "", true); dummyNode.SetSendVersion(PROTOCOL_VERSION); - GetNodeSignals().InitializeNode(config, &dummyNode, connman); + peerLogic->InitializeNode(config, &dummyNode); dummyNode.nVersion = 1; dummyNode.fSuccessfullyConnected = true; Misbehaving(dummyNode.GetId(), 100, ""); - SendMessages(config, &dummyNode, connman, interruptDummy); + peerLogic->SendMessages(config, &dummyNode, interruptDummy); BOOST_CHECK(connman->IsBanned(addr)); SetMockTime(nStartTime + 60 * 60); diff --git a/src/test/test_bitcoin.h b/src/test/test_bitcoin.h --- a/src/test/test_bitcoin.h +++ b/src/test/test_bitcoin.h @@ -61,11 +61,13 @@ * Included are data directory, coins database, script check threads setup. */ class CConnman; +class PeerLogicValidation; struct TestingSetup : public BasicTestingSetup { CCoinsViewDB *pcoinsdbview; fs::path pathTemp; boost::thread_group threadGroup; CConnman *connman; + std::unique_ptr peerLogic; TestingSetup(const std::string &chainName = CBaseChainParams::MAIN); ~TestingSetup(); diff --git a/src/test/test_bitcoin.cpp b/src/test/test_bitcoin.cpp --- a/src/test/test_bitcoin.cpp +++ b/src/test/test_bitcoin.cpp @@ -64,7 +64,6 @@ BasicTestingSetup::~BasicTestingSetup() { ECC_Stop(); - g_connman.reset(); } TestingSetup::TestingSetup(const std::string &chainName) @@ -101,13 +100,14 @@ // Deterministic randomness for tests. g_connman = std::unique_ptr(new CConnman(config, 0x1337, 0x1337)); connman = g_connman.get(); - RegisterNodeSignals(GetNodeSignals()); + peerLogic.reset(new PeerLogicValidation(connman)); } TestingSetup::~TestingSetup() { - UnregisterNodeSignals(GetNodeSignals()); threadGroup.interrupt_all(); threadGroup.join_all(); + g_connman.reset(); + peerLogic.reset(); UnloadBlockIndex(); delete pcoinsTip; delete pcoinsdbview;