diff --git a/src/net_processing.h b/src/net_processing.h --- a/src/net_processing.h +++ b/src/net_processing.h @@ -120,6 +120,12 @@ void EvictExtraOutboundPeers(int64_t time_in_seconds) EXCLUSIVE_LOCKS_REQUIRED(cs_main); + /** Process a single message from a peer. Public for fuzz testing */ + void ProcessMessage(const Config &config, CNode &pfrom, + const std::string &msg_type, CDataStream &vRecv, + int64_t nTimeReceived, + const std::atomic &interruptMsgProc); + private: //! Next time to check for stale tip int64_t m_stale_tip_check_time; @@ -145,10 +151,4 @@ /** Relay transaction to every node */ void RelayTransaction(const TxId &txid, const CConnman &connman); -void ProcessMessage(const Config &config, CNode &pfrom, - const std::string &msg_type, CDataStream &vRecv, - int64_t nTimeReceived, CTxMemPool &mempool, - ChainstateManager &chainman, CConnman &connman, - BanMan *banman, const std::atomic &interruptMsgProc); - #endif // BITCOIN_NET_PROCESSING_H diff --git a/src/net_processing.cpp b/src/net_processing.cpp --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -2519,11 +2519,10 @@ connman.PushMessage(&peer, std::move(msg)); } -void ProcessMessage(const Config &config, CNode &pfrom, - const std::string &msg_type, CDataStream &vRecv, - int64_t nTimeReceived, CTxMemPool &mempool, - ChainstateManager &chainman, CConnman &connman, - BanMan *banman, const std::atomic &interruptMsgProc) { +void PeerLogicValidation::ProcessMessage( + const Config &config, CNode &pfrom, const std::string &msg_type, + CDataStream &vRecv, int64_t nTimeReceived, + const std::atomic &interruptMsgProc) { const CChainParams &chainparams = config.GetChainParams(); LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); @@ -2569,7 +2568,7 @@ nSendVersion = std::min(nVersion, PROTOCOL_VERSION); nServices = ServiceFlags(nServiceInt); if (!pfrom.IsInboundConn()) { - connman.SetServices(pfrom.addr, nServices); + m_connman.SetServices(pfrom.addr, nServices); } if (pfrom.ExpectServicesFromConn() && !HasAllDesirableServiceFlags(nServices)) { @@ -2606,7 +2605,7 @@ vRecv >> fRelay; } // Disconnect if we connected to ourself - if (pfrom.IsInboundConn() && !connman.CheckIncomingNonce(nNonce)) { + if (pfrom.IsInboundConn() && !m_connman.CheckIncomingNonce(nNonce)) { LogPrintf("connected to self at %s, disconnecting\n", pfrom.addr.ToString()); pfrom.fDisconnect = true; @@ -2619,10 +2618,10 @@ // Be shy and don't send version until we hear if (pfrom.IsInboundConn()) { - PushNodeVersion(config, pfrom, connman, GetAdjustedTime()); + PushNodeVersion(config, pfrom, m_connman, GetAdjustedTime()); } - connman.PushMessage( + m_connman.PushMessage( &pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERACK)); pfrom.nServices = nServices; @@ -2680,10 +2679,10 @@ } // Get recent addresses - connman.PushMessage( + m_connman.PushMessage( &pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR)); pfrom.fGetAddr = true; - connman.MarkAddressGood(pfrom.addr); + m_connman.MarkAddressGood(pfrom.addr); } std::string remoteAddr; @@ -2750,7 +2749,8 @@ // We send this to non-NODE NETWORK peers as well, because even // non-NODE NETWORK peers can announce blocks (such as pruning // nodes) - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDHEADERS)); + m_connman.PushMessage(&pfrom, + msgMaker.Make(NetMsgType::SENDHEADERS)); } if (pfrom.nVersion >= SHORT_IDS_BLOCKS_VERSION) { // Tell our peer we are willing to provide version 1 or 2 @@ -2759,9 +2759,10 @@ // as well, because they may wish to request compact blocks from us. bool fAnnounceUsingCMPCTBLOCK = false; uint64_t nCMPCTBLOCKVersion = 1; - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, - fAnnounceUsingCMPCTBLOCK, - nCMPCTBLOCKVersion)); + m_connman.PushMessage(&pfrom, + msgMaker.Make(NetMsgType::SENDCMPCT, + fAnnounceUsingCMPCTBLOCK, + nCMPCTBLOCKVersion)); } pfrom.fSuccessfullyConnected = true; return; @@ -2812,17 +2813,17 @@ pfrom.AddAddressKnown(addr); // Do not process banned/discouraged addresses beyond remembering we // received them - if (banman->IsDiscouraged(addr)) { + if (m_banman->IsDiscouraged(addr)) { continue; } - if (banman->IsBanned(addr)) { + if (m_banman->IsBanned(addr)) { continue; } bool fReachable = IsReachable(addr); if (addr.nTime > nSince && !pfrom.fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) { // Relay to a limited number of other nodes - RelayAddress(addr, fReachable, connman); + RelayAddress(addr, fReachable, m_connman); } // Do not store addresses outside our network if (fReachable) { @@ -2830,7 +2831,7 @@ } } - connman.AddNewAddresses(vAddrOk, pfrom.addr, 2 * 60 * 60); + m_connman.AddNewAddresses(vAddrOk, pfrom.addr, 2 * 60 * 60); if (vAddr.size() < 1000) { pfrom.fGetAddr = false; } @@ -2897,7 +2898,7 @@ return; } - bool fAlreadyHave = AlreadyHave(inv, mempool); + bool fAlreadyHave = AlreadyHave(inv, m_mempool); LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); @@ -2913,7 +2914,7 @@ // should get the headers for first, we now only provide a // getheaders response here. When we receive the headers, we // will then ask for the blocks we need. - connman.PushMessage( + m_connman.PushMessage( &pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator( pindexBestHeader), @@ -2962,7 +2963,7 @@ pfrom.vRecvGetData.insert(pfrom.vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(config, pfrom, connman, mempool, interruptMsgProc); + ProcessGetData(config, pfrom, m_connman, m_mempool, interruptMsgProc); return; } @@ -3062,7 +3063,7 @@ // Unlock cs_most_recent_block to avoid cs_main lock inversion } if (recent_block) { - SendBlockTransactions(*recent_block, req, pfrom, connman); + SendBlockTransactions(*recent_block, req, pfrom, m_connman); return; } @@ -3101,7 +3102,7 @@ bool ret = ReadBlockFromDisk(block, pindex, chainparams.GetConsensus()); assert(ret); - SendBlockTransactions(block, req, pfrom, connman); + SendBlockTransactions(block, req, pfrom, m_connman); return; } @@ -3180,8 +3181,8 @@ // in the SendMessages logic. nodestate->pindexBestHeaderSent = pindex ? pindex : ::ChainActive().Tip(); - connman.PushMessage(&pfrom, - msgMaker.Make(NetMsgType::HEADERS, vHeaders)); + m_connman.PushMessage(&pfrom, + msgMaker.Make(NetMsgType::HEADERS, vHeaders)); return; } @@ -3214,12 +3215,12 @@ nodestate->m_tx_download.m_tx_in_flight.erase(txid); EraseTxRequest(txid); - if (!AlreadyHave(CInv(MSG_TX, txid), mempool) && - AcceptToMemoryPool(config, mempool, state, ptx, + if (!AlreadyHave(CInv(MSG_TX, txid), m_mempool) && + AcceptToMemoryPool(config, m_mempool, state, ptx, false /* bypass_limits */, Amount::zero() /* nAbsurdFee */)) { - mempool.check(&::ChainstateActive().CoinsTip()); - RelayTransaction(tx.GetId(), connman); + m_mempool.check(&::ChainstateActive().CoinsTip()); + RelayTransaction(tx.GetId(), m_connman); for (size_t i = 0; i < tx.vout.size(); i++) { auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i)); @@ -3235,12 +3236,13 @@ LogPrint(BCLog::MEMPOOL, "AcceptToMemoryPool: peer=%d: accepted %s " "(poolsz %u txn, %u kB)\n", - pfrom.GetId(), tx.GetId().ToString(), mempool.size(), - mempool.DynamicMemoryUsage() / 1000); + pfrom.GetId(), tx.GetId().ToString(), m_mempool.size(), + m_mempool.DynamicMemoryUsage() / 1000); // Recursively process any orphan transactions that depended on this // one - ProcessOrphanTx(config, connman, mempool, pfrom.orphan_work_set); + ProcessOrphanTx(config, m_connman, m_mempool, + pfrom.orphan_work_set); } else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS) { // It may be the case that the orphans parents have all been // rejected. @@ -3258,7 +3260,7 @@ // FIXME: MSG_TX should use a TxHash, not a TxId. const TxId _txid = txin.prevout.GetTxId(); pfrom.AddKnownTx(_txid); - if (!AlreadyHave(CInv(MSG_TX, _txid), mempool)) { + if (!AlreadyHave(CInv(MSG_TX, _txid), m_mempool)) { RequestTx(State(pfrom.GetId()), _txid, current_time); } } @@ -3306,7 +3308,7 @@ } else { LogPrintf("Force relaying tx %s from whitelisted peer=%d\n", tx.GetId().ToString(), pfrom.GetId()); - RelayTransaction(tx.GetId(), connman); + RelayTransaction(tx.GetId(), m_connman); } } } @@ -3358,7 +3360,7 @@ // Doesn't connect (or is genesis), instead of DoSing in // AcceptBlockHeader, request deeper headers if (!::ChainstateActive().IsInitialBlockDownload()) { - connman.PushMessage( + m_connman.PushMessage( &pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator( pindexBestHeader), @@ -3374,8 +3376,8 @@ const CBlockIndex *pindex = nullptr; BlockValidationState state; - if (!chainman.ProcessNewBlockHeaders(config, {cmpctblock.header}, state, - &pindex)) { + if (!m_chainman.ProcessNewBlockHeaders(config, {cmpctblock.header}, + state, &pindex)) { if (state.IsInvalid()) { MaybePunishNodeForBlock(pfrom.GetId(), state, /*via_compact_block*/ true, @@ -3439,7 +3441,7 @@ // normal getdata. std::vector vInv(1); vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); - connman.PushMessage( + m_connman.PushMessage( &pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); } return; @@ -3460,7 +3462,7 @@ (fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) { std::list::iterator *queuedBlockIt = nullptr; - if (!MarkBlockAsInFlight(config, mempool, pfrom.GetId(), + if (!MarkBlockAsInFlight(config, m_mempool, pfrom.GetId(), pindex->GetBlockHash(), chainparams.GetConsensus(), pindex, &queuedBlockIt)) { @@ -3468,7 +3470,7 @@ (*queuedBlockIt) ->partialBlock.reset( new PartiallyDownloadedBlock(config, - &mempool)); + &m_mempool)); } else { // The block was already in flight using compact // blocks from the same peer. @@ -3494,7 +3496,7 @@ // just request it. std::vector vInv(1); vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); - connman.PushMessage( + m_connman.PushMessage( &pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); return; } @@ -3514,7 +3516,7 @@ fProcessBLOCKTXN = true; } else { req.blockhash = pindex->GetBlockHash(); - connman.PushMessage( + m_connman.PushMessage( &pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); } @@ -3523,7 +3525,7 @@ // peer, or this peer has too many blocks outstanding to // download from. Optimistically try to reconstruct anyway // since we might be able to without any round trips. - PartiallyDownloadedBlock tempBlock(config, &mempool); + PartiallyDownloadedBlock tempBlock(config, &m_mempool); ReadStatus status = tempBlock.InitData(cmpctblock, vExtraTxnForCompact); if (status != READ_STATUS_OK) { @@ -3543,7 +3545,7 @@ // normally. std::vector vInv(1); vInv[0] = CInv(MSG_BLOCK, cmpctblock.header.GetHash()); - connman.PushMessage( + m_connman.PushMessage( &pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); return; } else { @@ -3556,8 +3558,7 @@ if (fProcessBLOCKTXN) { return ProcessMessage(config, pfrom, NetMsgType::BLOCKTXN, - blockTxnMsg, nTimeReceived, mempool, chainman, - connman, banman, interruptMsgProc); + blockTxnMsg, nTimeReceived, interruptMsgProc); } if (fRevertToHeaderProcessing) { @@ -3566,8 +3567,8 @@ // disconnect the peer if the header turns out to be for an invalid // block. Note that if a peer tries to build on an invalid chain, // that will be detected and the peer will be banned. - return ProcessHeadersMessage(config, pfrom, connman, mempool, - chainman, {cmpctblock.header}, + return ProcessHeadersMessage(config, pfrom, m_connman, m_mempool, + m_chainman, {cmpctblock.header}, /*via_compact_block=*/true); } @@ -3589,8 +3590,8 @@ // we have a chain with at least nMinimumChainWork), and we ignore // compact blocks with less work than our tip, it is safe to treat // reconstructed compact blocks as having been requested. - chainman.ProcessNewBlock(config, pblock, /*fForceProcessing=*/true, - &fNewBlock); + m_chainman.ProcessNewBlock(config, pblock, + /*fForceProcessing=*/true, &fNewBlock); if (fNewBlock) { pfrom.nLastBlockTime = GetTime(); } else { @@ -3656,8 +3657,8 @@ // Might have collided, fall back to getdata now :( std::vector invs; invs.push_back(CInv(MSG_BLOCK, resp.blockhash)); - connman.PushMessage(&pfrom, - msgMaker.Make(NetMsgType::GETDATA, invs)); + m_connman.PushMessage(&pfrom, + msgMaker.Make(NetMsgType::GETDATA, invs)); } else { // Block is either okay, or possibly we received // READ_STATUS_CHECKBLOCK_FAILED. @@ -3699,8 +3700,8 @@ // disk-space attacks), but this should be safe due to the // protections in the compact block handler -- see related comment // in compact block optimistic reconstruction handling. - chainman.ProcessNewBlock(config, pblock, /*fForceProcessing=*/true, - &fNewBlock); + m_chainman.ProcessNewBlock(config, pblock, + /*fForceProcessing=*/true, &fNewBlock); if (fNewBlock) { pfrom.nLastBlockTime = GetTime(); } else { @@ -3739,8 +3740,8 @@ ReadCompactSize(vRecv); } - return ProcessHeadersMessage(config, pfrom, connman, mempool, chainman, - headers, + return ProcessHeadersMessage(config, pfrom, m_connman, m_mempool, + m_chainman, headers, /*via_compact_block=*/false); } @@ -3777,7 +3778,7 @@ mapBlockSource.emplace(hash, std::make_pair(pfrom.GetId(), true)); } bool fNewBlock = false; - chainman.ProcessNewBlock(config, pblock, forceProcessing, &fNewBlock); + m_chainman.ProcessNewBlock(config, pblock, forceProcessing, &fNewBlock); if (fNewBlock) { pfrom.nLastBlockTime = GetTime(); } else { @@ -3991,10 +3992,10 @@ pfrom.fSentAddr = true; pfrom.vAddrToSend.clear(); - std::vector vAddr = connman.GetAddresses(); + std::vector vAddr = m_connman.GetAddresses(); FastRandomContext insecure_rand; for (const CAddress &addr : vAddr) { - if (!banman->IsDiscouraged(addr) && !banman->IsBanned(addr)) { + if (!m_banman->IsDiscouraged(addr) && !m_banman->IsBanned(addr)) { pfrom.PushAddress(addr, insecure_rand); } } @@ -4014,7 +4015,7 @@ return; } - if (connman.OutboundTargetReached(false) && + if (m_connman.OutboundTargetReached(false) && !pfrom.HasPermission(PF_MEMPOOL)) { if (!pfrom.HasPermission(PF_NOBAN)) { LogPrint(BCLog::NET, @@ -4051,7 +4052,8 @@ // pings: without it, if the remote node sends a ping once per // second and this node takes 5 seconds to respond to each, the 5th // ping the remote sends would appear to return very quickly. - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::PONG, nonce)); + m_connman.PushMessage(&pfrom, + msgMaker.Make(NetMsgType::PONG, nonce)); } return; } @@ -4186,17 +4188,17 @@ } if (msg_type == NetMsgType::GETCFILTERS) { - ProcessGetCFilters(pfrom, vRecv, chainparams, connman); + ProcessGetCFilters(pfrom, vRecv, chainparams, m_connman); return; } if (msg_type == NetMsgType::GETCFHEADERS) { - ProcessGetCFHeaders(pfrom, vRecv, chainparams, connman); + ProcessGetCFHeaders(pfrom, vRecv, chainparams, m_connman); return; } if (msg_type == NetMsgType::GETCFCHECKPT) { - ProcessGetCFCheckPt(pfrom, vRecv, chainparams, connman); + ProcessGetCFCheckPt(pfrom, vRecv, chainparams, m_connman); return; } @@ -4365,8 +4367,8 @@ } try { - ProcessMessage(config, *pfrom, msg_type, vRecv, msg.m_time, m_mempool, - m_chainman, m_connman, m_banman, interruptMsgProc); + ProcessMessage(config, *pfrom, msg_type, vRecv, msg.m_time, + interruptMsgProc); if (interruptMsgProc) { return false; } diff --git a/src/test/fuzz/process_message.cpp b/src/test/fuzz/process_message.cpp --- a/src/test/fuzz/process_message.cpp +++ b/src/test/fuzz/process_message.cpp @@ -113,11 +113,9 @@ connman.AddTestNode(p2p_node); g_setup->m_node.peer_logic->InitializeNode(config, &p2p_node); try { - ProcessMessage(config, p2p_node, random_message_type, - random_bytes_data_stream, GetTimeMillis(), - *g_setup->m_node.mempool, *g_setup->m_node.chainman, - *g_setup->m_node.connman.get(), - g_setup->m_node.banman.get(), std::atomic{false}); + g_setup->m_node.peer_logic->ProcessMessage( + config, p2p_node, random_message_type, random_bytes_data_stream, + GetTimeMillis(), std::atomic{false}); } catch (const std::ios_base::failure &e) { const std::string exception_message{e.what()}; const auto p =